kubernetes service controller系统设计与底层源码剖析

Service Controller

kubernetes service controller系统设计与底层源码剖析

Kubernetes Service定义分为两个部分

  • 一组pod的逻辑分组(微服务中的一个服务)
  • 一组pod的访问方式和访问策略

由于访问策略由kube-proxy控制,所以service Controller实际上只负责根据cloud.LoadBalancer的交互, 负责更新那些服务类型为LoadBalancer的service

Service Controller

Controller核心数据结构

type ServiceController struct {
        // cloud接口
    cloud               cloudprovider.Interface
        // 所有的node节点
    knownHosts          []*v1.Node
        // 需要更新的services, 分为两种:1)当前cache里面所有的service 2) 之前更新失败的service
    servicesToUpdate    []*v1.Service
    kubeClient          clientset.Interface
    clusterName         string
        // cloude的loadbalancer接口
    balancer            cloudprovider.LoadBalancer
        // 本地缓存的sevice的状态
    cache               *serviceCache
    serviceLister       corelisters.ServiceLister
    serviceListerSynced cache.InformerSynced
    eventBroadcaster    record.EventBroadcaster
    eventRecorder       record.EventRecorder
    nodeLister          corelisters.NodeLister
    nodeListerSynced    cache.InformerSynced
    // services that need to be synced
    queue workqueue.RateLimitingInterface
}

按照功能, 将Service分为下面几个部分

  • 数据存储
    • nodeLister和serviceLister分别用于node和service的获取
  • cloud交互
    • balancer用于绑定nodes到对应的cloud的loadbalancer
    • cluster 指定当前的集群环境, 提供给balanacer
  • 数据缓存
    • cache缓存service当前的状态, 用于前后变更状态的对别
    • servicesToUpdate存放需要被更新的loadbalanacer, 主要用于变更失败之后的重试和从cache同步所有的service
    • knownHosts所有的node节点列表

数据流

kubernetes service controller系统设计与底层源码剖析

Service Controller监听svc和node的变化,主要业务逻辑分为两个部分

  • service
    • 监听service变化, 更新到service和nodes到cloud的LoadBalancer(如果type是loadBalancer, 反之则啥都不会做)
    • 监听node变化, 更新node和所有的service到loadbalancer(注意loadbalancer里面绑定的node是全部的node, 并没有根据pod去只绑定有当前service的节点)

启动

  • 初始化
    serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc: s.enqueueService,
            UpdateFunc: func(old, cur interface{}) {
                oldSvc, ok1 := old.(*v1.Service)
                curSvc, ok2 := cur.(*v1.Service)
                // 检查当前service是否需要更新, 如果需要更新则加入队列
                if ok1 && ok2 && s.needsUpdate(oldSvc, curSvc) {
                    s.enqueueService(cur)
                }
            },
            DeleteFunc: s.enqueueService,
        },
        serviceSyncPeriod,
    )
    s.serviceLister = serviceInformer.Lister()
    s.serviceListerSynced = serviceInformer.Informer().HasSynced

    // init初始化主要负责loadbalancer的初始化
    if err := s.init(); err != nil {
        return nil, err
    }
    return s, nil
  • 启动Run
func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
    defer runtime.HandleCrash()
    defer s.queue.ShutDown()

    glog.Info("Starting service controller")
    defer glog.Info("Shutting down service controller")

    if !controller.WaitForCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) {
        return
    }

    // 启动worker
    for i := 0; i < workers; i++ {
        go wait.Until(s.worker, time.Second, stopCh)
    }

    // 启动node同步
    go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh)

    <-stopCh
}
  • 同步service
