还提供了管理工具,包括开发,部署测试,运维监控
service 是分布式集群架构的核心:
一个唯一指定的名字一个虚拟IP(clusterIP serviceIP 或 VIP)和端口号提供远程服务能力映射到了提供这种服务能力的一组容器应用上
Service 和 Pod 之间的关联关系,kubernetes 给每个 pod 贴上一个标签。每个 pod 里运行一个 Pause 容器,业务容器共享 Pause 容器的网络栈和 volume 挂载卷
Master 节点运行集群管理相关的进程 kube-apiserver kube-controller-manager kube-scheduler
Node 节点管理的最小单位是 pod,负责 pod 的创建,启动,监控,重启,销毁,以及实现软件模式的负载均衡器
每个 Pod 都会被分配一个单独的 IP 地址,但这个 IP 地址会随着 Pod 的销毁而消失。问题来了:如果一组 Pod 组成一个集群来提供服务,那么如何来访问呢?
一个 service 可以看作一组提供相同服务的 Pod 的对外访问接口,service 作用哪些 Pod 是通过 label selector 来定义的
Controller Manager: 集群内部的管理控制中心,主要目的是实现集群的故障检测和恢复的自动化工作
NewCMServer 主要是使用默认值初始化 CMServer结构体,主要执行体为 app.Run, 如下 1.1 所示
func main() { s := options.NewCMServer() s.AddFlags(pflag.CommandLine, app.KnownControllers(), app.ControllersDisabledByDefault.List()) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() if err := app.Run(s); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }
1.1 Run 函数
最主要的有两处函数,一个是 StartControllers,另一个是 leadereleaction.RunOrDie
// Run runs the CMServer. This should never exit. func Run(s *options.CMServer) error { ...... run := func(stop <-chan struct{}) { ...... if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers()); err != nil { glog.Fatalf("error starting controllers: %v", err) } ctx.InformerFactory.Start(ctx.Stop) select {} } leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: s.LeaderElection.LeaseDuration.Duration, RenewDeadline: s.LeaderElection.RenewDeadline.Duration, RetryPeriod: s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { glog.Fatalf("leaderelection lost") }, }, }) panic("unreachable") }1.1.1 NewControllerInitializers 函数
初始化一大堆子控制函数
replicationcontroller: 从 apiserver 同步 ReplicationController对象,管理 podsnodecontroller: 从 cloudprovider 获取 node 的信息,然后监控 node 的更新信息servicecontroller: 同步cloud provider的服务信息,管理所有node上面的serviceresourcequota: 调用 syncResourceQuota,同步 node 资源的 quota 信息,ResourceQuota 根据 namespace 进行资源namespace: 定期从 api-server 同步 namespace 信息persistentvolume-binder: 定期同步磁盘卷挂载信息serviceaccount: namespace 不同账户的信息管理 // NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func) // paired to their InitFunc. This allows for structured downstream composition and subdivision. func NewControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["replicationcontroller"] = startReplicationController controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController controllers["serviceaccount"] = startServiceAccountController controllers["garbagecollector"] = startGarbageCollectorController controllers["daemonset"] = startDaemonSetController controllers["job"] = startJobController controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController controllers["horizontalpodautoscaling"] = startHPAController controllers["disruption"] = startDisruptionController controllers["statefulset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["csrsigning"] = startCSRSigningController controllers["csrapproving"] = startCSRApprovingController controllers["ttl"] = startTTLController controllers["bootstrapsigner"] = startBootstrapSignerController controllers["tokencleaner"] = startTokenCleanerController controllers["service"] = startServiceController controllers["node"] = startNodeController controllers["route"] = startRouteController controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController return controllers }
1.1.2 StartControllers 函数
将 1.1.1 中初始化的各个控制器启动
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error { // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest // If this fails, just return here and fail since other controllers won't be able to get credentials. if _, err := startSATokenController(ctx); err != nil { return err } for controllerName, initFn := range controllers { if !ctx.IsControllerEnabled(controllerName) { glog.Warningf("%q is disabled", controllerName) continue } time.Sleep(wait.Jitter(ctx.Options.ControllerStartInterval.Duration, ControllerStartJitter)) glog.V(1).Infof("Starting %q", controllerName) started, err := initFn(ctx) if err != nil { glog.Errorf("Error starting %q", controllerName) return err } if !started { glog.Warningf("Skipping %q", controllerName) continue } glog.Infof("Started %q", controllerName) } return nil }
根据deployment函数为startDeploymentController
controllers["deployment"] = startDeploymentController结构体DeploymentController
type DeploymentController struct { // rsControl is used for adopting/releasing replica sets. rsControl controller.RSControlInterface client clientset.Interface eventRecorder record.EventRecorder // To allow injection of syncDeployment for testing. syncHandler func(dKey string) error // used for unit testing enqueueDeployment func(deployment *extensions.Deployment) // dLister can list/get deployments from the shared informer's store dLister extensionslisters.DeploymentLister // rsLister can list/get replica sets from the shared informer's store rsLister extensionslisters.ReplicaSetLister // podLister can list/get pods from the shared informer's store podLister corelisters.PodLister // dListerSynced returns true if the Deployment store has been synced at least once. // Added as a member to the struct to allow injection for testing. dListerSynced cache.InformerSynced // rsListerSynced returns true if the ReplicaSet store has been synced at least once. // Added as a member to the struct to allow injection for testing. rsListerSynced cache.InformerSynced // podListerSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podListerSynced cache.InformerSynced // Deployments that need to be synced queue workqueue.RateLimitingInterface }
创建NewDeploymentController对象如下
func startDeploymentController(ctx ControllerContext) (bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] { return false, nil } dc, err := deployment.NewDeploymentController( ctx.InformerFactory.Extensions().V1beta1().Deployments(), ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(), ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("deployment-controller"), ) if err != nil { return true, fmt.Errorf("error creating Deployment controller: %v", err) } go dc.Run(int(ctx.Options.ConcurrentDeploymentSyncs), ctx.Stop) return true, nil }
// Run begins watching and syncing. func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() glog.Infof("Starting deployment controller") defer glog.Infof("Shutting down deployment controller") if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return } for i := 0; i < workers; i++ { go wait.Until(dc.worker, time.Second, stopCh) } <-stopCh }
func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error { // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false) allRSs := append(oldRSs, newRS) activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d) if scaledDown { return dc.syncRolloutStatus(allRSs, newRS, d) } // Do not process a deployment when it has old pods running. if oldPodsRunning(newRS, oldRSs, podMap) { return dc.syncRolloutStatus(allRSs, newRS, d) } // If we need to create a new RS, create it now. if newRS == nil { newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true) if err != nil { return err } allRSs = append(oldRSs, newRS) } // scale up new replica set. if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil { return err } if util.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(oldRSs, d); err != nil { return err } } // Sync deployment status. return dc.syncRolloutStatus(allRSs, newRS, d) }