查看原文
其他

源码分析ElasticJob启动流程(基于Spring)

丁威 中间件兴趣圈 2022-11-10

本文主要目的:简单梳理了基于Spring ElasticJob的启动流程,从下文开始,将重点剖析ElasticJob的核心实现细节,例如选主、分片、失效转移机制等等。

Spring中使用Elastic-Job

1<job:simple id="areaSyncJob" class="full class path"
2     registry-center-ref="regCenter" cron="${elastic.exp.job.gisAMapArea.cron}"
3     disabled="${elastic.exp.job.gisAMapArea.disabled}"
4     sharding-total-count="${elastic.exp.job.areaSyncJob.shardingtotalcount}"
5     sharding-item-parameters="${elastic.exp.job.areaSyncJob.shardingitemparameters}"
6     overwrite="true" job-parameter="1" monitor-execution="true"
7    description="高德行政区域数据同步" event-trace-rdb-data-source="dataSource" />

从上篇中我们知道该标签的解析类为Si-mpleJobBeanDefinitionParser,最终会构建SpringJobScheduler实例,并在初始化实例后调用init方法,开始Job任务的生命周期(启动、运行、调度)。

Spring启动流程核心类图

  • JobScheduler ElasticJob调度任务管理实例

  • LiteJobConfiguration liteJobConfig job配置文件

  • CoordinatorRegistryCenter regCe-nter 分布式协调注册中心

  • SchedulerFacade schedulerFaca-de 任务调度门面类

  • JobFacade jobFacade job门面类

  • JobScheduleController

    quartz job封装类,封装了quartz a-pi,包括调度任务、重新调度任务、暂停任务、恢复任务、触发任务,是ElasticJob与Quartz的桥梁

Spring启动序列图

  • AbstractJobBeanDefinitionParse调用parseInternal()方法解析标签

  • 在SimpleJobBeanDefinitionParser类中调用SprinJobScheduler的init()方法

  • 第二步实际调用的是SpringJobSch-eduler的父类JobScheduler的init()方法。

  • 利用StdSchedulerFactory创建Qua-rtz的调度器Scheduler

  • 创建Quartz的JobDetail示例

  • 根据Scheduler、JobDetail、jobna-me创建JobScheduleController实例

  • 注册启动信息,ElasticJob的任务服务器启动流程就在这里定义,下文详细分析

  • 启动调度任务,受Quartz框架的定时调度

作业服务器启动流程

上面第7步,ElasticJob注册启动信息,其源码如下:
SchedulerFacade#registerStartUpInfo

1/**
2     * 注册作业启动信息.
3     * 
4     * @param enabled 作业是否启用
5     */

6    public void registerStartUpInfo(final boolean enabled) {
7        listenerManager.startAllListeners();
8        leaderService.electLeader();
9        serverService.persistOnline(enabled);
10        instanceService.persistOnline();
11        shardingService.setReshardingFlag();
12        monitorService.listen();
13        if (!reconcileService.isRunning()) {
14            reconcileService.startAsync();
15        }
16    }
  • 启动ElasticJob所有zk事件监听管理器。

  • 选主。

  • 注册并持久化作业服务器信息。

  • 注册并持久化作业运行实例信息。

  • 设置是否需要重新分片。

  • 启动调解分布式作业不一致状态服务。

ElasticJob事件监听管理器类图


  • ElectionListenerManager:主节点选举监听管理器

  • ShardingListenerManager:分片监听管理器。

  • FailoverListenerManager:失效转移监听管理器。

  • MonitorExecutionListenerManager
    幂等性监听管理器。

  • ShutdownListenerManager:运行实例关闭监听管理器。

  • TriggerListenerManager:作业触发监听管理器。

  • RescheduleListenerManager:重调度监听管理器。

  • GuaranteeListenerManager:保证分布式任务全部开始和结束状态监听管理器。

本文就到此为止,从下篇文章开始将重点介绍分布式调度任务所需要解决的问题的实现原理,例如如何选主、分片、失效转移等。


更多文章请关注微信公众号:


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存