apiserver启动分析

kube-apiserver

apiserver是k8s的中心组件,它以RESTful API的形式提供了各类资源对象的操作接口。其他组件或者客户端(如kubectl)都会去调用它。apiserver支持同时提供https(默认监听6443端口)和http(默认监听8080端口)。其中http API是非安全结构,不做任何认证授权机制。

它的主要功能是:

  1. 认证授权
    • 通过认证插件认证客户端:客户端发送请求,apiserver通过配置的一个或多个认证插件,轮流调用这些插件,直到有一个插件认证通过。主要通过检查HTTP请求实现,可以从HTTP头或者客户端证书中获取用户信息,插件获得客户端的用户名、用户id和归属组来进行认证。
    • 通过授权插件授权客户端:apiserver通过配置一个或多个授权插件,对已经认证的客户端进行认证,决定其是否能对请求的资源执行相应的操作
    • 通过准入控制插件验证修改资源请求:当客户端请求尝试创建、修改、删除资源时,请求需要经过准入控。制插件的验证。apiserver会配置多个准入控制插件,请求需要经过所有准入控制插件的验证。如果请求是读取数据,则不会做准入控制验证。
  2. 变更通知:
    • 客户端通过创建到apiserver的HTTP连接来监听资源变更,通过连接,客户端会接收到监听对象的一系列变更通知。当对象发生变化,apiserver会把新版本的对象发送至所有监听该对象的客户端。
  3. 组件通信:
    • k8s系统的组件,只能通过apiserver进行通信,组件间不会直接通信。apiserver是唯一与etcd通信的组件,其他组件不会与etcd做通信,而是通过与apiserver通信,来进行对资源对象的修改。

kube-apiserver架构

apiserver提供了3种HTTP Server服务,APIExtensionsServer、KubeAPIServer、AggregatorServer。不同服务的应用场景不同,提供的资源也不同。

  • APIExtensionsServer:API扩展服务(扩展器)。提供了CRD自定义服务,可通过CRD对Kubernetes资源进行扩展。该服务通过CustomResourceDefinitions对象进行管理,并通过extensionsapiserver.Scheme资源注册表管理CRD相关资源。
  • KubeAPIServer:API核心服务。提供了Kubernetes内置核心资源服务,不允许开发者随意更改相关资源。API核心服务通过Master对象进行管理,并通过legacyscheme.Scheme资源注册表管理Master相关资源。
  • AggregatorServer:将用户扩展的API注册到apiserver,为其提供服务发现。通过API聚合层APIAggregator(AA)将扩展API的访问请求转发到用户扩展的服务上。

  • GenericAPIServer:APIExtensionsServer、KubeAPIServer、AggregatorServer都基于GenericAPIServer进行创建。通过GenericAPIServer可以将Kubernetes资源与REST API进行映射。

apiserver启动流程分析

相关源码参照kubernetes 1.20.4

注册k8s所支持的资源

首先,kubernetes支持的资源需要注册到Scheme资源注册表中,启动时,才能从Scheme中拿到资源信息,并启动、运行AggregatorServer、APIExtensionsServer、KubeAPIServer。资源的注册,是通过Go的import和init触发的。

  1. cmd/kube-apiserver/app/server.go中,导入了"k8s.io/kubernetes/pkg/api/legacyscheme"包,该包中定义了全局的Scheme资源注册表Codec编解码器ParameterCodec参数编解码器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    package legacyscheme
    import (
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/serializer"
    )
    var (
    Scheme = runtime.NewScheme()
    Codecs = serializer.NewCodecFactory(Scheme)
    ParameterCodec = runtime.NewParameterCodec(Scheme)
    )
  2. 通过导入"k8s.io/kubernetes/pkg/controlplane"注册资源。controlplane导入了apiserver支持的API groups包,通过导入包的机制,触发其初始化函数,进行资源的注册。例如,在controlplane的import_known_versions.go,导入了k8s.io/kubernetes/pkg/apis/admission/install。这个install文件实际上就是将相应的资源通过AddToScheme方法,添加到Scheme资源注册表中。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    package install
    import (
    "k8s.io/apimachinery/pkg/runtime"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/kubernetes/pkg/api/legacyscheme"
    "k8s.io/kubernetes/pkg/apis/admission"
    v1 "k8s.io/kubernetes/pkg/apis/admission/v1"
    "k8s.io/kubernetes/pkg/apis/admission/v1beta1"
    )

    func init() {
    Install(legacyscheme.Scheme)
    }

    // Install registers the API group and adds types to a scheme
    func Install(scheme *runtime.Scheme) {
    utilruntime.Must(admission.AddToScheme(scheme))
    utilruntime.Must(v1beta1.AddToScheme(scheme))
    utilruntime.Must(v1.AddToScheme(scheme))
    utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
    }

Cobra命令行解析

