查看原文
其他

etcd watch 机制源码解析——客户端篇

小徐先生1212 小徐先生的编程世界 2024-03-30

0 前言

前一周和大家分享的主题是——”如何基于 etcd 实现 grpc 服务的注册与发现“. 这项能力背后依赖的是 etcd 中的 watch 机制,即针对于指定范围的数据设置 watch 监听回调,一旦数据发生变更,创建 watch 的应用方将能通过回调事件感知到对应的变更信息.

接下来两周,我想和大家一起针对 etcd watch 机制进行深挖,按照惯例,在原理分析之余,我会填充大量的源码内容对涉及的原理概念加以佐证.

本系列分为两部分:第一部分从 etcd 客户端的视角出发,对 watch 机制的实现链路进行解析;第二部分则进入 etcd 服务端的领域,揭晓 etcd watch 机制最底层的实现细节.

本文是其中的第一篇,下图是本文讨论内容的目录树结构:

 

1 背景

1.1 etcd

etcd 是一个分布式 KV 存储组件,协议层通过 raft 算法保证了服务的强一致性和高可用性,同时,etcd 还提供了针对于存储数据的 watch 监听回调功能,基于这一特性,etcd 适合用于作为配置管理中心或者服务注册/发现模块.

(如果大家对 etcd 协议层使用到分布式共识算法 raft 感兴趣的话,可以翻看我之前发表的两篇文章:”两万字长文解析raft算法原理“ 和 ”raft 工程化案例之 etcd 源码实现“.)

etcd 官方文档:https://etcd.io/

etcd 的开源地址为 https://github.com/etcd-io/etcd

本文走读的 etcd 源码版本为 v3.5.8.

 

1.2 watch 机制

所谓 watch 机制,指的是应用方可以针对存储在 etcd 中特定范围的数据创建 watch 监听器,在 watch 过程中,当对应数据发生变化时,etcd 会根据 watch 记录追溯到应用方,对变更事件进行同步.

建立在这样的机制之上,watch 机制主要能够胜任以下的几类应用场景.

 

1.3 分布式锁

在分布式场景中,当不同的物理节点尝试对共同的临界资源进行并发保护,形成串行访问的秩序时,需要使用到分布式锁技术. 分布式锁的实现模式可以分为两大类:

  • • 主动轮询型:类比于单机锁中的主动轮询 + cas 乐观锁模型,取锁方持续对分布式锁发出尝试获取动作,如果失败就发起重试,直到取锁成功;

  • • watch 回调型:在取锁方经历一轮取锁而不得的尝试后,会 watch 订阅分布式锁的释放事件,随后不再发起主动取锁的尝试;直到分布式锁被释放后,取锁方感知到这一变化后,才会重新发起尝试取锁的动作.

在单机环境中,主动轮询和 watch 回调两种锁模型各有优劣,所谓的”优“和”劣“也是相对而言,需要对 cpu 空转以及阻塞协程两种行为的损耗做出权衡. (大家对这部分概念如果不清晰,可以阅读一下我之前发表的文章——”Golang 单机锁实现原理“).

然而,在分布式场景中,我个人的理解是,watch 回调型的分布式锁是要优于主动轮询模型的. 这是因为分布式场景中”轮询“这一动作的成本相比于单机锁而言要高很多,背后存在的行为可能是一次甚至多次网络 IO 请求. 这种情况下,取锁方基于 watch 回调的方式,在确保锁被释放、自身有机会取锁的情况下,才会重新发出尝试取锁的请求,这样能在很大程度上避免无意义的轮询损耗.

当然,基于 watch 回调模型实现的分布式锁背后可能还存在其他的问题,比如:当有多个应用方同时 watch 同一把锁的释放事件,最终锁被释放时可能会引发“惊群效应”. 针对于分布式锁的实现策略本身也是一个有深度的话题,后续我会单开一个篇章进行讨论.

在这里,我真正想说的是,etcd 的 watch 机制,正是满足了 watch 回调类型分布式锁的实现条件. 我们把在 etcd 中添加一组 kv 对这一行为标志为”加锁成功“,则对于取锁而不得者,可以选择使用 etcd 的 watch 机制,创建一个 watch 监听对应 kv 数据的释放事件,直到被 watch 回调之后才再次尝试取锁. 这一流程正是 watch 回调型分布式锁的执行思路.

 

