查看原文
其他

Message Lifecycle:Pulsar 里的信息传递究竟是什么样子

胖鱼 StreamNative 2021-10-19


上周日,TGIP-CN 002 由郭斯杰继续为我们带来了关于「Message Lifecycle」的分享。首先来回顾一下 Pulsar 最近更新的新特性。


主要有三个方面:


1.Producer Group
https://github.com/apache/pulsar/pull/6209

2.KeyValue Schema
https://github.com/apache/pulsar/pull/6325

3.Memory usage based throttling
https://github.com/apache/pulsar/pull/6178


后续大家还想了解关于 Pulsar 的任何问题,都可以去下边这个 repo 下提 issue。同时关于每一次直播的各种参考细节,也可以在此 repo 下找到。
https://github.com/streamnative/tgip-cn


接下来我们就一起看看 Pulsar 里的数据流动细节吧。




Data Flow

数据如何从生产端向消费端流动


>>> Pulsar 集群


1. Brokers + Bookies

前边我们提到过,broker 是各零件之间进行交换的对象。因 Pulsar 为分层架构模式,使用了 BookKeeper 作为额外的存储系统,bookies 就是 BookKeeper 里的存储节点。


Brokers+Bookies 构成 Pulsar 的两个层次,共同完成了 Pulsar 的数据服务。


Broker:整个消息层的生产和消费,无存储状态

Bookie:数据持久化保存的节点,有存储状态


2. ZooKeeper

ZooKeeper 在 Pulsar 里的作用是存储 Pulsar 系统里元数据的存储和集群的管理以及节点的发现等,节点发现是指发现集群里有多少个 broker,有多少 bookie。


ZooKeeper 作为元数据系统,有好有坏,当然 Pulsar 也不会完全依赖于 ZooKeeper,目前 Pulsar 有很好的元数据抽象,你可以把 ZooKeeper 换为其他的强一致系统,比如 ETCD。


以上这三个组件,构成了 Pulsar 的集群。Brokers+bookies 为数据服务,ZooKeeper 为元数据服务。



>>> 数据流动过程


Producer 通过生产消息到一个 topic ,一个 topic 中可能有 N 个 partition,每个 partition 给一个 broker 服务;那 consumer 会根据要消费的 topic 以及具体的 partition 找到所服务的 broker 并进行消费。



1. Write Path(写过程)


数据的生产是怎么发生的呢?首先客户端会调用 Pulsar 提供给客户端的 API,这就是 producer。创建消息后,可以放进 pillow 里,也可以指定 key,放在一条消息里面,这条消息通过应用客户端传给 producer。


生产端的内部有一个叫 MessageWriter 的类,这个 MessageWriter 默认是 round-robin 的过程,相当于每发一条消息会轮询的去找一个 partition 去发送,为了提高效率,在一定的时间内只会选择一个 partition。


有时你会给 message 指定 key ,一旦指定它会根据 key 的 hash 去找出相应的 partition,如果你觉得 Pulsar 提供的  MessageWriter 不是很好,也可以自定义 MessageWriter。



一旦 partition 被选择出来,客户端会跟 broker 打交道,找出这个 partition 究竟是为哪个 broker 服务,这个寻找的过程叫做 Topic Discovery。Broker 收到消息后会调用 BookKeeper 的客户端并发去写多个副本。


整个并发写的流程是被封装到 BookKeeper 的客户端中,所以可以认为是 broker 调用 BookKeeper 的客户端去写 bookies,当 broker 收到了两个副本的 ACK 之后它会认为这条消息已经写成功,broker 会返回客户端,告知这条消息已经被持久化完成。


整个写的过程消息是从 producer 写到 broker,然后经过 broker 到 BookKeeper 上。整个过程中客户端都不会跟 ZooKeeper 打交道,你也不会跟 BookKeeper 打交道,唯一打交道的只有 broker。


2. Read Path(读过程)


a. Broker 已经缓存了消息

Broker 可能已经缓存了部分消息,这时 consumer 在订阅某个 topic 时根据订阅的策略来消费哪几个 partition ,比如 Failover 订阅是消费其中的某一个 partition,这时它同样会经过  Topic Discovery 的过程,找到 consumer 要连接的 broker 。



Consumer 在连接到 broker 后建立长连接,这些消息完全存到 bookie 后,从内存里拿出来通过推得方式 dispatch 给 consumer,consumer 收到消息后会放到消费端的 receiver queue ,之后应用端可以去 receiver queue 消费数据,消费完成后,consumer 会重新发起一个 flow request。


b. Broker 已经没有缓存消息

上边这种方式整个延迟是比较短的,但如果需要做回滚消费的操作,这时候 broker 已经不再缓存这部分数据,而去从 bookie 去读数据,数据读出后再 dispatch 给 consumer,这个读取过程可以选择任意一个存储节点去读消息。



整个存储架构没有主节点的说法,可以避免某个节点由于网络或者性能导致的高延迟读取,这也是分层存储带来的好处。那 broker 如何从 bookie 中读消息呢?其实是在 BookKeeper Client 中完成。


3.Failure Handing(故障处理)



a. Producer 端产生 failure

当出现「发消息网络断开、broker crash」等情况时,这个时候 producer 有 pending queue ,会在设置的 timeout 期间内对断开网络的连接进行重试。


b. Broker 端产生 failure

因为 broker 是没有状态的,所以它不保存任何数据,一旦 crash 后,topic 的 ownership 会被其他的 broker 掌管,这时候你的服务可以快速被恢复。


