查看原文
其他

使用 SpringBoot2.X 实现 Quartz 动态任务的分布式调度

MarkerHub 2022-11-04

小Hub领读:

常见的分布式定时任务框架:elastic-job、xxl-job、quartz ,你都熟悉吗,今天先来学一下Quartz吧!


作者:Sam Sho

https://blog.csdn.net/u012228718/article/details/90041675

见名知意,该篇番外主要是要解决如下几个问题:

1、使用 SpringBoot2.x 版本集成 Quartz

2、Quartz 的任务动态实现:

  • 调度任务可以通过页面进行新增、删除、启动、暂定等操作

  • 任务数据使用数据库保存

  • 任务之间实现简单的依赖

3、Quartz 实现分布式调度,使用其本身提供的基于数据库的实现

SpringBoot2 集成 Quartz

1、SpringBoot 不同的版本对于 Quartz 的集成有一定的差别,本文使用 2.1.2.RELEASE 版本。其实通过分析 SpringBoot 对于 Quartz 的自动化配置源码,也有助于我们理解 Quartz 的使用

2、SpringBoot-2.1.2.RELEASE 版本已经集成了对于 Quartz 的自动化配置,其源码路径为 org.springframework.boot.autoconfigure.quartz

集成简单实现

Pom 依赖

  1. # Web工程

  2. <dependency>

  3. <groupId>org.springframework.boot</groupId>

  4. <artifactId>spring-boot-starter-web</artifactId>

  5. </dependency>


  6. # quartz

  7. <dependency>

  8. <groupId>org.springframework.boot</groupId>

  9. <artifactId>spring-boot-starter-quartz</artifactId>

  10. </dependency>


  11. # 数据库JDBC

  12. <dependency>

  13. <groupId>org.springframework.boot</groupId>

  14. <artifactId>spring-boot-starter-jdbc</artifactId>

  15. </dependency>


  16. # 使用MySql

  17. <dependency>

  18. <groupId>mysql</groupId>

  19. <artifactId>mysql-connector-java</artifactId>

  20. <scope>runtime</scope>

  21. </dependency>

编码功能实现

1、由于 Springboot2 的自动化配置,不需要做任何配置,直接写 JobDetail、Trigger、Job 即可实现

  1. # Job 实现

  2. @DisallowConcurrentExecution

  3. public class DemoJob extends QuartzJobBean {

  4. @Override

  5. protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

  6. System.out.println("~~ DemoJob 启动运行汇总~~");

  7. }

  8. }


  9. # JobDetail、Trigger Bean配置

  10. @Configuration

  11. public class QuartzJobConfig {


  12. @Bean

  13. public JobDetailFactoryBean jobDetailFactoryBean() {

  14. JobDetailFactoryBean jobDetail = new JobDetailFactoryBean();

  15. jobDetail.setName("DemoJob");

  16. jobDetail.setGroup("DemoJob_Group");

  17. jobDetail.setJobClass(DemoJob.class);

  18. jobDetail.setDurability(true);

  19. return jobDetail;

  20. }


  21. @Bean

  22. public CronTriggerFactoryBean cronTriggerFactoryBean() {

  23. CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();

  24. trigger.setJobDetail(jobDetailFactoryBean().getObject());

  25. trigger.setCronExpression("*/10 * * * * ?");

  26. trigger.setName("DemoJob");

  27. trigger.setMisfireInstruction(0);


  28. return trigger;

  29. }

  30. }

2、这样就实现了 SpringBoot2.x 版本集成 Quartz 功能,在进行下一步之前,我们先对自动化配置的源码简单分析一下。

自动配置实现分析

SpringBoot 关于 Quartz 的自动配置的类一共有 6 个,分别为:

1、 JobStoreType:是一个枚举类,定义了 jobsStore 的类型,基于内存和数据库

2、 QuartzAutoConfiguration:自动配置类,配置了 Quartz 的主要组件,如 SchedulerFactoryBean

3、 QuartzDataSource:是一个注解类。如果需要注入 Quartz 配置的数据库操作类,需要使用此注解标注。参考 QuartzAutoConfiguration中的用法

