Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007

Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007
kubernetes watchCache 缓存滑动窗口 底层设计与源码剖析 | 专注go语言|微服务|分布式|kubernetes系统研发与源码分析
Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007

Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007

kubernetes watchCache 缓存滑动窗口 底层设计与源码剖析

watchCache 缓存滑动窗口

背景

kubernetes  watchCache 缓存滑动窗口 底层设计与源码剖析
  • 通常在运维开发中, 为了避免后端存储的压力,都会增加一个缓存,缓存资源数据,避免每次都从后端获取
  • 今天介绍另外一个好玩的东西watchCache
  • watchCache是一个固定大小的用于监测缓存数据变化的数组

主要作用是缓存固定大小的最新的数据, 因为有些情况下,我们并不一定需要一个全量的数据, 比如在k8s开始watch一个资源对象, 那其实只需要吧watch的对应的版本最新的数据,给你最近一批被修改的数据就可以了,并不需要全部的数据

数据结构

  • 事件类型

// EventType 事件类型
type EventType string

const (
    // Added 事件类型
    Added EventType = "ADDED"
    // Moidifed 修改
    Moidifed EventType = "MODIFED"
    // Deleted 删除
    Deleted EventType = "DELETED"
    // Error 错误
    Error EventType = "ERROR"
)
// Event 事件类型
type Event struct {
    Type   EventType
    Object Object
}
  • 存储
// Store Object对象存储接口
type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    List() []interface{}
    Get(obj interface{}) (item interface{}, exits bool, err error)
    GetByKey(key string) (item interface{}, exits bool, err error)
    Replace([]interface{}, string) error
}
  • 存储事件
// storeElement 存储通过验证后的数据, 避免在list/watch多次计算
type storeElement struct {
    Key           string
    Object        Object
    Labels        Set
    Fields        Set
    Uninitialized bool
}
  • Versioner
// Versioner  Object版本信息接口
type Versioner interface {
    // 更新对象缓存
    UpdateObject(obj Object, resourceVersion uint64) error
    ObjectResourceVersion(obj Object) (uint64, error)
    ParseResourceVersion(resourceVersion string) (uint64, error)
}
  • watchCacheEvent
// watchCacheEvent 观察到事件对象
type watchCacheEvent struct {
    // 事件类型
    Type EventType
    // 获取修改后的对象数据
    Object           Object
    ObjLabels        Set
    ObjFields        Set
    ObjUninitialized bool
    // 保存之前的数据
    PrevObject    Object
    PrevObjFields Set
    PrevObjLables Set
    // 之前对象是否被初始化
    PrevObjUninitialized bool
    Key                  string
    ResourceVersion      uint64
}
  • watchCacheElement
type watchCacheElement struct {
    resourceVersion uint64
    watchCacheEvent *watchCacheEvent
}
  • watchCache
type watchCache struct {
    sync.RWMutex

    // 等待获取刷新数据
    cond *sync.Cond

    // 容量
    capacity int

    // 获取对象的key
    keyFunc func(Object) (string, error)

    // 获取对象的属性, 第一个set是所有的label, 第二个是属性
    getAttrsFunc func(Object) (Set, Set, bool, error)

    //
    cache      []watchCacheElement
    startIndex int
    endIndex   int

    // 存储数据
    store Store

    resourceVersion uint64

    onEvent   func(*watchCacheEvent)
    onReplace func()

    versioner Versioner

    clock Clock
}

转换流程

kubernetes  watchCache 缓存滑动窗口 底层设计与源码剖析
  • 调用对外暴漏的Get/Add/Update/Delete接口, 进行数据格式化: Event(事件数据)、updateFunc(后端存储的具体操作)、resourceVersion(当前数据的版本, 通过versioner实现)
  • 调用processEvent开始进行逻辑处理
    • 通过传入的Event和store获取的Object(存储中当前最新的数据)组装成storeElement
    • 将storeElement和resourceversion组合成watCacheEvent
    • 调用updateCache更新缓存
    • 调用updateFunc, 更新后端存储数据

