kubernetes informer系统设计与底层源码剖析

Informer

概况

k8s是一个事件驱动的系统,核心通过apiserver进行event broadcast进行事件分发

客户端 controller通过list watch监听数据变化, 触发对应资源类型的改变, 作出对应的操作

为了减轻对apiserver的压力, 客户端实现了一个缓存机制informer, informer负责从apiserver端同步数据, 更新本地缓存数据, 同时watch数据的变化

并且接受其他组件的ResourceEventHandler, 再事件发生时, 调用对应事件类型的处理函数, 来进行业务逻辑的处理

初识

kubernetes informer系统设计与底层源码剖析

informer就是为了减轻核心apiserver数据交互的压力而抽象出来的一个 cache 层, 客户端对apiserver数据的 读取和 监听 操作都通过本地informer进行

组成

kubernetes informer系统设计与底层源码剖析
  • Reflector 负责获取和监听服务端数据变化
  • store 负责Reflector 数据的存储
    • items 主要是存放key-value的数据索引
    • indices 存储对象的所有索引映射
    • indexer 存储通过当前索引生成的索引映射
  • DeltaFIFO 是一个事件队列,Refletor接收数据变化之后, 将数据传给store和ResouceEventHandler

通信

informer在创建的时候, 会包装客户端 controller 的 Config 的Process方法, 接收两个参数 objType 和 ResourceEventHandler , 通过objType将获取的事件进行数据转换, ResourceEventHandler则是由对应的调用者去实现不同事件的处理方式

代码

创建Informer

func NewInformer(
    lw ListerWatcher,
    objType runtime.Object,
    resyncPeriod time.Duration,
    h ResourceEventHandler,
) (Store, Controller) {
    // NewStore创建一个threadSafeMap用于存储客户端的所有事件
    // 默认传递的keyFUNC是通过namespace+resouce_name生成对象key
    clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

// 创建DeltaFIFO队列, 用于获取对应的事件
// 第一个参数是keyFunc生成对象key, 第二参数则是KeyListerGetter, 用于实现从上面提的store中通过key获取对象列表
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)

    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    lw,
        ObjectType:       objType,
        FullResyncPeriod: resyncPeriod,
        RetryOnError:     false,

        Process: func(obj interface{}) error {
            // Process主要作用是根据事件的类型, 转发到EventResourceHandler不同的处理函数
            for _, d := range obj.(Deltas) {
                switch d.Type {
                case Sync, Added, Updated:
                                // 调用store的Get方法查询对应的object是否存在
                    if old, exists, err := clientState.Get(d.Object); err == nil && exists {
                        // 如果已经存在对应的数据,  就更新
                        if err := clientState.Update(d.Object); err != nil {
                            return err
                        }
                        // 调用传递进来的对应的ResourceEventHandler里面的对应的方法
                        h.OnUpdate(old, d.Object)
                    } else {
                        // 如果数据不存在, 就直接调用add方法
                        if err := clientState.Add(d.Object); err != nil {
                            return err
                        }
                        h.OnAdd(d.Object)
                    }
                case Deleted:
                    if err := clientState.Delete(d.Object); err != nil {
                        return err
                    }
                    h.OnDelete(d.Object)
                }
            }
            return nil
        },
    }
    // New最后返回一个对应的controller, controller负责从DeltaFIFO队列中取出数据传递给process
    return clientState, New(cfg)
}

informer这些代码是关键代码, 注意对应数据的传递以及变量的共享

controller

  • 数据结构
