kubernetes endpoint controller 底层原理剖析

Endpoint Controller

Endpoints理解

kubernetes endpoint controller 底层原理剖析

Endpoints并不同于其他的资源(service、job等都会watch自身的变化, 基于当前状态计算目标状态, 产生操作), 而是基于Service和Pod的一个聚合资源, 通过Pod和Service进行计算, 生成一个Endpoints

Endpoints Controller

数据结构

type EndpointController struct {
    client clientset.Interface

    // serviceLister is able to list/get services and is populated by the shared informer passed to
    // NewEndpointController.
    serviceLister corelisters.ServiceLister
    // servicesSynced returns true if the service shared informer has been synced at least once.
    // Added as a member to the struct to allow injection for testing.
    servicesSynced cache.InformerSynced

    // podLister is able to list/get pods and is populated by the shared informer passed to
    // NewEndpointController.
    podLister corelisters.PodLister
    // podsSynced returns true if the pod shared informer has been synced at least once.
    // Added as a member to the struct to allow injection for testing.
    podsSynced cache.InformerSynced

    // endpointsLister is able to list/get endpoints and is populated by the shared informer passed to
    // NewEndpointController.
    endpointsLister corelisters.EndpointsLister
    // endpointsSynced returns true if the endpoints shared informer has been synced at least once.
    // Added as a member to the struct to allow injection for testing.
    endpointsSynced cache.InformerSynced

    // Services that need to be updated. A channel is inappropriate here,
    // because it allows services with lots of pods to be serviced much
    // more often than services with few pods; it also would cause a
    // service that's inserted multiple times to be processed more than
    // necessary.
    queue workqueue.RateLimitingInterface

    // workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
    workerLoopPeriod time.Duration
}

Endpoints Controller数据结构比较简单,核心通过watch service和pod, 然后通过podLister、serviceLister和endpointsLister查询对应pod、 service的映射关心, 进行数据的组合修改

数据流

kubernetes endpoint controller 底层原理剖析

核心数据流程如下

  • watch pod获取pod的service添加到队列
  • watch service 添加到队列
  • worker 从queue中获取service,进行service endpoints的计算
  • 更新状态到apiserver

初始化

kubernetes endpoint controller 底层原理剖析
    e := &EndpointController{
        client:           client,
        queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
        workerLoopPeriod: time.Second,
    }

    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: e.enqueueService,
        UpdateFunc: func(old, cur interface{}) {
            e.enqueueService(cur)
        },
        DeleteFunc: e.enqueueService,
    })
    e.serviceLister = serviceInformer.Lister()
    e.servicesSynced = serviceInformer.Informer().HasSynced

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    e.addPod,
        UpdateFunc: e.updatePod,
        DeleteFunc: e.deletePod,
    })
    e.podLister = podInformer.Lister()
    e.podsSynced = podInformer.Informer().HasSynced

    e.endpointsLister = endpointsInformer.Lister()
    e.endpointsSynced = endpointsInformer.Informer().HasSynced

结合数据结构和上图, 其实狠容易就可以理解它为神马这样设计, endpoints映射的是service ports和por ports的对应关系, 所以自然要观察service和pod的变化, 同时为了避免每次service更新都查询service当前的endpoints是否变化就加入了endpointslisters

Pod的add、update、deleteHandler

kubernetes endpoint controller 底层原理剖析

这三个handler核心就干一件事解析pod关联的service, 更新这些service的endpoints, 只不过updatePod, 会关心之前pod的service, 需要更新的service就是新旧的集合

  • addPod
// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *v1.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
    // 获取当前pod的所有的services, 然后将service添加到队列中
    pod := obj.(*v1.Pod)
    services, err := e.getPodServiceMemberships(pod)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
        return
    }
    for key := range services {
        e.queue.Add(key)
    }
}
  • updatePod