4、 QuartzDataSourceInitializer:该类主要用于数据源初始化后的一些操作,根据不同平台类型的数据库进行选择不同的数据库脚本

5、 QuartzProperties:该类对应了在 application.yml配置文件以 spring.quartz开头的相关配置

6、 SchedulerFactoryBeanCustomizer:在自动配置的基础上自定义配置需要实现的此接口。

自动化配置分析 QuartzAutoConfiguration

1、初始化注入任务以及配置:构造函数实现

  • 注入了属性配置文件类: QuartzProperties

  • 注入了自定义扩展配置: SchedulerFactoryBeanCustomizer

  • 注入了 Quartz 的任务组件: JobDetail、 Trigger、 Calendar。所以我们只需要进行 JobDetail、Trigger Bean 配置,会自动注入进 QuartzAutoConfiguration类中,这边是通过 ObjectProvider的使用实现的。

  1. public QuartzAutoConfiguration(QuartzProperties properties,

  2. ObjectProvider<SchedulerFactoryBeanCustomizer> customizers,

  3. ObjectProvider<JobDetail[]> jobDetails,

  4. ObjectProvider<Map<String, Calendar>> calendars,

  5. ObjectProvider<Trigger[]> triggers, ApplicationContext applicationContext) {

  6. this.properties = properties;

  7. this.customizers = customizers;

  8. this.jobDetails = jobDetails.getIfAvailable();

  9. this.calendars = calendars.getIfAvailable();

  10. this.triggers = triggers.getIfAvailable();

  11. this.applicationContext = applicationContext;

  12. }

2、配置 SchedulerFactoryBean 的详细信息。这个类是一个 FactoryBean

  • 配置 JobFactory,内部设置了 applicationContext与 spring 容器结合

  • 配置各种属性,是通过 QuartzProperties类实现

  • 配置注入进来的 JobDetail、 Trigger、 Calendar

  • 配置自定配置,是通过 SchedulerFactoryBeanCustomizer实现。这边包括自定义,也包括基于数据库实现的 JobStore配置。

  1. @Bean

  2. @ConditionalOnMissingBean

  3. public SchedulerFactoryBean quartzScheduler() {


  4. # 配置 `JobFactory`

  5. SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();

  6. SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();

  7. jobFactory.setApplicationContext(this.applicationContext);

  8. schedulerFactoryBean.setJobFactory(jobFactory);


  9. # 开始配置各种属性

  10. if (this.properties.getSchedulerName() != null) {

  11. schedulerFactoryBean.setSchedulerName(this.properties.getSchedulerName());

  12. }

  13. schedulerFactoryBean.setAutoStartup(this.properties.isAutoStartup());

  14. schedulerFactoryBean

  15. .setStartupDelay((int) this.properties.getStartupDelay().getSeconds());

  16. schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(

  17. this.properties.isWaitForJobsToCompleteOnShutdown());

  18. schedulerFactoryBean

  19. .setOverwriteExistingJobs(this.properties.isOverwriteExistingJobs());

  20. if (!this.properties.getProperties().isEmpty()) {

  21. schedulerFactoryBean

  22. .setQuartzProperties(asProperties(this.properties.getProperties()));

  23. }


  24. # 配置 jobDetails、triggers等

  25. if (this.jobDetails != null && this.jobDetails.length > 0) {

  26. schedulerFactoryBean.setJobDetails(this.jobDetails);

  27. }

  28. if (this.calendars != null && !this.calendars.isEmpty()) {

  29. schedulerFactoryBean.setCalendars(this.calendars);

  30. }

  31. if (this.triggers != null && this.triggers.length > 0) {

  32. schedulerFactoryBean.setTriggers(this.triggers);

  33. }


  34. # 自定义配置

  35. customize(schedulerFactoryBean);

  36. return schedulerFactoryBean;

  37. }

3、基于数据库实现的 JobStore 配置,内部类 JdbcStoreTypeConfiguration

  • @ConditionalOnSingleCandidate(DataSource.class) 指定 pring 容器中有且只有一个指明的 DataSourceBean 时生效

  • 通过 dataSourceCustomizer方法实现 schedulerFactoryBean的数据库相关配置,该方法返回一个 SchedulerFactoryBeanCustomizer

  • 配置 QuartzDataSourceInitializer 数据库初始化 Bean

