分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业数据存储
点击上方“芋道源码”,选择“置顶公众号”
技术文章第一时间送达!
源码精品专栏
摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-storage/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文基于 Elastic-Job V2.1.5 版本分享
1. 概述
2. JobNodePath
3. JobNodeStorage
4. ConfigurationNode
5. ServerNode
6. InstanceNode
7. ShardingNode
8. LeaderNode
9. FailoverNode
10. GuaranteeNode
666. 彩蛋
1. 概述
本文主要分享 Elastic-Job-Lite 作业数据存储。
涉及到主要类的类图如下( 打开大图 ):
你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门
2. JobNodePath
JobNodePath,作业节点路径类。作业节点是在普通的节点前加上作业名称的前缀。
在 Zookeeper 看一个作业的数据存储:
[zk: localhost:2181(CONNECTED) 65] ls /elastic-job-example-lite-java/javaSimpleJob
[leader, servers, config, instances, sharding]
elastic-job-example-lite-java
:作业节点集群名,使用ZookeeperConfiguration.namespace
属性配置。javaSimpleJob
:作业名字,使用JobCoreConfiguration.jobName
属性配置。config
/servers
/instances
/sharding
/leader
:不同服务的数据存储节点路径。
JobNodePath,注释很易懂,点击链接查看。这里我们梳理下 JobNodePath 和其它节点路径类的关系:
Zookeeper 路径 | JobNodePath 静态属性 | JobNodePath 方法 | 节点路径类 |
---|---|---|---|
config | CONFIG_NODE | #getConfigNodePath() | ConfigurationNode |
servers | SERVERS_NODE | #getServerNodePath() | ServerNode |
instances | INSTANCES_NODE | #getInstancesNodePath() | InstanceNode |
sharding | SHARDING_NODE | #getShardingNodePath() | ShardingNode |
leader | / | #getFullPath(node) | LeaderNode |
leader/failover | / | #getFullPath(node) | FailoverNode |
guarantee | / | #getFullPath(node) | GuaranteeNode |
3. JobNodeStorage
JobNodeStorage,作业节点数据访问类。
Elastic-Job-Lite 使用注册中心存储作业节点数据,JobNodeStorage 对注册中心提供的方法做下简单的封装提供调用。举个例子:
// JobNodeStorage.java
private final CoordinatorRegistryCenter regCenter;
private final JobNodePath jobNodePath;
/**
* 判断作业节点是否存在.
*
* @param node 作业节点名称
* @return 作业节点是否存在
*/
public boolean isJobNodeExisted(final String node) {
return regCenter.isExisted(jobNodePath.getFullPath(node));
}
// JobNodePath.java
/**
* 获取节点全路径.
*
* @param node 节点名称
* @return 节点全路径
*/
public String getFullPath(final String node) {
return String.format("/%s/%s", jobName, node);
}
传递的参数
node
只是简单的作业节点名称,通过调用JobNodePath#getFullPath(…)
方法获取节点全路径。其它方法类似,有兴趣的同学点击链接查看。
4. ConfigurationNode
ConfigurationNode,配置节点路径。
在 Zookeeper 看一个作业的配置节点数据存储:
[zk: localhost:2181(CONNECTED) 67] get /elastic-job-example-lite-java/javaSimpleJob/config
{"jobName":"javaSimpleJob","jobClass":"com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob","jobType":"SIMPLE","cron":"0/5 * * * * ?","shardingTotalCount":3,"shardingItemParameters":"0\u003dBeijing,1\u003dShanghai,2\u003dGuangzhou","jobParameter":"","failover":true,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":true}
/config
是持久节点,存储Lite作业配置( LiteJobConfiguration ) JSON化字符串。
ConfigurationNode 代码如下:
public final class ConfigurationNode {
static final String ROOT = "config";
}
ConfigurationNode 如何读取、存储,在《Elastic-Job-Lite 源码分析 —— 作业配置》的「3.」作业配置服务已经详细解析。
5. ServerNode
ServerNode,服务器节点路径。
在 Zookeeper 看一个作业的服务器节点数据存储:
[zk: localhost:2181(CONNECTED) 72] ls /elastic-job-example-lite-java/javaSimpleJob/servers
[192.168.16.164, 169.254.93.156, 192.168.252.57, 192.168.16.137, 192.168.3.2, 192.168.43.31]
[zk: localhost:2181(CONNECTED) 73] get /elastic-job-example-lite-java/javaSimpleJob/servers/192.168.16.164
/servers/
目录下以IP
为数据节点路径存储每个服务器节点。如果相同IP服务器有多个服务器节点,只存储一个IP
数据节点。/servers/${IP}
是持久节点,不存储任何信息,只是空串(""
);
ServerNode 代码如下:
public final class ServerNode {
/**
* 服务器信息根节点.
*/
public static final String ROOT = "servers";
private static final String SERVERS = ROOT + "/%s";
}
ServerNode 如何存储,在《Elastic-Job-Lite 源码分析 —— 作业初始化》的「3.2.4」注册作业启动信息已经详细解析。
6. InstanceNode
InstanceNode,运行实例节点路径。
在 Zookeeper 看一个作业的运行实例节点数据存储:
[zk: localhost:2181(CONNECTED) 81] ls /elastic-job-example-lite-java/javaSimpleJob/instances
[192.168.16.137@-@56010]
[zk: localhost:2181(CONNECTED) 82] get /elastic-job-example-lite-java/javaSimpleJob/instances
/instances
目录下以作业实例主键(JOB_INSTANCE_ID
) 为数据节点路径存储每个运行实例节点。/instances/${JOB_INSTANCE_ID}
是临时节点,不存储任何信息,只是空串(""
);JOB_INSTANCE_ID
生成方式:// JobInstance.java
jobInstanceId = IpUtils.getIp()
+ DELIMITER
+ ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; // PID
InstanceNode 代码如下:
public final class InstanceNode {
/**
* 运行实例信息根节点.
*/
public static final String ROOT = "instances";
private static final String INSTANCES = ROOT + "/%s";
/**
* 获取当前运行实例节点路径
*
* @return 当前运行实例节点路径
*/
String getLocalInstanceNode() {
return String.format(INSTANCES, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
InstanceNode 如何存储,在《Elastic-Job-Lite 源码分析 —— 作业初始化》的「3.2.4」注册作业启动信息已经详细解析。
7. ShardingNode
ShardingNode,分片节点路径。
在 Zookeeper 看一个作业的分片节点数据存储:
[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/sharding
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/sharding/0
[running, instance, misfire]
[zk: localhost:2181(CONNECTED) 3] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance
192.168.16.137@-@56010
/sharding/${ITEM_ID}
目录下以作业分片项序号(ITEM_ID
) 为数据节点路径存储作业分片项的instance
/running
/misfire
/disable
数据节点信息。/sharding/${ITEM_ID}/instance
是临时节点,存储该作业分片项分配到的作业实例主键(JOB_INSTANCE_ID
)。在《Elastic-Job-Lite 源码分析 —— 作业分片》详细解析。/sharding/${ITEM_ID}/running
是临时节点,当该作业分片项正在运行,存储空串(""
);当该作业分片项不在运行,移除该数据节点。《Elastic-Job-Lite 源码分析 —— 作业执行》的「4.6」执行普通触发的作业已经详细解析。/sharding/${ITEM_ID}/misfire
是永久节点,当该作业分片项被错过执行,存储空串(""
);当该作业分片项重新执行,移除该数据节点。《Elastic-Job-Lite 源码分析 —— 作业执行》的「4.7」执行被错过触发的作业已经详细解析。/sharding/${ITEM_ID}/disable
是永久节点,当该作业分片项被禁用,存储空串(""
);当该作业分片项被开启,移除数据节点。
ShardingNode,代码如下:
public final class ShardingNode {
/**
* 执行状态根节点.
*/
public static final String ROOT = "sharding";
static final String INSTANCE_APPENDIX = "instance";
public static final String INSTANCE = ROOT + "/%s/" + INSTANCE_APPENDIX;
static final String RUNNING_APPENDIX = "running";
static final String RUNNING = ROOT + "/%s/" + RUNNING_APPENDIX;
static final String MISFIRE = ROOT + "/%s/misfire";
static final String DISABLED = ROOT + "/%s/disabled";
static final String LEADER_ROOT = LeaderNode.ROOT + "/" + ROOT;
static final String NECESSARY = LEADER_ROOT + "/necessary";
static final String PROCESSING = LEADER_ROOT + "/processing";
}
LEADER_ROOT / NECESSARY / PROCESSING 放在「4.7」LeaderNode 解析。
8. LeaderNode
LeaderNode,主节点路径。
在 leader
目录下一共有三个存储子节点:
election
:主节点选举。sharding
:作业分片项分配。failover
:作业失效转移。
主节点选举
在 Zookeeper 看一个作业的 leader/election
节点数据存储:
[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/leader/election
[latch, instance]
[zk: localhost:2181(CONNECTED) 2] get /elastic-job-example-lite-java/javaSimpleJob/leader/election/instance
192.168.16.137@-@1910
/leader/election/instance
是临时节点,当作业集群完成选举后,存储主作业实例主键(JOB_INSTANCE_ID
)。/leader/election/latch
主节点选举分布式锁,是 Apache Curator 针对 Zookeeper 实现的分布式锁的一种,笔者暂未了解存储形式,无法解释。在《Elastic-Job-Lite 源码分析 —— 注册中心》的「3.1」在主节点执行操作进行了简单解析。
作业分片项分配
在 Zookeeper 看一个作业的 leader/sharding
节点数据存储:
[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/leader/sharding
[necessary, processing]
[zk: localhost:2181(CONNECTED) 2] 个get /elastic-job-example-lite-java/javaSimpleJob/leader/sharding
[zk: localhost:2181(CONNECTED) 3] 个get /elastic-job-example-lite-java/javaSimpleJob/leader/processing
/leader/sharding/necessary
是永久节点,当相同作业有新的作业节点加入或者移除时,存储空串(""
),标记需要进行作业分片项重新分配;当重新分配完成后,移除该数据节点。/leader/sharding/processing
是临时节点,当开始重新分配作业分片项时,存储空串(""
),标记正在进行重新分配;当重新分配完成后,移除该数据节点。当且仅当作业节点为主节点时,才可以执行作业分片项分配,《Elastic-Job-Lite 源码分析 —— 作业分片》详细解析。
作业失效转移
作业失效转移数据节点在 FailoverNode,放在「9」FailoverNode 解析。
这里大家可能会和我一样比较疑惑,为什么 /leader/failover
放在 /leader
目录下,而不独立成为一个根目录?经过确认,作业失效转移 设计到分布式锁,统一存储在 /leader
目录下。
LeaderNode,代码如下:
public final class LeaderNode {
/**
* 主节点根路径.
*/
public static final String ROOT = "leader";
static final String ELECTION_ROOT = ROOT + "/election";
static final String INSTANCE = ELECTION_ROOT + "/instance";
static final String LATCH = ELECTION_ROOT + "/latch";
}
9. FailoverNode
FailoverNode,失效转移节点路径。
在 Zookeeper 看一个作业的失效转移节点数据存储:
[zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/leader/failover
[latch, items]
[zk: localhost:2181(CONNECTED) 4] ls /elastic-job-example-lite-java/javaSimpleJob/leader/failover/items
[0]
/leader/failover/latch
作业失效转移分布式锁,和/leader/failover/latch
是一致的。/leader/items/${ITEM_ID}
是永久节点,当某台作业节点 CRASH 时,其分配的作业分片项标记需要进行失效转移,存储其分配的作业分片项的/leader/items/${ITEM_ID}
为空串(""
);当失效转移标记,移除/leader/items/${ITEM_ID}
,存储/sharding/${ITEM_ID}/failover
为空串(""
),临时节点,需要进行失效转移执行。《Elastic-Job-Lite 源码分析 —— 作业失效转移》详细解析。
FailoverNode 代码如下:
public final class FailoverNode {
static final String FAILOVER = "failover";
static final String LEADER_ROOT = LeaderNode.ROOT + "/" + FAILOVER;
static final String ITEMS_ROOT = LEADER_ROOT + "/items";
static final String ITEMS = ITEMS_ROOT + "/%s";
static final String LATCH = LEADER_ROOT + "/latch";
private static final String EXECUTION_FAILOVER = ShardingNode.ROOT + "/%s/" + FAILOVER;
static String getItemsNode(final int item) {
return String.format(ITEMS, item);
}
static String getExecutionFailoverNode(final int item) {
return String.format(EXECUTION_FAILOVER, item);
}
}
10. GuaranteeNode
GuaranteeNode,保证分布式任务全部开始和结束状态节点路径。在《Elastic-Job-Lite 源码分析 —— 作业监听器》详细解析。
666. 彩蛋
旁白君:芋道君,你又水更了!
芋道君:屁屁屁,劳资怼死你!如下是作业数据存储整理,哼哼哈兮!
道友,赶紧上车,分享一波朋友圈!
如果你对 Dubbo / Netty 等等源码与原理感兴趣,欢迎加入我的知识星球一起交流。长按下方二维码噢:
目前在知识星球更新了《Dubbo 源码解析》目录如下:
01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览
05. 拓展机制 SPI
06. 线程池
07. 服务暴露 Export
08. 服务引用 Refer
09. 注册中心 Registry
10. 动态编译 Compile
11. 动态代理 Proxy
12. 服务调用 Invoke
13. 调用特性
14. 过滤器 Filter
15. NIO 服务器
16. P2P 服务器
17. HTTP 服务器
18. 序列化 Serialization
19. 集群容错 Cluster
20. 优雅停机
21. 日志适配
22. 状态检查
23. 监控中心 Monitor
24. 管理中心 Admin
25. 运维命令 QOS
26. 链路追踪 Tracing
... 一共 69+ 篇
目前在知识星球更新了《Netty 源码解析》目录如下:
01. 调试环境搭建
02. NIO 基础
03. Netty 简介
04. 启动 Bootstrap
05. 事件轮询 EventLoop
06. 通道管道 ChannelPipeline
07. 通道 Channel
08. 字节缓冲区 ByteBuf
09. 通道处理器 ChannelHandler
10. 编解码 Codec
11. 工具类 Util
... 一共 61+ 篇
目前在知识星球更新了《数据库实体设计》目录如下:
01. 商品模块
02. 交易模块
03. 营销模块
04. 公用模块
... 一共 17+ 篇
源码不易↓↓↓↓↓
点赞支持老艿艿↓↓