这一讲,我们主要来讲延迟消息。
这一次我们结合业务来讲。
业务背景
在电商中,下单后,有一些用户付款后,会主动退款。也有一些用户下单后,未付款。但是这部分未付款的订单,会占用着商品库存。
我们目前的电商App,下单后,会在订单表创建对应的订单数据。这些订单的状态,有一些是未付款的,但是未付款的订单占用着商品库存。为了让商品库存能正常恢复,我们现在的处理方案是:
- 启动一个定时任务,每30分钟,定时扫描一遍订单表
- 如果订单是已付款,则跳过,不处理
- 如果订单是未付款,但未超过30分钟,不处理
- 如果订单是未付款,且超过30分钟,就取消订单
(补充:取消订单,其实就是下单的逆向流程)
方案缺点
这个方案有什么缺点?
- 第一,每次定时任务去扫描全部订单,但是订单未付款且超时30分钟的只有一小部分。就是做很多无用功。
- 第二,如果订单表的数量超级超级大,这个时候,扫描的时间巨长,浪费cpu资源。
- 第三,这样子频繁查询数据库,给数据库造成很多不必要的压力。
解决方案
那针对上述的缺点,我们有没有好的解决方案:
- 第一,避免扫描全表
- 第二,谁没付款,就去取消谁,不要做多余的动作
- 第三,要保证近实时取消订单。(近实时:1s左右)
说了这么多,我摊牌了,不装了,就是为了引入RocketMQ的延迟消息
简单总结一下:创建订单的时候,发送一条延时30分钟的消息。到30分钟后,消费者拿到信息,再去判断订单是否已付款,如果付款就跳过不处理,没付款,那就取消订单。
这种方案:没有多余的扫描数据库操作;谁没付款,就去取消谁。多好呀!在生产上,赶紧用起来。
生产者
上面,介绍的都是方法论,下面就是具体的实操环节了。
下面,简单用一个demo介绍一下生产者
1 | java复制代码public class Producer { |
这里强调一下,不是延迟发送哈,是延迟消费。发送是立马就发送的,只是消费的时候,延迟30分钟。
补充知识点
延迟级别是从1开始的,不是从0开始。然后你可能会发现,最多延迟2小时。如果你想延迟3小时,对不起,RocketMQ不支持。告辞!!!
消费者
1 | java复制代码public class Consumer { |
总结:延迟消费者和普通的消费者相同,一毛一样。延迟消息的核心点:生产者多了一个延迟级别。
知其然知其所以然
上面,你已经知道怎么使用了。
如果面试官问你:RocketMQ的延迟消息底层原理是什么?
那你接着看下去。
看图说话。
- 第一,生产者发送的消息,因为带了延迟级别,因此会被分发到叫SCHEDULE_TOPIC_XXXX的Topic中。里面有18的队列,一个队列对应着一个延迟级别。比如queueId=delayLevel-1。
- 第二,定时器,每100毫秒,扫描所有延迟级别里面的延迟消息message,如果消费时间已经大于当前时间,那定时器就会把延迟消息message,发送到真正的topic(就是代码写的topic,比如上面代码的:delayTopic),根据负载均衡策略,把message发送到具体某个队列。
- 第三,有消息后,消费者进行消息和后续处理。
上面这里,是一个总体流程图。
然后,我们对照代码,来进一步深刻认识一下。其实,就是加深理解。
第一步:生产者发送的消息到SCHEDULE_TOPIC_XXXX的topic
org.apache.rocketmq.store.CommitLog#putMessage
1 | ini复制代码 //真正的topic |
第二步:定时器扫描信息
- 1,org.apache.rocketmq.store.schedule.ScheduleMessageService#start
1 | kotlin复制代码public void start() { |
2,org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
1 | ini复制代码public void executeOnTimeup() { |
第三步: 消费者后续处理(略)
最后用一张图来总结
好了,写完了,下期见,拜拜。
有问题的话,欢迎留言交流。
每日一问
RocketMQ不支持自定义延迟时间,那Kafka支持延迟消息吗?如果支持,支持自定义延迟时间吗?如要你实现自定义延迟时间,你会怎么实现?说说你的思路
欢迎留言
后续文章
- RocketMQ-入门(已更新)
- RocketMQ-架构和角色(已更新)
- RocketMQ-消息发送(已更新)
- RocketMQ-消费信息
- RocketMQ-消费者的广播模式和集群模式(已更新)
- RocketMQ-顺序消息(已更新)
- RocketMQ-延迟消息(已更新)
- RocketMQ-批量消息
- RocketMQ-过滤消息
- RocketMQ-事务消息
- RocketMQ-消息存储
- RocketMQ-高可用
- RocketMQ-高性能
- RocketMQ-主从复制
- RocketMQ-刷盘机制
- RocketMQ-幂等性
- RocketMQ-消息重试
- RocketMQ-死信队列
…
欢迎各位入(guan)股(zhu),后续文章干货多多。
本文转载自: 掘金