1.4 配置中心

watch 机制的另一类应用场景是分布式配置中心. 比如我在前文 “基于 etcd 实现 grpc 服务注册与发现” 当中聊到的,我们可以将 etcd 作为 grpc 服务的注册中心. 同一个 grpc 服务组在 etcd 中会以相同的服务名作为共同的标识键前缀,与各服务节点的信息建立好映射关系,以实现所谓的“服务注册”功能.

在客户端使用“服务发现”功能时,则会在 etcd 中通过服务名取得对应的服务节点列表缓存在本地,然后在客户端本地基于负载均衡策略选择 endpoint 进行连接请求. 在这个过程中,客户端还会利用到 etcd 的 watch 功能,在服务端节点发生变化时,及时感知到变更事件,然后对本地缓存的服务端节点列表进行更新,以保证客户端能够始终持有实时性较高的服务端地址信息.

 

 

2 客户端架构

2.1 探讨范围

首先,我们再次明确本文的讨论范围.

etcd 本身是客户端+服务端实现的 C-S 架构. 本文的讨论内容仅限于客户端部分,其中会涉及的内容点包括:

  • • 应用方如何利用 etcd 客户端 sdk 与 etcd 服务端搭建通信架构

  • • etcd 客户端如何将应用方创建 watch 的请求发往 etcd 服务端

  • • 当 etcd 服务端传来 watch 回调事件时, etcd 客户端如何将事件准确推送到创建 watch 的应用方手中

关于更底层的部分,etcd 服务端是基于何种方式实现对数据变更的感知、如何与已有的 watch 建立关联以及如何寻找到归属的应用方身份,这些实现细节对于本文而言暂时只是个黑匣子,我将会在下周推出的 ”etcd watch 机制——服务端篇“ 一文中给出答案.

 

2.2 C-S 长连接

首先,etcd 客户端和 etcd 服务端之间,是通过一条 grpc 长连接保持通信的. 这是因为两者之间的交互可能非常频繁,这条长连接双工管道,在客户端 -> 服务端的方向上,可能存在来自应用方多次发送的创建/删除 watch 的请求;在服务端 -> 客户端的方向上,可能持续发送因为因数据变更引起的 watch 回调事件. 由此可见,在双方请求如此频繁的情况下,使用 grpc 长连接是更加合适的方式. 倘若每笔请求都单独建立连接处理,这样引起的性能损耗无疑会高昂很多.

 

2.3 创建 watch 链路

当应用方首次尝试通过 etcd 客户端发起创建 watch 的请求时,首先会进行 etcd 客户端与 etcd 服务端间通信架构的初始化,在之后的请求中可以统一复用. 在这部分准备工作中,客户端侧会创建并异步运行 grpc 长连接代理对象 watcherGrpcStream;同时会启动协程 serveWatchClient,持续轮询处理来自 etcd 服务端的响应.

在创建 watch 的主链路中,etcd 客户端会创建好一个 channel(称之为 upch) 提前返回给应用方,后续 watch 监听的数据发生变更时,应用方可以通过这个 channel 接收到 watch 回调事件.

接下来,etcd 客户端会通过 watcherGrpcStream 将创建 watch 的请求通过 grpc 长连接推送到 etcd 服务端,并且 watcherGrpcStream 会针对每个 watch 异步启动一个 watchSubStream 进行对应 watch 下回调事件的监听和处理.

 

2.4 watch 回调链路

每当 watch 监听的数据发生变更后,etcd 服务端会通过 grpc 长连接将变更事件推送到 etcd 客户端.

etcd 客户端会通过常驻的 serveWatchClient 协程接收到 watch 回调事件,接下来 watcherGrpcStream 会根据回调事件所属的 watch 将其分配给对应的 watchSubStream,最终通过 endpointManager 的周转,并通过应用方持有的 upch,将回调事件推送到应用方的手中.

 

3 核心数据结构

