Spring Boot 整合:Redis 延时队列的实现方案(基于有赞的设计)
>>号外:关注“Java精选”公众号,回复“面试资料”,免费领取资料!“”小程序,3000+ 道面试题在线刷,最新、最全 Java 面试题!
设计
业务流程
对象
任务状态
对外提供的接口
接口 | 描述 | 数据 |
---|---|---|
add | 添加任务 | Job数据 |
pop | 取出待处理任务 | topic就是任务分组 |
finish | 完成任务 | 任务ID |
delete | 删除任务 | 任务ID |
额外的内容
实现
任务及相关对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {
/**
* 延迟任务的唯一标识,用于检索任务
*/
@JsonSerialize(using = ToStringSerializer.class)
private Long id;
/**
* 任务类型(具体业务类型)
*/
private String topic;
/**
* 任务的延迟时间
*/
private long delayTime;
/**
* 任务的执行超时时间
*/
private long ttrTime;
/**
* 任务具体的消息内容,用于处理具体业务逻辑用
*/
private String message;
/**
* 重试次数
*/
private int retryCount;
/**
* 任务状态
*/
private JobStatus status;
}
@Data
@AllArgsConstructor
public class DelayJob implements Serializable {
/**
* 延迟任务的唯一标识
*/
private long jodId;
/**
* 任务的执行时间
*/
private long delayDate;
/**
* 任务类型(具体业务类型)
*/
private String topic;
public DelayJob(Job job) {
this.jodId = job.getId();
this.delayDate = System.currentTimeMillis() + job.getDelayTime();
this.topic = job.getTopic();
}
public DelayJob(Object value, Double score) {
this.jodId = Long.parseLong(String.valueOf(value));
this.delayDate = System.currentTimeMillis() + score.longValue();
}
}
容器
@Component
@Slf4j
public class JobPool {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "job.pool";
private BoundHashOperations getPool () {
BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
return ops;
}
/**
* 添加任务
* @param job
*/
public void addJob (Job job) {
log.info("任务池添加任务:{}", JSON.toJSONString(job));
getPool().put(job.getId(),job);
return ;
}
/**
* 获得任务
* @param jobId
* @return
*/
public Job getJob(Long jobId) {
Object o = getPool().get(jobId);
if (o instanceof Job) {
return (Job) o;
}
return null;
}
/**
* 移除任务
* @param jobId
*/
public void removeDelayJob (Long jobId) {
log.info("任务池移除任务:{}",jobId);
// 移除任务
getPool().delete(jobId);
}
}
@Slf4j
@Component
public class DelayBucket {
@Autowired
private RedisTemplate redisTemplate;
private static AtomicInteger index = new AtomicInteger(0);
@Value("${thread.size}")
private int bucketsSize;
private List <String> bucketNames = new ArrayList <>();
@Bean
public List <String> createBuckets() {
for (int i = 0; i < bucketsSize; i++) {
bucketNames.add("bucket" + i);
}
return bucketNames;
}
/**
* 获得桶的名称
* @return
*/
private String getThisBucketName() {
int thisIndex = index.addAndGet(1);
int i1 = thisIndex % bucketsSize;
return bucketNames.get(i1);
}
/**
* 获得桶集合
* @param bucketName
* @return
*/
private BoundZSetOperations getBucket(String bucketName) {
return redisTemplate.boundZSetOps(bucketName);
}
/**
* 放入延时任务
* @param job
*/
public void addDelayJob(DelayJob job) {
log.info("添加延迟任务:{}", JSON.toJSONString(job));
String thisBucketName = getThisBucketName();
BoundZSetOperations bucket = getBucket(thisBucketName);
bucket.add(job,job.getDelayDate());
}
/**
* 获得最新的延期任务
* @return
*/
public DelayJob getFirstDelayTime(Integer index) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
if (CollectionUtils.isEmpty(set)) {
return null;
}
ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
Object value = typedTuple.getValue();
if (value instanceof DelayJob) {
return (DelayJob) value;
}
return null;
}
/**
* 移除延时任务
* @param index
* @param delayJob
*/
public void removeDelayTime(Integer index,DelayJob delayJob) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
bucket.remove(delayJob);
}
}
@Component
@Slf4j
public class ReadyQueue {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "process.queue";
private String getKey(String topic) {
return NAME + topic;
}
/**
* 获得队列
* @param topic
* @return
*/
private BoundListOperations getQueue (String topic) {
BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
return ops;
}
/**
* 设置任务
* @param delayJob
*/
public void pushJob(DelayJob delayJob) {
log.info("执行队列添加任务:{}",delayJob);
BoundListOperations listOperations = getQueue(delayJob.getTopic());
listOperations.leftPush(delayJob);
}
/**
* 移除并获得任务
* @param topic
* @return
*/
public DelayJob popJob(String topic) {
BoundListOperations listOperations = getQueue(topic);
Object o = listOperations.leftPop();
if (o instanceof DelayJob) {
log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));
return (DelayJob) o;
}
return null;
}
}
轮询处理
@Component
public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {
@Autowired
private DelayBucket delayBucket;
@Autowired
private JobPool jobPool;
@Autowired
private ReadyQueue readyQueue;
@Value("${thread.size}")
private int length;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ExecutorService executorService = new ThreadPoolExecutor(
length,
length,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue <Runnable>());
for (int i = 0; i < length; i++) {
executorService.execute(
new DelayJobHandler(
delayBucket,
jobPool,
readyQueue,
i));
}
}
}
测试请求
/**
* 测试用请求
* @author daify
* @date 2019-07-29 10:26
**/
@RestController
@RequestMapping("delay")
public class DelayController {
@Autowired
private JobService jobService;
/**
* 添加
* @param request
* @return
*/
@RequestMapping(value = "add",method = RequestMethod.POST)
public String addDefJob(Job request) {
DelayJob delayJob = jobService.addDefJob(request);
return JSON.toJSONString(delayJob);
}
/**
* 获取
* @return
*/
@RequestMapping(value = "pop",method = RequestMethod.GET)
public String getProcessJob(String topic) {
Job process = jobService.getProcessJob(topic);
return JSON.toJSONString(process);
}
/**
* 完成一个执行的任务
* @param jobId
* @return
*/
@RequestMapping(value = "finish",method = RequestMethod.DELETE)
public String finishJob(Long jobId) {
jobService.finishJob(jobId);
return "success";
}
@RequestMapping(value = "delete",method = RequestMethod.DELETE)
public String deleteJob(Long jobId) {
jobService.deleteJob(jobId);
return "success";
}
}
测试
2019-08-12 21:21:36.589 INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
2019-08-12 21:21:36.609 INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:46.744 INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
2019-08-09 19:36:02.342 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}
2019-08-09 19:36:02.364 INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}
2019-08-09 19:36:02.384 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}
2019-08-12 21:21:48.239 INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:48.261 INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}
2019-08-12 21:21:54.880 INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool : 任务池移除任务:3
2019-08-12 21:21:59.104 INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}
作者:大·风
blog.csdn.net/qq330983778/article/details/99341671
【附源码】使用 ZooKeeper 实现分布式队列、分布式锁和选举详解!
2021 年 6 月程序员工资平均 15052 元,你有拖后腿吗?
Java 中如何实现 HTTP 断点续传,超大文件上传服务器,附源码!
MySQL 中别再用 OFFSET 和 LIMIT 分页了,试试这种方式!
Java 线程中为什么不推荐使用 stop()、suspend() 方法来中断线程?