c. 存储节点产生 failure

存储节点只管理存储,如果只挂掉一个 bookie ,broker 是不会被感知的,除非所有的 bookie 都挂掉,没有足够的副本数量去写入数据。


正常情况下挂掉一两台节点,broker 会感知到然后新开一个 segment 到其他 bookie 上写入数据,因此 broker 挂掉不会影响数据的写入,也不会影响读取。


d. Consumer 端产生 failure

当 broker 挂掉,直到整个 TCP 断开,然后会重连到新的 broker 上去。


所以,Data Flow 总的来说就三大块:Producer、Broker、Consumer。Broker 与生产者和消费者联系沟通,Bookie 与 broker 进行沟通相连,以此来完成从生产端到消费端的处理。




Data Retention

数据保留/什么时候被确认和删除


>>> Subscription Initial Position


我们在上一期的直播时也提到过关于「Subscription」订阅和「Cursor」游标的概念。


消费者连接订阅时,如果这个订阅原来不在 Pulsar 系统里存在,那么应如何创建 cursor 呢。这里就出现了两个概念:Earliest /Latest。


Earliest:如果订阅对应的游标不存在,如果你在连接接消费者订阅时,指定 earliest,它会放到整个流的有效数据第一条(最早的数据)。


Latest:链接消费者订阅时,指定 latest,它会放到整个流的有效数据最末端。


默认行为是 latest,与 Kafka 的默认行为(类似于 earliest)是有区别的,所以这就是为什么一些从 Kafka 转过来的用户产生迷惑的地方。


Cursor 放置的位置,决定了你最终消费了什么数据。


Unsubscribe


一旦消费完毕,你不需要这些消息了,需要进行 unsubcribe,将 cursor 从这个流里拿掉,这样才能做到数据的完全清除。因为 cursor 不处理掉的话,这部分数据是会被保留下来的。


>>> Message Retention


Pulsar 是基于订阅的消息系统,一旦有订阅存在,订阅以后的所有消息都是需要被保留的,不能删除。



消费位置最早的订阅决定了你能保留消息多久。订阅之前的消息可被删除,这是 Pulsar 的默认行为,即消费完就可以被删除,释放空间留给之后的消息使用。


由于流计算的需求,有些数据消费完还不能删除,需要再额外保留个三五天。这种情况下,就需要有 retention 来进行数据保留设置。



添加 retention 后,该部分数据可以保留多大内存/多少天,应用在紫色区域的数据。前提是你的数据已被所有的 subscription 消费完毕,retention 设置才可以生效。


>>> TTL



由于后边未开始被消费的订阅还没有处理完全,所以仍需要进行操作。这些未处理消息没有被删除,但是所有的 consumer 均已下线,那么消息是永远不会被确认的。这样消息就会被一直累积。



为了保证整个生产线的正常运作,可以在这些「橘色消息」部分添加 TTL。TTL 作用范围是没被确认/消费的消息,通过移动 cursor 来实现确认目标。


TTL 的好处在于,可以保证消息的过期时间与 retention 的过期时间保持一致。一旦消息自动过期,会进入 retention 的处理流程,然后再利用 TTL 的特性进行数据清除。


补充概念:Message backlog 是指还没有被处理/消费的部分消息。Topic level 下的 backlog 是整个 topic 下最大的 backlog。


>>> Message Deletion



正常情况下,消息过期就会被删除,比如上图的 5-9 就会被删除。但事实 Pulsar 并不会这么处理。



由于 Pulsar 的分片架构模式,所以删除的行为不是「按个进行」,而是「按片进行」。上图中 S2 部分,有部分数据是已经过期需要删除的。在 Pulsar 的实际处理过程中,是不把 Msg 8和9 进行过期处理的,它会根据 S1-S2 之间的间隔进行过期分离然后处理掉。



所以真正意义上的 retention 是遵循了分片结构的模式,即 8、9、10 仍会被保留。


>>> Storage Size


Storage size 是计算所有没被删除的 segment 所占用的存储空间。所以跟上文提到的 Pulsar message deletion 一样,需要整个分片都被填满后/周期滚动,旧的 segment 才会被删除。


所以整个存储空间是按 segment 之间的存储力度进行计算的,同时 garbage collector 是在后台定期执行的,比如 1 小时/15 天。所以这就是为什么有时你发现 segment 已经被拿掉,但是 storage size 依旧没有变化的原因。




总结


综上来看,通过 data flow 和 data retention 的各组件概念+图例说明,进一步向大家展示了在 Pulsar 系统里,消息是如何在生产者和消费者之间传递,同时又是何时被确认和被删除的。


如果你想仔细回顾全篇内容,可以查看下方视频。想要获取更多关于本次直播的参考信息,可以查看:https://github.com/streamnative/tgip-cn/tree/master/episodes/002



希望通过以上的讲解,可以让大家对 Pulsar 的处理过程更加清晰明朗。


--- 我是分割线---


各位热爱 Pulsar 的小伙伴:


长期以来,为了第一时间与你分享 Pulsar 的最新资讯,我们每天早上在微信群发布「Pulsar 日报」,内容涵盖与 Pulsar 相关的研发进展、用户案例、技术博客和行业热点等。


为了更好地为你提供优质的服务,我们想了解一下:



我们会根据此投票结构,调整「Pulsar 日报」发布时间。非常感谢你的宝贵意见!


点击「阅读原文」可获取 TGIP-002 的 slides

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存