完整代码

package cacher

import (
    "fmt"
    "sort"
    "sync"
    "time"
)

// EventType 事件类型
type EventType string

const (
    // Added 事件类型
    Added EventType = "ADDED"
    // Moidifed 修改
    Moidifed EventType = "MODIFED"
    // Deleted 删除
    Deleted EventType = "DELETED"
    // Error 错误
    Error EventType = "ERROR"
)

const blockTimeout = 3 * time.Second

// Event 事件类型
type Event struct {
    Type   EventType
    Object Object
}

// Clock 时间
type Clock struct{}

// Now 当前时间
func (Clock) Now() time.Time {
    return time.Now()
}

// After Same as time.After(d).
func (Clock) After(d time.Duration) <-chan time.Time {
    return time.After(d)
}

// Since returns time since the specified timestamp.
func (Clock) Since(ts time.Time) time.Duration {
    return time.Since(ts)
}

// Object 资源抽象基类
type Object interface{}

// Set 获取对象属性的时,返回对象的属性和label集合
type Set map[string]string

// Store Object对象存储接口
type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    List() []interface{}
    Get(obj interface{}) (item interface{}, exits bool, err error)
    GetByKey(key string) (item interface{}, exits bool, err error)
    Replace([]interface{}, string) error
}

// storeElement 存储通过验证后的数据, 避免在list/watch多次计算
type storeElement struct {
    Key           string
    Object        Object
    Labels        Set
    Fields        Set
    Uninitialized bool
}

// Versioner Object版本信息接口
type Versioner interface {
    // 更新对象缓存
    UpdateObject(obj Object, resourceVersion uint64) error
    ObjectResourceVersion(obj Object) (uint64, error)
    ParseResourceVersion(resourceVersion string) (uint64, error)
}

// watchCacheEvent 观察到事件对象
type watchCacheEvent struct {
    // 事件类型
    Type EventType
    // 获取修改后的对象数据
    Object           Object
    ObjLabels        Set
    ObjFields        Set
    ObjUninitialized bool
    // 保存之前的数据
    PrevObject    Object
    PrevObjFields Set
    PrevObjLables Set
    // 之前对象是否被初始化
    PrevObjUninitialized bool
    Key                  string
    ResourceVersion      uint64
}

type watchCacheElement struct {
    resourceVersion uint64
    watchCacheEvent *watchCacheEvent
}

type watchCache struct {
    sync.RWMutex

    // 等待获取刷新数据
    cond *sync.Cond

    // 容量
    capacity int

    // 获取对象的key
    keyFunc func(Object) (string, error)

    // 获取对象的属性, 第一个set是所有的label, 第二个是属性
    getAttrsFunc func(Object) (Set, Set, bool, error)

    //
    cache      []watchCacheElement
    startIndex int
    endIndex   int

    // 存储数据
    store Store

    resourceVersion uint64

    onEvent   func(*watchCacheEvent)
    onReplace func()

    versioner Versioner

    clock Clock
}

func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
    object, ok := obj.(Object)
    if !ok {
        return nil, false, fmt.Errorf("obj does not implememnt Object interfa: %v", obj)
    }
    key, err := w.keyFunc(object)
    if err != nil {
        return nil, false, fmt.Errorf("couldn't compute key: %v", err)
    }
    return w.store.Get(&storeElement{Key: key, Object: object})
}

func (w *watchCache) Add(obj interface{}) error {
    object, resourceVersion, err := w.objectToVersionRuntimeObject(obj)
    if err != nil {
        return err
    }
    event := Event{Type: Added, Object: object}

    f := func(elem *storeElement) error { return w.store.Add(elem) }
    return w.processEvent(event, resourceVersion, f)
}

func (w *watchCache) Update(obj interface{}) error {
    object, resourceVersion, err := w.objectToVersionRuntimeObject(obj)
    if err != nil {
        return err
    }
    event := Event{Type: Moidifed, Object: object}

    f := func(elem *storeElement) error { return w.store.Update(elem) }
    return w.processEvent(event, resourceVersion, f)
}