4、通过后置工厂处理器 DataSourceInitializerSchedulerDependencyPostProcessor 实现对于 QuartzDataSourceInitializer这个 Bean 的依赖关系(dependsOn)

  1. @Bean

  2. @Order(0)

  3. public SchedulerFactoryBeanCustomizer dataSourceCustomizer(

  4. QuartzProperties properties, DataSource dataSource,

  5. @QuartzDataSource ObjectProvider<DataSource> quartzDataSource,

  6. ObjectProvider<PlatformTransactionManager> transactionManager) {


  7. return (schedulerFactoryBean) -> {

  8. # 判断是否为 JobStore

  9. if (properties.getJobStoreType() == JobStoreType.JDBC) {

  10. # 获取 DataSource

  11. DataSource dataSourceToUse = getDataSource(dataSource, quartzDataSource);


  12. # 配置 DataSource 和 TransactionManager管理

  13. schedulerFactoryBean.setDataSource(dataSourceToUse);

  14. PlatformTransactionManager txManager = transactionManager.getIfUnique();

  15. if (txManager != null) {

  16. schedulerFactoryBean.setTransactionManager(txManager);

  17. }

  18. }

  19. };

  20. }

支持功能配置 QuartzProperties

1、 @ConfigurationProperties("spring.quartz") 以 spring.quartz开头的配置

2、SpringBoot 已经做了相应的默认值处理,即使不做任何配置,也是没有问题的。

3、比较简单,直接贴码。属性的具体含义,任务调度框架(3)Quartz 简单使用

  1. spring:

  2. quartz:

  3. scheduler-name: springboot-quartz-jdbc-dynamic

  4. auto-startup: false

  5. startup-delay: 5s

  6. overwrite-existing-jobs: false

  7. wait-for-jobs-to-complete-on-shutdown: true

  8. job-store-type: memory

  9. # jdbc:

  10. # initialize-schema: embedded

  11. # schema: classpath:org/quartz/impl/jdbcjobstore/tables_@@platform@@.sql

  12. # comment-prefix: --

  13. properties: {

  14. org.quartz.scheduler.instanceName: springboot-quartz-jdbc-dynamic,

  15. org.quartz.scheduler.instanceId: AUTO,

  16. org.quartz.threadPool.class: org.springframework.scheduling.quartz.SimpleThreadPoolTaskExecutor,

  17. org.quartz.threadPool.threadCount: 25,

  18. org.quartz.threadPool.threadPriority: 5,

  19. org.quartz.jobStore.misfireThreshold: 60000,

  20. # org.quartz.jobStore.tablePrefix: QRTZ_,

  21. # org.quartz.jobStore.isClustered: true,

  22. # org.quartz.jobStore.clusterCheckinInterval: 20000,

  23. # org.quartz.jobStore.maxMisfiresToHandleAtATime: 1,

  24. # org.quartz.jobStore.txIsolationLevelSerializable: false

  25. }

小结

1、至此,我们完成了 SpringBoot 与 Quartz 的集成,并且简单运行了我们的调度任务

2、简单分析了 SpringBoot 对于 Quartz 的自动配置,由于各个版本的差别,这边使用的是 SpringBoot-2.1.2.RELEASE

Quartz 实现分布式调度

回顾分析

任务调度框架(4)Quartz 分布式实现 已经对 Quartz 自身的分布式实现做了简单的介绍,这边主要基于 SpringBoot 怎么做。

配置简单实现

