这是我参与11月更文挑战的第10天,活动详情查看:2021最后一次更文挑战
一、死信队列
死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。
一般来说,生产者将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。
死信消息来源:
- 消息 TTL 过期
- 队列满了,无法再次添加数据
- 消息被拒绝(reject 或 nack),并且 requeue =false
二、死信实战(基础版)
在这种,我们使用基础版来完成死信队列的几种情况的演示。
定义一个生产者 向 普通交换机下的普通队列发送消息。
定义一个消费者向普通交换机下的普通队列消费消息,当它消息满足三大条件之一时,消息就发送到死信交换机下的死信队列。
死信交换机也不是什么特殊交换机,是自己命名的,只是专门用来接收死信消息。
2.1 发送 TTL 消息
消费者1:消费普通消息,如果普通消息它过期了,就将它转发到死信队列中。
1 | java复制代码public class Consumer1 { |
生产者:
1 | java复制代码public class Producer { |
先启动 消费者1 然后创建了声明。再关闭消费者1,开始发送带有 ttl 的消息。最后可以发现,20条消息全都被发送到死信队列里面了。
2.2 队列满了
消费者1添加以下代码,添加正常队列的长度限制。当消息超过 5条之后,再过来的消息就会变成死信队列。
1 | javascript复制代码 arguments.put("x-max-length",5); |
实现的效果如下:
2.3 消息被拒
1 | vbnet复制代码 DeliverCallback deliverCallback =(String a, Delivery b)->{ |
三、死信实战(SpringBoot 版)
在这小节种,通过 Springboot 项目再次熟悉下这三大场景。
业务流程:
- 1、正常业务消息被投递到正常业务的 Exchange,该 Exchange 根据路由键将消息路由绑定到正常队列
- 2、正常的消息变成死信消息之后,会被自动投递到该队列绑定的死信交换机上;
- 3、死信交换机收到消息后,将消息根据路由规则路由到指定的死信队列
- 4、消息到达死信队列后,可监听该死信队列,处理死信消息。
3.1 消息被拒
1、配置死信队列config
1 | typescript复制代码@Configuration |
2、生产 yml配置
1 | yaml复制代码# 服务端口 |
3、生产者发送消息
1 | less复制代码@Slf4j |
4、消费者 yml
1 | yaml复制代码# 服务端口 |
5、消费者
1 | java复制代码 @RabbitListener(queues = DeadConfig.normal_queue_name) |
在浏览器发送消息之后,被拒绝的消息进入到 死信队列中。
3.2 队列满了
在普通队列中添加 x-max-length
然后将之前的队列删除,重新启动完生产者和消费者,生成新的队列,关闭消费者后,让生产者发送消息,这时候没有消费者,队列在两条的时候会满了,然后多余的信息会发送到死信队列里面。如图所示:
该队列的详情信息也可以点进去看:
3.3 发送 TTL 消息
设置队列消息的过期时间 x-message-ttl ,即该时间到了之后该条消息就会被发送到 死信队列中。
先清除之前的队列,然后配置这个参数,设为 5秒
在发送完消息,该条消息在该队列中待了5秒之后进入死信队列中
总结
死信队列可以实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会丢失。
这篇文章还是入门级别的对这死信队列做了不同场景的演示,这种业务场景是要求消息必须可靠的,通过死信队列为这些队列的消息的可靠性提供保障。
本文转载自: 掘金