聊完原理后,接下来进入源码走读环节,本周会介绍一下对 etcd 客户端模块涉及到的核心数据结构,各数据结构彼此间的关联关系如下图所示.

 

 

3.1 Client

Client 是对 etcd 客户端模块的抽象,其中包含了:

  • • Watcher:etcd 客户端监听回调模块

  • • conn:grpc 连接代理,用于后续和 etcd server 建立 grpc 长连接

type Client struct {
    // ... watch 监听回调模块
    Watcher
    // ...


    // grpc 连接代理. 用于后续和 etcd server 建立 grpc 长连接.
    conn *grpc.ClientConn


    // ...
}

 

3.2 endpointManager

endpointManager 是 etcd 的节点管理器,其中除了内置 etcd client 之外,每个 endpointManager 都会明确自身监听的 endpoint 范围,通过 target 字段进行标识.

type endpointManager struct {
    // Client is an initialized etcd client.
    client *clientv3.Client
    target string
}

 

3.3 watcher

watcher 是 etcd 客户端的监听回调模块,其中内置了用于和 grpc 服务端建立长连接的 remote; 此外,其中的 streams 字段是一个 map,里面通过 ctxKey 映射了多笔和服务端间通信的长连接代理对象 watchGrpcStream. 通常情况下,etcd 客户端会复用同一笔 watchGrpcStream.

type watcher struct {
    remote   pb.WatchClient
   
    // mu protects the grpc streams map
    mu sync.Mutex


    // streams holds all the active grpc streams keyed by ctx value.
    streams map[string]*watchGrpcStream
    // ...
}

 

3.4 watchGrpcStream

watchGrpcStream 是对 etcd 客户端和 etcd 服务端之间 grpc 长连接的抽象,同时也是用以处理应用方创建/删除 watch 请求以及服务端 watch 回调事件的中枢模块,其中核心字段包括:

  • • owner:标识了其所属的 watcher 模块

  • • remote:用于建立和服务端之间的通信长连接的桩代码客户端

  • • substreams:key 为 watchID,val 为对应了每个 watch 的子处理流 watcherStream

  • • reqc:用于接受应用方发送的创建或者删除 watch 请求

  • • respc:用于接受来自服务端的响应或者 watch 回调事件

type watchGrpcStream struct {
    owner    *watcher
    remote   pb.WatchClient
    // ...
    
    // substreams holds all active watchers on this grpc stream
    substreams map[int64]*watcherStream
    // ...


    // reqc sends a watch request from Watch() to the main goroutine
    reqc chan watchStreamRequest
    // respc receives data from the watch client
    respc chan *pb.WatchResponse
    // ...
}

 

3.5 watcherStream

watcherStream 是对某个特定 watch 的处理流的抽象,核心字段包括:

  • • initReq:应用方创建 watch 时的传递请求参数

  • • outc:将 watch 回调事件推送更上层 endpointManager 中时使用的 chan

  • • recv:用于接收来自 watchGrpcStream 分配的 watch 回调事件的 chan

  • • buf:用于缓存 watch 回调事件的缓冲区

type watcherStream struct {
    // initReq is the request that initiated this request
    initReq watchRequest


    // outc publishes watch responses to subscriber
    outc chan WatchResponse
    // recvc buffers watch responses before publishing
    recvc chan *WatchResponse
    // ...
    buf []*WatchResponse
}

 

4 创建 watch 链路

第 4 章以应用方通过 etcd 客户端 sdk 发出一笔创建 watch 请求的链路为主线进行源码走读.

4.1 整体链路

首先给出整体链路的流程图,分为 etcd 客户端向 etcd 服务端发出请求以及 etcd 客户端从 etcd 服务端接收响应两个方向:

  • • 应用方通过 etcd 客户端 sdk 向 etcd 服务端方向发出创建 watch 请求:

 

  • • etcd 客户端从 etcd 服务端接收到创建 watch 的响应,最终组装成一个 watch channel 返回到 endpointManager 手中:

 

4.2 endpointManager.NewWatchChannel

endpointManager.NewWatchChannel 方法是应用方创建 watch 的入口,实现对 endpointManager 指定 target 范围的数据进行 watch 监听.