func (w *watchCache) Delete(obj interface{}) error {
    object, resourceVersion, err := w.objectToVersionRuntimeObject(obj)
    if err != nil {
        return err
    }
    event := Event{Type: Deleted, Object: object}

    f := func(elem *storeElement) error { return w.store.Add(elem) }
    return w.processEvent(event, resourceVersion, f)
}

func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
    w.Lock()
    defer w.Unlock()
    w.onEvent = onEvent
}

func (w *watchCache) SetOnReplace(onReplace func()) {
    w.Lock()
    defer w.Unlock()
    w.onReplace = onReplace
}

func (w *watchCache) objectToVersionRuntimeObject(obj interface{}) (Object, uint64, error) {
    object, ok := obj.(Object)
    if !ok {
        return nil, 0, fmt.Errorf("obj does not implement Object interface: %v", obj)
    }
    resourceVersion, err := w.versioner.ObjectResourceVersion(object)
    if err != nil {
        return nil, 0, err
    }
    return object, resourceVersion, nil
}

// GetAllEventsSinceThreadUnsafe 保存了最近更新的缓存, 当游list/watch请求的时候,直接从当前cache里面获取对应数据, 从而避免对list/watch都进行后端数据的查询
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
    size := w.endIndex - w.startIndex
    oldest := w.resourceVersion + 1
    if size > 0 {
        oldest = w.cache[w.startIndex%w.capacity].resourceVersion
    }
    if resourceVersion == 0 {
        allItems := w.store.List()
        result := make([]*watchCacheEvent, len(allItems))
        for i, item := range allItems {
            elem, ok := item.(*storeElement)
            if !ok {
                return nil, fmt.Errorf("not a storeElement: %v", elem)
            }
            objLabels, objFields, objUninitialized, err := w.getAttrsFunc(elem.Object)
            if err != nil {
                return nil, err
            }
            result[i] = &watchCacheEvent{
                Type:             Added,
                Object:           elem.Object,
                ObjLabels:        objLabels,
                ObjFields:        objFields,
                ObjUninitialized: objUninitialized,
                Key:              elem.Key,
                ResourceVersion:  w.resourceVersion,
            }
        }
        return result, nil
    }
    if resourceVersion < oldest-1 {
        return nil, fmt.Errorf("too old reosurce version:%d (%d)", resourceVersion, oldest-1)
    }
    f := func(i int) bool {
        return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
    }
    first := sort.Search(size, f)
    result := make([]*watchCacheEvent, size-first)
    for i := 0; i < size-first; i++ {
        result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
    }
    return result, nil
}

func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
    version, err := w.versioner.ParseResourceVersion(resourceVersion)
    if err != nil {
        return err
    }

    toReplace := make([]interface{}, len(objs))
    for _, obj := range objs {
        object, ok := obj.(Object)
        if !ok {
            return fmt.Errorf("didn't get Object for replace: %v", obj)
        }
        key, err := w.keyFunc(obj)
        if err != nil {
            return fmt.Errorf("couldn't compute key: %v", err)
        }
        objLabels, objFields, objUninitialized, err := w.getAttrsFunc(object)
        if err != nil {
            return err
        }
        toReplace = append(toReplace, &storeElement{
            Key:           key,
            Object:        object,
            Labels:        objLabels,
            Fields:        objFields,
            Uninitialized: objUninitialized,
        })
    }

    w.Lock()
    defer w.Unlock()

    w.startIndex = 0
    w.endIndex = 0
    if err := w.store.Replace(toReplace, resourceVersion); err != nil {
        return err
    }
    w.resourceVersion = version
    if w.onReplace != nil {
        w.onReplace()
    }
    w.cond.Broadcast()
    return nil
}

