APIServer源码分析之路由注册
前面我们对 Kube APIServer 的入口点和 go-restful 有一个基础了解后,我们就可以开始来了解下 APIExtensionServer
是如何实例化的了。
APIExtensionServer
APIExtensionServer
的创建流程大致包含以下几个步骤:
创建 GeneriAPIServer 实例化 CustomResourceDefinitions 实例化 APIGroupInfo InstallAPIGroup
三种类型的 Server 底层都需要依赖 GeneriAPIServer,第二步创建的 CustomResourceDefinitions 就是当前类型的 Server 对象,用于后续进行路由注册。APIGroupInfo 是用于每个版本、每个资源类型对应的存储对象。最后调用 InstallAPIGroup
进行路由注册,把每一个资源的版本,类型映射到一个 URI 地址中。代码如下所示:
// cmd/kube-apiserver/app/apiextensions.go
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
// 真正的代码位于:/vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
// New 从指定的配置返回 CustomResourceDefinitions 的新实例
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
// APIExtensionsServer 依赖 GenericAPIServer
// 通过 GenericConfig 创建一个名为 apiextensions-apiserver 的 genericServer
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
// 实例化 CustomResourceDefinitions 对象
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
apiResourceConfig := c.GenericConfig.MergedResourceConfig
// 实例化 APIGroupInfo 对象,APIGroup 指该 server 需要暴露的 API
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
// 如果开启了 v1 版本,将资源版本、资源、资源存储存放在APIGroupInfo的map中
if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
storage["customresourcedefinitions"] = customResourceDefinitionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
}
// 注册API
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
// 初始化 crd clientset 和 informers,用于初始化控制器
crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
delegateHandler := delegationTarget.UnprotectedHandler()
if delegateHandler == nil {
delegateHandler = http.NotFoundHandler()
}
versionDiscoveryHandler := &versionDiscoveryHandler{
discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
delegate: delegateHandler,
}
groupDiscoveryHandler := &groupDiscoveryHandler{
discovery: map[string]*discovery.APIGroupHandler{},
delegate: delegateHandler,
}
// 初始化控制器
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
// 申请handler处理器
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
// ......
)
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
// 初始化其他控制器
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
crdClient.ApiextensionsV1(),
crdHandler,
)
// 初始化openapi控制器
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
var openapiv3Controller *openapiv3controller.Controller
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
openapiv3Controller = openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
}
// 将 informer 以及 controller 添加到 PostStartHook 中
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
return nil
})
// 注册hook函数,启动前面实例化的各种controller
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
if s.GenericAPIServer.OpenAPIVersionedService != nil && s.GenericAPIServer.StaticOpenAPISpec != nil {
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
}
}
// 启动各种控制器
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)
discoverySyncedCh := make(chan struct{})
go discoveryController.Run(context.StopCh, discoverySyncedCh)
select {
case <-context.StopCh:
case <-discoverySyncedCh:
}
return nil
})
// ....
return s, nil
}
先通过 GenericConfig 创建一个名为 apiextensions-apiserver
的 genericServer,genericServer 提供了一个通用的 http server,定义了通用的模板,例如地址、端口、认证、授权、健康检查等等通用功能。无论是 APIServer 还是 APIExtensionsServer 都依赖于 genericServer,实现方式如下所示:
// vendor/k8s.io/apiserver/pkg/server/config.go
// New 创建了一个新的服务器,该服务器将处理链与传递的服务器逻辑地结合在一起。
// name被用来区分日志记录
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// ...
handlerChainBuilder := func(handler http.Handler) http.Handler {
// BuildHandlerChainFunc 允许你通过装饰 apiHandler 来构建自定义处理程序链
// 目前默认的处理链函数为 DefaultBuildHandlerChain:里面包含了大量默认的处理方式
return c.BuildHandlerChainFunc(handler, c.Config)
}
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
s := &GenericAPIServer{
Handler: apiServerHandler,
listedPathProvider: apiServerHandler,
// ......
}
// ......
// 安装一些额外的路由,比如索引全部接口、metrics接口等
installAPI(s, c.Config)
return s, nil
}
// vendor/k8s.io/apiserver/pkg/server/handler.go
// HandlerChainBuilderFn 被用来包装正在使用提供的处理器链 GoRestfulContainer 处理器
// 它通常用于应用过滤,如身份验证和授权
type HandlerChainBuilderFn func(apiHandler http.Handler) http.Handler
// 该函数就是用来按照 go-restful 的模式初始化 Container
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
然后实例化 CRD 和 APIGroupInfo,其中的 APIGroupInfo 对象用于描述资源组信息,一个资源对应一个 APIGroupInfo 对象,每个资源对应一个资源存储对象:
// /vendor/k8s.io/apiextensions-apiserver/pkg/server/genericapiserver.go
func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
return APIGroupInfo{
PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group),
// 这个map用于存储资源、资源存储对象的映射关系
// 格式:资源版本/资源/资源存储对象
// 资源存储对象 RESTStorage,负责资源的CRUD
// 后续将 RESTStorage 转换为 http 的 handler 函数
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
Scheme: scheme,
ParameterCodec: parameterCodec,
NegotiatedSerializer: codecs,
}
}
然后需要注册 APIGroupInfo,通过 s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo)
来实现的,将 APIGroupInfo 中的资源对象注册到APIExtensionServerHandler 函数。其过程是:
遍历 APIGroupInfo 将资源组、资源版本、资源名称映射到 http path 请求路径 通过 InstallREST 函数将资源存储对象作为资源的 handlers 方法 最后用 go-restfu l的 ws.Route
将定义好的请求路径和 handlers 方法添加路由到 go-restful
详细的代码如下所示:
// /vendor/k8s.io/apiextensions-apiserver/pkg/server/genericapiserver.go
// 在API中暴露指定的 APIGroup
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
return s.InstallAPIGroups(apiGroupInfo)
}
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
// ...
// 获取 OpenAPI 模型
openAPIModels, err := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
// 遍历所有的资源信息,一次安装资源版本处理器
for _, apiGroupInfo := range apiGroupInfos {
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}
apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
// ...
apiGroup := metav1.APIGroup{
Name: apiGroupInfo.PrioritizedVersions[0].Group,
Versions: apiVersionsForDiscovery,
PreferredVersion: preferredVersionForDiscovery,
}
s.DiscoveryGroupManager.AddGroup(apiGroup)
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
return nil
}
// installAPIResources 用于安装 RESTStorage 以支持每个 api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
var resourceInfos []*storageversion.ResourceInfo
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
// ...
apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes
// 核心就是调用 InstallREST,参数为go-restful的container对象
r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
// ...
resourceInfos = append(resourceInfos, r...)
}
// ...
return nil
}
// InstallREST 将 REST handlers (storage, watch, proxy and redirect) 注册到 restful 容器中。
// 预期提供的路径 root 前缀将服务于所有操作,root 不能以斜线结尾。
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
// 比如从 InstallAPI 调用链下来这里的 g.Root 为 /apis,这样就可以确定 handler 的前缀为 /apis/{goup}/{version}
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
// 实例化 APIInstaller 对象
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
// 调用 Install 函数:注册 api,返回 go-restful 的 WebService 对象
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
// 将 WebService 添加到 Container 中,这需要了解 go-restful 框架的知识
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
上面整个过程都是为了注册 API 来做的准备,核心是在 installer.Install()
函数中,该函数就是将 API 资源添加处理器的
// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
// 使用api installer 的前缀和版本创建一个新的 restful webservice 对象
// 这部分都属于 go-restful 框架的用法
func (a *APIInstaller) newWebService() *restful.WebService {
ws := new(restful.WebService)
ws.Path(a.prefix)
// a.prefix 包含 "prefix/group/version"
ws.Doc("API at " + a.prefix)
ws.Consumes("*/*")
mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
ws.Produces(append(mediaTypes, streamMediaTypes...)...)
ws.ApiVersion(a.group.GroupVersion.String())
return ws
}
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var resourceInfos []*storageversion.ResourceInfo
var errors []error
// 新建一个 WebService 对象(go-restful框架中的)
ws := a.newWebService()
// 将 paths 排序
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
// 获取swagger spec规范
for _, path := range paths {
// 将 Storage 转换成 Router,然后将路由注册到 webservice
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
if apiResource != nil {
apiResources = append(apiResources, *apiResource)
}
if resourceInfo != nil {
resourceInfos = append(resourceInfos, resourceInfo)
}
}
return apiResources, resourceInfos, ws, errors
}
这里最重要的就是 registerResourceHandlers
函数了,这个方法很长,核心功能是根据 storage 构造 handler,再将 handler 和 path 构造成 go-restful 框架的 Route 对象,最后 Route 添加到 webservice。
// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
// ......
// 是否是命名空间级别
var namespaceScoped bool
// ......
// storage 支持哪些 verbs 操作,用于了解每个 path 所支持的操作。
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
// ......
// 获取指定范围的操作列表
switch {
case !namespaceScoped:
// 处理非命名空间范围的资源,如节点
// ......
// 添加 actions 到资源路径:/api/apiVersion/resource
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// ......
default:
// 命名空间级别的资源对象
namespaceParamName := "namespaces"
namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
namespacedPath := namespaceParamName + "/{namespace}/" + resource
namespaceParams := []*restful.Parameter{namespaceParam}
// ......
// 构造 action 列表
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
// ......
}
// 为 actions 创建 Routes 路由
// 配置go-restful产生的MIME类型
mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
allMediaTypes := append(mediaTypes, streamMediaTypes...)
ws.Produces(allMediaTypes...)
// ...
for _, action := range actions {
// ......
// 构造 go-restful 的 RouteBuilder 对象
routes := []*restful.RouteBuilder{}
// 如果是子资源,kind应该是prent的kind
if isSubresource {
parentStorage, ok := a.group.Storage[resource]
fqParentKind, err := GetResourceKind(a.group.GroupVersion, parentStorage, a.group.Typer)
kind = fqParentKind.Kind
}
// 根据不同的 Verb,注册到不同的 handler 中去
switch action.Verb {
case "GET":
var handler restful.RouteFunction // go-restful的处理器
// 初始化handler
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, reqScope)
}
//...
// 构造 route(这都属于go-restful框架的使用方法)
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
// 将route添加到routes
addParams(route, action.Params)
routes = append(routes, route)
// ... 其他verb处理方式基本一致
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
// ......
}
// 遍历路由,加入到 WebService 中
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
// 将route加入到WebService中去
ws.Route(route)
}
}
// ......
return &apiResource, resourceInfo, nil
}
registerResourceHandlers 函数很长,但是我们可以先抛开细节,整体上了解下,可以看到就是先通过 Storage 判断支持哪些 Verbs 操作,然后生成 actions 列表,然后将每个 action 构造路由列表,最后也是将这些路由添加到 go-restful 的 WebService 中去,这里构造的路由绑定的处理器实现方式路由不同,比如 GET 方式的 handler 是通过 restfulGetResource
来实例化的,POST 方式的是通过 restfulCreateResource
来实例化的,实现方式基本差不多。
GET 方式 handler 函数 restfulGetResource
实现如下所示:
// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func restfulGetResource(r rest.Getter, scope handlers.RequestScope) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.GetResource(r, &scope)(res.ResponseWriter, req.Request)
}
}
restfulGetResource 函数得到的就是 restful.RouteFunction,这是 go-restful 的方式,真正去处理的是 handlers.GetResource
这个函数,这个函数里面调用的是 getResourceHandler,该函数返回的就是一个 http 标准库 handler 函数,处理对应的路由请求。
GET 请求的处理过程比较简单,通过请求的查询构造出一个 metav1.GetOptions
,然后交给 Getter 接口处理,最后将查询结果转换后返回给请求者。
//vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go
// GetResource 返回一个函数,该函数处理从 rest.Storage 对象中检索单个资源的操作。
func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
// 初始化需要的 GetOptions
options := metav1.GetOptions{}
// 获取查询的参数
if values := req.URL.Query(); len(values) > 0 {
// ...
// 将查询的参数进行解码,编程 GetOptions 对象
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
err = errors.NewBadRequest(err.Error())
return nil, err
}
}
// 然后使用 Getter 接口来处理
return r.Get(ctx, name, &options)
})
}
// getResourceHandler 是用于获取请求的 HTTP Handler函数
// 它委托给传入的 getterFunc 来执行实际的 get
func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// ...
namespace, name, err := scope.Namer.Name(req)
/// ...
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
// ...
// 使用 getterFunc 来执行实际的 get 操作,
result, err := getter(ctx, name, req, trace)
// ...
// 将结果转换为用户需要的格式返回给用户
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
}
}
POST 的处理器也是类似的,对应的逻辑在 restfulCreateResource 中:
// /vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
同样真正去处理的是 handlers.CreateResource
这个函数,这个函数里面调用的是 createHandler,该函数返回的就是一个 http 标准库 handler 函数,处理对应的路由请求。
//vendor/k8s.io/apiserver/pkg/endpoints/handlers/create.go
// CreateNamedResource returns a function that will handle a resource creation with name.
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}
// CreateResource 返回将处理资源创建的函数
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}
createHandler 的实现代码比较长,主要做了一下几件事:
对查询串进行解码生成 metav1.CreateOptions
。对请求的 body 体中的数据进行解码,生成资源对象。解码的对象版本是 internal 版本,internal 版本是该资源对象所有版本字段的全集。针对不同版本的对象内部可以使用相同的代码进行处理。 对对象进行修改的准入控制,判断是否修需要修改对象。 交给 creater 接口创建资源对象。 将数据转换为期望的格式写入 response 中,调用 creater 接口返回的结果仍然是 internal 版本,编码时,会编码成用户请求的版本返回给用户。
//vendor/k8s.io/apiserver/pkg/endpoints/handlers/create.go
// 返回一个 http handler 函数,处理对应的路由请求
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
// 标准 http handler 函数
return func(w http.ResponseWriter, req *http.Request) {
// ...
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
// 找到合适的 Serializer
gv := scope.Kind.GroupVersion()
s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
// 将请求解码成 CreateOptions
options := &metav1.CreateOptions{}
values := req.URL.Query()
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
// ...
}
// ...
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions"))
defaultGVK := scope.Kind
original := r.New()
// ...
// 找到合适的解码器
decoder := scope.Serializer.DecoderToVersion(decodeSerializer, scope.HubGroupVersion)
// 请请求体 body 进行解码
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
ctx = request.WithNamespace(ctx, namespace)
// 审计、准入、请求日志记录
ae := audit.AuditEventFrom(ctx)
admit = admission.WithAudit(admit, ae)
audit.LogRequestObject(req.Context(), obj, objGV, scope.Resource, scope.Subresource, scope.Serializer)
userInfo, _ := request.UserFrom(ctx)
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
// 处理请求
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
// ...
// 执行准入控制的mutate操作,就是在创建对象的时候进行修改
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
// ......
// 调用创建方法
result, err := requestFunc()
return result, err
})
code := http.StatusCreated
status, ok := result.(*metav1.Status)
if ok && status.Code == 0 {
status.Code = int32(code)
}
// 将结果转换为用户需要的格式返回给用户
transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
}
}
KubeAPIServer
KubeAPIServer 是我们最核心的一个 Server,用于处理内置资源对象的请求,该 server 是通过下面的的方式进行创建的:
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
其中的 kubeAPIServerConfig 上面在介绍生成通用配置的时候就介绍过了,后面的参数 apiExtensionsServer.GenericAPIServer
是前面的 APIExtensionServer 生成的通用 APIServer,KubeAPIServer 也会依赖该 Server,所以直接拿过来使用。
// cmd/kube-apiserver/app/server.go
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
return kubeAPIServer, nil
}
同样的 CreateKubeAPIServer 函数是直接将 kubeAPIServerConfig 对象填充上默认对象后去进行实例化的。
// pkg/controlplane/instance.go
// New 从指定配置返回一个新的 Master 实例。
// 如果未设置,某些配置字段将被设置为默认值。
// 必须指定某些配置字段,包括:KubeltClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
// 这里和 crd server 一样调用 GenericConfig 实例化一个名为 kube-apiserver 的 Server
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
// 配置日志路由支持
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
// ......
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}
// 安装 LegacyAPI
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
// ......
}
// 如果启用了 restStorageProvider,InstallLegacyAPI 将为其安装旧版 API
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
// 如果具有相同名称的资源存在于这些组中的多个组中 (e.g. "deployments.apps"" and "deployments.extensions"),
// 此列表的顺序确定哪个更优先 (e.g. "deployments").
// 此优先级顺序用于本地发现,但最终在 `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go` 中聚合,并具有特定的优先级。
restStorageProviders := []RESTStorageProvider{
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{},
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
// 安装新版本的接口:/apis/
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
// ...
return m, nil
}
同样和 crd server 一样先调用 GenericConfig 实例化一个名为 kube-apiserver 的 Server,然后调用 m.InstallLegacyAPI 安装 LegacyAPI,此方法的主要功能是将 core API 注册到路由中,是 apiserver 初始化流程中最核心的方法之一。
// pkg/controlplane/instance.go
// InstallLegacyAPI 将安装旧版本的 APIs,其实就是 core API:/api/
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
// 为 LegacyAPI 中各个资源创建 RESTStorage
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
// 初始化 bootstrap 控制器
controllerName := "bootstrap-controller"
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
// 注册路由信息
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}
将 API 注册到路由最终目的就是对外提供 RESTful API 来操作对应 resource,注册 API 主要分为两步,第一步是为 API 中的每个 resource 初始化 RESTStorage 以此操作后端存储中数据的变更,第二步是为每个 resource 根据其 verbs 构建对应的路由。m.InstallLegacyAPI
的主要逻辑为:
调用 legacyRESTStorageProvider.NewLegacyRESTStorage
为 LegacyAPI 中各个资源创建 RESTStorage,RESTStorage 的目的是将每种资源的访问路径及其后端存储的操作对应起来;初始化 bootstrap-controller
,并将其加入到 PostStartHook 中,bootstrap-controller
是 apiserver 中的一个 controller,主要功能是创建系统所需要的一些 namespace 以及创建 kubernetes service 并定期触发对应的 sync 操作,apiserver 在启动后会通过调用 PostStartHook 来启动bootstrap-controller
;在为资源创建完 RESTStorage 后,调用 m.GenericAPIServer.InstallLegacyAPIGroup
为 APIGroup 注册路由信息,InstallLegacyAPIGroup
方法的调用链非常深,主要为InstallLegacyAPIGroup--> installAPIResources --> InstallREST --> Install --> registerResourceHandlers
,这里和前面的 crd server 就是一样的方式了,最终核心的路由构造在registerResourceHandlers
方法内,该方法比较复杂,其主要功能是通过上一步骤构造的 REST Storage 判断该资源可以执行哪些操作(如 create、update等),将其对应的操作存入到 action 中,每一个 action 对应一个标准的 REST 操作,如 create 对应的 action 操作为 POST、update 对应的 action 操作为 PUT。最终根据 actions 数组依次遍历,对每一个操作添加一个 handler 方法,注册到 route 中去,再将 route 注册到 webservice 中去,webservice 最终会注册到 container 中,遵循 go-restful 的设计模式,这个流程和前面 crd server 完全一致。
然后后面的 m.InstallAPIs
方式也类似,用于注册安装新版的(/apis)的资源接口。
然后就是创建 AggregatorServer 服务,基本方式和前面两个 server 是类似的,CreateServerChain 流程的调用链如下所示:
|--> CreateKubeAPIServerConfig
|
CreateServerChain --|--> createAPIExtensionsConfig
|
| |--> c.GenericConfig.New
|--> createAPIExtensionsServer --> apiextensionsConfig.Complete().New --|
| |--> s.GenericAPIServer.InstallAPIGroup
|
| |--> c.GenericConfig.New --> legacyRESTStorageProvider.NewLegacyRESTStorage
| |
|--> CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --|--> m.InstallLegacyAPI
| |
| |--> m.InstallAPIs
|
|
|--> createAggregatorConfig
|
| |--> c.GenericConfig.New
| |
|--> createAggregatorServer --> aggregatorConfig.Complete().NewWithDelegate --|--> apiservicerest.NewRESTStorage
|
|--> s.GenericAPIServer.InstallAPIGroup
启动
在调用 CreateServerChain
完成各 Server 的初始化 后,然后就会调用 server.PrepareRun
完成服务启动前的准备工作,最后调用 prepared.Run
方法来启动安全的 http server。
server.PrepareRun
主要完成了健康检查、存活检查和OpenAPI
路由的注册工作,下面继续分析 prepared.Run
的流程,在 prepared.Run
中主要调用 s.NonBlockingRun
来完成启动工作。
// vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
return s.runnable.Run(stopCh)
}
这里的 runnable 在前面的 PrepareRun 中进行了初始化操作,得到的是一个 preparedGenericAPIServer
对象:
type preparedAPIAggregator struct {
*APIAggregator
runnable runnable
}
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
// ...
// 得到的是一个 preparedGenericAPIServer 对象
prepared := s.GenericAPIServer.PrepareRun()
// ...
return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}
所以真正的启动方法 Run 是 preparedGenericAPIServer
中的 Run 方法:
// vendor/k8s.io/apiserver/pkg/server/generic/apiserver.go
// Run 启动一个安全的 http server.
// 只有当 stopCh 关闭或安全端口最初无法监听时,它才会返回
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
// ...
go func() {
defer delayedStopCh.Signal()
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
<-stopCh
// 一旦启动关闭,/readyz 应立即开始返回失败信息。
// 这将为负载均衡器提供一个由ShutdownDelayDuration定义的时间窗口,以便检测/readyz是否处于不可用状态,并停止向该服务器发送流量。
shutdownInitiatedCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name())
time.Sleep(s.ShutdownDelayDuration)
}()
// 关闭socket
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
stopHttpServerCh := delayedStopCh.Signaled()
shutdownTimeout := s.ShutdownTimeout
if s.ShutdownSendRetryAfter {
stopHttpServerCh = drainedCh.Signaled()
shutdownTimeout = 2 * time.Second
klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout)
}
// 调用 NonBlockingRun 完成启动流程
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
// 后面是收到退出信号后做的一些收尾工作
httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
go func() {
<-listenerStoppedCh
httpServerStoppedListeningCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
}()
// ...
}
启动服务真正的核心是 s.NonBlockingRun 方法,其主要实现代码如下所示:
// /vendor/k8s.io/apiextensions-apiserver/pkg/server/genericapiserver.go
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
auditStopCh := make(chan struct{})
// 是否要启动审计日志
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(auditStopCh); err != nil {
return nil, nil, fmt.Errorf("failed to run the audit backend: %v", err)
}
}
// 启动真正的 https server
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
var listenerStoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, shutdownTimeout, internalStopCh)
///
}
// 后面也是收到退出信息的一些处理
go func() {
<-stopCh
close(internalStopCh)
if stoppedCh != nil {
<-stoppedCh
}
s.HandlerChainWaitGroup.Wait()
close(auditStopCh)
}()
// 执行 postStartHooks
s.RunPostStartHooks(stopCh)
// 向 systemd 发送 ready 信号
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
return stoppedCh, listenerStoppedCh, nil
}
s.NonBlockingRun
的主要逻辑为:
判断是否要启动审计日志服务; 调用 s.SecureServingInfo.Serve
配置并启动 https server;执行 postStartHooks; 向 systemd 发送 ready 信号;
以上就是 apiserver 的初始化以及启动流程过程的分析,这只是整体上的一个流程,其中还有很多细节需要去深入,比如上文提到的 API Resource RESTStorage 等,这些还需要去继续分析。