kubernetes replicaset 设计与底层源码剖析

Replicaset

kubernetes replicaset 设计与底层源码剖析

Replicaset Controller主要主要功能如下:

  • 用于控制一个pod副本为指定数量在一段时间内
  • 提供Deployment发布策略的控制
  • 作为HPA目标对象, 实现副本的自动扩容

ReplicaSet主要业务实现

kubernetes replicaset 设计与底层源码剖析

ReplicaSet业务逻辑实现主要流程如下

  • 监听Rs变更事件, 获取对应Rs放入队列
    • Pod流程通过获取当前pod的controllerRef和label. selector进行对应rs的匹配
    • Rs流程则监听rs的变化,同时更新对应add/del的expectations
  • worker同步更新状态
    • worker从队列中获取rs进行rs状态的同步
    • 根据同步状态是否满足对应的expectations和当前的状态确定要进行的操作
    • 更新状态到apiserver

ReplicaSet实现分析

kubernetes replicaset 设计与底层源码剖析

数据结构

type ReplicaSetController struct {
    // GroupVersionKind indicates the controller type.
    // Different instances of this struct may handle different GVKs.
    // For example, this struct can be used (with adapters) to handle ReplicationController.
    schema.GroupVersionKind

    kubeClient clientset.Interface
    podControl controller.PodControlInterface

    // A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
    // It resumes normal action after observing the watch events for them.
    burstReplicas int
    // To allow injection of syncReplicaSet for testing.
    syncHandler func(rsKey string) error

    // A TTLCache of pod creates/deletes each rc expects to see.
    expectations *controller.UIDTrackingControllerExpectations

    // A store of ReplicaSets, populated by the shared informer passed to NewReplicaSetController
    rsLister appslisters.ReplicaSetLister
    // rsListerSynced returns true if the pod store has been synced at least once.
    // Added as a member to the struct to allow injection for testing.
    rsListerSynced cache.InformerSynced

    // A store of pods, populated by the shared informer passed to NewReplicaSetController
    podLister corelisters.PodLister
    // podListerSynced returns true if the pod store has been synced at least once.
    // Added as a member to the struct to allow injection for testing.
    podListerSynced cache.InformerSynced

    // Controllers that need to be synced
    queue workqueue.RateLimitingInterface
}

从功能上数据结构分类已经从上面的思维导图中已经区分了, 其中与其他controller不同的两个是burstReplicas和expectations

  • burstReplicas 这个用于控制rs同时创建的pod数量, 同时配合slowStartBatch来实现单个需要pod数量大的rs创建避免单个rs影响到整个系统
  • expectations 这里使用的是UIDTrackingControllerExpectations这个类型, rs主要是用于控制pod副本的数量, 那其实对应针对rs期望进行的add/del的数量就是对pod进行的add/del的数量

ReplicaSet的初始化

func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
    }

    rsc := &ReplicaSetController{
        GroupVersionKind: gvk,
        kubeClient:       kubeClient,
        podControl:       podControl,
        burstReplicas:    burstReplicas,
        expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
        queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
    }

    // 监听Rs
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    rsc.enqueueReplicaSet,
        UpdateFunc: rsc.updateRS,
        // This will enter the sync loop and no-op, because the replica set has been deleted from the store.
        // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
        // way of achieving this is by performing a `stop` operation on the replica set.
        DeleteFunc: rsc.enqueueReplicaSet,
    })
    rsc.rsLister = rsInformer.Lister()
    rsc.rsListerSynced = rsInformer.Informer().HasSynced

    // 监听pOD
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
        // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
        // local storage, so it should be ok.
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })
    rsc.podLister = podInformer.Lister()
    rsc.podListerSynced = podInformer.Informer().HasSynced

    rsc.syncHandler = rsc.syncReplicaSet

    return rsc
}

ReplicaSet启动

// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer rsc.queue.ShutDown()

    controllerName := strings.ToLower(rsc.Kind)
    glog.Infof("Starting %v controller", controllerName)
    defer glog.Infof("Shutting down %v controller", controllerName)

    if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(rsc.worker, time.Second, stopCh)
    }

    <-stopCh
}

Pod的add/update/del控制

  • addPod