func (w *watchCache) processEvent(event Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
    key, err := w.keyFunc(event.Object)
    if err != nil {
        return fmt.Errorf("coundln't compute key: %v", err)
    }
    elem := &storeElement{Key: key, Object: event.Object}
    elem.Labels, elem.Fields, elem.Uninitialized, err = w.getAttrsFunc(event.Object)
    if err != nil {
        return err
    }

    watchCacheEvent := &watchCacheEvent{
        Type:            event.Type,
        Object:          event.Object,
        ObjLabels:       elem.Labels,
        ObjFields:       elem.Fields,
        Key:             key,
        ResourceVersion: resourceVersion,
    }

    w.Lock()
    defer w.Unlock()
    previous, exits, err := w.store.Get(elem)
    if err != nil {
        return err
    }
    if exits {
        previousElem := previous.(*storeElement)
        watchCacheEvent.PrevObject = previousElem.Object
        watchCacheEvent.PrevObjLables = previousElem.Labels
        watchCacheEvent.PrevObjLables = previousElem.Fields
        watchCacheEvent.PrevObjUninitialized = previousElem.Uninitialized
    }

    if w.onEvent != nil {
        w.onEvent(watchCacheEvent)
    }

    w.updateCache(resourceVersion, watchCacheEvent)
    w.resourceVersion = resourceVersion
    w.cond.Broadcast()
    return updateFunc(elem)
}

func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
    if w.endIndex == w.startIndex+w.capacity {
        // 容量已满
        w.startIndex++
    }
    w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
    w.endIndex++
}

func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
    err := w.waitUtilFreshAndBlock(resourceVersion)
    defer w.RUnlock()
    if err != nil {
        return nil, false, 0, err
    }
    value, exists, err := w.store.GetByKey(key)
    return value, exists, w.resourceVersion, err
}

func (w *watchCache) WaitUtilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
    err := w.waitUtilFreshAndBlock(resourceVersion)
    defer w.RUnlock()
    if err != nil {
        return nil, 0, err
    }
    return w.store.List(), w.resourceVersion, nil
}

func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
    return w.store.GetByKey(key)
}

func (w *watchCache) waitUtilFreshAndBlock(resourceVersion uint64) error {
    startTime := w.clock.Now()
    go func() {
        <-w.clock.After(blockTimeout)
        w.cond.Broadcast()
    }()

    w.RLock()
    for w.resourceVersion < resourceVersion {
        if w.clock.Since(startTime) >= blockTimeout {
            return fmt.Errorf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion)
        }
        w.cond.Wait()
    }
    return nil
}

func newWatchCache(
    capacity int,
    keyFunc func(Object) (string, error),
    getAttrsFunc func(Object) (Set, Set, bool, error),
    versioner Versioner) *watchCache {
    wc := &watchCache{
        capacity:        capacity,
        keyFunc:         keyFunc,
        getAttrsFunc:    getAttrsFunc,
        cache:           make([]watchCacheElement, capacity),
        startIndex:      0,
        endIndex:        0,
        store:           Store{},
        resourceVersion: 0,
        clock:           Clock{},
        versioner:       versioner,
    }
    wc.cond = sync.NewCond(wc.RLocker())
    return wc
}

总结

目前暂时理解了这些,有一点需要注意的, 这个地方是watchCache, 它会将所有的增删改查的操作都放到自己的cache里,但在Get时候,并不回去读取缓存中的数据,只有在watch中才回从当前的cache数组中读取对象, 接下来继续看cache, 后面得反过来再了解下

原创文章,作者:baxiaoshi,如若转载,请注明出处:http://www.sreguide.com/uncategorized/kubernetes-watchcache-%e7%bc%93%e5%ad%98%e6%bb%91%e5%8a%a8%e7%aa%97%e5%8f%a3-%e5%ba%95%e5%b1%82%e8%ae%be%e8%ae%a1%e4%b8%8e%e6%ba%90%e7%a0%81%e5%89%96%e6%9e%90.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注

联系我们

QQ: 52866169