基于etcd实现分布式锁

1. etcd实现mutex

1.1 Etcd实现mutex的关键特性

1.1.1 Lease租约

image.png

租约是etcd v3 api的特性,是客户端想服务端创建一个短时的可过期的契约(租约),在进行etcd键值对操作的时候,如果客户端没有定时进行Lease的续约,过期后就会删除所有与该Lease绑定的键值对

注意Lease是递增的

1.1.2 Revision

Revision是etcd集群的一个64位计数器,键空间的每次修改都会递增

1.1.3 watch

image.png

为了避免客户端反复的轮训,etcd提供了一种watch机制,客户端可以通过watch给定的key值,当key数据发生变化的时候,服务端会主动进行event的推送,从而感知到数据的变化

1.1.4 事务机制

事务机制是etcd提供的同时执行多条命令的一种事务操作,我们可以拼接多个操作,作为一个事务进行执行,etcd会保证事务操作的原子性

1.2 基于etcd的 mutex实现

1.2.1 mutex实现核心流程

基于etcd实现分布式锁

etcd的分布式锁实现位于concurrency中的mutex.go, 利用lease、事务操作、watch机制实现了一个相对公平、可重入的分布式锁, 其核心流程如下

1.首先建立连接创建session

2.通过 session创建Lease

3.进行事务操作,事务操作中会先进行当前节点锁的拼接,如果锁节点不存在就创建,否则只获取

根据上面的获取锁的结果来进行当前节点锁结果的判定,即当前版本下是否有别的节点创建过锁,如果没有则成功, 这里会返回revision即当前的版本和当前版本之前的所有存在的KV对

4.根据前缀获取当前版本下(小于revision)的所有节点

5.watch获取的KV里面的第一个节点

6.如果节点释放锁或者超时,则watch到删除事件

7.重新获取所有节点,进行锁的判定,如果当前没有节点了,就返回成功

1.2.2 Lock加锁的核心流程

image.png
func (m *Mutex) Lock(ctx context.Context) error {
    // tryAcquire其实就是我们之前说的,进行事务操作返回本次事务的操作结果
    // resp里面会包含revision和所有存在的kvs
    resp, err := m.tryAcquire(ctx)
    if err != nil {
        return err
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    // 如果发现当前版本下没有键值对,即键值对的长度为0
    // 或者键值对的头节点是当前节点, 就创建锁成功
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }
    client := m.s.Client()
    // wait for deletion revisions prior to myKey
    // 等待之前版本的键值对删除,还记得这个地方的revision,其实是一个递增的饿值,
    // 这里只关注之前创建的所有的节点, 如果返回nil,则获取锁成功
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}

1.2.3 可重入事务加锁

image.png
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
    s := m.s
    client := m.s.Client()

    // 首先通过lease来进行节点的拼接,比如最终可能拼接成{lock}0000001
    // 通过leaseID来实现锁的公平性
    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    // 下面的cmp/put/get/getOwner操作其实就是组成etcd锁创建的事务操作
    // 这里通过cmp来先进行对比的原因,是因为如果重入加锁,实际上就啥也不做,只获取当前的状态即可
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    // 组装etcd的事务操作
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return nil, err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    return resp, nil
}

1.2.4通过获取watch等待锁释放实现

image.png
    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    // getOpts参数主要用于设定maxCreateRev,其实就是我们说的创建的节点是在当前节点操作之前的才进行关注
    getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
    for {
        // 通过锁前缀获取当前的所有锁节点
        resp, err := client.Get(ctx, pfx, getOpts...)
        if err != nil {
            return nil, err
        }
        // 如果之前的全部节点都删除了,则表示当前节点获取到了锁
        if len(resp.Kvs) == 0 {
            return resp.Header, nil
        }
        lastKey := string(resp.Kvs[0].Key)
        // 否则进行watch等待之前锁的释放
        if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
            return nil, err
        }
    }
}

1.2.5watch感知节点操作触发锁检查

func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
    cctx, cancel := context.WithCancel(ctx)
    defer cancel()

    var wr v3.WatchResponse
    // 通过watch api创建一个watch 管道
    wch := client.Watch(cctx, key, v3.WithRev(rev))
    for wr = range wch {
        for _, ev := range wr.Events {
            if ev.Type == mvccpb.DELETE {
                // 如果watch到删除事件则回调处理
                return nil
            }
        }
    }
    if err := wr.Err(); err != nil {
        return err
    }
    if err := ctx.Err(); err != nil {
        return err
    }
    return fmt.Errorf("lost watcher waiting for delete")
}

1.2.6etcd锁的惊群与公平性问题

image.png

惊群问题主要是指在节点删除后,会触发当前集群中所有的节点进行感知,在zk中通过只watch前序节点进行解决,但在etcd中却无法实现,主要是因为zk是依靠临时顺序节点,节点顺序进行的,而etcd基于revision和leaseId来实现

这会早就一个问题,就是如果你早就申请了lease,但是一直没进行锁的申请,你的leaseId是最小的,所以无法通过leaseId来进行排序判断锁的前后关系,来实现公平性

所以etcd里面的锁的公平性其实主要是通过revision来实现,通过检查当前版本之前的所有kv的数量,来进行公平性,因为revision只要变更就会进行递增

原创文章,作者:baxiaoshi,如若转载,请注明出处:http://www.sreguide.com/%e5%88%86%e5%b8%83%e5%bc%8f/%e5%9f%ba%e4%ba%8eetcd%e5%ae%9e%e7%8e%b0%e5%88%86%e5%b8%83%e5%bc%8f%e9%94%81.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注

联系我们

QQ: 52866169