在该方法中:

  • • 调用 Client.Get 方法,获取 target 数据范围的历史变更记录,如果应用方关注创建 watch 前的历史变更记录的话,则可以追溯到这部分变更事件

  • • 创建一个用于向应用方传递 watch 回调事件的 upch,提前返回到应用方手中

  • • 异步启动 endpointManager.watch 方法,持续监听更底层提供的 watch chan,取得 watch 回调事件后会将其投放到 upch 中供应用方获取.

func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
    key := m.target + "/"
    resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
    // ...


    // ...
    // 在创建 watcher 之初就获得的监听回调事件
    initUpdates := make([]*Update, 0, len(resp.Kvs))
    for _, kv := range resp.Kvs {
        var iup internal.Update
        if err := json.Unmarshal(kv.Value, &iup); err != nil {
            // ...
            continue
        }
        up := &Update{
            Op:       Add,
            Key:      string(kv.Key),
            Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata},
        }
        initUpdates = append(initUpdates, up)
    }


    upch := make(chan []*Update, 1)
    if len(initUpdates) > 0 {
        upch <- initUpdates
    }
    
    // 开启守护协程处理监听回调事件
    go m.watch(ctx, resp.Header.Revision+1, upch)
    return upch, nil
}

 

4.3 endpointManager.watch

该方法是 endpointManager 为一个 watch 启动的监听协程,核心逻辑包括:

  • • 通过 m.client.Watch 方法,获取到 watch 对应的 watch channel

  • • 基于 for + select 模型持续监听 watch channel,接收到来自更底层的 watch 回调事件后,会将其投放到 upch 中供更上层的应用方接收.

func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) {
    defer close(upch)


    opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
    key := m.target + "/"
    wch := m.client.Watch(ctx, key, opts...)
    for {
        select {
        // ...
        case wresp, ok := <-wch:
            // ...
            deltaUps := make([]*Update, 0, len(wresp.Events))
            for _, e := range wresp.Events {
                var iup internal.Update
                var err error
                var op Operation
                switch e.Type {
                case clientv3.EventTypePut:
                    err = json.Unmarshal(e.Kv.Value, &iup)
                    op = Add
                    if err != nil {
                        // ...
                        continue
                    }
                case clientv3.EventTypeDelete:
                    iup = internal.Update{Op: internal.Delete}
                    op = Delete
                default:
                    continue
                }
                up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}}
                deltaUps = append(deltaUps, up)
            }
            if len(deltaUps) > 0 {
                upch <- deltaUps
            }
        }
    }
}

 

4.4 watcher.Watch

watcher.Watch 的核心逻辑包括:

  • • 倘若长连接代理对象 watcherGrpcStream 未初始化,调用 watcher.newWatcherGrpcStream 方法完成其初始化动作. watcherGrpcStream 本身是有生命周期的,在 watcher.newWatcherGrpcStream 方法中,会异步开启一个协程负责 watcherGrpcStream 的运行

  • • 将本次创建/删除 watch 的请求投递到 watcherGrpcStream 的 reqc 当中,供 watcherGrpcStream 运行协程处理

  • • 当创建 watch 请求处理完成后,对应的 watch channel 会通过对应 watch 的 watcherSubStream 从 watchRequest.retc 中传出. watcher.Watch 方法会持续监听 watchRequest.retc,直到取得创建好的 watch channel 后,返回给更上层的 endpointManager

 

// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
    // ...
    wr := &watchRequest{
        ctx:            ctx,
        createdNotify:  ow.createdNotify,
        key:            string(ow.key),
        end:            string(ow.end),
        rev:            ow.rev,
        progressNotify: ow.progressNotify,
        fragment:       ow.fragment,
        filters:        filters,
        prevKV:         ow.prevKV,
        retc:           make(chan chan WatchResponse, 1),
    }


    ok := false
    ctxKey := streamKeyFromCtx(ctx)
    var closeCh chan WatchResponse
    for {
        // find or allocate appropriate grpc watch stream
        w.mu.Lock()
        // ...
        // 通过 ctxKey 标识应用方的身份,创建对应于应用方的 grpcStream 长连接
        wgs := w.streams[ctxKey]
        if wgs == nil {
            wgs = w.newWatcherGrpcStream(ctx)
            w.streams[ctxKey] = wgs
        }
        donec := wgs.donec
        reqc := wgs.reqc
        w.mu.Unlock()
        // ...


        // submit request
        select {
        // 提交创建 watcher 的请求
        case reqc <- wr:
            ok = true
        // ...
        }


        if ok {
            select {
            // 创建 watcher 成功后,返回推送监听事件的 chan (ret)
            case ret := <-wr.retc:
                return ret
            // ...
        }
        // ...
    }
    // ...
}

 

