【kubernetesk8s源码分析】 controller-manager之replicaset源码分析

xiaoxiao2021-07-06  265

ReplicaSet简介

    Kubernetes 中建议使用 ReplicaSet来取代 ReplicationController。ReplicaSet 跟 ReplicationController 没有本质的不同, ReplicaSet 支持集合式的 selector(ReplicationController 仅支持等式)

    但建议使用 Deployment 来自动管理 ReplicaSet,这样就无需担心跟其他机制的不兼容问题(比如 ReplicaSet 不支持 rolling-update 但 Deployment 支持),并且Deployment还支持版本记录、回滚、暂停升级等高级特性。

 

0. 開始:

func NewControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController return controllers }

0.1 startReplicaSetController函数

判断replicasets是否开启创建并初始化ReplicaSetController对象。启动ReplicaSetController对象的Run方法,循环处理流程 func startReplicaSetController(ctx ControllerContext) (bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] { return false, nil } go replicaset.NewReplicaSetController( ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(), ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, ).Run(int(ctx.Options.ConcurrentRSSyncs), ctx.Stop) return true, nil }

0.2 ReplicaSetController的Run函数

     启动一堆work进行处理

// Run begins watching and syncing. func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer rsc.queue.ShutDown() controllerName := strings.ToLower(rsc.Kind) glog.Infof("Starting %v controller", controllerName) defer glog.Infof("Shutting down %v controller", controllerName) if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) { return } for i := 0; i < workers; i++ { go wait.Until(rsc.worker, time.Second, stopCh) } <-stopCh }

0.3 主要函数syncHandler这个函数注册的为syncReplicaSet,在  NewBaseController中体现

// worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rsc *ReplicaSetController) worker() { for rsc.processNextWorkItem() { } } func (rsc *ReplicaSetController) processNextWorkItem() bool { key, quit := rsc.queue.Get() if quit { return false } defer rsc.queue.Done(key) err := rsc.syncHandler(key.(string)) if err == nil { rsc.queue.Forget(key) return true } utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err)) rsc.queue.AddRateLimited(key) return true }

 

1. NewReplicaSetController函数

    路径: pkg/controller/replicaset/replica_set.go

  1.1 创建eventBroadcaster并设置属性

eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})

  1.2 创建ReplicaSetController对象,包括k8s client,队列等

rsc := &ReplicaSetController{ GroupVersionKind: gvk, kubeClient: kubeClient, podControl: podControl, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName), }

  1.3 rsInformer和podInformer对应的事件回调函数,被注册syncReplicaSet

rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rsc.enqueueReplicaSet, UpdateFunc: rsc.updateRS, // This will enter the sync loop and no-op, because the replica set has been deleted from the store. // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended // way of achieving this is by performing a `stop` operation on the replica set. DeleteFunc: rsc.enqueueReplicaSet, }) rsc.rsLister = rsInformer.Lister() rsc.rsListerSynced = rsInformer.Informer().HasSynced podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rsc.addPod, // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from // local storage, so it should be ok. UpdateFunc: rsc.updatePod, DeleteFunc: rsc.deletePod, }) rsc.podLister = podInformer.Lister() rsc.podListerSynced = podInformer.Informer().HasSynced rsc.syncHandler = rsc.syncReplicaSet

 

2. syncReplicaSet函数

  2.1 获得namespace和name,并去的replicaset对象

namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) if errors.IsNotFound(err) { glog.V(4).Infof("%v %v has been deleted", rsc.Kind, key) rsc.expectations.DeleteExpectations(key) return nil } if err != nil { return err }

    2.2 获取namespace所有pod,并过滤调inactive的pod(active pod的Status.Phase不是Success  Failed 已经标记删除的)

// list all pods to include the pods that don't match the rs`s selector // anymore but has the stale controller ref. // TODO: Do the List and Filter in a single pass, or use an index. allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) if err != nil { return err } // Ignore inactive pods. var filteredPods []*v1.Pod for _, pod := range allPods { if controller.IsPodActive(pod) { filteredPods = append(filteredPods, pod) } }

    2.3 claimPods函数

      如果新创建无pod