1、上述完成的 SpringBoot 与 Quartz 的集成,可以看到有几个先关的配置:

  • job-store-type 可以选择 JDBC完成分布式 JdbcJobStore切换

  • jdbc.XXX 主要是对于初始化 SQL 的配置。树妖是对于 quartz 提供的 11 张表的初始化 sql

  • 对于 JdbcJobStore 的一些特殊配置,如表前缀、集群指定、数据库检查等,基于 RamJobStore时,这些是不允许配置的。

  1. spring:

  2. quartz:

  3. job-store-type: memory

  4. jdbc:

  5. initialize-schema: embedded

  6. schema: classpath:org/quartz/impl/jdbcjobstore/tables_@@platform@@.sql

  7. comment-prefix: --

  8. properties: {

  9. org.quartz.jobStore.misfireThreshold: 60000,

  10. org.quartz.jobStore.tablePrefix: QRTZ_,

  11. org.quartz.jobStore.isClustered: true,

  12. org.quartz.jobStore.clusterCheckinInterval: 20000,

  13. org.quartz.jobStore.maxMisfiresToHandleAtATime: 1,

  14. org.quartz.jobStore.txIsolationLevelSerializable: false

  15. }

2、以上就配置好了 Quartz 实现分布式调度,就是这么简单

3、【注意】在尝试的时候, jdbc.xxx配置没有生效,个人是自己手动初始化的表。

Quartz 的任务动态实现

1、以上我们简单完成了 SpringBoot 集成与基于 JDBC 的分布式,但是我们的任务还是基于 Bean 配置的:

  • 新增任务需要手动硬编码,增加 JobDetail、 Trigger的 Bean 配置

  • 上线后的任务无法修改,需要修改代码,停止应用

2、所以,所谓的动态任务主要是三个问题:

  • 任务数据使用数据库保存,包括任务的基本信息与 trigger 信息

  • 调度任务可以通过页面进行新增、修改、删除、启动、暂停、重启等操作

  • 任务之间实现简单的依赖,如 A 任务依赖于 B 任务,那么 A 任务必须等到 B 任务执行完成才会自动执行

数据使用数据库保存

1、简单,把任务调度分为两个模块:

  • 基本任务(BatchTask)与任务计划(BatchSchedule),BatchTask 与 BatchSchedule 是一对多关系,代替 Quartz 中 jobGroup 的概念。

  • 任务计划(BatchSchedule) 中可能需要用到配置的一些参数,定义任务计划参数(BatchScheduleParam)

2、具体的实体如下,数据库相关表结构略

  • 基本任务(BatchTask)

  1. public class BatchTask extends AbstractDataEntity {

  2. /**

  3. * 任务编码:唯一

  4. */

  5. private String code;


  6. /**

  7. * 任务名称

  8. */

  9. private String name;


  10. /**

  11. * 任务描述

  12. */

  13. private String description;


  14. /**

  15. * 前置任务

  16. */

  17. private List<BatchTask> previous;

  18. }

  • 任务计划(BatchSchedule)

  1. public class BatchSchedule extends AbstractDataEntity {

  2. /**

  3. * 计划编码

  4. */

  5. private String code;


  6. /**

  7. * 计划名称

  8. */

  9. private String name;


  10. /**

  11. * 计划状态: 整个生命周期状态

  12. */

  13. private Integer status;


  14. /**

  15. * 执行表达式类型

  16. */

  17. private Integer cronType;


  18. /**

  19. * 执行表达式

  20. */

  21. private String cronExpression;


  22. /**

  23. * 描述

  24. */

  25. private String description;


  26. /**

  27. * 处理业务类

  28. */

  29. private String interfaceName;


  30. /**

  31. * 任务编码(任务组的概念)

  32. */

  33. private String taskCode;


  34. /**

  35. * 开始时间(最近)

  36. */

  37. private Date startDate;


  38. /**

  39. * 结束时间(最近)

  40. */

  41. private Date endDate;


  42. /**

  43. * 前置计划列表

  44. */

  45. private List<BatchSchedule> dependencies;


  46. /**

  47. * 参数列表

  48. */

  49. private List<BatchScheduleParam> params;

  50. }

  • 计划参数(BatchScheduleParam)

  1. public class BatchScheduleParam {


  2. /**

  3. * 任务计划ID

  4. */

  5. private String scheduleId;


  6. /**

  7. * 任务计划code

  8. */

  9. private String scheduleCode;


  10. /**

  11. * 参数名

  12. */

  13. private String paramName;


  14. /**

  15. * 参数值

  16. */

  17. private String paramValue;

  18. }

3、有了这些实体,无非就是根据实体对基本任务与任务计划做 CRUD,这个比较简单,不赘述。