4.5 watchGrpcStream.run

在 watcher.newWatcherGrpcStream 方法中:

  • • 构造了一个 watcherGrpcStream 实例

  • • 调用 watcherGrpcStream.run 方法,异步启动了 watcherGrpcStream 的运行协程

func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
    ctx, cancel := context.WithCancel(&valCtx{inctx})
    wgs := &watchGrpcStream{
        owner:      w,
        remote:     w.remote,
        // ...
        substreams: make(map[int64]*watcherStream),
        respc:      make(chan *pb.WatchResponse),
        reqc:       make(chan watchStreamRequest),
        // ...
    }
    go wgs.run()
    return wgs
}

 

在 watchGrpcStream 的运行协程中:

  • • 通过 watchGrpcStream.newWatchClient 方法,构造了和 etcd 服务端之间的通信长连接

  • • 通过 for + select 模型,持续轮询处理来自更上层的请求以及来自更底层的响应

  • • 持续从 reqc 中接收来自更上层的请求. 每遇到一个创建 watch 的请求,会调用 go watchGrpcStream.serveSubstream 方法,异步启动一个服务于这个 watch 的 subStream

  • • 持续从 respc 中接收来自 etcd 服务端的响应事件,并且调用 watch.dispatchEvent 方法,根据事件归属的 watch 将其分配给所属的 watch subStream

func (w *watchGrpcStream) run() {
    var wc pb.Watch_WatchClient
    
    // 和 etcd grpc server 交互,构造 watch 客户端.
    if wc, closeErr = w.newWatchClient(); closeErr != nil {
        return
    }


    // ...
    var cur *pb.WatchResponse
    for {
        select {
        // 上层调用 Watch 方法创建监听器时,此处会接收到 watchRequest
        case req := <-w.reqc:
            switch wreq := req.(type) {
            case *watchRequest:
                outc := make(chan WatchResponse, 1)
                // TODO: pass custom watch ID?
                ws := &watcherStream{
                    initReq: *wreq,
                    id:      InvalidWatchID,
                    outc:    outc,
                    // unbuffered so resumes won't cause repeat events
                    recvc: make(chan *WatchResponse),
                }


                ws.donec = make(chan struct{})
                w.wg.Add(1)
                // 为每个一个 watch 启动一个 subStream 
                go w.serveSubstream(ws, w.resumec)
                // queue up for watcher creation/resume
                w.resuming = append(w.resuming, ws)
                if len(w.resuming) == 1 {
                    // 发送创建 watch 的请求
                    if err := wc.Send(ws.initReq.toPB()); err != nil {
                     // ...
                    }
                }
               // ...
            }


        // 从服务端接收到响应
        case pbresp := <-w.respc:
            if cur == nil || pbresp.Created || pbresp.Canceled {
                cur = pbresp
            } else if cur != nil && cur.WatchId == pbresp.WatchId {
                // 属于当前 watch 的响应事件
                cur.Events = append(cur.Events, pbresp.Events...)            
                cur.Fragment = pbresp.Fragment
            }


            switch {
            case pbresp.Created:
                // 将事件分配给对应的 watch subStream
                if len(w.resuming) != 0 {
                    if ws := w.resuming[0]; ws != nil {
                        w.addSubstream(pbresp, ws)
                        w.dispatchEvent(pbresp)
                        w.resuming[0] = nil
                    }
                }
                // ...             
            default:
                // 将事件分配给对应 watch subStream
                ok := w.dispatchEvent(cur)              
            }
        }
    }
}
    

 