type controller struct {
    config         Config
    reflector      *Reflector
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}
  • 运行方法
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    // controller使用NewInformer中传递的Config对象的Queue和lw来进行Reflector的实例化
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue, // 这里就是reflector中的store
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group
    defer wg.Wait()
    // 启动Reflector
    wg.StartWithChannel(stopCh, r.Run)

    // processLoop不端的从队列中获取数据, 然后调用对应的处理方法
    wait.Until(c.processLoop, time.Second, stopCh)
}
func (c *controller) processLoop() {
    for {
        // Pop的时候, 我们可以传递一个处理函数, 该处理函数就是Config里面的process, 根据不同的事件类型调用下面的eventHandler对应的方法
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            if err == FIFOClosedError {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                // 如果允许重试,可以重新将对象数据放回到队列中进行处理
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

Reflector

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    // ResourceVersion为0表示当前数据直接从apiserver的cache中获取, 然后从当前的返回的数据中取出当前的resourceVersion, 后续调用Watch进行数据变化的监听
    options := metav1.ListOptions{ResourceVersion: "0"}
    // 获取当前类型的所有数据
    list, err := r.listerWatcher.List(options)
    // 获取当前数据的resourceVersion
    resourceVersion = listMetaInterface.GetResourceVersion()
    items, err := meta.ExtractList(list)

    // 将获取的数据和resourceVersion,替换到当前的store里面, 这里的store就是Queue
    if err := r.syncWith(items, resourceVersion); err != nil {
        return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
    }
    // 设置最后同步的资源版本
    r.setLastSyncResourceVersion(resourceVersion)
            // 通过之前获取的resouceVersion生成一个http request的watch继续watch后续数据的额变化
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
        }

        w, err := r.listerWatcher.Watch(options)
        // 生成一个 watcher后续watcher会监听数据变化, 同时提供w.ResultChan给watchHandler获取watch到的事件
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
            }
            return nil
        }
    }
}

// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    return r.store.Replace(found, resourceVersion)
}

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested
        case err := <-errc:
            return err
            // 监听到数据变化 , 然后获取对应数据的变化, 调用store对应的方法, 修改数据
        case event, ok := <-w.ResultChan():
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type {
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
                    //. 设置新的resouceVersion后续请求使用新的watch opyions
                    // 注意这里是使用的指针直接进行了对应资源的修改
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }

    watchDuration := r.clock.Now().Sub(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        r.metrics.numberOfShortWatches.Inc()
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
    return nil
}
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    return r.store.Replace(found, resourceVersion)
}

threadSafeMap

kubernetes informer系统设计与底层源码剖析

threadSafeMap其实就相当于informer中的一个 数据库 , 负责存储对应数据的key->object的映射, 同时接收外部传递进来的indexers, 为当前object构建对应的 索引 , 上图中updateIndices和indexer, 当cache (就是上面的store)接收到一个数据的时候, 首先通过keyFunc生成object对应的key, 然后就调用threadSafeMap中的增删改查方法, 在增改删中都会调用跟indices有关的方法(增改是updateIndices, 删是deleteFromIndices), 更新本地索引

  • 结构体
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}
  • 生成索引
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
    // if we got an old object, we need to remove it before we add it again
    if oldObj != nil {
        c.deleteFromIndices(oldObj, key)
    }
        // 遍历所有的indexer, 生成当前object的values, 然后吧所有的values都存放在Index中, 其中value就是key
    for name, indexFunc := range c.indexers {
        // 通过indexer的 indexFunc, 获取当前对象的key, 返回的是一个列表
        indexValues, err := indexFunc(newObj)
        if err != nil {
            panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
        }
        // 从indices中获取当前index的索引的数据
        index := c.indices[name]
        if index == nil {
            // 如果为空就至为一个值, 就新建一个索引
            index = Index{}
            c.indices[name] = index
        }

        // 注意,这个地方通过上面的indexFunc,生成了一个indexValues, 会在当前index将所有value的映射都设置为传递进来的**key**
        // **key**是由add或者update 等方法为当前对象生成的一个key, 该key映射到items里面对应的object
        for _, indexValue := range indexValues {
            // 遍历当前object的所有indexValues, 设置当前index的缓存的索引的值
            set := index[indexValue]
            if set == nil {
                set = sets.String{}
                index[indexValue] = set
            }
            set.Insert(key)
        }
    }
}

原创文章,作者:baxiaoshi,如若转载,请注明出处:http://www.sreguide.com/go%e8%af%ad%e8%a8%80/kubernetes-informer%e7%b3%bb%e7%bb%9f%e8%ae%be%e8%ae%a1%e4%b8%8e%e5%ba%95%e5%b1%82%e6%ba%90%e7%a0%81%e5%89%96%e6%9e%90.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注

联系我们

QQ: 52866169