// When a pod is created, enqueue the replica set that manages it and update its expectations.
func (rsc *ReplicaSetController) addPod(obj interface{}) {
    pod := obj.(*v1.Pod)

    if pod.DeletionTimestamp != nil {
        // 当controller manager重启启动, 会进行pod的sync操作, informer会将接收到所有pod,会产生add事件从而调用当前addPod方法, 如果一个pod状态是等待删除, 就直接调用删除deletePod进行处理
        rsc.deletePod(pod)
        return
    }

    // If it has a ControllerRef, that's all that matters.
    if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
        // 获取pod的rs
        rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
        if rs == nil {
            return
        }
        rsKey, err := controller.KeyFunc(rs)
        if err != nil {
            return
        }
        glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
        // 设置观察到一个rs创建, 并放入队列
        rsc.expectations.CreationObserved(rsKey)
        rsc.enqueueReplicaSet(rs)
        return
    }

    // 尝试通过label selector 来进行孤儿pod可能的rs的搜索, 如果找到与之匹配的则将rs加入到队列厘米处理
    rss := rsc.getPodReplicaSets(pod)
    if len(rss) == 0 {
        return
    }
    glog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
    for _, rs := range rss {
        rsc.enqueueReplicaSet(rs)
    }
}
  • updatePod
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
    curPod := cur.(*v1.Pod)
    oldPod := old.(*v1.Pod)
    // 比对新旧pod的resourceVersion是否相等, 如果相等就直接跳过, 表示当前pod咩有更新, resourceVersion是k8s中的乐观并发控制的, 目前主要是依赖于etcd里面的实现
    if curPod.ResourceVersion == oldPod.ResourceVersion {
        return
    }

    // 判断label是否相等, 并且判断当前pod是否是被删除了
    labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
    if curPod.DeletionTimestamp != nil {
        // 当检查到一个pod的DeletionTimestamp被设置, rsc会立刻更新对应的rs的信息, 而并不等待kubelet真正删除
        rsc.deletePod(curPod)
        if labelChanged {
            rsc.deletePod(oldPod)
        }
        return
    }

    curControllerRef := metav1.GetControllerOf(curPod)
    oldControllerRef := metav1.GetControllerOf(oldPod)
    controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
    if controllerRefChanged && oldControllerRef != nil {
        // 同步老的rs, 如果老的rs存在则加入队列
        if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
            rsc.enqueueReplicaSet(rs)
        }
    }

    // If it has a ControllerRef, that's all that matters.
    if curControllerRef != nil {
        rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
        if rs == nil {
            return
        }
        glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
        rsc.enqueueReplicaSet(rs)
        // 一种补偿的实现, 当我们检查到当前pod状态已经是ready状态, 但rs的MinReadySeconds设置大于0
        // 那么当经过MinReadySeconds后,期望的是rs会接收到一个pod变成aavailable的状态, 那么当前rs就会变更到一个新的状态
        if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
            glog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
            rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
        }
        return
    }

    // 尝试进行孤儿pod的匹配, 如果匹配到对应的rs则加入到队列 
    if labelChanged || controllerRefChanged {
        rss := rsc.getPodReplicaSets(curPod)
        if len(rss) == 0 {
            return
        }
        glog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
        for _, rs := range rss {
            rsc.enqueueReplicaSet(rs)
        }
    }
}
  • deletePod
func (rsc *ReplicaSetController) deletePod(obj interface{}) {
    pod, ok := obj.(*v1.Pod)

    // 当接收到一个删除的事件的时候, 尝试从删除事件中获取对应的pod, 如果获取到则继续获取对应的rs
    if !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
            return
        }
        pod, ok = tombstone.Obj.(*v1.Pod)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
            return
        }
    }

    controllerRef := metav1.GetControllerOf(pod)
    if controllerRef == nil {
        // No controller should care about orphans being deleted.
        return
    }
    rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
    if rs == nil {
        return
    }
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
        return
    }
    glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
    rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
    rsc.enqueueReplicaSet(rs)
}

syncReplicaSet和manageReplicasets

  • syncReplicaSet
