查看原文
其他

分布式做系统 Elastic-Job-Lite 源码分析 —— 作业初始化

老艿艿 芋道源码 2019-05-13

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

 

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-init/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. 作业注册表

  • 3. 作业调度器

    • 3.1 创建

    • 3.2 初始化

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 作业初始化

涉及到主要类的类图如下( 打开大图 ):


你行好事会因为得到赞赏而愉悦 
同理,开源项目贡献者会因为 Star 而更加有动力 
为 Elastic-Job 点赞!传送门

2. 作业注册表

作业注册表( JobRegistry ),维护了单个 Elastic-Job-Lite 进程内作业相关信息,可以理解成其专属的 Spring IOC 容器。因此,其本身是一个单例

public final class JobRegistry {

    /**
     * 单例
     */

    private static volatile JobRegistry instance;
    /**
     * 作业调度控制器集合
     * key:作业名称
     */

    private Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap<>();
    /**
     * 注册中心集合
     * key:作业名称
     */

    private Map<String, CoordinatorRegistryCenter> regCenterMap = new ConcurrentHashMap<>();
    /**
     * 作业运行实例集合
     * key:作业名称
     */

    private Map<String, JobInstance> jobInstanceMap = new ConcurrentHashMap<>();
    /**
     * 运行中作业集合
     * key:作业名字
     */

    private Map<String, Boolean> jobRunningMap = new ConcurrentHashMap<>();
    /**
     * 作业总分片数量集合
     * key:作业名字
     */

    private Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap<>();

    /**
     * 获取作业注册表实例.
     * 
     * @return 作业注册表实例
     */

    public static JobRegistry getInstance() {
        if (null == instance) {
            synchronized (JobRegistry.class) {
                if (null == instance) {
                    instance = new JobRegistry();
                }
            }
        }
        return instance;
    }

    // .... 省略方法
}
  • instance 是一个单例,通过 #getInstance() 方法获取该单例。该单例的创建方式为双重检验锁模式

  • Map集合属性全部作业名称作为 KEY,通过作业名称,可以获得作业相关信息。

  • 省略的方法,下文在实际调用时,进行解析。

3. 作业调度器

作业调度器( JobScheduler ),创建并初始化后,进行作业调度。

Elastic-Job-Lite 使用 Quartz 作为调度内核。

3.1 创建

public class JobScheduler {
    /**
     * Lite作业配置
     */

    private final LiteJobConfiguration liteJobConfig;
    /**
     * 注册中心
     */

    private final CoordinatorRegistryCenter regCenter;
    /**
     * 调度器门面对象
     */

    @Getter
    private final SchedulerFacade schedulerFacade;
    /**
     * 作业门面对象
     */

    private final JobFacade jobFacade;