apiserver.go的main方法主要是通过cobra进行执行启动命令。通过调用NewAPIServerCommand()方法,对apiserver配置进行默认值填充和验证。在该方法中,会通过NewServerRunOptions配置参数默认值,然后将options传给command。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func NewAPIServerCommand() *cobra.Command {
// 资源组件配置默认值填充
s := options.NewServerRunOptions()
// 构建命令行命令,将参数传递给command
cmd := &cobra.Command{
......
// 设置apiserver默认启动的options
completedOptions, err := Complete(s)
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
// cobra生成的命令实现的功能方法
RunE: func(cmd *cobra.Command, args []string) error {
......
return Run(completedOptions, genericapiserver.SetupSignalHandler())
}
}
......
return cmd

apiserver的main方法中,最终会调用cobra.Execute()执行这个命令进行启动。启动时,cobra会调用Run()方法,Run()方法主要做了3件事server:=CreateServerChainserver.PrepareRunserver.Run

1
2
3
4
5
6
7
8
9
func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewAPIServerCommand()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}

创建ServerChain

CreateServerChain主要对KubeAPIServer、APIExtensionServer、AggregatorServer进行了创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
// 根据options的SSHUser和SSHKeyfile,与Node进行连接
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
if err != nil {
return nil, err
}
// 创建运行apiserver需要的资源
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil {
return nil, err
}

// 创建APIExtensionServer需要的资源
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
return nil, err
}

// 创建APIExtensionServer
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}

// 创建KubeAPIServer
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}

// 创建AggregatorServer需要的资源
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
// 创建AggregatorServer
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}

return aggregatorServer, nil
}

创建通用配置

ServerRunOptions包含了apiserver启动需要的所有参数,主要配置主要包括:

  • genericoptions,比如Etcd、SecureServing、Audit、Features等
  • kubeoptions,Admission、Authentication、Authorization、CloudProvider等
  • metrics:采集的配置
  • logs:日志的配置
  • kubeletConfig:kubeletclient的config,比如port,node的hostname,ip,连接超时时间等
  • SSHKeyfile
  • SSHUser
  • ProxyClientCertFile
  • ProxyClientKeyFile

CreateKubeAPIServerConfig主要完成apiserver的基础配置,这些配置信息来源于kube-apiserver.yaml中的command

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// cmd/kube-apiserver/app/server.go
func CreateKubeAPIServerConfig(
s completedServerRunOptions,
nodeTunneler tunneler.Tunneler,
proxyTransport *http.Transport,
) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
// 根据配置信息,初始化genericConfig等信息
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
if err != nil {
return nil, nil, nil, err
}
......
// 存储配置
if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.Transport.ServerList[0]); err == nil && port != "0" && len(port) != 0 {
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.Transport.ServerList}.CheckEtcdServers); err != nil {
return nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
}
}
......
// ip范围设置
serviceIPRange, apiServerServiceIP, err := controlplane.ServiceIPRange(s.PrimaryServiceClusterIPRange)
if err != nil {
return nil, nil, nil, err
}
......
config := &controlplane.Config{
GenericConfig: genericConfig,
ExtraConfig: controlplane.ExtraConfig{
......
},
}
// 省略了其他配置
.......
return config, serviceResolver, pluginInitializers, nil
}

创建APIExtensionsServer

逻辑概述

  1. 通过c.GenericConfig.New()创建一个名为apiextensions-apiserver的genericServer。
    1. 创建http请求处理链,用作对请求的处理,一般用来进行filter操作,比如身份验证和授权。
    2. 实例化一个genericAPIServer对象s,genericAPIServer
    3. 创建并将SharedInformerFactory添加到s的PostStartHook
    4. 创建将对请求的限流添加到s的PostStartHook
    5. 创建将healthz检测添加到s的PostStartHook
    6. 通过installAPI,将api暴露
  2. 创建CustomResourceDefinitions(CRD)对象。
  3. 实例化GroupNameapiextensions.k8s.ioAPIGroupInfo,只对apiextensions.k8s.io的资源进行管理。
  4. APIGroupInfo添加RESTStorageRESTStorage通过Storage Backend访问etcd。
  5. 通过s.GenericAPIServer.InstallAPIGroup将CRD的资源服务加入到gorestful container中。其调用链为”InstallAPIGroup->InstallAPIGroups->InstallAPIResources->InstallREST->APIInstaller.Install”。大体逻辑为遍历APIGroupInfo,注册APIGroupInstallAPIGroup遍历时将Group、Resource、Version,与路径/apis封装为APIGroupVersion。然后调用InstallREST方法,InstallREST调用Install将遍历APIGroupInfo,根据rest.Storage确定APIGroupInfo的Verbs(GET、WATCH、CREATE、DELETE、UPDATE等),APIGroupInfo进行绑定,然后将其封装为APIResource。通过Verbs确定能对资源做哪些操作,再将这些操作的path和http请求类型,请求处理的handler,注册为一条条的route记录。然后再将routes放到webseivice中,通过DiscoveryGroupManager对该webseivice设置服务发现,最后将webservice放到gorestful container中。
  6. 定义CRD的ClientSet,并为其创建SharedInformerFactory
  7. 定义crdHandler,为暴露的api注册handler,包括crdInformer的AddFun、UpdateFunc、DeleteFunc等
  8. 定义对clientset的一系列controller控制器
  9. 将informer和controller添加到PostStartHook中。PostStartHookFunc是一个钩子函数,在server启动时触发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
// 创建genericServer
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
if err != nil {
return nil, err
}
// 实例化CRD,CRD中包括了GenericServer和SharedInformerFactory
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}

apiResourceConfig := c.GenericConfig.MergedResourceConfig
// 实例化APIGroupInfo,将GroupName设置为apiextensions.k8s.io
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)

