Canal 如何保证数据库库事务的一致性
点击上方“中间件兴趣圈”,选择“设为星标”
做积极的人,越努力越幸运!
1、环形缓存区
关系型数据库讲究的是ACID 4个特性,故引入了数据库事务的概念,一个数据库事务中的多条SQL引发的多条数据变更要么全部成功,要么全部失败,即数据的一致性,那同样在数据同步的场景,在解析一个事务的 binlog 日志时,一次数据同步应该至少以事务为单位,一个事务内的所有 Event 应该作为一个批次提交到数据消费端,让消费端有能力一次同步一个事务中的数据,而不是一条一条变更日志的处理,这样容易造成数据不一致。
环形缓存区的引用就是为了解决将一个事务的完整数据一次提交到消费端,既然是多条消息,故一定需要用到缓存,环形缓存区就在这样的背景下被引入。
在 Canal 中关于事务 Event 的环形缓存区实现类为 EventTransactionBuffer。
1.1 类图
EventTransactionBuffer 的类图如下:
int bufferSize环形缓存区的长度,默认为 1024,该长度必须为 2 的幂次方,因为对位运算非常友好。
int indexMask环形缓存区下标掩码,其值为 bufferSize - 1 ,sequence * indexMask 能快速定位序号 sequence 所在环形缓存区中的具体下标。
CannalEntry.Entry[] entries环形缓存区数据数组,即缓存区实际存储数据的内存区域,为数组结构,长度为 bufferSize。
AtomicLong putSequence当前写入的序号,每调用 add 方法添加一条数据,该值增加一,可超过缓存区的实际长度。
AtomicLong flushSequence当前已处理的数据序号,flushSequence <= putSequence,(putSequence - flushSequence)表示未处理的数据,即缓存区累积的有效数据。
TransactionFlushCallback flushCallbackflush 回调函数,这个和环形缓存区本身关系不大,这个与 Canal 特定业务的,环形缓存区中收集到一个完整的事务变更日志列表后,将这部分内容传入业务回调方法,并重新利用这些缓存空间。
环形缓存区的重大要义就是循环利用。
1.2 环形缓存区存储实现
接下来我们通过其 add 方法来看一下环形缓存区的,在研究环形缓存区之前,将结合8个元素的环形缓存区进行讲解。
检测当前环形缓存区是否已满,如果未满,则向缓存区中添加一条数据。添加数据的具体逻辑:
获取下一个写入的序号 next,等于当前已写入的序号 + 1,即 putSequence + 1。
通过 next & indexMask 取得放入 CannalEntry.Entry[] entries 中的下标,与 next % bufferSize 效果等同。
如果已满,则首先将缓存区中的数据刷新,即将未处理的数据全部抽取,提交到数据消费方,然后释放缓存区,继续添加数据。
关键在于如何判断环形缓存区已满,具体算法如下:
再回到本示例中,一个事务只包含5条日志,在写满 5条日志后会即调用 flush 方法,将环形缓存区中下标为 0~4 的消息传入数据消费方,在 Canal 中会将这批消息一次传入 EventSink 组件。执行完 flush 方法后,flushSequence 等于4,其环形缓存区如下图所示:
思考,Canal 基于环形缓存区的实现,一定能保证一个事务的所有变更日志都一次提交到 EventSink 组件吗,大家可以简单思考一下,在文末的总结部分有笔者的思考。
答案是否定的,如果一个事务包含的日志条目超过了环形缓存区的长度,为了保证数据不丢失,会首先将环形缓存区的数据全部提交,然后接收新的数据,这样一个事务中的消息会被分成多次提交到 EventSink。
原创不易,如果对你有所帮助请你为本文点个【在看】吧,这将是我写作更多优质文章的最强动力。
欢迎加入我的知识星球,一起交流源码,探讨架构,揭秘亿级订单的架构设计与实践经验,打造高质量的技术交流圈,为广大星友提供高质量问答服务,长按如下二维码加入。