kubernetes 事件分发的抽象与核心代码分析

事件分发

背景

kubernetes 事件分发的抽象与核心代码分析

在运维系统里面通常一个数据变化,可能会触发多个联动系统的一些操作,每个系统自身可能又会有不通的操作,那么我们就需要一个中间的模块,帮我们完成事件的监听和统一处理

其实只做一件很简单的事情 监听某个地方的数据变化 和 把变化传递给关心的handler

设计

kubernetes 事件分发的抽象与核心代码分析
  • eventStorage: 负责监听底层各种事件, 传递给Broadcaster
  • Broadcaster: 自身维护一个watcher队列,接收到storage发送的event事件转发给所有wacther
  • watcher: 负责将接受到的事件直接传递给handler
  • handler: 声明的负责函数处理的具体方法

数据结构

  • Broadcaster
// 分发器
type Broadcaster struct {
    lock    sync.Mutex

    watchers        map[int64]*broadcasterWatcher
    nextWatcher     int64
    distributing    sync.WaitGroup

    incoming        chan Event

    watchQueueLength    int
    fullChannelBehavior FullChannelBehavior
}
  • Watcher
// 事件Watcher
type broadcasterWatcher struct {
    result  chan Event
    stopped chan struct{}
    stop    sync.Once
    id      int64    // Broadercaster里面的map里面的标识, 用于标识当前watcher
    m       *Broadcaster
}

关键实现

  • blcokQueue
// blockQueue用于阻塞队列,执行功能函数, 主要是产生一个新的watcher
func (m *Broadcaster) blockQueue (f func()) {
    var wg sync.WaitGroup
    wg.Add(1)
    m.incoming <- Event{
        Type:   internalRunFunctionMarker,
        object: functionFakeRuntimeObject(func() {
            defer wg.Done()
            f()
        }),
    }
    wg.Wait()
}
  • Watch
// Watch生成一个新的watcher, 后续handler处理只需要从当前watcher的chan里面获取数据, Broadcaster会把接受到的事件广播到所有的watcher中
func (m *Broadcaster) Watch() Interface {
    var w *broadcasterWatcher
    m.blockQueue(func() {
        m.lock.Lock()
        defer m.lock.Unlock()

        id := m.nextWatcher
        m.nextWatcher++
        w = &broadcasterWatcher{
            result: make(chan Event, m.watchQueueLength),
            stopped:make(chan struct{}),
            id:     id,
            m:      m,
        }
        m.watchers[id] = w
    })
    return w
}
  • loop
// 分发器循环将事件发送到所有的watcher
func (m *Broadcaster) loop () {
    for event := range m.incoming {    // 从事件的chan获取事件
        if event.Type == internalRunFunctionMarker {    // 当我们接受到的是一个函数类型的事件, 就执行该函数, 目前主要用于增加新的watch
            event.object.(functionFakeRuntimeObject)()
            continue
        }
        m.distribute(event)
    }
    m.closeAll()
    m.distributing.Done()
}
  • distribute
// 实现event消息分发到watcher
func (m *Broadcaster) distribute(event Event) {
    m.lock.Lock()
    defer m.lock.Unlock()
    if m.fullChannelBehavior == DropIfChannelFull {
        for _, w := range m.watchers {
            select {
            case w.result <- event:
            case <- w.stopped:
            default:
            }
        }
    } else {
        for _,w := range m.watchers {
            select {
            case w.result <- event:
            case <- w.result:
            }
        }
    }
}

完整代码

package watch

import "sync"

// 对象类型
type Object interface {

}

// 事件
type EventType string

type Event struct {
    Type    EventType
    object  Object
}



// 队列长度
const incomingQueueLength = 25

// 定义队列满之后的行为操作
type FullChannelBehavior int

// 定义一个包装函数用于阻塞队列实现功能函数执行
const internalRunFunctionMarker = "internal-do-function"

type functionFakeRuntimeObject func()

const (
    WaitIfChannelFull FullChannelBehavior = iota
    DropIfChannelFull
)


// 分发器
type Broadcaster struct {
    lock    sync.Mutex

    watchers        map[int64]*broadcasterWatcher
    nextWatcher     int64
    distributing    sync.WaitGroup

    incoming        chan Event

    watchQueueLength    int
    fullChannelBehavior FullChannelBehavior
}

