数据库中间件 MyCAT源码分析:【单库单表】插入
本文主要基于 MyCAT 1.6.5 正式版
1. 概述
2. 接收请求,解析 SQL
3. 获得路由结果
4. 获得 MySQL 连接,执行 SQL
5. 响应执行 SQL 结果
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。
1. 概述
内容形态以 顺序图 + 核心代码 为主。
如果有地方表述不错误或者不清晰,欢迎留言。
对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。
微信号:wangwenbin-server。
本文讲解 【单库单表】插入 所涉及到的代码。交互如下图:
整个过程,MyCAT Server 流程如下:
接收 MySQL Client 请求,解析 SQL。
获得路由结果,进行路由。
获得 MySQL 连接,执行 SQL。
响应执行结果,发送结果给 MySQL Client。
我们逐个步骤分析,一起来看看源码。
2. 接收请求,解析 SQL
【 1 - 2 】
接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。
【 3 】
不同 MySQL 命令,分发到不同的方法执行。核心代码如下:
1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】
2: public class FrontendCommandHandler implements NIOHandler {
3:
4:
5: public void handle(byte[] data) {
6:
7: // .... 省略部分代码
8: switch (data[4]) //
9: {
10: case MySQLPacket.COM_INIT_DB:
11: commands.doInitDB();
12: source.initDB(data);
13: break;
14: case MySQLPacket.COM_QUERY: // 查询命令
15: // 计数查询命令
16: commands.doQuery();
17: // 执行查询命令
18: source.query(data);
19: break;
20: case MySQLPacket.COM_PING:
21: commands.doPing();
22: source.ping();
23: break;
24: // .... 省略部分case
25: }
26: }
27:
28: }
INSERT
/SELECT
/UPDATE
/DELETE
等 SQL 归属于 MySQLPacket.COM_QUERY
,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》。
【 4 】
将 二进制数组 解析成 SQL。核心代码如下:
1: // ⬇️⬇️⬇️【FrontendConnection.java】
2: public void query(byte[] data) {
3: // 取得语句
4: String sql = null;
5: try {
6: MySQLMessage mm = new MySQLMessage(data);
7: mm.position(5);
8: sql = mm.readString(charset);
9: } catch (UnsupportedEncodingException e) {
10: writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
11: return;
12: }
13: // 执行语句
14: this.query( sql );
15: }
【 5 】
解析 SQL 类型。核心代码如下:
1: // ⬇️⬇️⬇️【ServerQueryHandler.java】
2:
3: public void query(String sql) {
4: // 解析 SQL 类型
5: int rs = ServerParse.parse(sql);
6: int sqlType = rs & 0xff;
7:
8: switch (sqlType) {
9: //explain sql
10: case ServerParse.EXPLAIN:
11: ExplainHandler.handle(sql, c, rs >>> 8);
12: break;
13: // .... 省略部分case
14: break;
15: case ServerParse.SELECT:
16: SelectHandler.handle(sql, c, rs >>> 8);
17: break;
18: // .... 省略部分case
19: default:
20: if(readOnly){
21: LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
22: c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
23: break;
24: }
25: c.execute(sql, rs & 0xff);
26: }
27: }
28:
29:
30: // ⬇️⬇️⬇️【ServerParse.java】
31: public static int parse(String stmt) {
32: int length = stmt.length();
33: //FIX BUG FOR SQL SUCH AS /XXXX/SQL
34: int rt = -1;
35: for (int i = 0; i < length; ++i) {
36: switch (stmt.charAt(i)) {
37: // .... 省略部分case case 'I':
38: case 'i':
39: rt = insertCheck(stmt, i);
40: if (rt != OTHER) {
41: return rt;
42: }
43: continue;
44: // .... 省略部分case
45: case 'S':
46: case 's':
47: rt = sCheck(stmt, i);
48: if (rt != OTHER) {
49: return rt;
50: }
51: continue;
52: // .... 省略部分case
53: default:
54: continue;
55: }
56: }
57: return OTHER;
58: }
【 6 】
执行 SQL,详细解析见下文,核心代码如下:
1: // ⬇️⬇️⬇️【ServerConnection.java】
2: public class ServerConnection extends FrontendConnection {
3: public void execute(String sql, int type) {
4: // .... 省略代码
5: SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
6: if (schema == null) {
7: writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
8: "Unknown MyCAT Database '" + db + "'");
9: return;
10: }
11:
12: // .... 省略代码
13:
14: // 路由到后端数据库,执行 SQL
15: routeEndExecuteSQL(sql, type, schema);
16: }
17:
18: public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
19: // 路由计算
20: RouteResultset rrs = null;
21: try {
22: rrs = MycatServer
23: .getInstance()
24: .getRouterservice()
25: .route(MycatServer.getInstance().getConfig().getSystem(),
26: schema, type, sql, this.charset, this);
27:
28: } catch (Exception e) {
29: StringBuilder s = new StringBuilder();
30: LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);
31: String msg = e.getMessage();
32: writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
33: return;
34: }
35:
36: // 执行 SQL
37: if (rrs != null) {
38: // session执行
39: session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
40: }
41:
42: }
43:
44: }
3. 获得路由结果
【 1 - 2 】【 12 】
获得路由主流程。核心代码如下:
1: // ⬇️⬇️⬇️【RouteService.java】
2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
3: int sqlType, String stmt, String charset, ServerConnection sc)
4: throws SQLNonTransientException {
5: RouteResultset rrs = null;
6: // .... 省略代码
7: int hintLength = RouteService.isHintSql(stmt);
8: if(hintLength != -1){ // TODO 待读:hint
9: // .... 省略代码
10: }
11: } else {
12: stmt = stmt.trim();
13: rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,
14: charset, sc, tableId2DataNodeCache);
15: }
16:
17: // .... 省略代码 return rrs;
18: }
19: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
20:
21: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
22: String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
23:
24: // .... 省略代码
25:
26: // 处理一些路由之前的逻辑;全局序列号,父子表插入
27: if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
28: return null;
29: }
30:
31: // .... 省略代码
32:
33: // 检查是否有分片
34: if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
35: rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
36: } else {
37: RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
38: if (returnedSet == null) {
39: rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
40: }
41: }
42:
43: return rrs;
44: }
路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。
【 3 - 6 】
路由前置处理。当符合如下三种情况下,进行处理:
{ 1 } 使用全局序列号:
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
{ 2 } ER 子表插入
{ 3 } 主键使用自增 ID 插入:
insert into table (name) values ('name')
===>
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
情况 { 1 } { 3 } 情况类似,使用全局序列号。
核心代码如下:
1: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
2: private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)
3: throws SQLNonTransientException {
4: return // 处理 id 使用 全局序列号
5: RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)
6: // 处理 ER 子表
7: || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))
8: // 处理 id 自增长
9: || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));
10: }
RouterUtil.java
处理 SQL 考虑性能,实现会比较 C-style,代码咱就不贴了,传送门:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/route/util/RouterUtil.java。 (😈该仓库从官方 Fork,逐步完善中文注释,欢迎 Star)
【 7 - 11 】
当前置路由处理全局序列号时,添加到全局序列处理器(MyCATSequnceProcessor
)。该处理器会异步生成 ID,替换 SQL 内的 NEXT VALUE FOR MYCATSEQ_
正则。例如:
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
===>
insert into table (id, name) values (868348974560579584, 'name')
异步处理完后,调用 ServerConnection#routeEndExecuteSQL(sql, type, schema)
方法重新执行 SQL。
核心代码如下:
1: // ⬇️⬇️⬇️【RouterUtil.java】
2: public static void processSQL(ServerConnection sc,SchemaConfig schema,String sql,int sqlType){
3: SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);
4: MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);
5: }
6: // ⬇️⬇️⬇️【MyCATSequnceProcessor.java】
7: public class MyCATSequnceProcessor {
8: private LinkedBlockingQueue<SessionSQLPair> seqSQLQueue = new LinkedBlockingQueue<SessionSQLPair>();
9: private volatile boolean running=true;
10:
11: public void addNewSql(SessionSQLPair pair) {
12: seqSQLQueue.add(pair);
13: }
14:
15: private void executeSeq(SessionSQLPair pair) {
16: try {
17:
18: // 使用Druid解析器实现sequence处理 @兵临城下
19: DruidSequenceHandler sequenceHandler = new DruidSequenceHandler(MycatServer
20: .getInstance().getConfig().getSystem().getSequnceHandlerType());
21:
22: // 生成可执行 SQL :目前主要是生成 id
23: String charset = pair.session.getSource().getCharset();
24: String executeSql = sequenceHandler.getExecuteSql(pair.sql,charset == null ? "utf-8":charset);
25:
26: // 执行 SQL
27: pair.session.getSource().routeEndExecuteSQL(executeSql, pair.type,pair.schema);
28: } catch (Exception e) {
29: LOGGER.error("MyCATSequenceProcessor.executeSeq(SesionSQLPair)",e);
30: pair.session.getSource().writeErrMessage(ErrorCode.ER_YES,"mycat sequnce err." + e);
31: return;
32: }
33: }
34:
35: class ExecuteThread extends Thread {
36:
37: public ExecuteThread() {
38: setDaemon(true); // 设置为后台线程,防止throw RuntimeExecption进程仍然存在的问题
39: }
40:
41: public void run() {
42: while (running) {
43: try {
44: SessionSQLPair pair=seqSQLQueue.poll(100,TimeUnit.MILLISECONDS);
45: if(pair!=null){
46: executeSeq(pair);
47: }
48: } catch (Exception e) {
49: LOGGER.warn("MyCATSequenceProcessor$ExecutorThread",e);
50: }
51: }
52: }
53: }
54: }
❓此处有个疑问:MyCATSequnceProcessor
是单线程,会不会插入性能有一定的影响?后续咱做下性能测试。
4. 获得 MySQL 连接,执行 SQL
【 1 - 8 】
获得 MySQL 连接。
PhysicalDBNode :物理数据库节点。
PhysicalDatasource :物理数据库数据源。
【 9 - 13 】
发送 SQL 到 MySQL Server,执行 SQL。
5. 响应执行 SQL 结果
【 1 - 4 】
处理 MySQL Server 响应数据包。
【 5 - 8 】
发送插入成功结果给 MySQL Client。