// dispatchEvent sends a WatchResponse to the appropriate watcher stream
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
    events := make([]*Event, len(pbresp.Events))
    for i, ev := range pbresp.Events {
        events[i] = (*Event)(ev)
    }
    // TODO: return watch ID?
    wr := &WatchResponse{
        Header:          *pbresp.Header,
        Events:          events,
        CompactRevision: pbresp.CompactRevision,
        Created:         pbresp.Created,
        Canceled:        pbresp.Canceled,
        cancelReason:    pbresp.CancelReason,
    }


    // ...
    return w.unicastResponse(wr, pbresp.WatchId)
}

 

// unicastResponse sends a watch response to a specific watch substream.
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
    ws, ok := w.substreams[watchId]
    if !ok {
        return false
    }
    select {
    case ws.recvc <- wr:
    case <-ws.donec:
        return false
    }
    return true
}

 

4.6 watchGrpcStream.openWatchClient

4.5 小节中聊到,当 watchGrpcStream 被初始化时,会在运行协程中同步调用 watchGrpcStream.newWatchClient 方法,此方法主要完成两项任务:

  • • 同步调用 watchGrpcStream.openWatchClient -> watchClient.Watch 方法链,建立和 etcd 服务端之间的 grpc 长连接

  • • 异步启动 watchGrpcStream.serveWatchClient 方法,创建出一个接收协程,持续轮询 grpc 长连接,接收处理 etcd 服务端侧返回的响应.

func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
    // ...
    wc, err := w.openWatchClient()
    // ...
    go w.serveWatchClient(wc)
    return wc
}

 

func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
    // ...
    for {
        // ...
        if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
            break
        }
        // ...
    }
    return ws, nil
}

 

func (c *watchClient) Watch(ctx context.Context, opts ...grpc.CallOption) (Watch_WatchClient, error) {
    stream, err := c.cc.NewStream(ctx, &_Watch_serviceDesc.Streams[0], "/etcdserverpb.Watch/Watch", opts...)
    if err != nil {
        return nil, err
    }
    x := &watchWatchClient{stream}
    return x, nil
}

 

当 etcd 服务端完成创建 watch 的处理后,对应的响应便会在接收协程中通过pb.Watch_WatchClient.Recv 方法接收到,并将其投递到 watchGrpcStream 的 respc 中,供 watchGrpcStream 的运行协程接收处理.

func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
    for {
        resp, err := wc.Recv()
        // ...
        select {
        case w.respc <- resp:
        case <-w.donec:
            return
        }
    }
}

 

如 4.5 小节中提到的,当 watchGrpcStream 的运行协程通过 respc 接收到 create 类型的响应事件时,也会通过 watchGrpcStream.dispatchEvent 方法,通过将其投递到 subStream 的 recvc 当中,分配给到其所属 watch 对应的 subStream.

func (w *watchGrpcStream) run() {
    var wc pb.Watch_WatchClient
    
    // 和 etcd grpc server 交互,构造 watch 客户端.
    if wc, closeErr = w.newWatchClient(); closeErr != nil {
        return
    }


    // ...
    var cur *pb.WatchResponse
    for {
        select {
        // ...
        // 从服务端接收到响应
        case pbresp := <-w.respc:
            if cur == nil || pbresp.Created || pbresp.Canceled {
                cur = pbresp
            } // ...


            switch {
            case pbresp.Created:
                // 将事件分配给对应的 watch subStream
                if len(w.resuming) != 0 {
                    if ws := w.resuming[0]; ws != nil {
                        w.addSubstream(pbresp, ws)
                        w.dispatchEvent(pbresp)
                        w.resuming[0] = nil
                    }
                }
                // ...             
            // ...      
            }
        }
    }
}

 

 

4.7 watchGrpcStream.serveSubstream

当某个 watch 对应的 subStream 通过 recvc 接收到 create 类型的 watchResponse,标志着 etcd 服务端侧已经完成了对创建 watch 请求的处理,此时 subStream 会将在 watchGrpcStream 运行协程提前创建好的 outc(endpointManager 所需要的 watch channel)投递到 subStream.initReq.retc 当中,呼应了 4.4 小节 watcher.Watch 方法监听 watchRequest.retc 的动作.