任务计划的动态管理

手动配置实现的原理

1、手动配置是需要编码实现 JobDetailFactoryBean、 CronTriggerFactoryBean,然后通过 SpringBoot 的自动化配置,设置到
schedulerFactoryBean对象的对应属性中。

  1. schedulerFactoryBean.setJobDetails(this.jobDetails);

  2. schedulerFactoryBean.setTriggers(this.triggers);

2、 SchedulerFactoryBean源码中,通过 afterPropertiesSet()方法中方法,注册到 Scheduler对象中

  • 最终核心是通过 Scheduler.scheduleJob(jobDetail,trigger);添加

  1. // Initialize the Scheduler instance...

  2. this.scheduler = prepareScheduler(prepareSchedulerFactory());

  3. try {

  4. registerListeners();

  5. registerJobsAndTriggers(); # 注册JobsAndTriggers

  6. }


  7. protected void registerJobsAndTriggers() throws SchedulerException {


  8. // Register JobDetails.

  9. if (this.jobDetails != null) {

  10. for (JobDetail jobDetail : this.jobDetails) {

  11. addJobToScheduler(jobDetail);

  12. }

  13. }


  14. // Register Triggers.

  15. if (this.triggers != null) {

  16. for (Trigger trigger : this.triggers) {

  17. addTriggerToScheduler(trigger);

  18. }

  19. }

  20. }


  21. private boolean addJobToScheduler(JobDetail jobDetail) throws SchedulerException {

  22. if (this.overwriteExistingJobs || getScheduler().getJobDetail(jobDetail.getKey()) == null) {

  23. # 最终是通过Scheduler.addJob(jobDetail, true); 添加

  24. getScheduler().addJob(jobDetail, true);

  25. return true;

  26. }

  27. else {

  28. return false;

  29. }

  30. }


  31. private boolean addTriggerToScheduler(Trigger trigger) throws SchedulerException {

  32. # 最终是通过Scheduler.scheduleJob(jobDetail, trigger); 添加(只是一部分功能)

  33. getScheduler().scheduleJob(jobDetail, trigger);

  34. }

动态管理:创建计划任务引擎类

