【kubernetesk8s源码分析】kube-controller-manager 源码分析之启动流程

xiaoxiao2021-02-28  117

前言

    

    kubernetes 是谷歌 Borg 的一个开源版本,基于容器技术的集群管理系统,实现资源管理的自动化,以及多个数据中心资源使用率的最大化。分布式系统支撑平台,透明的服务注册和服务发现机制,内建智能负载均衡器,强大的故障发现和自我修复能力,服务滚动升级和在线扩容能力。

    还提供了管理工具,包括开发,部署测试,运维监控

 

    service 是分布式集群架构的核心:

一个唯一指定的名字一个虚拟IP(clusterIP serviceIP 或 VIP)和端口号提供远程服务能力映射到了提供这种服务能力的一组容器应用上

 

    Service 和 Pod 之间的关联关系,kubernetes 给每个 pod 贴上一个标签。每个 pod 里运行一个 Pause 容器,业务容器共享 Pause 容器的网络栈和 volume 挂载卷

 

    Master 节点运行集群管理相关的进程 kube-apiserver kube-controller-manager kube-scheduler

    Node 节点管理的最小单位是 pod,负责 pod 的创建,启动,监控,重启,销毁,以及实现软件模式的负载均衡器

 

   每个 Pod 都会被分配一个单独的 IP 地址,但这个 IP 地址会随着 Pod 的销毁而消失。问题来了:如果一组 Pod 组成一个集群来提供服务,那么如何来访问呢?

    一个 service 可以看作一组提供相同服务的 Pod 的对外访问接口,service 作用哪些  Pod 是通过 label selector 来定义的

 

   Controller Manager: 集群内部的管理控制中心,主要目的是实现集群的故障检测和恢复的自动化工作

 

1. 入口 main 函数

 

    NewCMServer 主要是使用默认值初始化 CMServer结构体,主要执行体为 app.Run, 如下 1.1 所示

func main() { s := options.NewCMServer() s.AddFlags(pflag.CommandLine, app.KnownControllers(), app.ControllersDisabledByDefault.List()) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() if err := app.Run(s); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }

 

 

1.1 Run 函数

    最主要的有两处函数,一个是 StartControllers,另一个是 leadereleaction.RunOrDie

// Run runs the CMServer. This should never exit. func Run(s *options.CMServer) error { ...... run := func(stop <-chan struct{}) { ...... if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers()); err != nil { glog.Fatalf("error starting controllers: %v", err) } ctx.InformerFactory.Start(ctx.Stop) select {} } leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: s.LeaderElection.LeaseDuration.Duration, RenewDeadline: s.LeaderElection.RenewDeadline.Duration, RetryPeriod: s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { glog.Fatalf("leaderelection lost") }, }, }) panic("unreachable") }

 

1.1.1 NewControllerInitializers 函数

   初始化一大堆子控制函数

replicationcontroller: 从 apiserver 同步 ReplicationController对象,管理 podsnodecontroller: 从 cloudprovider 获取 node 的信息,然后监控 node 的更新信息servicecontroller: 同步cloud provider的服务信息,管理所有node上面的serviceresourcequota: 调用 syncResourceQuota,同步 node 资源的 quota 信息,ResourceQuota 根据 namespace 进行资源namespace: 定期从 api-server 同步 namespace 信息persistentvolume-binder: 定期同步磁盘卷挂载信息serviceaccount: namespace 不同账户的信息管理 // NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func) // paired to their InitFunc. This allows for structured downstream composition and subdivision. func NewControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["replicationcontroller"] = startReplicationController controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController controllers["serviceaccount"] = startServiceAccountController controllers["garbagecollector"] = startGarbageCollectorController controllers["daemonset"] = startDaemonSetController controllers["job"] = startJobController controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController controllers["horizontalpodautoscaling"] = startHPAController controllers["disruption"] = startDisruptionController controllers["statefulset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["csrsigning"] = startCSRSigningController controllers["csrapproving"] = startCSRApprovingController controllers["ttl"] = startTTLController controllers["bootstrapsigner"] = startBootstrapSignerController controllers["tokencleaner"] = startTokenCleanerController controllers["service"] = startServiceController controllers["node"] = startNodeController controllers["route"] = startRouteController controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController return controllers }

 