// 分别为v1.apiextensions和v1beta.apiextensions的apiGroupInfo添加RESTStorage。用于与etcd交互的
if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage["customresourcedefinitions"] = customResourceDefinitionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
}
if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
......
apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
}

// 在路由中注册APIGroupInfo,对外暴露API
// InstallAPIGroup方法直接返回s.InstallAPIGroups(apiGroupInfo)
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
.....

// 为'/apis'资源组的service提供处理方法handler
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
establishingController,
c.ExtraConfig.ServiceResolver,
c.ExtraConfig.AuthResolverWrapper,
c.ExtraConfig.MasterCount,
s.GenericAPIServer.Authorizer,
c.GenericConfig.RequestTimeout,
time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
apiGroupInfo.StaticOpenAPISpec,
c.GenericConfig.MaxRequestBodyBytes,
)
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)


// 定义controller,CRD用来处理请求
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
crdClient.ApiextensionsV1(),
crdHandler,
)
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
// controller和informer添加到PostStartHook中,在server启动的时候,运行Controller
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
if s.GenericAPIServer.OpenAPIVersionedService != nil && s.GenericAPIServer.StaticOpenAPISpec != nil {
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
}
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)

discoverySyncedCh := make(chan struct{})
go discoveryController.Run(context.StopCh, discoverySyncedCh)
select {
case <-context.StopCh:
case <-discoverySyncedCh:
}

return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
return s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced(), nil
}, context.StopCh)
})
return s, nil
}

创建GenericServer

  1. 配置项检查
  2. 定义用于身份验证和授权的handlerChainBuilder请求处理链
  3. 创建handler,handler用于创建gorestful container,配置路由并将handler与路由关联。
  4. 创建SharedInformerFactory,用于给资源创建informer。并将其添加到PostStartrHook中
  5. 创建用于对资源请求的隔离、限流、健康检查的处理,并添加到PostStartHook中。
  6. 通过InstallAPI,进行资源暴露
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// k8s.io/apiserver/pkg/server/config.go
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// completedConfig检查
......
// go-restful请求处理链,用来处理身份验证和授权等过滤
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}

// apiServer用来处理请求的handler
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
// 根据config,配置genericAPIServer,并将apiServerHandler绑定到crd.genericAPIServer中
s := &GenericAPIServer{
......
}
......

// 添加hook
for name, preconfiguredPostStartHook := range c.PostStartHooks {
if err := s.AddPostStartHook(name, preconfiguredPostStartHook.hook); err != nil {
return nil, err
}
}

// 创建informers
genericApiServerHookName := "generic-apiserver-start-informers"
if c.SharedInformerFactory != nil {
if !s.isPostStartHookRegistered(genericApiServerHookName) {
err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
// context.StopCh给informer的channel提供信息
c.SharedInformerFactory.Start(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
// 添加healthz检查
err := s.addReadyzChecks(healthz.NewInformerSyncHealthz(c.SharedInformerFactory))
if err != nil {
return nil, err
}
}

const priorityAndFairnessConfigConsumerHookName = "priority-and-fairness-config-consumer"
if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) {
} else if c.FlowControl != nil {
err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error {
go c.FlowControl.MaintainObservations(context.StopCh)
go c.FlowControl.Run(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
// TODO(yue9944882): plumb pre-shutdown-hook for request-management system?
} else {
klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName)
}

if c.FlowControl != nil {
// 添加APF对请求进行分类和隔离(限流机制,保护api被恶意请求消耗资源然后死掉)
const priorityAndFairnessFilterHookName = "priority-and-fairness-filter"
if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) {
err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error {
genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
} else {
// 默认添加max-in-flight,避免受到CPU和内存过载的影响
const maxInFlightFilterHookName = "max-in-flight-filter"
if !s.isPostStartHookRegistered(maxInFlightFilterHookName) {
err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error {
genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
}

// 添加healthz检查
for _, delegateCheck := range delegationTarget.HealthzChecks() {
skip := false
for _, existingCheck := range c.HealthzChecks {
if existingCheck.Name() == delegateCheck.Name() {
skip = true
break
}
}
if skip {
continue
}
s.AddHealthChecks(delegateCheck)
}
// api的path集合
s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget}
// 暴露api
installAPI(s, c.Config)

// use the UnprotectedHandler from the delegation target to ensure that we don't attempt to double authenticator, authorize,
// or some other part of the filter chain in delegation cases.
if delegationTarget.UnprotectedHandler() == nil && c.EnableIndex {
s.Handler.NonGoRestfulMux.NotFoundHandler(routes.IndexLister{
StatusCode: http.StatusNotFound,
PathProvider: s.listedPathProvider,
})
}

return s, nil
}
InstallAPI
  • routes.Index{}.Install(),暴露了"/","/index.html",用于获取apiservice索引页面。可以通过浏览器访问”http://ip:8080“查看。
  • routes.Profiling{}.Install(),暴露了"/debug/pprof"性能分析页面。包含的path有:"/debug/pprof/","/debug/pprof/profile","/debug/pprof/profile","/debug/pprof/profile"。可以通过浏览器访问”http://ip:8080/debug/pprof/“查看。
  • routes.MetricsWithReset{}.Install()routes.DefaultMetrics{}.Install(),暴露了"/metrics"。可以通过浏览器访问”http://ip:8080/“查看。
  • 添加服务注册管理
  • 添加APF流量控制机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// apiserver/pkg/server/config.go
func installAPI(s *GenericAPIServer, c *Config) {
if c.EnableIndex {
routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
}
if c.EnableProfiling {
routes.Profiling{}.Install(s.Handler.NonGoRestfulMux)
if c.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
routes.DebugFlags{}.Install(s.Handler.NonGoRestfulMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
}
if c.EnableMetrics {
if c.EnableProfiling {
routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
} else {
routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux)
}
}

routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)

if c.EnableDiscovery {
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
}
if c.FlowControl != nil && feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
c.FlowControl.Install(s.Handler.NonGoRestfulMux)
}
}

InstallAPIGroups的主要处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// InstallAPIGroups的主要处理
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
......
openAPIModels, err := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
for _, apiGroupInfo := range apiGroupInfos {
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}
......
s.DiscoveryGroupManager.AddGroup(apiGroup)
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
return nil
}
installAPIResources
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//k8s.io/apiserver/pkg/server/genericapiserver.go
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
var resourceInfos []*storageversion.ResourceInfo
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
......

apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)

......
// 注册restful api的处理方法
r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
......

resourceInfos = append(resourceInfos, r...)
}
}
getAPIGroupVersion

APIGroupInfo封装为APIGroupVersion,为其添加一个RESTful Storage。这个Storage主要用于存放该组资源的信息,从版本到资源再到存储的映射。

1
2
3
4
5
6
7
8
9
10
11
//apiserver/pkg/server/genericapiserver.go
func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion {
storage := make(map[string]rest.Storage)
for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
storage[strings.ToLower(k)] = v
}
version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
version.Root = apiPrefix
version.Storage = storage
return version
}
InstallREST

InstallRESTREST handlers(storage, watch, proxy and redirect)注册到RESTful Container中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// k8s.io/apiserver/pkg/endpoints/groupversion.go
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
// 为API resources注册handlers,同时创建一个restful.WebService,把handlers注册到WebService中。Install()调用registerResourceHandlers,为资源绑定action和path
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
// 将webservice注册到container中
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
APIInstaller.Install()绑定资源的path和handler

installer.Install()调用install.registerResourceHandlers(),根据rest.Storage判断资源可以执行的操作,然后将其放入actions中,再对actions遍历,使用restful的RouteFunction为action创建handler。然后将action和handler注册到route中,再将route注册到webservice中。InstallREST()通过container.Add(ws)会把这个ws注册到restful.Container中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
......
// 为资源添加action
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
......
// 为action创建handler
for _, action := range actions {
......
switch action.Verb {
case "GET":
var handler restful.RouteFunction
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, exporter, reqScope)
}
if needOverride {
// need change the reported verb
handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
} else {
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
}
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
doc := "read the specified " + kind
//
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
case "LIST":
......
case "PUT":
......
......
default:
return nil, nil, fmt.Errorf("unrecognized action verb: %s", action.Verb)
}
// 将route注册到webservice中
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
ws.Route(route)
}
}
......
return &apiResource, resourceInfo, nil
}

创建KubeAPIServer

kube-apiserver的创建逻辑与创建apiextensions-apiserver类似,kube-apiserver主要是为了对"/api""/apis"下的资源进行注册暴露。

逻辑概述

  1. 通过GenericConfig.New()方法创建一个为kube-apiserver,方法逻辑与创建apiextensions-apiserver完全一致。
  2. 判断是否支持logs,支持的话,就添加logs的路由。
  3. 实例化apiserver对象Instance,将认证配置封装到apiserver对象中。
  4. 调用InstallLegacyAPI方法,注册apiserver的核心资源(core),主要是"/api/v1"下的每个resource。核心资源的均声明于k8s.io/client-go/kubernetes/typed/core/v1/下。
  5. []RESTStorageProvider中每一项StorageProvider在实例化时,都会创建一个带有groupNameapiGroupInfo,并为不同version创建Storage。将[]RESTStorageProvider传入InstallAPIs
  6. 调用InstallAPIs方法中,遍历RESTStorageProvider列表,为每种资源封装apiGroupInfo,然后调用InstallAPIGroups将服务进行注册。
  7. 最后为apiserver添加用于身份认证、检测的PostStartHook
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// k8s.io/apiserver/pkg/server/config.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
......
// 实例化kube-apiserver
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
......
// 注册logs路由
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}

// beta v1.20,开启SA服务发现,用于SA的token验证。
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceAccountIssuerDiscovery) {
md, err := serviceaccount.NewOpenIDMetadata(
c.ExtraConfig.ServiceAccountIssuerURL,
c.ExtraConfig.ServiceAccountJWKSURI,
c.GenericConfig.ExternalAddress,
c.ExtraConfig.ServiceAccountPublicKeys,
)
if err != nil {
......
} else {
// 注册apiserver提供JWKS的URL路径,该JWKS包含可用于签署Kubernetes服务帐户密钥的公钥。
routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
Install(s.Handler.GoRestfulContainer)
}
}
// apiserver实例包含genericAPIServer和认证配置(验证普通客户端的身份的ClientCA、用来确定用户名的头信息RequestHeaderUsernameHeaders、kube-apiserver用来确定组的头信息RequestHeaderGroupHeaders、用来确定user.extra的头文件RequestHeaderExtraHeaderPrefixes、允许充当前端代理的Subject主题RequestHeaderAllowedNames、用来验证前端代理的requesttheaderca)
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}
// LegacyAPI
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
}
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}