    public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final ElasticJobListener... elasticJobListeners) {
        this(regCenter, liteJobConfig, new JobEventBus(), elasticJobListeners);
    }

    public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig, 
                        final ElasticJobListener... elasticJobListeners) 
{
        this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
    }

    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        // 添加 作业运行实例
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        // 设置 Lite作业配置
        this.liteJobConfig = liteJobConfig;
        this.regCenter = regCenter;
        // 设置 作业监听器
        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        // 设置 调度器门面对象
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
        // 设置 作业门面对象
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }
}
  • 调用 #JobRegistry#addJobInstance() 方法添加作业运行实例( JobInstance )

    // JobRegistry.java
    /**
    * 作业运行实例集合
    * key:作业名称
    */

    private Map<String, JobInstance> jobInstanceMap = new ConcurrentHashMap<>();
    /**
    * 添加作业实例.
    *
    * @param jobName 作业名称
    * @param jobInstance 作业实例
    */

    public void addJobInstance(final String jobName, final JobInstance jobInstance) {
       jobInstanceMap.put(jobName, jobInstance);
    }

    // JobInstance.java
    public final class JobInstance {private static final String DELIMITER = "@-@";

    /**
     * 作业实例主键.
     */

    private final String jobInstanceId;

    public JobInstance() {
        jobInstanceId = IpUtils.getIp()
                + DELIMITER
                + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; // PID
    }
    }
    • jobInstanceId 格式:${IP}@-@${PID}。其中 PID 为进程编号。同一个 Elastic-Job-Lite 实例,不同的作业使用相同的作业实例主键。

  • 设置作业监听器,在《Elastic-Job-Lite 源码解析 —— 作业监听器》详细分享。

  • SchedulerFacade,为调度器提供内部服务的门面类。

    public final class SchedulerFacade {/**
     * 作业名称
     */

    private final String jobName;
    /**
     * 作业配置服务
     */

    private final ConfigurationService configService;
    /**
     * 作业分片服务
     */

    private final ShardingService shardingService;
    /**
     * 主节点服务
     */

    private final LeaderService leaderService;
    /**
     * 作业服务器服务
     */

    private final ServerService serverService;
    /**
     * 作业运行实例服务
     */

    private final InstanceService instanceService;
    /**
     * 执行作业服务
     */

    private final ExecutionService executionService;
    /**
     * 作业监控服务
     */

    private final MonitorService monitorService;
    /**
     * 调解作业不一致状态服务
     */

    private final ReconcileService reconcileService;
    /**
     * 作业注册中心的监听器管理者
     */

    private ListenerManager listenerManager;

    public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List&lt;ElasticJobListener&gt; elasticJobListeners) {
        this.jobName = jobName;
        // .... 省略 new XXXXX() 对象
    }
  • LiteJobFacade,为作业提供内部服务的门面类。

    public final class LiteJobFacade implements JobFacade {
        /**
         * 作业配置服务
         */

        private final ConfigurationService configService;
        /**
         * 作业分片服务
         */

        private final ShardingService shardingService;
        /**
         * 执行作业服务
         */

        private final ExecutionService executionService;
        /**
         * 作业运行时上下文服务
         */

        private final ExecutionContextService executionContextService;
        /**
         * 作业失效转移服务
         */

        private final FailoverService failoverService;
        /**
         * 作业监听器数组
         */

        private final List<ElasticJobListener> elasticJobListeners;
        /**
         * 作业事件总线
         */

        private final JobEventBus jobEventBus;public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List&lt;ElasticJobListener&gt; elasticJobListeners, final JobEventBus jobEventBus) {
        // .... 省略 new XXXXX() 对象
        failoverService = new FailoverService(regCenter, jobName);
        this.elasticJobListeners = elasticJobListeners;
        this.jobEventBus = jobEventBus;
    }
    }

SchedulerFacade 和 LiteJobFacade,看起来很相近,实际差别很大。它们分别为调度器、作业提供需要的方法。下文也会体现这一特点。

3.2 初始化

作业调度器创建后,调用 #init() 方法初始化,作业方开始调度。

/**
* 初始化作业.
*/

public void init() {
   // 更新 作业配置
   LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
   // 设置 当前作业分片总数
   JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
   // 创建 作业调度控制器
   JobScheduleController jobScheduleController = new JobScheduleController(
           createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
   // 添加 作业调度控制器
   JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
   // 注册 作业启动信息
   schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
   // 调度作业
   jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}

3.2.1 更新作业配置

// SchedulerFacade.java
/**
* 更新作业配置.
*
* @param liteJobConfig 作业配置
* @return 更新后的作业配置
*/

public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
   // 更新 作业配置
   configService.persist(liteJobConfig);
   // 读取 作业配置
   return configService.load(false);
}
  • 从《Elastic-Job 源码分析 —— 作业配置》的「3.2 持久化作业配置」,调用 ConfigService#persist(…) 方法也不一定会更新作业配置,因此调用 ConfigService#load(…) 方法返回的可能是本地的作业配置,也可能是注册中心存储的作业配置。

3.2.2 设置当前作业分片总数

// JobRegistry.java
private Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap<>();
/**
* 设置当前分片总数.
*
* @param jobName 作业名称
* @param currentShardingTotalCount 当前分片总数
*/

