数据库中间件 MyCAT源码分析 —— XA分布式事务
原文地址:http://www.yunai.me/MyCAT/xa-distributed-transaction/
MyCat-Server
带注释代码地址 :https://github.com/YunaiV/Mycat-Server
😈本系列每 1-1 周更新一篇,欢迎订阅、关注、收藏 公众号
QQ :7685413
1. 概述
2. XA 概念
3. MyCAT 代码实现
3.1 JDBC Demo 代码
3.2 MyCAT 开启 XA 事务
3.3 MyCAT 接收 SQL
3.4 MySQL 接收 COMMIT
3.4.1 单节点事务 or 多节点事务
3.4.2 协调日志
3.4.3 MultiNodeCoordinator
3.5 MyCAT 启动回滚 XA事务
4. MyCAT 实现缺陷
4.1 协调日志写入性能
4.2 数据节点未全部 PREPARE 就进行 COMMIT
4.3 MyCAT 启动回滚 PREPARE 的 XA事务
4.4 单节点事务未记录协调日志
4.5 XA COMMIT 部分节点挂了重新恢复后,未进一步处理
5. 彩蛋
1. 概述
数据库拆分后,业务上会碰到需要分布式事务的场景。MyCAT 基于 XA 实现分布式事务。国内目前另外一款很火的数据库中间件 Sharding-JDBC 准备基于 TCC 实现分布式事务。
本文内容分成三部分:
XA 概念简述
MyCAT 代码如何实现 XA
MyCAT 在实现 XA 存在的一些缺陷
2. XA 概念
> X/Open 组织(即现在的 Open Group )定义了分布式事务处理模型。 X/Open DTP 模型( 1994 )包括:
应用程序( AP )
事务管理器( TM )
资源管理器( RM )
通信资源管理器( CRM )
一般,常见的事务管理器( TM )是交易中间件,常见的资源管理器( RM )是数据库,常见的通信资源管理器( CRM)是消息中间件,下图是X/Open DTP模型:
一般的编程方式是这样的:
配置 TM ,通过 TM 或者 RM 提供的方式,把 RM 注册到 TM。可以理解为给 TM 注册 RM 作为数据源。一个 TM 可以注册多个 RM。
AP 从 TM 获取资源管理器的代理(例如:使用JTA接口,从TM管理的上下文中,获取出这个TM所管理的RM的JDBC连接或JMS连接)
AP 向 TM 发起一个全局事务。这时,TM 会通知各个 RM。XID(全局事务ID)会通知到各个RM。AP 通过 TM 中获取的连接,间接操作 RM 进行业务操作。这时,TM 在每次 AP 操作时把 XID(包括所属分支的信息)传递给 RM,RM 正是通过这个 XID 关联来操作和事务的关系的。
AP 结束全局事务时,TM 会通知 RM 全局事务结束。开始二段提交,也就是prepare - commit的过程。
XA协议指的是TM(事务管理器)和RM(资源管理器)之间的接口。目前主流的关系型数据库产品都是实现了XA接口的。JTA(Java Transaction API)是符合X/Open DTP模型的,事务管理器和资源管理器之间也使用了XA协议。 本质上也是借助两阶段提交协议来实现分布式事务的,下面分别来看看XA事务成功和失败的模型图:
😈 看到这里是不是有种黑人问号的感觉?淡定!我们接下来看 MyCAT 代码层面是如何实现 XA 的。另外,有兴趣对概念了解更多的,可以参看如下文章:
《XA事务处理》
《XA Transaction SQL Syntax》
《MySQL XA 事务支持调研》
3. MyCAT 代码实现
MyCAT :TM,协调者。
数据节点 :RM,参与者。
3.1 JDBC Demo 代码
public class MyCATXAClientDemo {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
// 1. 获得数据库连接
Class.forName("com.mysql.jdbc.Driver");
Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:8066/dbtest", "root", "123456");
conn.setAutoCommit(false);
// 2. 开启 MyCAT XA 事务
conn.prepareStatement("set xa=on").execute();
// 3. 插入 SQL
// 3.1 SQL1 A库
long uid = Math.abs(new Random().nextLong());
String username = UUID.randomUUID().toString();
String password = UUID.randomUUID().toString();
String sql1 = String.format("insert into t_user(id, username, password) VALUES (%d, '%s', '%s')",
uid, username, password);
conn.prepareStatement(sql1).execute();
// 3.2 SQL2 B库
long orderId = Math.abs(new Random().nextLong());
String nickname = UUID.randomUUID().toString();
String sql2 = String.format("insert into t_order(id, uid, nickname) VALUES(%d, %s, '%s')", orderId, uid, nickname);
conn.prepareStatement(sql2).execute();
// 4. 提交 XA 事务
conn.commit();
}
}
setxa=on
MyCAT 开启 XA 事务。conn.commit
提交 XA 事务。
3.2 MyCAT 开启 XA 事务
当 MyCAT 接收到 setxa=on
命令时,开启 XA 事务,并生成 XA 事务编号。XA 事务编号生成算法为 UUID。核心代码如下:
// SetHandler.java
public static void handle(String stmt, ServerConnection c, int offset) {
int rs = ServerParseSet.parse(stmt, offset);
switch (rs & 0xff) {
// ... 省略代码
case XA_FLAG_ON: {
if (c.isAutocommit()) {
c.writeErrMessage(ErrorCode.ERR_WRONG_USED, "set xa cmd on can't used in autocommit connection ");
return;
}
c.getSession2().setXATXEnabled(true);
c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));
break;
}
case XA_FLAG_OFF: {
c.writeErrMessage(ErrorCode.ERR_WRONG_USED,
"set xa cmd off not for external use ");
return;
}
// ... 省略代码
}
}
// NonBlockingSession.java
public void setXATXEnabled(boolean xaTXEnabled) {
if (xaTXEnabled) {
if (this.xaTXID == null) {
xaTXID = genXATXID(); // 😈😈😈获得 XA 事务编号
}
} else {
this.xaTXID = null;
}
}
private String genXATXID() {
return MycatServer.getInstance().getXATXIDGLOBAL();
}
// MycatServer.java
public String getXATXIDGLOBAL() {
return "'" + getUUID() + "'";
}
public static String getUUID() { // 😈😈😈
String s = UUID.randomUUID().toString();
return s.substring(0, 8) + s.substring(9, 13) + s.substring(14, 18) + s.substring(19, 23) + s.substring(24);
}
3.3 MyCAT 接收 SQL
此处 SQL 指的是 insert
、 update
、 delete
操作。
当向某个数据节点第一次发起 SQL 时,会在 SQL 前面附加 XA START'xaTranId'
,并设置该数据节点连接事务状态为 TxState.TX_STARTED_STATE
(分布式事务状态,下文会专门整理)。核心代码如下:
// MySQLConnection.java
private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
int clientCharSetIndex, int clientTxIsoLation,
boolean clientAutoCommit) {
String xaCmd = null;
boolean conAutoComit = this.autocommit;
String conSchema = this.schema;
// never executed modify sql,so auto commit
boolean expectAutocommit = !modifiedSQLExecuted || isFromSlaveDB() || clientAutoCommit;
if (expectAutocommit == false && xaTxID != null && xaStatus == TxState.TX_INITIALIZE_STATE) { // 😈😈😈
xaCmd = "XA START " + xaTxID + ';';
this.xaStatus = TxState.TX_STARTED_STATE;
}
// .... 省略代码
StringBuilder sb = new StringBuilder();
// .... 省略代码
if (xaCmd != null) {
sb.append(xaCmd);
}
// and our query sql to multi command at last
sb.append(rrn.getStatement() + ";");
// syn and execute others
this.sendQueryCmd(sb.toString());
}
举个 变量 sb
的例子:
SET names utf8;SET autocommit=0;XA START '1f2da7353e8846e5833b8d8dd041cfb1','db2';insert into t_user(id, username, password) VALUES (3400, 'b7c5ec1f-11cc-4599-851c-06ad617fec42', 'd2694679-f6a2-4623-a339-48d4a868be90');
3.4 MySQL 接收 COMMIT
3.4.1 单节点事务 or 多节点事务
COMMIT
执行时,MyCAT 会判断 XA 事务里,涉及到的数据库节点数量。
如果节点数量为 1,单节点事务,使用
CommitNodeHandler
处理。如果节点数量 > 1,多节点事务,使用
MultiNodeCoordinator
处理。
CommitNodeHandler
相比 MultiNodeCoordinator
来说,只有一个数据节点,不需要进行多节点协调,逻辑会相对简单,有兴趣的同学可以另外看。我们主要分析 MultiNodeCoordinator
。
3.4.2 协调日志
协调日志,记录协调过程中各数据节点 XA 事务状态,处理MyCAT异常奔溃或者数据节点部分XA COMMIT,另外部分 XA PREPARE下的状态恢复。
XA 事务共有种:
TXINITIALIZESTATE :事务初始化
TXSTARTEDSTATE :事务开始完成
TXPREPAREDSTATE :事务准备完成
TXCOMMITEDSTATE :事务提交完成
TXROLLBACKEDSTATE :事务回滚完成
状态变更流 :TXINITIALIZESTATE => TXSTARTEDSTATE => TXPREPAREDSTATE => TXCOMMITEDSTATE / TXROLLBACKEDSTATE 。
协调日志包含两个部分:
CoordinatorLogEntry :协调者日志
ParticipantLogEntry :参与者日志。此处,数据节点扮演参与者的角色。下文中,可能会出现参与者与数据节点混用的情况,望见谅。
一次 XA 事务,对应一条 CoordinatorLogEntry
。一条 CoordinatorLogEntry
包含 N条 ParticipantLogEntry
。 核心代码如下:
// CoordinatorLogEntry :协调者日志
public class CoordinatorLogEntry implements Serializable {
/**
* XA 事务编号
*/
public final String id;
/**
* 参与者日志数组
*/
public final ParticipantLogEntry[] participants;
}
// ParticipantLogEntry :参与者日志
public class ParticipantLogEntry implements Serializable {
/**
* XA 事务编号
*/
public String coordinatorId;
/**
* 数据库 uri
*/
public String uri;
/**
* 过期描述
*/
public long expires;
/**
* XA 事务状态
*/
public int txState;
/**
* 参与者名字
*/
public String resourceName;
}
MyCAT 记录协调日志以 JSON格式 到文件。每行包含一条 CoordinatorLogEntry
。举个例子:
{"id":"'e827b3fe666c4d968961350d19adda31'","participants":[{"uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db3"},{"uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db1"}]}
{"id":"'f00b61fa17cb4ec5b8264a6d82f847d0'","participants":[{"uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db2"},{"uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db1"}]}
实现类为:
// XA 协调者日志 存储接口:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/Repository.java
public interface Repository {}
// XA 协调者日志 文件存储:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/impl/FileSystemRepository.java
public class FileSystemRepository implements Repository {}
// XA 协调者日志 文件存储:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/impl/InMemoryRepository.java
public class InMemoryRepository implements Repository {}
目前日志文件写入的方式性能较差,这里我们不做分析,在【4. MyCAT 实现缺陷】里一起讲。
3.4.3 MultiNodeCoordinator
敲敲敲,这里是本文的重点之一噢。😈
第一阶段:发起 PREPARE。
public void executeBatchNodeCmd(SQLCtrlCommand cmdHandler) {
this.cmdHandler = cmdHandler;
final int initCount = session.getTargetCount();
runningCount.set(initCount);
nodeCount = initCount;
failed.set(false);
faileCount.set(0);
//recovery nodes log
ParticipantLogEntry[] participantLogEntry = new ParticipantLogEntry[initCount];
// 执行
int started = 0;
for (RouteResultsetNode rrn : session.getTargetKeys()) {
if (rrn == null) {
continue;
}
final BackendConnection conn = session.getTarget(rrn);
if (conn != null) {
conn.setResponseHandler(this);
//process the XA_END XA_PREPARE Command
MySQLConnection mysqlCon = (MySQLConnection) conn;
String xaTxId = null;
if (session.getXaTXID() != null) {
xaTxId = session.getXaTXID() + ",'" + mysqlCon.getSchema() + "'";
}
if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) { // XA 事务
//recovery Log
participantLogEntry[started] = new ParticipantLogEntry(xaTxId, conn.getHost(), 0, conn.getSchema(), ((MySQLConnection) conn).getXaStatus());
String[] cmds = new String[]{"XA END " + xaTxId, // XA END 命令
"XA PREPARE " + xaTxId}; // XA PREPARE 命令
mysqlCon.execBatchCmd(cmds);
} else { // 非 XA 事务
// recovery Log
participantLogEntry[started] = new ParticipantLogEntry(xaTxId, conn.getHost(), 0, conn.getSchema(), ((MySQLConnection) conn).getXaStatus());
cmdHandler.sendCommand(session, conn);
}
++started;
}
}
// xa recovery log
if (session.getXaTXID() != null) {
CoordinatorLogEntry coordinatorLogEntry = new CoordinatorLogEntry(session.getXaTXID(), false, participantLogEntry);
inMemoryRepository.put(session.getXaTXID(), coordinatorLogEntry);
fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());
}
if (started < nodeCount) { // TODO 疑问:如何触发
runningCount.set(started);
LOGGER.warn("some connection failed to execute " + (nodeCount - started));
/**
* assumption: only caused by front-end connection close. <br/>
* Otherwise, packet must be returned to front-end
*/
failed.set(true);
}
}
向各数据节点发送
XAEND
+XA PREPARE
指令。举个 变量cmds
例子:
XA END '4cbb18214d0b47adbdb0658598666677','db3';XA PREPARE '4cbb18214d0b47adbdb0658598666677','db3';
记录协调日志。每条参与者日志状态为
TxState.TX_STARTED_STATE
。
第二阶段:发起 COMMIT。
@Override
public void okResponse(byte[] ok, BackendConnection conn) {
// process the XA Transatcion 2pc commit
if (conn instanceof MySQLConnection) {
MySQLConnection mysqlCon = (MySQLConnection) conn;
switch (mysqlCon.getXaStatus()) {
case TxState.TX_STARTED_STATE:
//if there have many SQL execute wait the okResponse,will come to here one by one
//should be wait all nodes ready ,then send xa commit to all nodes.
if (mysqlCon.batchCmdFinished()) {
String xaTxId = session.getXaTXID();
String cmd = "XA COMMIT " + xaTxId + ",'" + mysqlCon.getSchema() + "'";
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Start execute the cmd :" + cmd + ",current host:" + mysqlCon.getHost() + ":" + mysqlCon.getPort());
}
// recovery log
CoordinatorLogEntry coordinatorLogEntry = inMemoryRepository.get(xaTxId);
for (int i = 0; i < coordinatorLogEntry.participants.length; i++) {
LOGGER.debug("[In Memory CoordinatorLogEntry]" + coordinatorLogEntry.participants[i]);
if (coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())) {
coordinatorLogEntry.participants[i].txState = TxState.TX_PREPARED_STATE;
}
}
inMemoryRepository.put(xaTxId, coordinatorLogEntry);
fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());
// send commit
mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE);
mysqlCon.execCmd(cmd);
}
return;
case TxState.TX_PREPARED_STATE: {
// recovery log
String xaTxId = session.getXaTXID();
CoordinatorLogEntry coordinatorLogEntry = inMemoryRepository.get(xaTxId);
for (int i = 0; i < coordinatorLogEntry.participants.length; i++) {
if (coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())) {
coordinatorLogEntry.participants[i].txState = TxState.TX_COMMITED_STATE;
}
}
inMemoryRepository.put(xaTxId, coordinatorLogEntry);
fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());
// XA reset status now
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
break;
}
default:
}
}
// 释放连接
if (this.cmdHandler.relaseConOnOK()) {
session.releaseConnection(conn);
} else {
session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false);
}
// 是否所有节点都完成commit,如果是,则返回Client 成功
if (this.finished()) {
cmdHandler.okResponse(session, ok);
if (cmdHandler.isAutoClearSessionCons()) {
session.clearResources(false);
}
/* 1. 事务提交后,xa 事务结束 */
if (session.getXaTXID() != null) {
session.setXATXEnabled(false);
}
/* 2. preAcStates 为true,事务结束后,需要设置为true。preAcStates 为ac上一个状态 */
if (session.getSource().isPreAcStates()) {
session.getSource().setAutocommit(true);
}
}
}
mysqlCon.batchCmdFinished()
每个数据节点,第一次返回的是XAEND
成功,第二次返回的是XA PREPARE
。在XA PREPARE
成功后,记录该数据节点的参与者日志状态为TxState.TX_PREPARED_STATE
。之后,向该数据节点发起XA COMMIT
命令。XA COMMIT
返回成功后,记录该数据节点的事务参与者日志状态为TxState.TX_COMMITED_STATE
。当所有数据节点(参与者)都执行完成
XA COMMIT
返回,即this.finished()==true
,返回 MySQL Client XA 事务提交成功。
[x] XA PREPARE
和 XA COMMIT
,数据节点可能返回失败,目前暂时没模拟出来,对应方法为 #errorResponse(....)
。
3.5 MyCAT 启动回滚 XA事务
MyCAT 启动时,会回滚处于TxState.TXPREPAREDSTATE的 ParticipantLogEntry
对应的数据节点的 XA 事务。代码如下:
// MycatServer.java
private void performXARecoveryLog() {
// fetch the recovery log
CoordinatorLogEntry[] coordinatorLogEntries = getCoordinatorLogEntries();
for (int i = 0; i < coordinatorLogEntries.length; i++) {
CoordinatorLogEntry coordinatorLogEntry = coordinatorLogEntries[i];
boolean needRollback = false;
for (int j = 0; j < coordinatorLogEntry.participants.length; j++) {
ParticipantLogEntry participantLogEntry = coordinatorLogEntry.participants[j];
if (participantLogEntry.txState == TxState.TX_PREPARED_STATE) {
needRollback = true;
break;
}
}
if (needRollback) {
for (int j = 0; j < coordinatorLogEntry.participants.length; j++) {
ParticipantLogEntry participantLogEntry = coordinatorLogEntry.participants[j];
//XA rollback
String xacmd = "XA ROLLBACK " + coordinatorLogEntry.id + ';';
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(new String[0], new XARollbackCallback());
outloop:
for (SchemaConfig schema : MycatServer.getInstance().getConfig().getSchemas().values()) {
for (TableConfig table : schema.getTables().values()) {
for (String dataNode : table.getDataNodes()) {
PhysicalDBNode dn = MycatServer.getInstance().getConfig().getDataNodes().get(dataNode);
if (dn.getDbPool().getSource().getConfig().getIp().equals(participantLogEntry.uri)
&& dn.getDatabase().equals(participantLogEntry.resourceName)) {
//XA STATE ROLLBACK
participantLogEntry.txState = TxState.TX_ROLLBACKED_STATE;
SQLJob sqlJob = new SQLJob(xacmd, dn.getDatabase(), resultHandler, dn.getDbPool().getSource());
sqlJob.run();
break outloop;
}
}
}
}
}
}
}
// init into in memory cached
for (int i = 0; i < coordinatorLogEntries.length; i++) {
MultiNodeCoordinator.inMemoryRepository.put(coordinatorLogEntries[i].id, coordinatorLogEntries[i]);
}
// discard the recovery log
MultiNodeCoordinator.fileRepository.writeCheckpoint(MultiNodeCoordinator.inMemoryRepository.getAllCoordinatorLogEntries());
}
4. MyCAT 实现缺陷
MyCAT 1.6.5 版本实现弱XA事务,相对来说,笔者认为距离实际生产使用存在一些差距。下面罗列可能存在的缺陷,如有错误,麻烦指出。🙂希望 MyCAT 在分布式事务的实现上,能够越来越给力。
4.1 协调日志写入性能
1、 CoordinatorLogEntry
、 ParticipantLogEntry
在每次写入文件时,是将内存中所有的日志全部重新写入,导致写入性能随着 XA 事务次数的增加,性能会越来越糟糕,导致 XA 事务整体性能会非常差。另外,该方法是同步的,也加大了写入的延迟。
建议:先获得可写入文件的 OFFSET,写入协调日志到文件,内存维护好 XA事务编号 与 OFFSET 的映射关系,从而实现顺序写入 + 并行写入。
2、内存里维护了所有的协调日志,占用内存会越来越大,并且无释放机制。即使重启,协调日志也会重新加载到内存。
建议:已完全回滚或者提交的协调日志不放入内存。另外有文件存储好 XA事务编号 与 OFFSET 的映射关系。
3、协调日志只写入单个文件。
建议:分拆协调日志文件。
PS:有兴趣的同学可以看下 RocketMQ
对 CommitLog
的存储,性能上很赞!
4.2 数据节点未全部 PREPARE 就进行 COMMIT
XA 事务定义,需要等待所有参与者全部 XA PREPARE
成功完成后发起 XA COMMIT
。目前 MyCAT 是某个数据节点 XA PREPARE
完成后立即进行 XA COMMIT
。比如说:第一个数据节点提交了 XAEND;XA PREPARE
时,第二个数据节在进行 XAEND;XA PREAPRE;
前挂了,第一个节点依然会 XA COMMIT
成功。
建议:按照严格的 XA 事务定义。
4.3 MyCAT 启动回滚 PREPARE 的 XA事务
1、MyCAT 启动时,回滚所有的 PREPARE
的 XA 事务,可能某个 XA 事务,部分 COMMIT
,部分 PREPARE
。此时直接回滚,会导致数据不一致。
建议:当判断到某个 XA 事务存在 PREPARE
的参与者,同时判断该 XA 事务里其他参与者的事务状态以及数据节点里 XA 事务状态,比如参与者为 MySQL
时,可以使用 XA RECOVER
查询处于 PREPARE
所有的 XA 事务。
2、回滚 PREPARE
是异步进行的,在未进行完成时已经设置文件里回滚成功。如果异步过程中失败,会导致 XA 事务状态不一致。
建议:回调成功后,更新该 XA 事务状态。
4.4 单节点事务未记录协调日志
该情况较为极端。发起 XA PREPARE
完后,MyCAT 挂了。重启后,该 XA 事务在 MyCAT 里就“消失“了,参与者的该 XA 事务一直处于 PREPARE
状态。从理论上来说,需要回滚该 XA 事务。
建议:记录协调日志。
4.5 XA COMMIT 部分节点挂了重新恢复后,未进一步处理
当一部分节点 XA COMMIT
完成,另外一部分此时挂了。在管理员重启挂掉的节点,其对应的 XA 事务未进一步处理,导致数据不一致。
建议:😈木有建议。也很好奇,如果是这样的情况,如何处理较为合适。如有大大知道,烦请告知下。
5. 彩蛋
例行“彩蛋”?
《Mycat源码篇 : MyCat事务管理机制分析》 来自 MyCAT Committer 的文章
《MySQL · 捉虫动态 · 连接断开导致XA事务丢失》
《分布式系统事务一致性解决方案》
《MySQL数据库分布式事务XA优缺点与改进方案》
《深入理解分布式系统的2PC和3PC》
【分布式事务.xmind】 笔者拙作
《RocketMQ 源码分析 —— 事务消息》 笔者拙作