Canal 初次启动时如何定位同步位点(文末附流程图)
点击上方“中间件兴趣圈”,选择“设为星标”
做积极的人,越努力越幸运!
1、Canal定位启动位点
在一个 Canal Instance 实例启动时,在向 MySQL 发送 dump 命令之前,首先先得计算该从 binlog 的什么位置开始同步,初次启动时如何寻找位点等。其代码如下图所示:
从这里可以看成,将调用 findStartPosition 方法查找启动时需要从那个位置开始同步 binglog ,该方法是一个抽象方法,具体实现在其子类中,我们将重点关注一下其子类 MysqlEventParser。
在MySQL中定位binlog日志可以分为gtid、binlog文件名+position两种方式,故Canal查找position的方式也分两种情况进行展开,由于篇幅问题,本节将暂不考虑gtid。
这里主要是调用 findStartPositionInternal 方法进行查找位点,这里还有一个标记 needTransactionPosition,表示查出来的位点是不是一个事务的开始或结束。
接下来重点探讨 Canal在启动时如何定位解析位点的。
1.1 查找位点
Step1:使用位点存储管理器中查看已解析过的位点数据,Canal 提供了多种日志管理实现,这部分稍后会详细展开。
Step2:这里分如下两种情况
如果日志位点管理器(LogPositionManager)中并未存储相关的位点信息,例如初次启动时的处理逻辑。
如果日志位点管理器中已存储相关的位点信息的处理逻辑。
由于初次启动时日志位点管理器并没有存储其位点信息,故我们先看位点管理器并未存储位点的情况。
Step3:如果当前连接的是主节点,则尝试使用 masterPosition,如果当前连接的是从节点(发生了切换),即使用 standbyPosition,那这两个位点信息是从哪来的的呢?原来在 Canal Instance 实例启动之前,可以手动通过 positions 属性手动设置开始解析位点。
Step4:如果在启动时未手动设置初始解析位点,则从当前 binlog 日志最后的位点开始同步,其实现原理是向 MySQL 服务器发送 show master status\G 命令,其命令输出结果如下图所示:
接下来再关注一下如果从日志位点管理器中查找到位点的处理逻辑,在进入该流程的探究之前,先看一下表示位点的实体类,一睹其结构。
会在 LogIdentity 中记录该日志位点是由哪个 slaveId 以及所连接的 MySQL 服务器信息。
Step5:如果从日志位点管理器中查询到位点,则需要判断当前连接的服务器地址与日志位点中记录的是否一致,如果不一致则说明发生了故障切换,为了确保数据不丢失,提供了回退时间的机制,其具体实现关键点如下:
如果解析 dump 出现的次数超过其阔值,可能是基于VIP模式发生了漂移,此时可以根据 serverId 来判断是否发生了切换,如何切换了,则按时间回退来重新寻找位点。
如果查找到的位点连接的信息与当前连接的信息不符合,说明发生了切换,则需要回退指定的时间,即根据时间区重新定位位点,至于回退多久的时间,可以通过参数 fallbackIntervalInSeconds 进行设置,默认为 60s。
Canal Instance 启动时如何定位同步位点的流程就介绍到这里了,接下来我们再来看一下 Canal 如何基于时间戳来定位 binlog 位点。
为了流程的完整性,在学习如何根据时间戳查找binlog位点之前,我们先来看一下从位点管理器中查询到对应的位点信息后的处理流程。
1.2 基于时间戳从查找 binlog 位点
基于时间戳查找 binlog 位点的实现方法为 MysqlEventParser 的 findByStartTimeStamp,接下来我们来看一下其实现原理。
Step1:首先先查询最大的位点与最小位点,最小位点可发送SQL:show binlog events limit 1。
Step2:然后从最后一个文件开始,尝试根据开始时间戳进行日志查找,等下会详细介绍如果从一个binlog日志定位 endposition。
Step3:如果找到一个合适的endposition,则结束寻找。如果没有找到一个合适的endposition,则尝试向前一个文件进行解析,首先解析出要查找的最小文件的名称,例如(mysql-bin.000036),从文件名称序号,然后减1,再判断该文件名是否小于这次可查找的最小文件名,如果不大于,则向前继续选择,否则结束查找,返回null。
接下来我们看一下如果在一个binlog文件中根据时间戳查找合适的位点。
通过向 MySQL Master 发送 dump 命令,建立连接,一条一条从 binlog 日志中解析事件,一条日志日志进行匹配,每从master获取一个logevent,调用 SinkFunction 的 seek 方法。
Step1:如果 justForPositionTimestamp 参数为 true,表示在查询位点时只考虑时间戳,并不考虑事务,在按开始时间戳寻找的方法中该参数为 false,即不会进入该方法。
Step2:获取当前日志的基本信息,例如所在的binlog日志文件、日志偏移量、日志写入时间戳、master serverId。
Step3:如果记录日志的时间戳大于等于待查找的时间戳,返回 false,停止在文件中的停止,是否继续查找其他文件取决在在当前文件中是否已查到符合条件的日志(LogEvent),即是否查找到小于或等于要查找的时间戳。
温馨提示:按照时间戳去查找,其设计理念就是查找小于待查找时间戳中的最大时间戳的LogEvent。
Step4:如果当前的解析的日志偏移量小于此次待查找的最大偏移量,同样结束本文件的查找(针对查找的第一个文件)。因为在查询的时候,首先会查询当前最大偏移量,即查找时的快照,新的内容不在本次查找范围内。
Step5:重点查找事件类型为TRANSACTIONEND与TRANSACTIONBEGIN ,即事务结束与事务开始的事件,并将其存储在 logPostion 中,表示该文件中满足查找条件的事件,但并不是只要找到一条就退出,而是继续向后找,直到找到最合适的事件。
由于源码剖析不够直观,为了更好的理解按照时间戳查找日志位点,再给出其流程图,如下:
2、总结
阅读源码不那么直观,故先来一张流程图对其进行一个总结,辅助大家了解定位位点的核心步骤。
原创不易,如果对你有所帮助请你为本文点个【在看】吧,这将是我写作更多优质文章的最强动力。
欢迎加入我的知识星球,一起交流源码,探讨架构,揭秘亿级订单的架构设计与实践经验,打造高质量的技术交流圈,为广大星友提供高质量问答服务,长按如下二维码加入。