func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
    // ...
    emptyWr := &WatchResponse{}
    for {
        // ...
        case wr, ok := <-ws.recvc:
            if !ok {
                // shutdown from closeSubstream
                return
            }


            if wr.Created {
                if ws.initReq.retc != nil {
                    ws.initReq.retc <- ws.outc
                    // ...
                }
            } else {
                // current progress of watch; <= store revision
                nextRev = wr.Header.Revision + 1
            }


            // ... 
            ws.buf = append(ws.buf, wr)


            // ...
        }
    }
    // lazily send cancel message if events on missin   

 

5 watch 回调链路

当应用方 watch 监听的数据发生变更后,etcd 服务端会通过 grpc 长连接将 watch 回调事件发送到 etcd 客户端. 这部分细节我们暂且按下不表,本章我们来着重分析一下,在 etcd 客户端接收到 watch 回调事件后,如何自底向上,一步步将其精准地交付到所属 watch 的应用方手中.

 

5.1 整体链路

watch 回调事件的起点和创建 watch 请求正好相反,是以 etcd 客户端接收协程 serveWatchClient 接收到来自 etcd 服务端的回调事件为起点,以 endpointManager 通过对应 watch subStream 的 outc(watch channel)接收到 watch 回调事件,并将其投递到 upch 中供应用方消费为终点. 整体方法链路图如下:

5.2 watchGrpcStream.serveWatchClient

在常驻的接收协程 watchGrpcStream.serveWatchClient 的轮询过程中,倘若通过 grpc 长连接接收到来自 etcd 客户端的 watch 回调事件,则会将其投递到 watchGrpcStream 的 respc 中,供运行协程 watchGrpcStream.run 进行消费处理.

 

func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
    for {
        resp, err := wc.Recv()
        // ...
        select {
        case w.respc <- resp:
        case <-w.donec:
            return
        }
    }
}

 

5.3 watchGrpcStream.run

在 watchGrpcStream 的运行协程通过从 resp 接收到来自 etcd 服务端的回调事件后,会根据事件所属的 watch 维度对事件进行聚合,然后通过 watchGrpcStream.dispatchEvent -> watchGrpcStream.unicastResponse 的方法链路,将事件分发到对应 watch subStream 的 recvc 当中.

func (w *watchGrpcStream) run() {
    var wc pb.Watch_WatchClient
    
    // ...
    var cur *pb.WatchResponse
    for {
        select {
        
        // 从服务端接收到响应
        case pbresp := <-w.respc:
            if cur == nil || pbresp.Created || pbresp.Canceled {
                cur = pbresp
            } else if cur != nil && cur.WatchId == pbresp.WatchId {
                // 属于当前 watch 的响应事件
                cur.Events = append(cur.Events, pbresp.Events...)            
                cur.Fragment = pbresp.Fragment
            }




            switch {
            // ...            
            default:
                // 将事件分配给对应 watch subStream
                ok := w.dispatchEvent(cur)              
            }
        }
   }
}

 

// dispatchEvent sends a WatchResponse to the appropriate watcher stream
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
    events := make([]*Event, len(pbresp.Events))
    for i, ev := range pbresp.Events {
        events[i] = (*Event)(ev)
    }
    // TODO: return watch ID?
    wr := &WatchResponse{
        Header:          *pbresp.Header,
        Events:          events,
        CompactRevision: pbresp.CompactRevision,
        Created:         pbresp.Created,
        Canceled:        pbresp.Canceled,
        cancelReason:    pbresp.CancelReason,
    }




    // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
    // indicate they should be broadcast.
    if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
        return w.broadcastResponse(wr)
    }




    return w.unicastResponse(wr, pbresp.WatchId)
}

 

// unicastResponse sends a watch response to a specific watch substream.
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
    ws, ok := w.substreams[watchId]
    if !ok {
        return false
    }
    select {
    case ws.recvc <- wr:
    case <-ws.donec:
        return false
    }
    return true
}

 

5.4 watchGrpcStream.serveSubstream