// 每一种资源的RESTStorageProvider存放了其不同version和path.
restStorageProviders := []RESTStorageProvider{
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
extensionsrest.RESTStorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{},
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
// 暴露api
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
if c.ExtraConfig.Tunneler != nil {
m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
}

m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
......
})
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {
......
}
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {
......
}
}
return m, nil
}

InstallLegacyAPI

  1. 通过NewLegacyRESTStorage,声明一个资源组对象apiGroupInfo。然后为核心组的资源创建存储restStorage,比如通过调用'podstore.NewStorage'为pod资源创建一个podStorage,然后将其放入restStorageMap。最后将apiGroupInfo资源版本设置为v1,并将restStorageMap,赋值给核心组的VersionedResourcesStorageMap``(VersionedResourcesStorageMap map[string]map[string]rest.Storage,对外显示为Version/Resource/RestStorage),例如可以通过"http://ip:8080/api/v1/pods"查看存储的pod对象。
  2. /api下的资源注册路由,并绑定处理方法,以便对外提供RESTful API。调用链为”InstallLegacyAPI->InstallLegacyAPIGroup->installAPIResources->InstallREST->APIInstaller.Install”,逻辑与APIExtensionsServer注册路由的逻辑一致。
  3. 将routes添加到services中,然后将service添加到container中,对外提供服务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// pkg/controlplane/instance.go
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
// 为LegacyAPI中的资源定义RESTStorage,包括对Pod的资源存储类型
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
return fmt.Errorf("error building core storage: %v", err)
}

controllerName := "bootstrap-controller"
// 声明一个RESTClient
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
// 初始化bootstrap-controller,Controller是核心引导Kubernetes控制器循环的控制器管理器,它管理创建“Kubernetes”服务、“default”、“kube-system”和“kube-public”名称空间,并提供对服务IP的IP修复检查
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

// Install '/api'下的路由
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}
NewLegacyRESTStorage

为核心组定义apiGroupInfo,并对组内的资源及其子资源创建restStorage,然后放到restStorageMap中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{
PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
Scheme: legacyscheme.Scheme,
ParameterCodec: legacyscheme.ParameterCodec,
NegotiatedSerializer: legacyscheme.Codecs,
}
......
podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
...
eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
...
limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
...
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage, err := namespacestore.NewREST(restOptionsGetter)
...
podStorage, err := podstore.NewStorage(restOptionsGetter,nodeStorage.KubeletConnectionInfo,c.ProxyTransport,podDisruptionClient)
......

restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
......
}
......
// 将核心组的version设置为v1
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
return restStorage, apiGroupInfo, nil
}
InstallLegacyAPIGroup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//apiserver/pkg/server/genericapiserver.go
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
}
// 从apiGroupInfos中取出前缀为`api`的apiGroupInfo,通过utilopenapi.ToProtoModels,将其格式化为OpenAPI
openAPIModels, err := s.getOpenAPIModels(apiPrefix, apiGroupInfo)
if err != nil {
return fmt.Errorf("unable to get openapi models: %v", err)
}
// 将REST (存储、监视、代理和重定向)注册到restful Container中
if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
return err
}

s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
return nil
}

InstallAPIs

对每个apiGroupsInfo,调用InstallAPIGroups,对/apis下的APIGroup进行暴露。通过installAPIResources为每个resource注册pathstoragehandler,并将其注册到routewebservice中。这部分处理流程与installAPIResources部分的流程一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// pkg/server/genericapiserver.go
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
//空Group或空Version返回错误处理
......
// 从apiGroupInfos中取出"/apis"前缀的APIGroupInfo,通过utilopenapi.ToProtoModels,将其格式化为OpenAPI
openAPIModels, err := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
......
for _, apiGroupInfo := range apiGroupInfos {
// 调用installAPIResources
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}
// 设置discovery
apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
continue
}
apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
GroupVersion: groupVersion.String(),
Version: groupVersion.Version,
})
}
preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{
GroupVersion: apiGroupInfo.PrioritizedVersions[0].String(),
Version: apiGroupInfo.PrioritizedVersions[0].Version,
}
apiGroup := metav1.APIGroup{
Name: apiGroupInfo.PrioritizedVersions[0].Group,
Versions: apiVersionsForDiscovery,
PreferredVersion: preferredVersionForDiscovery,
}

s.DiscoveryGroupManager.AddGroup(apiGroup)
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
return nil
}

创建AggregatorServer

逻辑概述

  1. 创建aggregatorServer实例。
  2. 创建apiregistrationClient实例,实质为RESTClient
  3. 创建crdRegistrationController,它通过AutoAPIServiceRegistration来自动注册APIServices。aggregatorServer在启动时,会Run起5个goroutine,不断的对workequeue进行消费。在调用checkService()的时候,会把服务器的资源与cache上的做对比,然后根据比较结果,通过apiRegistrationClientapiService进行操作。
  4. 通过apiServicesToRegister将apiService放入到队列中等待消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// cmd/kube-apiserver/app/aggregator.go
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
// 创建aggregatorServer
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
}