// 产生新的事件分发器
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
    m := &Broadcaster{
        watchers:           map[int64]*broadcasterWatcher{},
        incoming:           make(chan Event, incomingQueueLength),
        watchQueueLength:   queueLength,
        fullChannelBehavior:fullChannelBehavior,
    }
    m.distributing.Add(1)
    go m.loop()
    return m
}

// 分发器循环将事件发送到所有的watcher
func (m *Broadcaster) loop () {
    for event := range m.incoming {
        if event.Type == internalRunFunctionMarker {
            event.object.(functionFakeRuntimeObject)()
            continue
        }
        m.distribute(event)
    }
    m.closeAll()
    m.distributing.Done()
}

// blockQueue用于阻塞队列,执行功能函数, 主要是产生一个新的watcher
func (m *Broadcaster) blockQueue (f func()) {
    var wg sync.WaitGroup
    wg.Add(1)
    m.incoming <- Event{
        Type:   internalRunFunctionMarker,
        object: functionFakeRuntimeObject(func() {
            defer wg.Done()
            f()
        }),
    }
    wg.Wait()
}

// Action产生一个事件函数
func (m *Broadcaster) Action(action EventType, object Object) {
    m.incoming <- Event{action, object}
}

// Watch生成一个新的watcher
func (m *Broadcaster) Watch() Interface {
    var w *broadcasterWatcher
    m.blockQueue(func() {
        m.lock.Lock()
        defer m.lock.Unlock()

        id := m.nextWatcher
        m.nextWatcher++
        w = &broadcasterWatcher{
            result: make(chan Event, m.watchQueueLength),
            stopped:make(chan struct{}),
            id:     id,
            m:      m,
        }
        m.watchers[id] = w
    })
    return w
}

// 关闭所有的watcher
func (m *Broadcaster) closeAll() {
    m.lock.Lock()
    defer m.lock.Unlock()
    for _, w := range m.watchers {
        close(w.result)
    }
    m.watchers = map[int64]*broadcasterWatcher{}
}

// 停止分发器
func (m *Broadcaster) Shutdown()  {
    close(m.incoming)
    m.distributing.Wait()
}

// 实现event消息分发到watcher
func (m *Broadcaster) distribute(event Event) {
    m.lock.Lock()
    defer m.lock.Unlock()
    if m.fullChannelBehavior == DropIfChannelFull {
        for _, w := range m.watchers {
            select {
            case w.result <- event:
            case <- w.stopped:
            default:
            }
        }
    } else {
        for _,w := range m.watchers {
            select {
            case w.result <- event:
            case <- w.result:
            }
        }
    }
}

// 暂停制定的watch id
func (m *Broadcaster) stopWatching(id int64) {
    m.lock.Lock()
    defer m.lock.Unlock()
    w, ok := m.watchers[id]
    if !ok {
        return
    }
    delete(m.watchers, id)
    close(w.result)
}

// 事件Watcher
type broadcasterWatcher struct {
    result  chan Event
    stopped chan struct{}
    stop    sync.Once
    id      int64
    m       *Broadcaster
}

// broadcasterWatcher
func (mw *broadcasterWatcher) ResultChan() <- chan Event {
    return mw.result
}

// 关闭broadcasterWatcher
func (mw *broadcasterWatcher) Stop() {
    mw.stop.Do(func() {
        close(mw.stopped)
        mw.m.stopWatching(mw.id)
    })
}

总结

以上代码主要来自k8s, 上面的只是一个抽象的实现,具体的实现目前嗨没看到, 当etcd里面的数据变化, 就会被对应的storage识别到, 就会通过Broadcaster分发到对应的watcher, 比如http里面的watcher接受到数据变化就会把对应的数据做编码,然后发送给客户端

原创文章,作者:baxiaoshi,如若转载,请注明出处:http://www.sreguide.com/%e7%b3%bb%e7%bb%9f%e8%ae%be%e8%ae%a1/kubernetes-%e4%ba%8b%e4%bb%b6%e5%88%86%e5%8f%91%e7%9a%84%e6%8a%bd%e8%b1%a1%e4%b8%8e%e6%a0%b8%e5%bf%83%e4%bb%a3%e7%a0%81%e5%88%86%e6%9e%90.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注

联系我们

QQ: 52866169