公众号:Java小咖秀,网站:javaxks.com
作者:大风的博客, 链接: blog.csdn.net/qq330983778…
设计
之前学习 Redis 的时候发现有赞团队之前分享过一篇关于延时队列的设计:有赞延时队列 现在就尝试实现一下
业务流程
首先我们分析下这个流程
- 用户提交任务。首先将任务推送至延迟队列中。
- 延迟队列接收到任务后,首先将任务推送至 job pool 中,然后计算其执行时间。
- 然后生成延迟任务(仅仅包含任务 id)放入某个桶中
- 时间组件时刻轮询各个桶,当时间到达的时候从 job pool 中获得任务元信息。
- 监测任务的合法性如果已经删除则 pass。继续轮询。如果任务合法则再次计算时间
- 如果合法则计算时间,如果时间合法:根据 topic 将任务放入对应的 ready queue,然后从 bucket 中移除。如果时间不合法,则重新计算时间再次放入 bucket,并移除之前的 bucket 中的内容
- 消费端轮询对应 topic 的 ready queue。获取 job 后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的 job 按照其设定的 TTR,重新计算执行时间,并将其放入 bucket。
- 完成消费后,发送 finish 消息,服务端根据 job id 删除对应信息。
用户任务池延时任务时间循环待完成任务提交任务提交延时任务轮询任务任务已经到达时间用户领取任务设置其完成超时时间, 然后保存进延时任务中任务超时任务完成或者任务删除检测到任务不存在队列中移除用户任务池延时任务时间循环待完成任务
对象
我们现在可以了解到中间存在的几个组件
- 延迟队列,为 Redis 延迟队列。实现消息传递
- Job pool 任务池保存 job 元信息。根据文章描述使用 K/V 的数据结构,key 为 ID,value 为 job
- Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个 Bucket 可以知道其并没有使用 topic 来区分,个人这里默认使用顺序插入
- Timer 时间组件,负责扫描各个 Bucket。根据文章描述存在多个 Timer,但是同一个 Timer 同一时间只能扫描一个 Bucket
- Ready Queue 负责存放需要被完成的任务,但是根据描述根据 Topic 的不同存在多个 Ready Queue。
其中 Timer 负责轮询,Job pool、Delay Bucket、Ready Queue 都是不同职责的集合。
任务状态
- ready:可执行状态,
- delay:不可执行状态,等待时钟周期。
- reserved:已被消费者读取,但没有完成消费。
- deleted:已被消费完成或者已被删除。
对外提供的接口
接口 | 描述 | 数据 |
---|---|---|
add | 添加任务 | Job 数据 |
pop | 取出待处理任务 | topic 就是任务分组 |
finish | 完成任务 | 任务 ID |
delete | 删除任务 | 任务 ID |
额外的内容
- 首先根据状态状态描述,finish 和 delete 操作都是将任务设置成 deleted 状态。
- 根据文章描述的操作,在执行 finish 或者 delete 的操作的时候任务已经从元数据中移除,此时 deleted 状态可能只存在极短时间,所以实际实现中就直接删除了。
- 文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。
- 文章中因为使用了集群,所以使用 redis 的 setnx 锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。
实现
现在我们根据设计内容完成设计。这一块设计我们分四步完成
任务及相关对象
目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delay job)
任务对象
1 | less复制代码@Data |
任务引用对象
1 | kotlin复制代码@Data |
容器
目前我们需要完成三个容器的创建,Job 任务池、延迟任务容器、待完成任务容器
job 任务池,为普通的 K/V 结构,提供基础的操作
1 | java复制代码@Component |
延迟任务,使用可排序的 ZSet 保存数据,提供取出最小值等操作
1 | typescript复制代码@Slf4j |
待完成任务,内部使用 topic 进行细分,每个 topic 对应一个 list 集合
1 | typescript复制代码@Component |
轮询处理
设置了线程池为每个 bucket 设置一个轮询操作
1 | java复制代码@Component |
测试请求
1 | typescript复制代码/** |
测试
添加延迟任务
通过 postman 请求:localhost:8000/delay/add
此时这条延时任务被添加进了线程池中
1 | arduino复制代码2019-08-12 21:21:36.589 INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000} |
根据设置 10 秒钟之后任务会被添加至 ReadyQueue 中
1 | ini复制代码2019-08-12 21:21:46.744 INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test) |
获得任务
这时候我们请求 localhost:8000/delay/pop
这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在 DelayBucket 中
1 | arduino复制代码2019-08-09 19:36:02.342 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"} |
按照设计在 30 秒后,任务假如没有被消费将会重新放置在 ReadyQueue 中
1 | arduino复制代码2019-08-12 21:21:48.239 INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"} |
任务的删除 / 消费
现在我们请求:localhost:8000/delay/delete
此时在 Job pool 中此任务将会被移除,此时元数据已经不存在,但任务还在 DelayBucket 中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。
1 | arduino复制代码2019-08-12 21:21:54.880 INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool : 任务池移除任务:3 |
本篇文章涉及的源码下载地址:gitee.com/daifyutils/…
本文转载自: 掘金