public void setCurrentShardingTotalCount(final String jobName, final int currentShardingTotalCount) {
   currentShardingTotalCountMap.put(jobName, currentShardingTotalCount);
}

3.2.3 创建作业调度控制器

public void init() {
   // .... 省略
   // 创建 作业调度控制器
   JobScheduleController jobScheduleController = new JobScheduleController(
           createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
   // .... 省略
}
  • JobScheduleController,作业调度控制器,提供对 Quartz 方法的封装:

    public final class JobScheduleController {/**
     * Quartz 调度器
     */

    private final Scheduler scheduler;
    /**
     * 作业信息
     */

    private final JobDetail jobDetail;
    /**
     * 触发器编号
     * 目前使用工作名字( jobName )
     */

    private final String triggerIdentity;

    public void scheduleJob(final String cron) {} // 调度作业
    public synchronized void rescheduleJob(final String cron) {} // 重新调度作业
    private CronTrigger createTrigger(final String cron) {} // 创建触发器
    public synchronized boolean isPaused() {} // 判断作业是否暂停
    public synchronized void pauseJob() {} // 暂停作业
    public synchronized void resumeJob() {} // 恢复作业
    public synchronized void triggerJob() {} // 立刻启动作业
    public synchronized void shutdown() {} // 关闭调度器
    }
  • 调用 #createScheduler() 方法创建 Quartz 调度器:

    // JobScheduler.java
    private Scheduler createScheduler() {
       Scheduler result;
       try {
           StdSchedulerFactory factory = new StdSchedulerFactory();
           factory.initialize(getBaseQuartzProperties());
           result = factory.getScheduler();
           result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
       } catch (final SchedulerException ex) {
           throw new JobSystemException(ex);
       }
       return result;
    }

    private Properties getBaseQuartzProperties() {
       Properties result = new Properties();
       result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
       result.put("org.quartz.threadPool.threadCount""1"); // Quartz 线程数:1
       result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
       result.put("org.quartz.jobStore.misfireThreshold""1");
       result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName()); // 作业关闭钩子
       result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString()); // 关闭时,清理所有资源
       return result;
    }
    • org.quartz.threadPool.threadCount = 1,即 Quartz 执行作业线程数量为 1。原因:一个作业( ElasticJob )的调度,需要配置独有的一个作业调度器( JobScheduler ),两者是 1 : 1 的关系。

    • org.quartz.plugin.shutdownhook.class 设置作业优雅关闭钩子:JobShutdownHookPlugin。

    • 触发器监听器( TriggerListener ),在《Elastic-Job-Lite 源码解析 —— 作业执行》详细分享。

  • 调用 #createJobDetail() 方法创建 Quartz 作业:

    // JobScheduler.java
    private JobDetail createJobDetail(final String jobClass) {
       // 创建 Quartz 作业
       JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
       //
       result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
       // 创建 Elastic-Job 对象
       Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
       if (elasticJobInstance.isPresent()) {
           result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
       } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
           try {
               result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
           } catch (final ReflectiveOperationException ex) {
               throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
           }
       }
       return result;
    }

    protected Optional<ElasticJob> createElasticJobInstance() {
       return Optional.absent();
    }

    // SpringJobScheduler.java
    @Override
    protected Optional<ElasticJob> createElasticJobInstance() {
       return Optional.fromNullable(elasticJob);
    }
    • 创建 Quartz 作业设置了 LiteJob 类,这样 Quartz 触发作业执行时,LiteJob 会去调用 Elastic-Job 作业对象。在《Elastic-Job-Lite 源码解析 —— 作业执行》详细分享。

    • 在 Spring 里,Elastic-Job 如果已经创建好注入到 SpringJobScheduler,无需进行创建。

    • Jodetail.jobDataMap 属性里添加了作业门面对象( LiteJobFacade )、Elastic-Job 对象,Quartz 触发作业时,会设置到 LiteJob 对象里。

