使用 SpringBoot2.X 实现 Quartz 动态任务的分布式调度
小Hub领读:
常见的分布式定时任务框架:elastic-job、xxl-job、quartz ,你都熟悉吗,今天先来学一下Quartz吧!
作者:Sam Sho
https://blog.csdn.net/u012228718/article/details/90041675
见名知意,该篇番外主要是要解决如下几个问题:
1、使用 SpringBoot2.x 版本集成 Quartz
2、Quartz 的任务动态实现:
调度任务可以通过页面进行新增、删除、启动、暂定等操作
任务数据使用数据库保存
任务之间实现简单的依赖
3、Quartz 实现分布式调度,使用其本身提供的基于数据库的实现
SpringBoot2 集成 Quartz
1、SpringBoot 不同的版本对于 Quartz 的集成有一定的差别,本文使用 2.1.2.RELEASE
版本。其实通过分析 SpringBoot 对于 Quartz 的自动化配置源码,也有助于我们理解 Quartz 的使用
2、SpringBoot-2.1.2.RELEASE 版本已经集成了对于 Quartz 的自动化配置,其源码路径为 org.springframework.boot.autoconfigure.quartz
集成简单实现
Pom 依赖
# Web工程
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
# quartz
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
# 数据库JDBC
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
# 使用MySql
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
编码功能实现
1、由于 Springboot2 的自动化配置,不需要做任何配置,直接写 JobDetail、Trigger、Job 即可实现
# Job 实现
@DisallowConcurrentExecution
public class DemoJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
System.out.println("~~ DemoJob 启动运行汇总~~");
}
}
# JobDetail、Trigger Bean配置
@Configuration
public class QuartzJobConfig {
@Bean
public JobDetailFactoryBean jobDetailFactoryBean() {
JobDetailFactoryBean jobDetail = new JobDetailFactoryBean();
jobDetail.setName("DemoJob");
jobDetail.setGroup("DemoJob_Group");
jobDetail.setJobClass(DemoJob.class);
jobDetail.setDurability(true);
return jobDetail;
}
@Bean
public CronTriggerFactoryBean cronTriggerFactoryBean() {
CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();
trigger.setJobDetail(jobDetailFactoryBean().getObject());
trigger.setCronExpression("*/10 * * * * ?");
trigger.setName("DemoJob");
trigger.setMisfireInstruction(0);
return trigger;
}
}
2、这样就实现了 SpringBoot2.x 版本集成 Quartz 功能,在进行下一步之前,我们先对自动化配置的源码简单分析一下。
自动配置实现分析
SpringBoot 关于 Quartz 的自动配置的类一共有 6 个,分别为:
1、 JobStoreType
:是一个枚举类,定义了 jobsStore 的类型,基于内存和数据库
2、 QuartzAutoConfiguration
:自动配置类,配置了 Quartz 的主要组件,如 SchedulerFactoryBean
3、 QuartzDataSource
:是一个注解类。如果需要注入 Quartz 配置的数据库操作类,需要使用此注解标注。参考 QuartzAutoConfiguration
中的用法
4、 QuartzDataSourceInitializer
:该类主要用于数据源初始化后的一些操作,根据不同平台类型的数据库进行选择不同的数据库脚本
5、 QuartzProperties
:该类对应了在 application.yml
配置文件以 spring.quartz
开头的相关配置
6、 SchedulerFactoryBeanCustomizer
:在自动配置的基础上自定义配置需要实现的此接口。
自动化配置分析 QuartzAutoConfiguration
1、初始化注入任务以及配置:构造函数实现
注入了属性配置文件类:
QuartzProperties
注入了自定义扩展配置:
SchedulerFactoryBeanCustomizer
注入了 Quartz 的任务组件:
JobDetail
、Trigger
、Calendar
。所以我们只需要进行JobDetail、Trigger
Bean 配置,会自动注入进QuartzAutoConfiguration
类中,这边是通过ObjectProvider
的使用实现的。
public QuartzAutoConfiguration(QuartzProperties properties,
ObjectProvider<SchedulerFactoryBeanCustomizer> customizers,
ObjectProvider<JobDetail[]> jobDetails,
ObjectProvider<Map<String, Calendar>> calendars,
ObjectProvider<Trigger[]> triggers, ApplicationContext applicationContext) {
this.properties = properties;
this.customizers = customizers;
this.jobDetails = jobDetails.getIfAvailable();
this.calendars = calendars.getIfAvailable();
this.triggers = triggers.getIfAvailable();
this.applicationContext = applicationContext;
}
2、配置 SchedulerFactoryBean
的详细信息。这个类是一个 FactoryBean
。
配置
JobFactory
,内部设置了applicationContext
与 spring 容器结合配置各种属性,是通过
QuartzProperties
类实现配置注入进来的
JobDetail
、Trigger
、Calendar
配置自定配置,是通过
SchedulerFactoryBeanCustomizer
实现。这边包括自定义,也包括基于数据库实现的JobStore
配置。
@Bean
@ConditionalOnMissingBean
public SchedulerFactoryBean quartzScheduler() {
# 配置 `JobFactory`
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
jobFactory.setApplicationContext(this.applicationContext);
schedulerFactoryBean.setJobFactory(jobFactory);
# 开始配置各种属性
if (this.properties.getSchedulerName() != null) {
schedulerFactoryBean.setSchedulerName(this.properties.getSchedulerName());
}
schedulerFactoryBean.setAutoStartup(this.properties.isAutoStartup());
schedulerFactoryBean
.setStartupDelay((int) this.properties.getStartupDelay().getSeconds());
schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(
this.properties.isWaitForJobsToCompleteOnShutdown());
schedulerFactoryBean
.setOverwriteExistingJobs(this.properties.isOverwriteExistingJobs());
if (!this.properties.getProperties().isEmpty()) {
schedulerFactoryBean
.setQuartzProperties(asProperties(this.properties.getProperties()));
}
# 配置 jobDetails、triggers等
if (this.jobDetails != null && this.jobDetails.length > 0) {
schedulerFactoryBean.setJobDetails(this.jobDetails);
}
if (this.calendars != null && !this.calendars.isEmpty()) {
schedulerFactoryBean.setCalendars(this.calendars);
}
if (this.triggers != null && this.triggers.length > 0) {
schedulerFactoryBean.setTriggers(this.triggers);
}
# 自定义配置
customize(schedulerFactoryBean);
return schedulerFactoryBean;
}
3、基于数据库实现的 JobStore
配置,内部类 JdbcStoreTypeConfiguration
@ConditionalOnSingleCandidate(DataSource.class)
指定 pring 容器中有且只有一个指明的DataSource
Bean 时生效通过
dataSourceCustomizer
方法实现schedulerFactoryBean
的数据库相关配置,该方法返回一个SchedulerFactoryBeanCustomizer
。配置
QuartzDataSourceInitializer
数据库初始化 Bean
4、通过后置工厂处理器 DataSourceInitializerSchedulerDependencyPostProcessor
实现对于 QuartzDataSourceInitializer
这个 Bean 的依赖关系(dependsOn)
@Bean
@Order(0)
public SchedulerFactoryBeanCustomizer dataSourceCustomizer(
QuartzProperties properties, DataSource dataSource,
@QuartzDataSource ObjectProvider<DataSource> quartzDataSource,
ObjectProvider<PlatformTransactionManager> transactionManager) {
return (schedulerFactoryBean) -> {
# 判断是否为 JobStore
if (properties.getJobStoreType() == JobStoreType.JDBC) {
# 获取 DataSource
DataSource dataSourceToUse = getDataSource(dataSource, quartzDataSource);
# 配置 DataSource 和 TransactionManager管理
schedulerFactoryBean.setDataSource(dataSourceToUse);
PlatformTransactionManager txManager = transactionManager.getIfUnique();
if (txManager != null) {
schedulerFactoryBean.setTransactionManager(txManager);
}
}
};
}
支持功能配置 QuartzProperties
1、 @ConfigurationProperties("spring.quartz")
以 spring.quartz
开头的配置
2、SpringBoot 已经做了相应的默认值处理,即使不做任何配置,也是没有问题的。
3、比较简单,直接贴码。属性的具体含义,任务调度框架(3)Quartz 简单使用
spring:
quartz:
scheduler-name: springboot-quartz-jdbc-dynamic
auto-startup: false
startup-delay: 5s
overwrite-existing-jobs: false
wait-for-jobs-to-complete-on-shutdown: true
job-store-type: memory
# jdbc:
# initialize-schema: embedded
# schema: classpath:org/quartz/impl/jdbcjobstore/tables_@@platform@@.sql
# comment-prefix: --
properties: {
org.quartz.scheduler.instanceName: springboot-quartz-jdbc-dynamic,
org.quartz.scheduler.instanceId: AUTO,
org.quartz.threadPool.class: org.springframework.scheduling.quartz.SimpleThreadPoolTaskExecutor,
org.quartz.threadPool.threadCount: 25,
org.quartz.threadPool.threadPriority: 5,
org.quartz.jobStore.misfireThreshold: 60000,
# org.quartz.jobStore.tablePrefix: QRTZ_,
# org.quartz.jobStore.isClustered: true,
# org.quartz.jobStore.clusterCheckinInterval: 20000,
# org.quartz.jobStore.maxMisfiresToHandleAtATime: 1,
# org.quartz.jobStore.txIsolationLevelSerializable: false
}
小结
1、至此,我们完成了 SpringBoot 与 Quartz 的集成,并且简单运行了我们的调度任务
2、简单分析了 SpringBoot 对于 Quartz 的自动配置,由于各个版本的差别,这边使用的是 SpringBoot-2.1.2.RELEASE
Quartz 实现分布式调度
回顾分析
任务调度框架(4)Quartz 分布式实现 已经对 Quartz 自身的分布式实现做了简单的介绍,这边主要基于 SpringBoot 怎么做。
配置简单实现
1、上述完成的 SpringBoot 与 Quartz 的集成,可以看到有几个先关的配置:
job-store-type
可以选择JDBC
完成分布式JdbcJobStore
切换jdbc.XXX
主要是对于初始化 SQL 的配置。树妖是对于 quartz 提供的 11 张表的初始化 sql对于
JdbcJobStore
的一些特殊配置,如表前缀、集群指定、数据库检查等,基于RamJobStore
时,这些是不允许配置的。
spring:
quartz:
job-store-type: memory
jdbc:
initialize-schema: embedded
schema: classpath:org/quartz/impl/jdbcjobstore/tables_@@platform@@.sql
comment-prefix: --
properties: {
org.quartz.jobStore.misfireThreshold: 60000,
org.quartz.jobStore.tablePrefix: QRTZ_,
org.quartz.jobStore.isClustered: true,
org.quartz.jobStore.clusterCheckinInterval: 20000,
org.quartz.jobStore.maxMisfiresToHandleAtATime: 1,
org.quartz.jobStore.txIsolationLevelSerializable: false
}
2、以上就配置好了 Quartz 实现分布式调度,就是这么简单
3、【注意】在尝试的时候, jdbc.xxx
配置没有生效,个人是自己手动初始化的表。
Quartz 的任务动态实现
1、以上我们简单完成了 SpringBoot 集成与基于 JDBC 的分布式,但是我们的任务还是基于 Bean 配置的:
新增任务需要手动硬编码,增加
JobDetail
、Trigger
的 Bean 配置上线后的任务无法修改,需要修改代码,停止应用
2、所以,所谓的动态任务主要是三个问题:
任务数据使用数据库保存,包括任务的基本信息与 trigger 信息
调度任务可以通过页面进行新增、修改、删除、启动、暂停、重启等操作
任务之间实现简单的依赖,如 A 任务依赖于 B 任务,那么 A 任务必须等到 B 任务执行完成才会自动执行
数据使用数据库保存
1、简单,把任务调度分为两个模块:
基本任务(BatchTask)与任务计划(BatchSchedule),BatchTask 与 BatchSchedule 是一对多关系,代替 Quartz 中 jobGroup 的概念。
任务计划(BatchSchedule) 中可能需要用到配置的一些参数,定义任务计划参数(BatchScheduleParam)
2、具体的实体如下,数据库相关表结构略
基本任务(BatchTask)
public class BatchTask extends AbstractDataEntity {
/**
* 任务编码:唯一
*/
private String code;
/**
* 任务名称
*/
private String name;
/**
* 任务描述
*/
private String description;
/**
* 前置任务
*/
private List<BatchTask> previous;
}
任务计划(BatchSchedule)
public class BatchSchedule extends AbstractDataEntity {
/**
* 计划编码
*/
private String code;
/**
* 计划名称
*/
private String name;
/**
* 计划状态: 整个生命周期状态
*/
private Integer status;
/**
* 执行表达式类型
*/
private Integer cronType;
/**
* 执行表达式
*/
private String cronExpression;
/**
* 描述
*/
private String description;
/**
* 处理业务类
*/
private String interfaceName;
/**
* 任务编码(任务组的概念)
*/
private String taskCode;
/**
* 开始时间(最近)
*/
private Date startDate;
/**
* 结束时间(最近)
*/
private Date endDate;
/**
* 前置计划列表
*/
private List<BatchSchedule> dependencies;
/**
* 参数列表
*/
private List<BatchScheduleParam> params;
}
计划参数(BatchScheduleParam)
public class BatchScheduleParam {
/**
* 任务计划ID
*/
private String scheduleId;
/**
* 任务计划code
*/
private String scheduleCode;
/**
* 参数名
*/
private String paramName;
/**
* 参数值
*/
private String paramValue;
}
3、有了这些实体,无非就是根据实体对基本任务与任务计划做 CRUD,这个比较简单,不赘述。
任务计划的动态管理
手动配置实现的原理
1、手动配置是需要编码实现 JobDetailFactoryBean
、 CronTriggerFactoryBean
,然后通过 SpringBoot 的自动化配置,设置到schedulerFactoryBean
对象的对应属性中。
schedulerFactoryBean.setJobDetails(this.jobDetails);
schedulerFactoryBean.setTriggers(this.triggers);
2、 SchedulerFactoryBean
源码中,通过 afterPropertiesSet()
方法中方法,注册到 Scheduler
对象中
最终核心是通过
Scheduler.scheduleJob(jobDetail,trigger);
添加
// Initialize the Scheduler instance...
this.scheduler = prepareScheduler(prepareSchedulerFactory());
try {
registerListeners();
registerJobsAndTriggers(); # 注册JobsAndTriggers
}
protected void registerJobsAndTriggers() throws SchedulerException {
// Register JobDetails.
if (this.jobDetails != null) {
for (JobDetail jobDetail : this.jobDetails) {
addJobToScheduler(jobDetail);
}
}
// Register Triggers.
if (this.triggers != null) {
for (Trigger trigger : this.triggers) {
addTriggerToScheduler(trigger);
}
}
}
private boolean addJobToScheduler(JobDetail jobDetail) throws SchedulerException {
if (this.overwriteExistingJobs || getScheduler().getJobDetail(jobDetail.getKey()) == null) {
# 最终是通过Scheduler.addJob(jobDetail, true); 添加
getScheduler().addJob(jobDetail, true);
return true;
}
else {
return false;
}
}
private boolean addTriggerToScheduler(Trigger trigger) throws SchedulerException {
# 最终是通过Scheduler.scheduleJob(jobDetail, trigger); 添加(只是一部分功能)
getScheduler().scheduleJob(jobDetail, trigger);
}
动态管理:创建计划任务引擎类
1、 Scheduler
在 SpringBoot 中已经通过 SchedulerFactoryBean
自动配置好了,直接注入即可使用。 2、具体可以 参考源码
public class QuartzScheduleEngine {
@Autowired
private Scheduler scheduler;
/**
* 新增计划任务: 主要是添加 jobDetail 和 trigger
*
* @param batchSchedule
*/
public Date addJob(BatchSchedule batchSchedule) throws Exception {
String cronExpression = batchSchedule.getCronExpression();
String name = batchSchedule.getCode();
String group = batchSchedule.getTaskCode();
String interfaceName = batchSchedule.getInterfaceName();
// 校验数据
this.checkNotNull(batchSchedule);
// 添加 1-JobDetail
// 校验 JobDetail 是否存在
JobKey jobKey = JobKey.jobKey(name, group);
if (scheduler.checkExists(jobKey)) {
if (Strings.isNullOrEmpty(cronExpression)) {
// 已经存在并且执行一次,立即执行
scheduler.triggerJob(jobKey);
} else {
throw new Exception("任务计划 JobKey 已经在执行队列中,不需要重复启动");
}
} else {
// 构建 JobDetail
Class<? extends Job> jobClazz = (Class<? extends Job>) Class.forName(interfaceName);
JobDetail jobDetail = JobBuilder.newJob(jobClazz).withIdentity(jobKey).build();
jobDetail.getJobDataMap().put(BatchSchedule.SCHEDULE_KEY, batchSchedule.toString());
// 添加 2-Trigger
// 校验 Trigger 是否存在
TriggerKey triggerKey = TriggerKey.triggerKey(name, group);
Trigger trigger = scheduler.getTrigger(triggerKey);
if (Objects.nonNull(trigger)) {
throw new Exception("任务计划 Trigger 已经在执行队列中,不需要重复启动");
}
// 构建 Trigger
trigger = getTrigger(cronExpression, triggerKey);
return scheduler.scheduleJob(jobDetail, trigger);
}
return new Date();
}
/**
* 修改
*
* @param batchSchedule
*/
public void updateCronExpression(BatchSchedule batchSchedule) throws Exception {
updateJobCronExpression(batchSchedule);
}
/**
* 更新Job的执行表达式
*
* @param batchSchedule
* @throws SchedulerException
*/
public Date updateJobCronExpression(BatchSchedule batchSchedule) throws SchedulerException {
checkNotNull(batchSchedule);
String name = batchSchedule.getCode();
String group = batchSchedule.getTaskCode();
TriggerKey triggerKey = TriggerKey.triggerKey(name, group);
// 在队列中才需要修改
if (scheduler.checkExists(triggerKey)) {
// 构建 Trigger
String cronExpression = batchSchedule.getCronExpression();
Trigger trigger = this.getTrigger(cronExpression, triggerKey);
return scheduler.rescheduleJob(triggerKey, trigger);
}
return null;
}
/**
* 构建 Trigger
*
* @param cronExpression
* @param triggerKey
* @return
*/
private Trigger getTrigger(String cronExpression, TriggerKey triggerKey) {
Trigger trigger;
if (Strings.isNullOrEmpty(cronExpression.trim())) {
trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).build();
} else {
cronExpression = cronExpression.replaceAll("#", " ");
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
}
return trigger;
}
/**
* 暂停计划任务
*
* @param batchSchedule
*/
public void pauseJob(BatchSchedule batchSchedule) throws Exception {
checkNotNull(batchSchedule);
JobKey jobKey = JobKey.jobKey(batchSchedule.getCode(), batchSchedule.getTaskCode());
if (!scheduler.checkExists(jobKey)) {
throw new Exception("任务计划不在执行队列中,不能暂停");
}
scheduler.pauseJob(jobKey);
}
/**
* 从暂停中恢复
*
* @param batchSchedule
*/
public void resumeJob(BatchSchedule batchSchedule) throws Exception {
checkNotNull(batchSchedule);
JobKey jobKey = JobKey.jobKey(batchSchedule.getCode(), batchSchedule.getTaskCode());
if (!scheduler.checkExists(jobKey)) {
throw new Exception("任务计划不在执行队列中,不能恢复");
}
scheduler.resumeJob(jobKey);
}
/**
* 删除计划任务
*
* @param batchSchedule
*/
public boolean deleteJob(BatchSchedule batchSchedule) throws SchedulerException {
boolean flag = true;
checkNotNull(batchSchedule);
JobKey jobKey = JobKey.jobKey(batchSchedule.getCode(), batchSchedule.getTaskCode());
if (scheduler.checkExists(jobKey)) {
flag = scheduler.deleteJob(jobKey);
}
return flag;
}
/**
* 添加任务监听
*
* @param jobListener
* @param matcher
* @throws SchedulerException
*/
public void addJobListener(JobListener jobListener, Matcher<JobKey> matcher) throws SchedulerException {
scheduler.getListenerManager().addJobListener(jobListener, matcher);
}
/**
* 执行一次(可用于测试)
*
* @param batchSchedule
*/
public void runJobOnce(BatchSchedule batchSchedule) throws SchedulerException {
checkNotNull(batchSchedule);
JobKey jobKey = JobKey.jobKey(batchSchedule.getCode(), batchSchedule.getTaskCode());
scheduler.triggerJob(jobKey);
}
private void checkNotNull(BatchSchedule batchSchedule) {
Preconditions.checkNotNull(batchSchedule, "计划为空");
Preconditions.checkState(!StringUtils.isEmpty(batchSchedule.getCode()), "计划编号为空");
Preconditions.checkState(!StringUtils.isEmpty(batchSchedule.getTaskCode()), "计划所属任务为空");
Preconditions.checkState(!StringUtils.isEmpty(batchSchedule.getInterfaceName()), "任务执行业务类为空");
}
public SchedulerMetaData getMetaData() throws SchedulerException {
SchedulerMetaData metaData = scheduler.getMetaData();
return metaData;
}
}
任务状态与计划依赖
1、使用 JobListener
实现,需要自定义配置的支持
public class CustomGlobalJobListener extends JobListenerSupport {
@Override
public String getName() {
return this.getClass().getName();
}
/**
* Scheduler 在 JobDetail 将要被执行时调用这个方法。
*
* @param context
*/
@Override
public void jobToBeExecuted(JobExecutionContext context) {
getLog().debug("计划 {} : ~~~ 【RUNNING】 更新正在运行中状态 ~~~ ");
}
/**
* Scheduler 在 JobDetail 即将被执行,但又被 TriggerListener 否决了时调用这个方法
*
* @param context
*/
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
}
/**
* Scheduler 在 JobDetail 被执行之后调用这个方法
*
* @param context
* @param jobException
*/
@Override
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
getLog().debug("计划 {} : ~~~ 【COMPLETE | ERROR】 更新已经结束状态 ~~~ ");
// 唤醒子任务
batchScheduleService.notifyChildren(scheduleJob);
}
}
2、自定义实现,可以实现 SchedulerFactoryBeanCustomizer
接口
@Configuration
public class SchedulerFactoryBeanCustomizerConfig implements SchedulerFactoryBeanCustomizer {
@Bean
public CustomGlobalJobListener globalJobListener() {
return new CustomGlobalJobListener();
}
@Override
public void customize(SchedulerFactoryBean schedulerFactoryBean) {
schedulerFactoryBean.setGlobalJobListeners(globalJobListener());
}
}
3、计划依赖:
如计划有依赖其他计划,则该计划一般不允许手动运行,需要等待所依赖的计划完成后在监听器中自动唤醒
目前只简单实现了单个计划依赖,没有实现复杂功能。后期可以扩展:多计划依赖,依赖排序等功能。
小结
至此,Quartz 的任务动态实现已经完成,主要可以分为三个部分:
1、任务与计划定义,使用数据库保存
2、动态计划的管理,使用 Quartz 本身的 API 实现
3、任务计划状态监控,使用 JobListener
监听器实现
4、计划依赖,使用 JobListener
监听器实现。
参考
1、lotso-web:使用 SpringBoot2.X 实现 Quartz 动态任务的分布式调度
2、源码地址 LearningJobSchedule
(完)
MarkerHub文章索引:(点击阅读原文直达)
https://github.com/MarkerHub/JavaIndex
【推荐阅读】