func (rsc *ReplicaSetController) claimPods(rs *extensions.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) { // If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing Pods (see #42639). canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { fresh, err := rsc.kubeClient.ExtensionsV1beta1().ReplicaSets(rs.Namespace).Get(rs.Name, metav1.GetOptions{}) if err != nil { return nil, err } if fresh.UID != rs.UID { return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID) } return fresh, nil }) cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc) return cm.ClaimPods(filteredPods) }

    2.3.1 NewPodControllerRefManager 实例化 PodControllerRefManager 对象

// NOTE: Once CanAdopt() is called, it will not be called again by the same // PodControllerRefManager instance. Create a new instance if it makes // sense to check CanAdopt() again (e.g. in a different sync pass). func NewPodControllerRefManager( podControl PodControlInterface, controller metav1.Object, selector labels.Selector, controllerKind schema.GroupVersionKind, canAdopt func() error, ) *PodControllerRefManager { return &PodControllerRefManager{ BaseControllerRefManager: BaseControllerRefManager{ Controller: controller, Selector: selector, CanAdoptFunc: canAdopt, }, controllerKind: controllerKind, podControl: podControl, } }

     2.3.2 PodControllerRefManager 的 ClaimPods 方法

     Claims 方法尝试接管 pod,调整的原则是:如果 selector 匹配则收养孤儿 pod,如果 selector 不再匹配则释放自己曾管理的对象

// ClaimPods tries to take ownership of a list of Pods. // // It will reconcile the following: // * Adopt orphans if the selector matches. // * Release owned objects if the selector no longer matches. // // Optional: If one or more filters are specified, a Pod will only be claimed if // all filters return true. // // A non-nil error is returned if some form of reconciliation was attempted and // failed. Usually, controllers should try again later in case reconciliation // is still needed. // // If the error is nil, either the reconciliation succeeded, or no // reconciliation was necessary. The list of Pods that you now own is returned. func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) { var claimed []*v1.Pod var errlist []error

     2.3.2.1 对所有 pod 领养或者释放

for _, pod := range pods { ok, err := m.ClaimObject(pod, match, adopt, release) if err != nil { errlist = append(errlist, err) continue } if ok { claimed = append(claimed, pod) } }

     2.3.2.1.1 ClaimObject 函数

  ownerReferences:   - apiVersion: apps/v1     blockOwnerDeletion: true     controller: true     kind: ReplicaSet      name: mysql-hostpath-2-9964cc484     uid: 73daf2e9-82e1-4bac-a78b-066b0df68ba4

     不能领养的条件,pod 所属的 UID 不等于 controller 的,contoller 已经被删除了,成功被删除了,

if controllerRef != nil { if controllerRef.UID != m.Controller.GetUID() { // Owned by someone else. Ignore. return false, nil } if match(obj) { // We already own it and the selector matches. // Return true (successfully claimed) before checking deletion timestamp. // We're still allowed to claim things we already own while being deleted // because doing so requires taking no actions. return true, nil } // Owned by us but selector doesn't match. // Try to release, unless we're being deleted. if m.Controller.GetDeletionTimestamp() != nil { return false, nil } if err := release(obj); err != nil { // If the pod no longer exists, ignore the error. if errors.IsNotFound(err) { return false, nil } // Either someone else released it, or there was a transient error. // The controller should requeue and try again if it's still stale. return false, err } // Successfully released. return false, nil }

    孤儿 pod,pod 已经标记为删除了,不会领养

// It's an orphan. if m.Controller.GetDeletionTimestamp() != nil || !match(obj) { // Ignore if we're being deleted or selector doesn't match. return false, nil } if obj.GetDeletionTimestamp() != nil { // Ignore if the object is being deleted return false, nil }

     AdoptPod 函数是 selector 匹配进行领养 pod, 能不能领养就是看 pod 和所属的 rs 是否 UID 相同

// AdoptPod sends a patch to take control of the pod. It returns the error if // the patching fails. func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error { if err := m.CanAdopt(); err != nil { return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err) } // Note that ValidateOwnerReferences() will reject this patch if another // OwnerReference exists with controller=true. addControllerPatch := fmt.Sprintf( `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName(), m.Controller.GetUID(), pod.UID) return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch)) }

    2.4 manageReplicas函数

     主要函数为manageReplicas

var manageReplicasErr error if rsNeedsSync && rs.DeletionTimestamp == nil { manageReplicasErr = rsc.manageReplicas(filteredPods, rs) }

 

3. manageReplicas函数

   调节Pod的数量,多退少补。计算差值

diff := len(filteredPods) - int(*(rs.Spec.Replicas))

    3.1 diff少于replicas则少补,不能超过burstReplicas(好像是500)

      slowStartBatch 有控制并发功能,创建 pod,其实 slowStartInitialBatchSize,成功以 2 倍并发执行

if diff < 0 { diff *= -1 if diff > rsc.burstReplicas { diff = rsc.burstReplicas } rsc.expectations.ExpectCreations(rsKey, diff) successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error { boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ APIVersion: rsc.GroupVersion().String(), Kind: rsc.Kind, Name: rs.Name, UID: rs.UID, BlockOwnerDeletion: boolPtr(true), Controller: boolPtr(true), } err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef) if err != nil && errors.IsTimeout(err) { return nil } return err }) if skippedPods := diff - successfulCreations; skippedPods > 0 { for i := 0; i < skippedPods; i++ { // Decrement the expected number of creates because the informer won't observe this pod rsc.expectations.CreationObserved(rsKey) } } return err }

     如果多 pod 情况则并发执行 删除

转载请注明原文地址: https://www.6miu.com/read-4821448.html

最新回复(0)