// 根据LoopbackClientConfig配置创建apiRegistrationClient,
// 这是一个RESTClient,REST是最基础的客户端。其对HTTP Request进行了封装,实现了RESTful风格的API。
// 这个client用于与集群通信
apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}

// 创建autoRegistrationController,用于将CRD对应的APIService自动注册到apiserver中
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
// 将每个apiService放入autoRegistrationController的sync队列中,在controller启动时,对apiServices进行添加
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)

// 创建crdRegistrationController,crdRegistrationController用autoRegistrationController注册CRD GroupVersions,这样它们就会自动保持同步
crdRegistrationController := crdregistration.NewCRDRegistrationController(
apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
autoRegistrationController)

err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go crdRegistrationController.Run(5, context.StopCh)
go func() {
if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
crdRegistrationController.WaitForInitialSync()
}
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})
if err != nil {
return nil, err
}

err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
makeAPIServiceAvailableHealthCheck(
"autoregister-completion",
apiServices,
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
),
)
if err != nil {
return nil, err
}

return aggregatorServer, nil
}
实例化aggregatorServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
//kube-aggregator/pkg/apiserver/apiserver.go
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
// 使用aggregatorServer自己的OpenAPI handler
// 将默认的genericServer配置OpenAPI handler置为空
openAPIConfig := c.GenericConfig.OpenAPIConfig
c.GenericConfig.OpenAPIConfig = nil
// 创建kube-aggregator,此处逻辑与kube-apiserver和apiextensions-apiserver相同
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
if err != nil {
return nil, err
}
// 创建一个apiregistrationClient,实质为一组RestClient
// 用于与group为'apiregistration.k8s'的资源进行交互。
apiregistrationClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}

// informerFactory用于创建Informer
informerFactory := informers.NewSharedInformerFactory(
apiregistrationClient,
5*time.Minute,
)

s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: openAPIConfig,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
}

// 定义一个apiGroupInfo,其groupName为"apiregistration.k8s.io",封装资源信息及访问存储的RESTStorage
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
// 调用InstallAPIGroups,对APIService进行暴露
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}

enabledVersions := sets.NewString()
for v := range apiGroupInfo.VersionedResourcesStorageMap {
enabledVersions.Insert(v)
}
if !enabledVersions.Has(v1.SchemeGroupVersion.Version) {
return nil, fmt.Errorf("API group/version %s must be enabled", v1.SchemeGroupVersion.String())
}

apisHandler := &apisHandler{
codecs: aggregatorscheme.Codecs,
lister: s.lister,
discoveryGroup: discoveryGroup(enabledVersions),
}
// 绑定过滤器
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)

//实例化APIServiceRegistrationController,用来添加删除APIService
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
if len(c.ExtraConfig.ProxyClientCertFile) > 0 && len(c.ExtraConfig.ProxyClientKeyFile) > 0 {
aggregatorProxyCerts, err := dynamiccertificates.NewDynamicServingContentFromFiles("aggregator-proxy-cert", c.ExtraConfig.ProxyClientCertFile, c.ExtraConfig.ProxyClientKeyFile)
if err != nil {
return nil, err
}
if err := aggregatorProxyCerts.RunOnce(); err != nil {
return nil, err
}
aggregatorProxyCerts.AddListener(apiserviceRegistrationController)
s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContent

s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(context genericapiserver.PostStartHookContext) error {
go aggregatorProxyCerts.Run(1, context.StopCh)
return nil
})
}
// 创建AvailableConditionController,处理检查已注册API服务的可用性。
availableController, err := statuscontrollers.NewAvailableConditionController(
informerFactory.Apiregistration().V1().APIServices(),
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
apiregistrationClient.ApiregistrationV1(),
c.ExtraConfig.ProxyTransport,
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
s.serviceResolver,
c.GenericConfig.EgressSelector,
)
if err != nil {
return nil, err
}
// 将informer、resgistrationController、和用于可用性检测的apiServiceStatusAvailableController添加到PostStartHook中
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.StopCh)
c.GenericConfig.SharedInformerFactory.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
handlerSyncedCh := make(chan struct{})
go apiserviceRegistrationController.Run(context.StopCh, handlerSyncedCh)
select {
case <-context.StopCh:
case <-handlerSyncedCh:
}

return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
// if we end up blocking for long periods of time, we may need to increase threadiness.
go availableController.Run(5, context.StopCh)
return nil
})

if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
// 在AggregatorServer中开启一个goroutine用来更新资源的storage version
s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
_, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(
context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}, hookContext.StopCh); err != nil {
return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",
s.GenericAPIServer.APIServerID, err)
}

// 正常情况,一个apiserver只在启动时更新一次storage version
// storage version可能会被不同的代理修改或删除,每10分钟进行一次一致性的update操作
// 如果没发生改变,这个一致性的update操作不会进行循环,并且被apiserver阻塞住
go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {
/** 所有apiserver (aggregator-apiserver, kube-apiserver
* apiextensions-apiserver)共享相同的generic apiserver配置。
* 当generic apiserver安装api时,使用相同的StorageVersion管理器注册所有内置资源。
**/
s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(
hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
return false, nil
}, hookContext.StopCh)
// StorageVersionManager完成第一轮更新时,PostStartHook会返回unblock/healthz。
// 处理程序不再阻塞write请求
wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
return s.GenericAPIServer.StorageVersionManager.Completed(), nil
}, hookContext.StopCh)
return nil
})
}
return s, nil
}
kube-aggregator的clientset