1.1.2 StartControllers 函数

   将 1.1.1 中初始化的各个控制器启动

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error { // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest // If this fails, just return here and fail since other controllers won't be able to get credentials. if _, err := startSATokenController(ctx); err != nil { return err } for controllerName, initFn := range controllers { if !ctx.IsControllerEnabled(controllerName) { glog.Warningf("%q is disabled", controllerName) continue } time.Sleep(wait.Jitter(ctx.Options.ControllerStartInterval.Duration, ControllerStartJitter)) glog.V(1).Infof("Starting %q", controllerName) started, err := initFn(ctx) if err != nil { glog.Errorf("Error starting %q", controllerName) return err } if !started { glog.Warningf("Skipping %q", controllerName) continue } glog.Infof("Started %q", controllerName) } return nil }

 

2. deployment分析

 

    根据deployment函数为startDeploymentController

controllers["deployment"] = startDeploymentController

结构体DeploymentController

type DeploymentController struct { // rsControl is used for adopting/releasing replica sets. rsControl controller.RSControlInterface client clientset.Interface eventRecorder record.EventRecorder // To allow injection of syncDeployment for testing. syncHandler func(dKey string) error // used for unit testing enqueueDeployment func(deployment *extensions.Deployment) // dLister can list/get deployments from the shared informer's store dLister extensionslisters.DeploymentLister // rsLister can list/get replica sets from the shared informer's store rsLister extensionslisters.ReplicaSetLister // podLister can list/get pods from the shared informer's store podLister corelisters.PodLister // dListerSynced returns true if the Deployment store has been synced at least once. // Added as a member to the struct to allow injection for testing. dListerSynced cache.InformerSynced // rsListerSynced returns true if the ReplicaSet store has been synced at least once. // Added as a member to the struct to allow injection for testing. rsListerSynced cache.InformerSynced // podListerSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podListerSynced cache.InformerSynced // Deployments that need to be synced queue workqueue.RateLimitingInterface }

 

3. startDeploymentController函数

 

创建NewDeploymentController对象如下

func startDeploymentController(ctx ControllerContext) (bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] { return false, nil } dc, err := deployment.NewDeploymentController( ctx.InformerFactory.Extensions().V1beta1().Deployments(), ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(), ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("deployment-controller"), ) if err != nil { return true, fmt.Errorf("error creating Deployment controller: %v", err) } go dc.Run(int(ctx.Options.ConcurrentDeploymentSyncs), ctx.Stop) return true, nil }

3.1 NewDeploymentController函数

 

创建eventBroadcaster并设置属性设置速率RateLimiter 创建DeploymentController对象设置dInformer,rsInformer和podInformer对应的事件回调函数设置rsc.syncHandler为rsc.syncDeployment函数 func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")}) if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil { return nil, err } } dc := &DeploymentController{ client: client, eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), } dc.rsControl = controller.RealRSControl{ KubeClient: client, Recorder: dc.eventRecorder, } dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment, // This will enter the sync loop and no-op, because the deployment has been deleted from the store. DeleteFunc: dc.deleteDeployment, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: dc.deletePod, }) dc.syncHandler = dc.syncDeployment dc.enqueueDeployment = dc.enqueue dc.dLister = dInformer.Lister() dc.rsLister = rsInformer.Lister() dc.podLister = podInformer.Lister() dc.dListerSynced = dInformer.Informer().HasSynced dc.rsListerSynced = rsInformer.Informer().HasSynced dc.podListerSynced = podInformer.Informer().HasSynced return dc, nil }

 

3.2 Run函数

 

WaitForCacheSync等待Inform初始化完成启动worker协程

 

// Run begins watching and syncing. func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() glog.Infof("Starting deployment controller") defer glog.Infof("Shutting down deployment controller") if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return } for i := 0; i < workers; i++ { go wait.Until(dc.worker, time.Second, stopCh) } <-stopCh }

3.3 worker函数

 

从queue中取出key,调用syncHandler(syncDeployment)函数进行处理 func (dc *DeploymentController) worker() { for dc.processNextWorkItem() { } } func (dc *DeploymentController) processNextWorkItem() bool { key, quit := dc.queue.Get() if quit { return false } defer dc.queue.Done(key) err := dc.syncHandler(key.(string)) dc.handleErr(err, key) return true }

3.4 syncDeployment函数

 

SplitMetaNamespaceKey根据key以/分割获取到namespace,name获取deployment对象,k8s1.9类型为*v1beat.DeploymentgetReplicaSetsForDeployment根据deployment获取所有replicaSet对象列表getPodMapForDeployment根据deployment以及rs获取pod对象列表checkPausedConditions检查是否暂停状态如果需要会滚,则进行会滚操作isScalingEvent判断副本是否需要调整判断执行更新操作rolloutRecreate还是rolloutRolling

 

func (dc *DeploymentController) syncDeployment(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } deployment, err := dc.dLister.Deployments(namespace).Get(name) rsList, err := dc.getReplicaSetsForDeployment(d) podMap, err := dc.getPodMapForDeployment(d, rsList) if err = dc.checkPausedConditions(d); err != nil { return err } if d.Spec.Paused { return dc.sync(d, rsList, podMap) } if d.Spec.RollbackTo != nil { return dc.rollback(d, rsList, podMap) } scalingEvent, err := dc.isScalingEvent(d, rsList, podMap) if err != nil { return err } if scalingEvent { return dc.sync(d, rsList, podMap) } switch d.Spec.Strategy.Type { case extensions.RecreateDeploymentStrategyType: return dc.rolloutRecreate(d, rsList, podMap) case extensions.RollingUpdateDeploymentStrategyType: return dc.rolloutRolling(d, rsList, podMap) } return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) }

 

 

 

3.5 rolloutRecreate函数

 

getAllReplicaSetAndSyncRevision获取旧rs,新rs,如果新rs不存在也不创建过滤旧rs实例不为0将旧rs实例更新为0scaleUpNewReplicaSetForRecreate设置新rs副本数为制定数

 

func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error { // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false) allRSs := append(oldRSs, newRS) activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d) if scaledDown { return dc.syncRolloutStatus(allRSs, newRS, d) } // Do not process a deployment when it has old pods running. if oldPodsRunning(newRS, oldRSs, podMap) { return dc.syncRolloutStatus(allRSs, newRS, d) } // If we need to create a new RS, create it now. if newRS == nil { newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true) if err != nil { return err } allRSs = append(oldRSs, newRS) } // scale up new replica set. if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil { return err } if util.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(oldRSs, d); err != nil { return err } } // Sync deployment status. return dc.syncRolloutStatus(allRSs, newRS, d) }

 

 

 

 

 

 

转载请注明原文地址: https://www.6miu.com/read-23904.html

最新回复(0)