1、 Scheduler 在 SpringBoot 中已经通过 SchedulerFactoryBean自动配置好了,直接注入即可使用。 2、具体可以 参考源码

  1. public class QuartzScheduleEngine {



  2. @Autowired

  3. private Scheduler scheduler;



  4. /**

  5. * 新增计划任务: 主要是添加 jobDetail 和 trigger

  6. *

  7. * @param batchSchedule

  8. */

  9. public Date addJob(BatchSchedule batchSchedule) throws Exception {


  10. String cronExpression = batchSchedule.getCronExpression();

  11. String name = batchSchedule.getCode();

  12. String group = batchSchedule.getTaskCode();

  13. String interfaceName = batchSchedule.getInterfaceName();


  14. // 校验数据

  15. this.checkNotNull(batchSchedule);


  16. // 添加 1-JobDetail

  17. // 校验 JobDetail 是否存在

  18. JobKey jobKey = JobKey.jobKey(name, group);

  19. if (scheduler.checkExists(jobKey)) {

  20. if (Strings.isNullOrEmpty(cronExpression)) {

  21. // 已经存在并且执行一次,立即执行

  22. scheduler.triggerJob(jobKey);

  23. } else {

  24. throw new Exception("任务计划 JobKey 已经在执行队列中,不需要重复启动");

  25. }

  26. } else {


  27. // 构建 JobDetail

  28. Class<? extends Job> jobClazz = (Class<? extends Job>) Class.forName(interfaceName);

  29. JobDetail jobDetail = JobBuilder.newJob(jobClazz).withIdentity(jobKey).build();

  30. jobDetail.getJobDataMap().put(BatchSchedule.SCHEDULE_KEY, batchSchedule.toString());


  31. // 添加 2-Trigger

  32. // 校验 Trigger 是否存在

  33. TriggerKey triggerKey = TriggerKey.triggerKey(name, group);

  34. Trigger trigger = scheduler.getTrigger(triggerKey);

  35. if (Objects.nonNull(trigger)) {

  36. throw new Exception("任务计划 Trigger 已经在执行队列中,不需要重复启动");

  37. }


  38. // 构建 Trigger

  39. trigger = getTrigger(cronExpression, triggerKey);


  40. return scheduler.scheduleJob(jobDetail, trigger);

  41. }


  42. return new Date();

  43. }



  44. /**

  45. * 修改

  46. *

  47. * @param batchSchedule

  48. */

  49. public void updateCronExpression(BatchSchedule batchSchedule) throws Exception {

  50. updateJobCronExpression(batchSchedule);

  51. }



  52. /**

  53. * 更新Job的执行表达式

  54. *

  55. * @param batchSchedule

  56. * @throws SchedulerException

  57. */

  58. public Date updateJobCronExpression(BatchSchedule batchSchedule) throws SchedulerException {

  59. checkNotNull(batchSchedule);


  60. String name = batchSchedule.getCode();

  61. String group = batchSchedule.getTaskCode();

  62. TriggerKey triggerKey = TriggerKey.triggerKey(name, group);

  63. // 在队列中才需要修改

  64. if (scheduler.checkExists(triggerKey)) {

  65. // 构建 Trigger

  66. String cronExpression = batchSchedule.getCronExpression();

  67. Trigger trigger = this.getTrigger(cronExpression, triggerKey);

  68. return scheduler.rescheduleJob(triggerKey, trigger);

  69. }

  70. return null;

  71. }


  72. /**

  73. * 构建 Trigger

  74. *

  75. * @param cronExpression

  76. * @param triggerKey

  77. * @return

  78. */

  79. private Trigger getTrigger(String cronExpression, TriggerKey triggerKey) {

  80. Trigger trigger;

  81. if (Strings.isNullOrEmpty(cronExpression.trim())) {

  82. trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).build();

  83. } else {

  84. cronExpression = cronExpression.replaceAll("#", " ");

  85. CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);

  86. trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();

  87. }

  88. return trigger;

  89. }


  90. /**

  91. * 暂停计划任务

  92. *

  93. * @param batchSchedule

  94. */

  95. public void pauseJob(BatchSchedule batchSchedule) throws Exception {

  96. checkNotNull(batchSchedule);

  97. JobKey jobKey = JobKey.jobKey(batchSchedule.getCode(), batchSchedule.getTaskCode());

  98. if (!scheduler.checkExists(jobKey)) {

  99. throw new Exception("任务计划不在执行队列中,不能暂停");

  100. }

  101. scheduler.pauseJob(jobKey);

  102. }


  103. /**

  104. * 从暂停中恢复

  105. *

  106. * @param batchSchedule

  107. */

  108. public void resumeJob(BatchSchedule batchSchedule) throws Exception {

  109. checkNotNull(batchSchedule);

  110. JobKey jobKey = JobKey.jobKey(batchSchedule.getCode(), batchSchedule.getTaskCode());

  111. if (!scheduler.checkExists(jobKey)) {

  112. throw new Exception("任务计划不在执行队列中,不能恢复");

  113. }


  114. scheduler.resumeJob(jobKey);

  115. }


  116. /**

  117. * 删除计划任务

  118. *

  119. * @param batchSchedule

  120. */

  121. public boolean deleteJob(BatchSchedule batchSchedule) throws SchedulerException {

  122. boolean flag = true;

  123. checkNotNull(batchSchedule);

  124. JobKey jobKey = JobKey.jobKey(batchSchedule.getCode(), batchSchedule.getTaskCode());

  125. if (scheduler.checkExists(jobKey)) {

  126. flag = scheduler.deleteJob(jobKey);

  127. }


  128. return flag;

  129. }


  130. /**

  131. * 添加任务监听

  132. *

  133. * @param jobListener

  134. * @param matcher

  135. * @throws SchedulerException

  136. */

  137. public void addJobListener(JobListener jobListener, Matcher<JobKey> matcher) throws SchedulerException {

  138. scheduler.getListenerManager().addJobListener(jobListener, matcher);

  139. }


  140. /**

  141. * 执行一次(可用于测试)

  142. *

  143. * @param batchSchedule

  144. */

  145. public void runJobOnce(BatchSchedule batchSchedule) throws SchedulerException {

  146. checkNotNull(batchSchedule);

  147. JobKey jobKey = JobKey.jobKey(batchSchedule.getCode(), batchSchedule.getTaskCode());

  148. scheduler.triggerJob(jobKey);

  149. }


  150. private void checkNotNull(BatchSchedule batchSchedule) {

  151. Preconditions.checkNotNull(batchSchedule, "计划为空");

  152. Preconditions.checkState(!StringUtils.isEmpty(batchSchedule.getCode()), "计划编号为空");

  153. Preconditions.checkState(!StringUtils.isEmpty(batchSchedule.getTaskCode()), "计划所属任务为空");

  154. Preconditions.checkState(!StringUtils.isEmpty(batchSchedule.getInterfaceName()), "任务执行业务类为空");

  155. }



  156. public SchedulerMetaData getMetaData() throws SchedulerException {

  157. SchedulerMetaData metaData = scheduler.getMetaData();

  158. return metaData;

  159. }

  160. }