Clientset根据为一个group中资源的不同Version,为其设置不同的restClient

1
2
3
4
5
6
//k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/clientset.go
type Clientset struct {
*discovery.DiscoveryClient
apiregistrationV1beta1 *apiregistrationv1beta1.ApiregistrationV1beta1Client
apiregistrationV1 *apiregistrationv1.ApiregistrationV1Client
}
创建autoRegistrationController
  • apiServiceInformer提供对APIServices共享的informer和list的访问。
  • apiServiceClient是一个ClientSet,ClientSet封装了对Resource和Version的管理方法。每一个Resource和Version都以函数的方式暴露给开发者。
    autoRegisterController负责将CRD(CustomResourceDefinition)对应的APIServices自动注册到apiserver上。其内部有一个queue,在启动时的时候,goroutine不断的对queue进行消费。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go
func NewAutoRegisterController(apiServiceInformer informers.APIServiceInformer, apiServiceClient apiregistrationclient.APIServicesGetter) *autoRegisterController {
// 实例化autoRegisterController
c := &autoRegisterController{
apiServiceLister: apiServiceInformer.Lister(),
apiServiceSynced: apiServiceInformer.Informer().HasSynced,
apiServiceClient: apiServiceClient,
apiServicesToSync: map[string]*v1.APIService{},

apiServicesAtStart: map[string]bool{},

syncedSuccessfullyLock: &sync.RWMutex{},
syncedSuccessfully: map[string]bool{},
// delaying_queue
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "autoregister"),
}
// 对比服务器etcd和cache中的资源
c.syncHandler = c.checkAPIService

apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cast := obj.(*v1.APIService)
c.queue.Add(cast.Name)
},
UpdateFunc: func(_, obj interface{}) {
cast := obj.(*v1.APIService)
c.queue.Add(cast.Name)
},
DeleteFunc: func(obj interface{}) {
cast, ok := obj.(*v1.APIService)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
cast, ok = tombstone.Obj.(*v1.APIService)
if !ok {
klog.V(2).Infof("Tombstone contained unexpected object: %#v", obj)
return
}
}
c.queue.Add(cast.Name)
},
})
return c
}
syncHandler(checkAPIService)

checkAPIService根据所需APIService对象列表同步当前APIService。

A. desired: not found B. desired: sync on start C. desired: sync always
1. current: lookup error error error error
2. current: not found - create once create
3. current: no sync - - -
4. current: sync on start, not present at start - - -
5. current: sync on start, present at start delete once update once update once
6. current: sync always delete update once update
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go
func (c *autoRegisterController) checkAPIService(name string) (err error) {
// 从cacheStorage中获取apiService
desired := c.GetAPIServiceToSync(name)
curr, err := c.apiServiceLister.Get(name)
// if we've never synced this service successfully, record a successful sync.
hasSynced := c.hasSyncedSuccessfully(name)
if !hasSynced {
defer func() {
if err == nil {
c.setSyncedSuccessfully(name)
}
}()
}

switch {
// we had a real error, just return it (1A,1B,1C)
case err != nil && !apierrors.IsNotFound(err):
return err

// we don't have an entry and we don't want one (2A)
case apierrors.IsNotFound(err) && desired == nil:
return nil

// the local object only wants to sync on start and has already synced (2B,5B,6B "once" enforcement)
case isAutomanagedOnStart(desired) && hasSynced:
return nil

// we don't have an entry and we do want one (2B,2C)
case apierrors.IsNotFound(err) && desired != nil:
_, err := c.apiServiceClient.APIServices().Create(context.TODO(), desired, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
// created in the meantime, we'll get called again
return nil
}
return err

// we aren't trying to manage this APIService (3A,3B,3C)
case !isAutomanaged(curr):
return nil

// the remote object only wants to sync on start, but was added after we started (4A,4B,4C)
case isAutomanagedOnStart(curr) && !c.apiServicesAtStart[name]:
return nil

// the remote object only wants to sync on start and has already synced (5A,5B,5C "once" enforcement)
case isAutomanagedOnStart(curr) && hasSynced:
return nil

// we have a spurious APIService that we're managing, delete it (5A,6A)
case desired == nil:
opts := metav1.DeleteOptions{Preconditions: metav1.NewUIDPreconditions(string(curr.UID))}
err := c.apiServiceClient.APIServices().Delete(context.TODO(), curr.Name, opts)
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
// deleted or changed in the meantime, we'll get called again
return nil
}
return err

// if the specs already match, nothing for us to do
case reflect.DeepEqual(curr.Spec, desired.Spec):
return nil
}

