Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007

Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007
kubernetes shared informer 系统设计与源码分析 | 专注go语言|微服务|分布式|kubernetes系统研发与源码分析
Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007

Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007

kubernetes shared informer 系统设计与源码分析

shared informer

概况

informer负责对应单个资源的lw操作, 但是如果当前controller需要使用多种资源,那每个controller就得对关注的资源再来一个informer(比如jobcontroller, 它既需要负责job事件的处理,同时如果job的pod不满足又得创建对应的pod)

shared informer 就是一堆informer的集合,每一类资源在一个客户端下只会有一个informer, 如果其他的controller关注对应的资源, 就只注册一个对应的处理函数, 当事件发生的时候, 由shared informer统一进行事件的分发处理(有点类似观察者模式, 但是里面的消息传递大量使用了go里面channel)

组成

kubernetes shared informer 系统设计与源码分析
  • controller调用ctx.InformerFactory获取关心的informer
  • sharedInformerFactory从当前缓存中获取或者创建对应informer
  • controller添加自己的ResourceEventHandler
  • sharedInformer接收Reflector里面的事件回调,触发controoler的ResourceEventHandler

sharedInformerFactory

sharedInformerFactory(工厂模式)负责给各种controller提供创建对应的informer, 并提供缓存功能(如果存在就返回, 否则创建)

sharedInformer

sharedInformer的核心功能跟其他的informer其实类似, 不过不同的是, 其允许注册多个ResourceEventHandler

sharedProcessor

sharedProcessor负责所有controller的ResourceEventHandler的注册, 同时接收informer的事件,分发给所有的processorListener

负责所有listener的启动和停止操作

processorListener

processorListener是controller传递进来的Handler的一层包装, 内部通过一个环状队列来缓冲了sharedProcessor传递过来的所有事件, 同时在自己内部又构建了一个事件通道nextCh, 用于自身获取事件和传递给controller

代码

sharedInformerFactory

  • 结构体
type sharedInformerFactory struct {
    // 客户端用于访问k8s apiserver
    client           kubernetes.Interface
    // 默认会监听所有namespace
    namespace        string
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    lock             sync.Mutex
    defaultResync    time.Duration
    customResync     map[reflect.Type]time.Duration

    // 存放已经创建的sharedinformer
    informers map[reflect.Type]cache.SharedIndexInformer
    // startedInformers is used for tracking which informers have been started.
    // This allows Start() to be called multiple times safely.
    startedInformers map[reflect.Type]bool
}
  • 工厂函数
// 当前工厂会以一个factory的函数传递给所有的informer, 在informer生成的时候, 会传递出当前informer的newFunc, 如果不存在就创建, 否则直接返回
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}

sharedInformer

  • 结构体
type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller
    // 负责接收所有controller传递进来的ResourceEventHandler
    processor             *sharedProcessor
    cacheMutationDetector CacheMutationDetector

    // 监听apiserver对应resource数据的变化
    listerWatcher ListerWatcher
    objectType    runtime.Object

    // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    // shouldResync to check if any of our listeners need a resync.
    resyncCheckPeriod time.Duration
    // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    // value).
    defaultEventHandlerResyncPeriod time.Duration
    // clock allows for testability
    clock clock.Clock

    started, stopped bool
    startedLock      sync.Mutex

    // blockDeltas gives a way to stop all event distribution so that a late event handler
    // can safely join the shared informer.
    blockDeltas sync.Mutex
}
  • run

跟其他的informer逻辑是一样的, 生成一个队列, 然后构建Reflector, 后面listwatch apiserver的数据变化, 回调

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
        // 注意这个地方, 当启动的时候,  会获取所有的processorListener, 然后启动processorListener的pop和run方法, 来监听informer添加的事件,来进行业务逻辑的处理
    wg.StartWithChannel(processorStopCh, s.processor.run)

    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    // controller.Run启动controller和reflector
    s.controller.Run(stopCh)
}
  • AddEventHandlerWithResyncPeriod
// 包装controller传递的ResourceEventHandler, 并注册到sharedProcessor
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {

        // 根据当前注册的所有handler需要同步的周期, 获取一个最小值
    if resyncPeriod > 0 {
        if resyncPeriod < minimumResyncPeriod {
            glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
            resyncPeriod = minimumResyncPeriod
        }

        if resyncPeriod < s.resyncCheckPeriod {
            if s.started {
                glog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
                resyncPeriod = s.resyncCheckPeriod
            } else {
                // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
                // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
                // accordingly
                // 设置同步周期
                s.resyncCheckPeriod = resyncPeriod
                s.processor.resyncCheckPeriodChanged(resyncPeriod)
            }
        }
    }

    // 生成一个新的processListener
    listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

    // 如果当前sharedInformer没有启动, 就直接添加到processor{sharedProcessor}
    //
    if !s.started {
        s.processor.addListener(listener)
        return
    }

    // in order to safely join, we have to
    // 1. stop sending add/update/delete notifications
    // 2. do a list against the store
    // 3. send synthetic "Add" events to the new handler
    // 4. unblock
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // 如果informer已经启动, 就调用indexer.List(), 传送给当前的listener
    s.processor.addListener(listener)
    for _, item := range s.indexer.List() {
        listener.add(addNotification{newObj: item})
    }
}