kubernetes replicaset 设计与底层源码剖析
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {

    startTime := time.Now()
    defer func() {
        glog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
    }()

    // 获取ReplicatSet对象
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        glog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
        rsc.expectations.DeleteExpectations(key)
        return nil
    }
    if err != nil {
        return err
    }


    // 获取当前rs是否需要进行同步
    // 今天读到这厘感觉忽然对controller里面expectations有一点领悟的意思了
    // 比如一个rs, 它期望5个pod, 然后我本地已经观察到了我期望的所有了, 或者我认为本地观察到的这些操作已经过期了
    // 那我就需要进行同步
    // 为什么是这样的呢
    // 我理解是我期望的所有条件都被满足了,那我获取当前的状态,如果发现我的状态实际上是没有被满足的, 但是我观察的是已经被满足了,那我就需要根据现在的和我实际上定义的需要的进行对比,如果是少了,那就要进行创建,如果是多了, 就需要删除
    // expectations结构体的设计?
    rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
    // 生成一个selector
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
        return nil
    }

    // 获取所有的pod
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    if err != nil {
        return err
    }
    // Ignore inactive pods.
    var filteredPods []*v1.Pod
    // 获取所有active的pod
    for _, pod := range allPods {
        if controller.IsPodActive(pod) {
            filteredPods = append(filteredPods, pod)
        }
    }

    // 通过rs和对应的 selector获取当前的pods
    filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
    if err != nil {
        return err
    }

    var manageReplicasErr error
    // 如果期望超市或者当前条件不满足,则需要同步, 会调用的pod的添加或者删除
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }
    // rs是informer中的对象是共享的需要进行拷贝后才能进行使用, 同时根据rs、filterPods和上面操作的错误, 计算新的状态
    rs = rs.DeepCopy()
    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

    updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    if err != nil {
        return err
    }
    // 如果更新成功, 但发现当前状态依然不满足期望状态,就根据MinReadySeconds再加入一个事件到延迟队列
    if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
        updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
        updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
        rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    }
    return manageReplicasErr
}
  • manageReplicaSets
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    // 根据当前获取的pods和期望的副本数量进行比较
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
        return nil
    }
    // 如果小于0则表示未满足
    if diff < 0 {
        diff *= -1
        // 如果超过rsc批量的最大值就进行限流
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        // 设置期望进行创建的数量
        rsc.expectations.ExpectCreations(rsKey, diff)
        glog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        // slowStartBatch慢启动, 进行创建
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            boolPtr := func(b bool) *bool { return &b }
            controllerRef := &metav1.OwnerReference{
                APIVersion:         rsc.GroupVersion().String(),
                Kind:               rsc.Kind,
                Name:               rs.Name,
                UID:                rs.UID,
                BlockOwnerDeletion: boolPtr(true),
                Controller:         boolPtr(true),
            }
            // 创建pod
            err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
            if err != nil && errors.IsTimeout(err) {
                return nil
            }
            return err
        })
        // 检查跳过的pod数量, 调用apiserver出现错误,我们实际创建的数量小于我们期望创建的数量, 那这些未被创建的pod数量,就不会被观察到创建,
        // 所以我们最终期望的pod数量,是diff-成功创建的,剩下的是未被创建的,下一次retry的时候,会进行计算进行创建动作,并且填充期望
        if skippedPods := diff - successfulCreations; skippedPods > 0 {
            glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
            for i := 0; i < skippedPods; i++ {
                // Decrement the expected number of creates because the informer won't observe this pod
                rsc.expectations.CreationObserved(rsKey)
            }
        }
        return err
    } else if diff > 0 {
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        glog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        podsToDelete := getPodsToDelete(filteredPods, diff)
        rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

        errCh := make(chan error, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                defer wg.Done()
                if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
                    // Decrement the expected number of deletes because the informer won't observe this deletion
                    podKey := controller.PodKey(targetPod)
                    glog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
                    rsc.expectations.DeletionObserved(rsKey, podKey)
                    errCh <- err
                }
            }(pod)
        }
        wg.Wait()

        select {
        case err := <-errCh:
            // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
            if err != nil {
                return err
            }
        default:
        }
    }

    return nil
}

ReplicaSet controller总结

kubernetes replicaset 设计与底层源码剖析

ReplicaSet Controller核心逻辑同前面几个controller几乎没什么区别, 但读完这个controller似乎终于明白expectations的作用和整个流程的实现, 如图

  • 观察状态 表示当前controller观察到的当前系统的resouce的状态
  • 期望状态 表示根据resouce的定义controller期望进行的操作的状态(add/del次数)
  • 目标操作 表示期望状态与观察状态之间的差值
  • 当前状态 表示进行目标操作后的resource的当前实际状态
kubernetes replicaset 设计与底层源码剖析

具体实现组件流程如下

  • expectations 存储着我们期望进行的add/del的数量(线程安全的)
  • Controller 从expectations中获取当前的期望, 同时结合观察状态进行目标操作的决策
  • 同步状态 根据目标操作的结果, 进行最终状态的同步

最终实现目标resource的期望状态

原创文章,作者:baxiaoshi,如若转载,请注明出处:http://www.sreguide.com/go%e8%af%ad%e8%a8%80/kubernetes-replicaset-%e8%ae%be%e8%ae%a1%e4%b8%8e%e5%ba%95%e5%b1%82%e6%ba%90%e7%a0%81%e5%89%96%e6%9e%90.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注

联系我们

QQ: 52866169