watch subStream 的运行协程中,会通过 recvc 接收到 watch 回调事件,然后根据 watch 维度进行聚合,将事件追加到缓冲区 watcherStream.buf 当中. 之后每轮循环会将 buf 中的事件推送到 outc(watch channel)当中,供更上层的 endpointManager 接收处理.

func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
    // ...
    emptyWr := &WatchResponse{}
    for {
        curWr := emptyWr
        outc := ws.outc


        if len(ws.buf) > 0 {
            curWr = ws.buf[0]
        } else {
            outc = nil
        }
        select {
        case outc <- *curWr:
            if ws.buf[0].Err() != nil {
                return
            }
            ws.buf[0] = nil
            ws.buf = ws.buf[1:]
        case wr, ok := <-ws.recvc:
            if !ok {
                // shutdown from closeSubstream
                return
            }


            if wr.Created {
                // ...
            } else {
                // current progress of watch; <= store revision
                nextRev = wr.Header.Revision + 1
            }


            // ... 
            ws.buf = append(ws.buf, wr)


            // ...
        }
    }
    // lazily send cancel message if events on missin  

 

5.5 endpointManager.watch

最后,endpointManager 的监听协程在接收到来自 watch channel 的 watch 回调事件后,会将其投递到 upch 中,供应用方接收处理,呼应了本文 4.2 小节中的 endpointManager.NewWatchChannel 方法.

func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) {
    defer close(upch)




    opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
    key := m.target + "/"
    wch := m.client.Watch(ctx, key, opts...)
    for {
        select {
        // ...
        case wresp, ok := <-wch:
            // ...
            deltaUps := make([]*Update, 0, len(wresp.Events))
            for _, e := range wresp.Events {
                var iup internal.Update
                var err error
                var op Operation
                switch e.Type {
                case clientv3.EventTypePut:
                    err = json.Unmarshal(e.Kv.Value, &iup)
                    op = Add
                    if err != nil {
                        // ...
                        continue
                    }
                case clientv3.EventTypeDelete:
                    iup = internal.Update{Op: internal.Delete}
                    op = Delete
                default:
                    continue
                }
                up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}}
                deltaUps = append(deltaUps, up)
            }
            if len(deltaUps) > 0 {
                upch <- deltaUps
            }
        }
    }
}

 

6 总结

本文和大家一起梳理了 etcd 客户端模块对 watch 机制的原理. etcd 客户端和 etcd 服务端通过 grpc 长连接保持通信,并在客户端侧巧妙运用了 goroutine 和 channel 的能力,构筑了一套边界清晰的松耦合交互架构. 其中总共包括四部分常驻协程,彼此之前通过 for + select 的方式建立了异步通信机制:

  • • endpointManager:为每个 watch 开启一个监听协程,通过 watch channel 接收来自底层 watch subStream 传递的 watch 回调事件,将其投递到 upch 中,供应用方接收处理

  • • watchGrpcStream:是 etcd 客户端与 etcd 服务端间长连接的代理对象. 通常情况下全局复用同一个实例. watchGrpcStream 一方面负责接收来自上层应用方发送的创建/取消 watch 等请求,将其通过 grpc 长连接发往 etcd 服务端;一方面负责接收来底层接收协程 serveWatchClient 传递的 etcd 服务端的响应事件,将其通过 recvc 分配到对应 watch 的 subStream 中

  • • watcherSubStream:是对应于每个 watch 存在一个处理协程,负责通过 recvc 接收来自 watchGrpcStream 分配的事件,并投递到对应的 channel 中供上层 endpointManager 接收处理.(如果是创建 watch 响应事件会将其投递到 retc 中;如果是 watch 回调事件,会投递到 outc (watch channel)中)

  • • serveWatchClient:是与每个 watchGrpcStream 一一对应的接收协程. 负责轮询 grpc 长连接,接收来自 etcd 服务端的响应事件,并通过 respc 传递给 watchGrpcStream.

到目前为止,etcd 服务端侧对于 watch 机制的实现链路对于我们而言仍然是个黑匣子,其底层的实现细节,我将在下周分享的”etcd watch 机制源码解析——服务端篇“当中为大家揭晓.


继续滑动看下一个
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存