func (s *ServiceController) syncService(key string) error {
    startTime := time.Now()
    var cachedService *cachedService
    defer func() {
        glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    // service holds the latest service info from apiserver
    // 从serviceLister获取service信息
    service, err := s.serviceLister.Services(namespace).Get(name)
    switch {
    case errors.IsNotFound(err):
        // service absence in store means watcher caught the deletion, ensure LB info is cleaned
        // 如果对应的service不存在就删除
        glog.Infof("Service has been deleted %v. Attempting to cleanup load balancer resources", key)
        err = s.processServiceDeletion(key)
    case err != nil:
        glog.Infof("Unable to retrieve service %v from store: %v", key, err)
    default:
        // 获取或者创建key, 调用processServiceUpdate
        cachedService = s.cache.getOrCreate(key)
        err = s.processServiceUpdate(cachedService, service, key)
    }

    return err
}
  • 调用loadbalancer更新本地缓存
kubernetes service controller系统设计与底层源码剖析
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) error {
    // 检查state不为空
    if cachedService.state != nil {
        // 判断当前service与缓存中的service是否相同, 如果不相同就删除缓存的service
        if cachedService.state.UID != service.UID {
            err := s.processLoadBalancerDelete(cachedService, key)
            if err != nil {
                return err
            }
        }
    }
    // cache the service, we need the info for service deletion
    // 创建对应的loadbalancer, 设置cachedService
    cachedService.state = service
    // 检查是否创建loadbalancer
    err := s.createLoadBalancerIfNeeded(key, service)
    if err != nil {
        eventType := "CreatingLoadBalancerFailed"
        message := "Error creating load balancer (will retry): "
        if !wantsLoadBalancer(service) {
            eventType = "CleanupLoadBalancerFailed"
            message = "Error cleaning up load balancer (will retry): "
        }
        message += err.Error()
        s.eventRecorder.Event(service, v1.EventTypeWarning, eventType, message)
        return err
    }
    // Always update the cache upon success.
    // NOTE: Since we update the cached service if and only if we successfully
    // processed it, a cached service being nil implies that it hasn't yet
    // been successfully processed.
    s.cache.set(key, cachedService)

    return nil
}
  • 检查创建loadbalancer
kubernetes service controller系统设计与底层源码剖析
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) error {
    // Note: It is safe to just call EnsureLoadBalancer.  But, on some clouds that requires a delete & create,
    // which may involve service interruption.  Also, we would like user-friendly events.

    // Save the state so we can avoid a write if it doesn't change
    // 拷贝之前的loadBalancer的状态
    previousState := v1helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
    var newState *v1.LoadBalancerStatus
    var err error

    // 如果当前service不需要loadbalancer
    if !wantsLoadBalancer(service) {
         // 获取之前的通过service获取之前的loadbalancer, 如果存在就删除
        _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service)
        if err != nil {
            return fmt.Errorf("error getting LB for service %s: %v", key, err)
        }

        if exists {
            glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
            s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
            if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
                return err
            }
            s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
        }

        // 生成一个空的状态设置为新的状态
        newState = &v1.LoadBalancerStatus{}
    } else {
        glog.V(2).Infof("Ensuring LB for service %s", key)

        // TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
        // 创建loadbalancer
        s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer")
        newState, err = s.ensureLoadBalancer(service)
        if err != nil {
            return fmt.Errorf("failed to ensure load balancer for service %s: %v", key, err)
        }
        s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
    }

    // Write the state if changed
    // TODO: Be careful here ... what if there were other changes to the service?
    // 如果状态不相等
    if !v1helper.LoadBalancerStatusEqual(previousState, newState) {
        // Make a copy so we don't mutate the shared informer cache
        // 拷贝service, 不要修改 share的 informer中的缓存
        service = service.DeepCopy()

        // Update the status on the copy
        // 设置service的状态为新的状态
        service.Status.LoadBalancer = *newState

        // 更新service状态
        if err := s.persistUpdate(service); err != nil {
            runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err))
            return nil
        }
    } else {
        glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key)
    }

    return nil
}
  • 更新 apiserver
func (s *ServiceController) persistUpdate(service *v1.Service) error {
    // 更新service状态
    var err error
    for i := 0; i < clientRetryCount; i++ {
        _, err = s.kubeClient.CoreV1().Services(service.Namespace).UpdateStatus(service)
        if err == nil {
            return nil
        }
        // If the object no longer exists, we don't want to recreate it. Just bail
        // out so that we can process the delete, which we should soon be receiving
        // if we haven't already.
        if errors.IsNotFound(err) {
            glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
                service.Namespace, service.Name, err)
            return nil
        }
        // TODO: Try to resolve the conflict if the change was unrelated to load
        // balancer status. For now, just pass it up the stack.
        if errors.IsConflict(err) {
            return fmt.Errorf("not persisting update to service '%s/%s' that has been changed since we received it: %v",
                service.Namespace, service.Name, err)
        }
        glog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
            service.Namespace, service.Name, err)
        time.Sleep(clientRetryInterval)
    }
    return err
}

参考文章

  • http://dockone.io/article/2834
  • https://kubernetes.io/zh/docs/concepts/services-networking/service/

原创文章,作者:baxiaoshi,如若转载,请注明出处:http://www.sreguide.com/go%e8%af%ad%e8%a8%80/kubernetes-service-controller%e7%b3%bb%e7%bb%9f%e8%ae%be%e8%ae%a1%e4%b8%8e%e5%ba%95%e5%b1%82%e6%ba%90%e7%a0%81%e5%89%96%e6%9e%90.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注

联系我们

QQ: 52866169