3.2.4 注册作业启动信息

/**
* 注册作业启动信息.

* @param enabled 作业是否启用
*/

public void registerStartUpInfo(final boolean enabled) {
   // 开启 所有监听器
   listenerManager.startAllListeners();
   // 选举 主节点
   leaderService.electLeader();
   // 持久化 作业服务器上线信息
   serverService.persistOnline(enabled);
   // 持久化 作业运行实例上线相关信息
   instanceService.persistOnline();
   // 设置 需要重新分片的标记
   shardingService.setReshardingFlag();
   // 初始化 作业监听服务
   monitorService.listen();
   // 初始化 调解作业不一致状态服务
   if (!reconcileService.isRunning()) {
       reconcileService.startAsync();
   }
}
  • 开启所有监听器。每个功能模块都有其相应的监听器,在模块对应「文章」详细分享。

  • 选举主节点,在《Elastic-Job-Lite 源码解析 —— 主节点选举》详细分享。

  • 调用 ServerService#persistOnline() 方法,持久化作业服务器上线信息。

    public final class ServerService {
        /**
         * 持久化作业服务器上线信息.
         * 
         * @param enabled 作业是否启用
         */

        public void persistOnline(final boolean enabled) {
            if (!JobRegistry.getInstance().isShutdown(jobName)) {
                jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
            }
        }
    }
    • 当作业配置设置作业禁用时( LiteJobConfiguration.disabled = true ),作业调度但调度作业分片为空。不太好理解?《Elastic-Job-Lite 源码解析 —— 作业分片》详细分享。

  • 调用 InstanceService#persistOnline() 方法,持久化作业运行实例上线相关信息:

    public final class InstanceService {
        /**
         * 持久化作业运行实例上线相关信息.
         */

        public void persistOnline() {
            jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
        }
    }
  • 设置需要重新分片的标记,在《Elastic-Job-Lite 源码解析 —— 作业分片》详细分享。

  • 初始化作业监听服务,在《Elastic-Job-Lite 源码解析 —— 作业监控服务》详细分享。

  • 初始化调解作业不一致状态服务,在《Elastic-Job-Lite 源码解析 —— 自诊断修复》详细分享。

3.2.5 调度作业

// JobScheduler.java
public void init() {
   // .... 省略部分代码
   // 调度作业
   jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}

// JobScheduleController.java
/**
* 调度作业.
*
* @param cron CRON表达式
*/

public void scheduleJob(final String cron) {
   try {
       if (!scheduler.checkExists(jobDetail.getKey())) {
           scheduler.scheduleJob(jobDetail, createTrigger(cron));
       }
       scheduler.start();
   } catch (final SchedulerException ex) {
       throw new JobSystemException(ex);
   }
}
  • 调用 #scheduleJob() 方法后,该 Elastic-Job 作业开始被调度。

666. 彩蛋

作业初始化,如果你对 Quartz 不是特别了解,可以再看 Quartz 再重新理解。

下一篇,《Elastic-Job-Lite 源码解析 —— 作业执行》 起航!

道友,分享一波微信朋友圈支持支持支持,可好?




如果你对 Dubbo 感兴趣,欢迎加入我的知识星球一起交流。


知识星球



目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:

01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览

05. 拓展机制 SPI

06. 线程池

07. 服务暴露 Export

08. 服务引用 Refer

09. 注册中心 Registry

10. 动态编译 Compile

11. 动态代理 Proxy

12. 服务调用 Invoke

13. 调用特性 

14. 过滤器 Filter

15. NIO 服务器

16. P2P 服务器

17. HTTP 服务器

18. 序列化 Serialization

19. 集群容错 Cluster

20. 优雅停机

21. 日志适配

22. 状态检查

23. 监控中心 Monitor

24. 管理中心 Admin

25. 运维命令 QOS

26. 链路追踪 Tracing

...
一共 60 篇++


源码不易↓↓↓↓

点赞支持老艿艿↓↓

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存