sharedProcessor

  • 结构体
type sharedProcessor struct {
    listenersStarted bool
    listenersLock    sync.RWMutex
    listeners        []*processorListener    // 注册listeners, 由controller添加进来
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group
}
  • run
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            // 启动所有 listener的run方法和pop方法
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
        // 等待停止然后暂停所有listener
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}
  • distribute

分发事件给所有的listener

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}

processorListener

kubernetes shared informer 系统设计与源码分析

为什么是这个样子呢?

  • 查看了之前版本的代码, 这个地方最初是使用一个切片来来做事件的缓冲, 后来才变成了RingGrowing
  • 首先当sharedProcessor进行事件分发的时候, 应该尽可能少的阻塞(要不会因为某个controller消费慢而影响所有controller的进度)
  • 其次事件应该尽可能快的被消费, informer这个地方使用select来监听事件的变化,同时通过阻塞channel的方式来实时的获取数据, 就出现了上面的两个channel: nextCh(内部消息转发通道)和addCh(外部通道)
  • 但是两边的处理速率可能并不相同, 那多出来的数据怎么办?就是我们看到的RingGrowing, 由它来进行数据的缓冲
  • 如果发现容量不够了,就直接进行扩容操作

个人理解其实跟正常业务开发使用MQ进行削峰是一样的道理

  • – 结构体
type processorListener struct {
    // nextCh是当前listener内部事件通道
    nextCh chan interface{}
    // addCh负责接收sharedProessor传递进来的事件通知 
    addCh  chan interface{}
    // controller的ResourceEventHandler
    handler ResourceEventHandler
    // 内部缓冲队列
    pendingNotifications buffer.RingGrowing

    // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
    requestedResyncPeriod time.Duration
    // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
    // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
    // informer's overall resync check period.
    resyncPeriod time.Duration
    // nextResync is the earliest time the listener should get a full resync
    nextResync time.Time
    // resyncLock guards access to resyncPeriod and nextResync
    resyncLock sync.Mutex
}
  • pop
kubernetes shared informer 系统设计与源码分析
// pop方法设计的太精妙了, 主要是是通过addCh-> RingGrowing -> nextCh来进行事件的实时分发
func (p *processorListener) pop() {
    // processorListener从当前 addCh中获取对应的事件
    // 获取事件后传递给.NextCh,给后续的run使用
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        // 当第一次运行的时候, notification->nextCh中, 这时候,从pendingNotifications中读取数据会返回notification为nil
        // 同时ok是false,nextCh为置为nil, 当channel为nil的时候, 不接收任何数据, 当前case条件就不会被满足
        case nextCh <- notification:
            // Notification dispatched
            // 精妙的地方在这, 当我从pendingNotifications一直能读取到数据(本地队列中一直有缓冲的事件)
            // 那我就一直可以读取数据, 当前的这个case分支就可以一直读取数据, 然后就会将notification加入到nextCh(p.nextCh)中
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        // 当sharedProcessor添加数据的时候, 会从addCh中获取到对应的事件
        case notificationToAdd, ok := <-p.addCh:
            // 当sharedProcessor分发一个事件, 如果当的notification不为空, 表示当前正在分发数据到nextCh中
            // 就先将数据写入到队列中等待后续读取数据
            if !ok {
                return
            }
            // notification在上一个ReadOne中已经被置为nil, 这时候继续读取数据, 然后将p.nextCh设置为nextCh(nextCh不为nil, 就可以继续接收数据了)
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                // 将事件加入环状队列中, 如果队列容量不足,就进行扩容
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}
  • run
// run方法其实就比较简单了, 就直接从.nextCh中获取事件然后调用handler里面对应的事件进行处理就可以了
func (p *processorListener) run() {
    // this call blocks until the channel is closed.  When a panic happens during the notification
    // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    // the next notification will be attempted.  This is usually better than the alternative of never
    // delivering again.
    stopCh := make(chan struct{})
    wait.Until(func() {
        // this gives us a few quick retries before a long pause and then a few more quick retries
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            // run从nextCh获取一个事件, 同时根据事件的类型,  调用最初传递进来的handler函数对应的action处理方法
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            return true, nil
        })

        // the only way to get here is if the p.nextCh is empty and closed
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

原创文章,作者:baxiaoshi,如若转载,请注明出处:http://www.sreguide.com/uncategorized/kubernetes-shared-informer-%e7%b3%bb%e7%bb%9f%e8%ae%be%e8%ae%a1%e4%b8%8e%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注

联系我们

QQ: 52866169