// When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these.
// old and cur must be *v1.Pod types.
func (e *EndpointController) updatePod(old, cur interface{}) {
    // 如果pod的ResourceVersion是相等的, 则不进行任何操作
    newPod := cur.(*v1.Pod)
    oldPod := old.(*v1.Pod)
    if newPod.ResourceVersion == oldPod.ResourceVersion {
        // Periodic resync will send update events for all known pods.
        // Two different versions of the same pod will always have different RVs.
        return
    }

    podChangedFlag := podChanged(oldPod, newPod)

    // Check if the pod labels have changed, indicating a possible
    // change in the service membership
    labelsChanged := false
    // 比对pod的labels标签, 如果不相等就修改labelsChanged
    if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
        !hostNameAndDomainAreEqual(newPod, oldPod) {
        labelsChanged = true
    }

    // If both the pod and labels are unchanged, no update is needed
    if !podChangedFlag && !labelsChanged {
        return
    }

    // 获取新的pod的所有的service
    services, err := e.getPodServiceMemberships(newPod)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err))
        return
    }

    // 如果标签更改, 则获取旧的pod的所有的services
    if labelsChanged {
        oldServices, err := e.getPodServiceMemberships(oldPod)
        if err != nil {
            utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err))
            return
        }
        // 获取所有需要更新的service列表
        services = determineNeededServiceUpdates(oldServices, services, podChangedFlag)
    }

    // 更新新旧pod关联的所有services, 添加到队列
    for key := range services {
        e.queue.Add(key)
    }
}
  • delPod
// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) deletePod(obj interface{}) {
    // 解析出pod对象,调用addPod逻辑
    if _, ok := obj.(*v1.Pod); ok {
        // Enqueue all the services that the pod used to be a member
        // of. This happens to be exactly the same thing we do when a
        // pod is added.
        e.addPod(obj)
        return
    }
    // 通过从DeletionFinalStateUnknown获取到对应的pod
    // If we reached here it means the pod was deleted but its final state is unrecorded.
    tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
    if !ok {
        utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
        return
    }
    pod, ok := tombstone.Obj.(*v1.Pod)
    if !ok {
        utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
        return
    }
    glog.V(4).Infof("Enqueuing services of deleted pod %s/%s having final state unrecorded", pod.Namespace, pod.Name)
    e.addPod(pod)
}

Service的add、update、del Handler

  • 增删更新逻辑
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: e.enqueueService,
        UpdateFunc: func(old, cur interface{}) {
            e.enqueueService(cur)
        },
        DeleteFunc: e.enqueueService,
    })
  • enqueueService

添加到队列

// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) enqueueService(obj interface{}) {
    // 获取service的key加入到队列中
    key, err := controller.KeyFunc(obj)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
        return
    }

    e.queue.Add(key)
}

SyncService

