前言
在前几篇文章中,主要认识了下 RabbitMQ 的组成和基本使用,并通过 SpringBoot 工程整合了 RabbitMQ,做了一个完整的 Demo。一般说,引入了新的中间件,数据的风险性就又要多一层考虑,那么 Rabbitmq 的消息它是怎么知道它有没有被消费者消费的呢?生产者又怎么确保自己发送成功了呢,我们在这篇文章中将对这些问题进行演示学习。
一、为什么要进行消息确认?
在mq 中,消费者和生产者并不直接进行通信,生产者只负责把消息发送到队列,消费者只负责从队列获取消息(不管是push还是pull)。
- 消费者从队列中获取到消息之后,这条消息就不存在队列中了,但是如果此时消费者所在的信道因为网络中断没有消费到,那这条消息就被永远的丢失了,所以,我们希望等待消费者成功消费掉这个消息之后再删除这条消息。
- 而在发送消息的时候也是这样的,生产者发消息给交换机,也不能保证消息准确发送过去了,消息就像石沉大海一样,所以这样需要一个消息确认。
这个机制就是 消息确认机制。
二、消息确认流程
在流程图中,我们可与看到消息确认是分为生产者确认和消费者确认的。
这两个机制都是受到 TCP 协议的启发,它们对数据安全非常重要。
补充:
- 在RabbitMQ 中有两种事务机制来确保消息的安全送达,分别是事务机制和确认机制。事务机制需要每个消息或每组消息发布提交的通道设置为事务性的,非常耗费性能,降低了消息吞吐量。因此,实际中通常采用确认机制即可。
三、生产者确认
由生产者发送到 consumer 的链路为 producer -> broker -> exchange -> queue -> consumer 。
在编码时我们可以用两个选项用来控制消息投递的可靠性:
- 消息从 producer 到 RabbitMQ broker cluster 成功,则会返回一个
confirmCallback
; - 消息从 exchange 到 queue 投递失败,则会返回一个
returnCallback
我们可以利用这两个 callback 接口来控制消息的一致性和处理一部分的异常情况。
3.1 代码准备
3.1.1 配置文件
1、在配置文件中需要添加:
1 | yaml复制代码spring: |
它有三个值:
- NONE:禁用发布确认模式,是默认值
- CORRELATED:发布消息成功到交换器后触发回调方法
- SIMPLE:值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
3.1.2 配置类
思考:如何在项目启动的时候从数据库中加载这些配置并进行绑定,后续我们来研究下。
1 | typescript复制代码@Configuration |
3.1.3 回调接口
1 | typescript复制代码@Slf4j |
3.2 交换机确认–生产消费测试
使用交换机回调,就配置 publisher-confirm-type: 为CORRELATED
1 | less复制代码 @GetMapping("/sendMsg/{message}") |
当生产者获取不到消息的时候进入回调函数执行 false 的代码。
实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。
- correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
- ack:消息投递到broker 的状态,true表示成功。
- cause:表示投递失败的原因。
3.1.5 队列确认–回退接口
交换机接收到消息后可以判断当前的路径发送没有问题,但是不能保证消息能够发送到路由队列的。而发送者是不知道这个消息有没有送达队列的,因此,我们需要在队列中进行消息确认。这就是回退消息。
实现接口ReturnCallback
,重写 returnedMessage()
方法,方法有五个参数message
(消息体)、replyCode
(响应code)、replyText
(响应内容)、exchange
(交换机)、routingKey
(队列)。
添加注解:
1 | sql复制代码publisher-returns: true |
执行代码:
1 | arduino复制代码rabbitTemplate.convertAndSend(ConfirmConfig.confirm_exchange_name, ConfirmConfig.confirm_routing_key,message+"1",new CorrelationData("1")); |
回退执行
1 | css复制代码消息 (Body:'hello22' MessageProperties [headers={spring_returned_message_correlation=2}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,被交换机 confirm_exchange 退回原因 NO_ROUTE |
阶段小结:
生产者的消息确认机制有两种:
- 一个是从生产者发送到交换机的确认回调;
- 一个是从交换机发送到队列的确认回调;
生产者发送到交换机,需要配置一个参数 publisher-confirm-type ,它默认是 none,没有开启,可以把它改为 correlated ,即消息成功发送后会触发一个回调;然后我们根据 ack 的一个状态进行判断,如果为 true ,则代表发送成功。
还有一个交换机到队列的回调,将 publisher-returns 改为 true 即可,触发 returnedMessage 。
四、消费者确认
首先介绍消息消费的前提,rabbitmq 消费消息有两种模式,一个是推送 push ,一个是自己拉取pull。
- 推模式:消息中间件主动将消息推送给消费者
- 拉模式:消费者主动从消息中间件拉取消息。
但实际使用中,拉取消息是会降低系统吞吐量的,以及消费者很难实时获取消息,因此,一般使用的是push 模式。
在 mq 推消息给消费者不是等消费者消费完一个再推一个,而是根据prefetch_count 参数来决定可以推多个消息到消费者的缓存里面。
在消费者确认中,为了保证数据不会丢失,RabbitMQ 支持消息确定ACK。ACK 机制是消费者从 RabbitMQ 收到消息并处理完成后,返回给RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列中删除。
4.1 自动确认
自动确认是指消费者在消费消息的时候,当消费者收到消息后,消息就会被 RabbitMQ 从队列中删除掉。这种模式认为 “发送即成功”。这是不安全的,因为消费者可能在业务中并没有成功消费完就中断了。
下面我们通过 debug 来测试下这个逻辑。可以看到在debug 状态下,消费者还未消费,该队列中就没有任何数据了。
4.2 手动确认 autoAck:false
手动确认又分为肯定确认和否定确认。
4.2.1 肯定确认 BasicAck
1 | bash复制代码// false 表示只确认 b.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认) |
当消费者消费完数据后,队列中一共还剩18条,有一条消息待确认。
4.2.2 否定确认: BasicNack、BasicReject
否定确认的场景不多,但有时候某个消费者因为某种原因无法立即处理某条消息时,就需要否定确认了.
否定确认时,需要指定是丢弃掉这条消息,还是让这条消息重新排队,过一会再来,又或者是让这条消息重新排队,并尽快让另一个消费者接收并处理它.
1 | php复制代码 丢弃:requeue: false:channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: false); |
1 | php复制代码重新排队( requeue: true): channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true); |
一般来说,如果出现异常,就使用channel.BasicNack 把消费失败的消息重新放入到队列中去。
4.3 springboot 版本确认
Springboot 的确认模式有三种,配置如下:
1 | ini复制代码spring.rabbitmq.listener.simple.acknowledge-mode=manual |
- NONE : 不确认 :
- 1、默认所有消息消费成功,会不断的向消费者推送消息
- 2、因为 rabbitmq 认为所有消息都被消费成功。所以队列中存在丢失消息风险。
- AUTO:自动确认
- 1、根据消息处理逻辑是否抛出异常自动发送 ack(正常)和nack(异常)给服务端,如果消费者本身逻辑没有处理好这条数据就存在丢失消息的风险。
- 2、使用自动确认模式时,需要考虑的另一件事情就是消费者过载。
- MANUAL:手动确认
- 1、手动确认在业务失败后进行一些操作,消费者调用 ack、nack、reject 几种方法进行确认,如果消息未被 ACK 则发送到下一个消费者或重回队列。
- 2、ack 用于肯定确认;nack 用于 否定确认 ;reject 用于否定确认(一次只能拒绝单条消息)
1 | less复制代码@Component |
发现它可以将消息返回给队列,然后又消费这个数据,不断消费,造成了死循环,消息无限投递。
这时候可以改成 false,然后配置下死信队列,将该消息发送到死信队列中。
总结
本文主要对消息的确认进行了 springboot 微服务版本的测试,通过两个服务之间的互相调用来验证 rabbitmq 的消息确认可行性。在后面的文章中,我们将对rabbitmq 更多细节进行深入研究。
本文转载自: 掘金