// we have an entry and we have a desired, now we deconflict. Only a few fields matter. (5B,5C,6B,6C)
apiService := curr.DeepCopy()
apiService.Spec = desired.Spec
_, err = c.apiServiceClient.APIServices().Update(context.TODO(), apiService, metav1.UpdateOptions{})
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
// deleted or changed in the meantime, we'll get called again
return nil
}
return err
}
创建CRDRegistrationController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//pkg/controlplane/controller/crdregistration/crdregistration_controller.go
func NewCRDRegistrationController(crdinformer crdinformers.CustomResourceDefinitionInformer, apiServiceRegistration AutoAPIServiceRegistration) *crdRegistrationController {
c := &crdRegistrationController{
crdLister: crdinformer.Lister(),
crdSynced: crdinformer.Informer().HasSynced,
apiServiceRegistration: apiServiceRegistration,
syncedInitialSet: make(chan struct{}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_autoregistration_controller"),
}
c.syncHandler = c.handleVersionUpdate
crdinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cast := obj.(*apiextensionsv1.CustomResourceDefinition)
c.enqueueCRD(cast)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Enqueue both old and new object to make sure we remove and add appropriate API services.
// The working queue will resolve any duplicates and only changes will stay in the queue.
c.enqueueCRD(oldObj.(*apiextensionsv1.CustomResourceDefinition))
c.enqueueCRD(newObj.(*apiextensionsv1.CustomResourceDefinition))
},
DeleteFunc: func(obj interface{}) {
cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
cast, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
klog.V(2).Infof("Tombstone contained unexpected object: %#v", obj)
return
}
}
c.enqueueCRD(cast)
},
})

return c
}

准备启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
// add post start hook before generic PrepareRun in order to be before /healthz installation
if s.openAPIConfig != nil {
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIAggregationController.Run(context.StopCh)
return nil
})
}

prepared := s.GenericAPIServer.PrepareRun()

// delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers
if s.openAPIConfig != nil {
specDownloader := openapiaggregator.NewDownloader()
openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator(
&specDownloader,
s.GenericAPIServer.NextDelegate(),
s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),
s.openAPIConfig,
s.GenericAPIServer.Handler.NonGoRestfulMux)
if err != nil {
return preparedAPIAggregator{}, err
}
s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)
}
// 返回一个server和一个channel
return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//k8s.io/apiserver/pkg/server/genericapiserver.go
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
s.delegationTarget.PrepareRun()
// 创建Swagger服务
if s.openAPIConfig != nil {
s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{
Config: s.openAPIConfig,
}.Install(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
}
// 调用InstallPathHandler注册"/healthz"和"/livez"健康检查,并绑定相应的handler
s.installHealthz()
s.installLivez()

// 添加"/readyz"健康检查
err := s.addReadyzShutdownCheck(s.readinessStopCh)
if err != nil {
klog.Errorf("Failed to install readyz shutdown check %s", err)
}
s.installReadyz()

// Register audit backend preShutdownHook.
if s.AuditBackend != nil {
err := s.AddPreShutdownHook("audit-backend", func() error {
s.AuditBackend.Shutdown()
return nil
})
if err != nil {
klog.Errorf("Failed to add pre-shutdown hook for audit-backend %s", err)
}
}

return preparedGenericAPIServer{s}
}

启动

cobra执行启动server最后调用server.Run,最后会去执行genericServer.Run。这个server只在stopCh接收到信号时,才会退出。Run方法中调用了NonBlockingRun,创建一个https的server,对配置的addr(–insecure-bind-address和–insecure-port)绑定一个listener进行监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//k8s.io/apiserver/pkg/server/genericapiserver.go
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := make(chan struct{})

go func() {
defer close(delayedStopCh)

<-stopCh

// As soon as shutdown is initiated, /readyz should start returning failure.
// This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red
// and stop sending traffic to this server.
close(s.readinessStopCh)

time.Sleep(s.ShutdownDelayDuration)
}()

// close socket after delayed stopCh
stoppedCh, err := s.NonBlockingRun(delayedStopCh)
if err != nil {
return err
}

<-stopCh

// run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver.
err = s.RunPreShutdownHooks()
if err != nil {
return err
}

// wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called).
<-delayedStopCh
// wait for stoppedCh that is closed when the graceful termination (server.Shutdown) is finished.
<-stoppedCh

// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
s.HandlerChainWaitGroup.Wait()

return nil
}

s.NonBlockingRun

NonBlockingRun主要执行步骤:

  1. 创建并启动审计日志服务
  2. 创建并启动https server
  3. 遍历PostStartHooks,并为每个PostStartHook开一个goroutine执行PostStartHookFunc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// k8s.io/apiserver/pkg/server/genericapiserver.go
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) {
// 创建审计的stop channel,用来在httpserver关闭后,不丢弃审计事件
auditStopCh := make(chan struct{})

// 启动审计的channel
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(auditStopCh); err != nil {
return nil, fmt.Errorf("failed to run the audit backend: %v", err)
}
}

// 创建httpsServer
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
close(auditStopCh)
return nil, err
}
}

// Now that listener have bound successfully, it is the
// responsibility of the caller to close the provided channel to
// ensure cleanup.
go func() {
<-stopCh
close(internalStopCh)
if stoppedCh != nil {
<-stoppedCh
}
s.HandlerChainWaitGroup.Wait()
close(auditStopCh)
}()
// 执行postStartHooks
s.RunPostStartHooks(stopCh)

// 发送ready信号
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}

return stoppedCh, nil
}