kubernetes endpoint controller 底层原理剖析
func (e *EndpointController) syncService(key string) error {
    startTime := time.Now()
    defer func() {
        glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    // 获取service失败, 尝试删除service
    service, err := e.serviceLister.Services(namespace).Get(name)
    if err != nil {
        // Delete the corresponding endpoint, as the service has been deleted.
        // TODO: Please note that this will delete an endpoint when a
        // service is deleted. However, if we're down at the time when
        // the service is deleted, we will miss that deletion, so this
        // doesn't completely solve the problem. See #6877.
        err = e.client.CoreV1().Endpoints(namespace).Delete(name, nil)
        if err != nil && !errors.IsNotFound(err) {
            return err
        }
        return nil
    }

    // 如果service selector为空并不建立endpoints
    if service.Spec.Selector == nil {
        // services without a selector receive no endpoints from this controller;
        // these services will receive the endpoints that are created out-of-band via the REST API.
        return nil
    }

    // 根据service的selector获取下面的pod
    glog.V(5).Infof("About to update endpoints for service %q", key)
    pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
    if err != nil {
        // Since we're getting stuff from a local cache, it is
        // basically impossible to get this error.
        return err
    }

    // If the user specified the older (deprecated) annotation, we have to respect it.
    // 服务是否允许unreadPoints
    tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses
    if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {
        b, err := strconv.ParseBool(v)
        if err == nil {
            tolerateUnreadyEndpoints = b
        } else {
            utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err))
        }
    }

    // endpoint数组
    subsets := []v1.EndpointSubset{}
    var totalReadyEps int = 0
    var totalNotReadyEps int = 0

    for _, pod := range pods {
        // 获取没有podIP的pod跳过
        if len(pod.Status.PodIP) == 0 {
            glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
            continue
        }
        // 如果pod已经删除
        if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
            glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
            continue
        }

        // 获取podEndpoint地址
        epa := *podToEndpointAddress(pod)

        // 获取pod的hostname
        hostname := pod.Spec.Hostname
        if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace {
            // 设置pod的endponitAddress的主机名
            epa.Hostname = hostname
        }

        // Allow headless service not to have ports.
        if len(service.Spec.Ports) == 0 {
            // 如果service没有设置pod,
            if service.Spec.ClusterIP == api.ClusterIPNone {
                subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
                // No need to repack subsets for headless service without ports.
            }
            // 剩下的是一种情况是service没有端口也不是headless service就啥也不做
        } else {
            // 循环service的所有的ports设置endpoint
            for i := range service.Spec.Ports {
                servicePort := &service.Spec.Ports[i]

                portName := servicePort.Name
                portProto := servicePort.Protocol
                // 查找对应的portNum
                portNum, err := podutil.FindPort(pod, servicePort)
                if err != nil {
                    glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
                    continue
                }

                var readyEps, notReadyEps int
                // endpoint pod获取对应的数量
                epp := &v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto}
                subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
                totalReadyEps = totalReadyEps + readyEps
                totalNotReadyEps = totalNotReadyEps + notReadyEps
            }
        }
    }
    // 获取subsets的NotReadyAddresses和Adress还有pod
    subsets = endpoints.RepackSubsets(subsets)

    // See if there's actually an update here.
    // 获取当前的endpoint, 如果不存在就删除
    currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
    if err != nil {
        if errors.IsNotFound(err) {
            // 如果未找到就创建
            currentEndpoints = &v1.Endpoints{
                ObjectMeta: metav1.ObjectMeta{
                    Name:   service.Name,
                    Labels: service.Labels,
                },
            }
        } else {
            return err
        }
    }

    // 如果当前endpoints的ResourceVersion==0
    createEndpoints := len(currentEndpoints.ResourceVersion) == 0

    // 如果endpoints已经存在并且label和subsets未变化, 就啥也不做
    if !createEndpoints &&
        apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
        apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) {
        glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
        return nil
    }
    // 拷贝一个新的状态, 不直接修改informer里面的数据
    newEndpoints := currentEndpoints.DeepCopy()
    newEndpoints.Subsets = subsets
    newEndpoints.Labels = service.Labels
    if newEndpoints.Annotations == nil {
        newEndpoints.Annotations = make(map[string]string)
    }

    glog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
    // 更新endpoints
    if createEndpoints {
        // No previous endpoints, create them
        _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(newEndpoints)
    } else {
        // Pre-existing
        _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(newEndpoints)
    }
    if err != nil {
        if createEndpoints && errors.IsForbidden(err) {
            // A request is forbidden primarily for two reasons:
            // 1. namespace is terminating, endpoint creation is not allowed by default.
            // 2. policy is misconfigured, in which case no service would function anywhere.
            // Given the frequency of 1, we log at a lower level.
            glog.V(5).Infof("Forbidden from creating endpoints: %v", err)
        }
        return err
    }
    return nil
}

总结

Endpoints为Service和关联的Pod结合kube-proxy实现了一个动态的负载均衡, 用于提供一组pod的访问

kubernetes endpoint controller 底层原理剖析

如果把service和endpoints这种机制集成到我们的运维平台里面有哪些场景呢

场景1-新建主机自动化上线

当新建主机完成上面的操作之后, 通过监听事件,实现自动负载均衡器挂载

场景2-故障主机的自动离线

当监控系统发布主机对应的服务监控失败, 则从负载均衡器摘除当前主机

结合日志和监控数据分析, 当发现业务指标下降,或者响应延迟的情况,并且其他主机正常,自动实现故障主机下线

并非万能

负载均衡器一直是运维开发里面的一个不好做到一个场景,并非这个场景多复杂,而是涉及到整站入口, 所有的操作都要异常小心,如果因为运维平台导致整站的不可访问,是不能接受的

原创文章,作者:baxiaoshi,如若转载,请注明出处:http://www.sreguide.com/go%e8%af%ad%e8%a8%80/kubernetes-endpoint-controller-%e5%ba%95%e5%b1%82%e5%8e%9f%e7%90%86%e5%89%96%e6%9e%90.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注

联系我们

QQ: 52866169