任务状态与计划依赖

1、使用 JobListener实现,需要自定义配置的支持

  1. public class CustomGlobalJobListener extends JobListenerSupport {



  2. @Override

  3. public String getName() {

  4. return this.getClass().getName();

  5. }


  6. /**

  7. * Scheduler 在 JobDetail 将要被执行时调用这个方法。

  8. *

  9. * @param context

  10. */

  11. @Override

  12. public void jobToBeExecuted(JobExecutionContext context) {

  13. getLog().debug("计划 {} : ~~~ 【RUNNING】 更新正在运行中状态 ~~~ ");

  14. }


  15. /**

  16. * Scheduler 在 JobDetail 即将被执行,但又被 TriggerListener 否决了时调用这个方法

  17. *

  18. * @param context

  19. */

  20. @Override

  21. public void jobExecutionVetoed(JobExecutionContext context) {

  22. }


  23. /**

  24. * Scheduler 在 JobDetail 被执行之后调用这个方法

  25. *

  26. * @param context

  27. * @param jobException

  28. */

  29. @Override

  30. public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {

  31. getLog().debug("计划 {} : ~~~ 【COMPLETE | ERROR】 更新已经结束状态 ~~~ ");


  32. // 唤醒子任务

  33. batchScheduleService.notifyChildren(scheduleJob);

  34. }

  35. }

2、自定义实现,可以实现 SchedulerFactoryBeanCustomizer接口

  1. @Configuration

  2. public class SchedulerFactoryBeanCustomizerConfig implements SchedulerFactoryBeanCustomizer {


  3. @Bean

  4. public CustomGlobalJobListener globalJobListener() {

  5. return new CustomGlobalJobListener();

  6. }



  7. @Override

  8. public void customize(SchedulerFactoryBean schedulerFactoryBean) {

  9. schedulerFactoryBean.setGlobalJobListeners(globalJobListener());

  10. }

  11. }

3、计划依赖:

  • 如计划有依赖其他计划,则该计划一般不允许手动运行,需要等待所依赖的计划完成后在监听器中自动唤醒

  • 目前只简单实现了单个计划依赖,没有实现复杂功能。后期可以扩展:多计划依赖,依赖排序等功能。

小结

至此,Quartz 的任务动态实现已经完成,主要可以分为三个部分:

1、任务与计划定义,使用数据库保存

2、动态计划的管理,使用 Quartz 本身的 API 实现

3、任务计划状态监控,使用 JobListener监听器实现

4、计划依赖,使用 JobListener监听器实现。

参考

1、lotso-web:使用 SpringBoot2.X 实现 Quartz 动态任务的分布式调度

2、源码地址 LearningJobSchedule



(完)

MarkerHub文章索引:(点击阅读原文直达)

https://github.com/MarkerHub/JavaIndex


【推荐阅读】

从集成到ACK、消息重试、死信队列,Kafka你知多少?

精选41 道 Spring Boot 面试题,附答案!

为什么要前后端分离?这些接口规范你都懂吗?

SpringBoot-Vue 前后端分离开发首秀


好文!必须点赞

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存