总文档 :文章目录
Github : github.com/black-ant
前言
本篇文章会尝试着说一说RabbitMQ 稍微深一点的技术点 , 期望能讲清楚 , 讲明白 .
RabbitMQ 介绍可以参考MQ总览 , 这里就不过多的描述了 , 我们围绕其中四种方式来依次聊一下.
一 . 基础使用
1.1 配置详情
1 | xml复制代码<dependency> |
application.yml
1 | yaml复制代码spring: |
1.2 消费者
1 | java复制代码@RabbitListener(bindings = @QueueBinding(value = @Queue("DirectA"),key = "ONE",exchange = @Exchange(name = "DirectExchange", type = ExchangeTypes.DIRECT))) |
1.3 生产者
1 | java复制代码rabbitTemplate.convertAndSend("DirectExchange", "ONE", "发送消息 :" + msg); |
通过图片我们可以看到 , DirectExchange 实际上绑定了三个 Queue , 所以我多个消费者都收到了消息
1.4 四种发送模式
fanout => 发送到该交换机的消息发送到所有绑定的队列
1 | java复制代码 @RabbitListener(bindings = @QueueBinding( |
direct => 消息路由到 BindingKey 和 RoutingKey 完全相同的队列
1 | java复制代码 @RabbitListener(bindings = @QueueBinding( |
top => BindingKey 使用 */# 模糊匹配
1 | java复制代码 @RabbitListener(bindings = @QueueBinding( |
header => 根据消息内容种的header 属性进行匹配
1.5 底层用法
1.5.1 创建连接发送消息
1 | java复制代码// 以下是一个非Spring发送的完整流程 ,在 SpringBoot 中 , 这些流程被代理了 |
1.5.2 构建一个Binding
1 | java复制代码 |
1.5.3 连接工厂
RabbitTemplate 提供了三个连接工厂 :
- PooledChannelConnectionFactory
- 常用 ,基于连接池的连接工厂 (commons-pool2)
- 支持简单的 publisher 确认
- ThreadChannelConnectionFactory
- 需要使用作用域操作 , 就可以确保严格的消息顺序
- 支持简单的 publisher 确认
- 此工厂确保同一线程上的所有操作使用相同的通道
- CachingConnectionFactory
- 可以通过 CacheMode 打开多个连接(共享连接 ,区别于连接池)
- 可以由相关的发布者确认
- 支持简单的 publisher 确认
PS : 前面2个需要 spring-rabbit 包
三种连接池提供了不同的功能, 我们可以根据自己的项目来选择和定制
PooledChannelConnectionFactory 的构建方式
1 | java复制代码@Bean |
CachingConnectionFactory 创建一个新连接
1 | java复制代码CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost"); |
1.5.4 发送消息
1 | java复制代码void send(Message message) throws AmqpException; |
1.5.5 接收消息
1 | java复制代码Message receive() throws AmqpException; |
1.5.6 开启注解
1 | java复制代码@EnableRabbit |
二 . 源码分析
2.1 基础接口
1 | java复制代码 |
2.2 主要类简述
之前说了非Spring 加载的底层用法 , 主要涉及到ConnectionFactory ,AmqpAdmin ,Queue , 这一栏只要说说前置处理的相关类
2.2.1 连接工厂系列
我们先来看看这三个类 :
ConnectionFactory :
这里的 ConnectionFactory 主要是 CachingConnectionFactory , 该类类似于缓存连接池 , CHANNEL(默认)从所有createConnection()调用中返回相同的连接,并忽略对Connection.close()的调用 ,并且缓存CHANNEL
默认情况下,将只缓存一个通道,并根据需要创建和处理进一步请求的通道.
1 | java复制代码// 默认缓存大小 |
2.2.2 框架成员系列
2.2.3 投递成员系列 (Admin)
AmqpAdmin
AmqpAdmin 是一个接口 , 其核心实现类是 RabbitAdmin , 实现了可移植的AMQP管理操作
只要应用程序上下文中存在 RabbitAdmin,就可以自动声明和绑定队列
1 | java复制代码// 方法 declareExchanges : |
后续可以看到 , 在开局扫描注解的时候 , 会通过该类完成相关的初始化
RabbitAdmin
RabbitAdmin 是比较底层的类 , 其主要在 这些地方被调用
RabbitAdmin 最核心的就是多个声明的逻辑 ,
1 | java复制代码 |
2.2.4 event 事件系列
Rabbit 中主要有以下事件 :
- AsyncConsumerStartedEvent : 消费者启动时
- AsyncConsumerRestartedEvent : 失败后重启消费者
- AsyncConsumerTerminatedEvent : 消费者停止
- AsyncConsumerStoppedEvent : 消费者停止 (SimpleMessageListenerContainer 使用)
- ConsumeOkEvent : 当从代理接收到 consumeOk 时
- ListenerContainerIdleEvent :
- MissingQueueEvent : 丢失 Queue
2.2.5 Transaction 事务处理
两种方式的不同主要在于一个是通过提供一个外部事务 (通过设置 channelTransacted + 一个TransactionManager) , 一个是内部逻辑 (通过设置 channelTransacted + 事务声明 (例如 @Transaction)).
方式一 :
在 RabbitTemplate 和 SimpleMessageListenerContainer 中,都有一个标志 channelTransacted,如果该标志为真,则告诉框架使用事务通道,并通过提交或回滚(取决于结果)结束所有操作(发送或接收) ,同时有一个异常信号发出回滚。
1 | java复制代码// 这种方式的体现主要在 Consumer 中, 以BlockingQueueConsumer 为例 |
方式二 :
提供一个外部事务,其中包含一个 Spring 的 PlatformTransactionManager 实现,作为正在进行的操作的上下文。如果在框架发送或接收消息时已经有一个事务在进行中,并且 channelTransacted 标志为 true,则消息传递事务的提交或回滚将推迟到当前事务结束时。如果 channelTransacted 标志为 false,则不会对消息传递操作应用事务语义(它是自动记录的)
1 | java复制代码@Bean |
2.3 RabbitListener 注解详细分析
实际上整个过程中 ,我们并没有主动去建立一个 Queue 或者 Exchange , 那么注解为我们做了什么?
我们先看下注解中提供了哪些方法 :
- id :
- containerFactory : container 工厂类
- queues : 声明 queues
- queuesToDeclare : 如果在应用程序上下文中有一个RabbitAdmin,队列将在broker上使用默认绑定
- exclusive :
- priority :
- admin : AmqpAdmin 类
- bindings : 绑定的体系
- group :
- returnExceptions : 将异常返回给发送方, 异常包装在 RemoteInvocationResult 对象中
- errorHandler : 异常处理 Handler
- concurrency : 为该侦听器设置侦听器容器的并发性
- autoStartup : 是否在 ApplicationContext 启动时启动
- executor : 为这个监听器的容器设置任务执行器bean的名称;覆盖容器工厂上设置的任何执行程序
- **ackMode :**AcknowledgeMode
- replyPostProcessor :
自动持久化
1 | java复制代码 @RabbitListener(bindings = @QueueBinding( |
声明并绑定了一个匿名(排他的、自动删除的)队列
1 | java复制代码// 这里 Queue 没有设置任何东西 |
注解获取 Header
1 | java复制代码@RabbitListener(queues = "myQueue") |
监听多个 Queue
1 | java复制代码@RabbitListener(queues = { "queue1", "queue2" } ) |
SendTo 发送额外消息
- @SendTo : 会将接收到的消息发送到指定的路由目的地,所有订阅该消息的用户都能收到,属于广播。
- @SendToUser : 消息目的地有UserDestinationMessageHandler来处理,会将消息路由到发送者对应的目的地。
1 | java复制代码@SendTo("#{environment['my.send.to']}") |
定义回复类型
1 | java复制代码@RabbitListener(queues = "q1", messageConverter = "delegating", |
核心类一 : RabbitListenerAnnotationBeanPostProcessor
其中会根据注释的参数 ,通过创建的AMQP消息监听器容器调用RabbitListenerContainerFactory , 同时自动检测容器中的任何RabbitListenerConfigurer实例,允许自定义注册表,默认的容器工厂或对端点注册的细粒度控制
1 | java复制代码// RabbitListenerAnnotationBeanPostProcessor |
上面已经看到 , 最终会在postProcessAfterInitialization 方法中调用 processAmqpListener 来处理注解 ,我们来看看 processAmqpListener 方法
1 | java复制代码// processAmqpListener |
看看RabbitTemplate 里面是怎么操作的
1 | java复制代码 |
以上大概可以看到 , 注解是怎么把消息注册进去的 , 下面我们看看消息的发送和接受过程 :
2.4 发送消息
通常我们发送消息会使用如下代码 :
1 | java复制代码rabbitTemplate.convertAndSend("DirectExchange", "ONE", "发送消息 :" + msg); |
我们看看这个里面做了什么 :
1 | java复制代码 |
2.5 接收消息
接收消息肯定是通过代理类来做的 ,我们找一下具体的代理类 :
1 | java复制代码 |
2.6 消息监听
RabbitMQ 中提供给了2个消息监听器 : SimpleMessageListenerContainer (SMLC)和 DirectMessageListenerContainer (DMLC) , 他们具有如下属性 :
- ackTimeout : 设置 messagesPerAck后 ,此值作为ack 替代方案 , 会与上次 ack 之后的时间 , 确保真实性
- acknowledgeMode : 有三种类型 NONE , MANUAL , AUTO
- NONE : 没有发送任何 ack
- MANUAL : 监听器必须通过调用 Channel.basicAck ()来确认所有消息
- AUTO : 容器自动确认消息,除非 MessageListener 抛出异常
- adviceChain :应 用于监听器执行的 AOP 通知数组
- afterReceivePostProcessors : 在调用监听器之前调用的 MessagePostProcessor 实例数组
- alwaysRequeueWithTxManagerRollback : true 表示在配置事务管理器时始终在回滚时重新查询消息
- autoDeclare : 设置为 true (默认值)时,如果容器检测到在启动过程中至少有一个队列丢失(可能是因为它是自动删除队列或过期队列) ,则使用 RabbitAdmin 重新声明所有 AMQP 对象(队列、交换器、绑定) ,但如果队列因任何原因丢失,则重新声明将继续进行。
- autoStartup : 指示容器应该在 ApplicationContext 启动时启动 , 默认true
- batchSize : 与 acknowledgeMode.AUTO 一起使用时,容器在发送消息之前尝试批量处理该数量的消息
- batchingStrategy : 批量处理策略
- channelTransacted : 是否在事务中(手动或自动)确认所有消息
- concurrency : 每个监听器的并发消费者范围
- concurrentConsumers : 每个监听器最初启动的并发使用者数
- connectionFactory : 对 ConnectionFactory 的引用
- consecutiveActiveTrigger : 最小连续消息数 (小于该数不会发送接收超时)
- consumerBatchEnabled : 启用批处理消息
- consumerStartTimeout : 等待使用者线程启动的时间,以毫秒为单位
- consumerTagStrategy : ConsumerTagStrategy 策略实现
- consumersPerQueue : 每个配置的队列创建的使用者数
- errorHandler : errorHandler 引用
- exclusive : 此容器中的单个使用者是否对队列具有独占访问权
- messagesPerAck : 在 acks 之间要接收的消息数
- maxConcurrentConsumers : 根据需要启动的并发使用者的最大数量
- noLocal : 设置为 true 以禁用从服务器向消费者传递在同一通道连接上发布的消息
- rabbitAdmin : rabbitAdmin 实现
- receiveTimeout : 每条消息等待的最长时间
- shutdownTimeout : 关闭超时时间
- startConsumerMinInterval : 启动消费者最小间隔
- stopConsumerMinInterval : 关闭消费者最小间隔
- transactionManager : 事务管理
监听器的功能
由此我们大概可以知道监听器的常用功能 :
- 监听队列(多个队列)
- 自动启动
- 自动声明功能
- 设置事务特性、事务管理器、事务属性、事务并发量、开启事务、消息回滚
- 设置消费者数量、最大最小数量、批量消费属性等
- 设置 confirm 和 ack module、是否重回队列、errorHandler
- 设置消费者 tag 生成策略、是否独占模式、消费者属性
- 设置具体的转换器、 conversion 等
- 以及其他常见的功能
DirectMessageListenerContainer
- 每个队列的每个使用者使用一个单独的通道
- 并发性由 rabbitClient控制
1 | java复制代码// 配置方式 |
从源码中看看使用方式 :
TODO
三 . 深入知识点
3.1 配置 SSL
通过配置 RabbitConnectionFactoryBean 中 userSSL 来配置 SSL 相关属性 , 同时要注意配置以下属性 :
- private Resource sslPropertiesLocation;
- private String keyStore;
- private String trustStore;
详见官方文档 4.1.2 @ docs.spring.io/spring-amqp…
3.2 配置集群
AddressShuffleMode.RANDOM 表示随机设置连接顺序 , 默认是逐个轮询
- NONE : 不要在打开连接之前或之后更改地址;以固定顺序尝试连接
- RANDOM : 在打开连接之前随机洗牌地址;尝试新顺序的连接
- INORDER : 在打开连接后对地址进行洗牌,将第一个地址移动到最后 (听着怎么这么像切牌..)
1 | java复制代码@Bean |
3.3 配置连接工厂
1 | java复制代码public class MyService { |
3.4 配置 RabbitTemplate 常见功能
3.4.1 重试功能
RetryTemplate 不是 Rabbit 专属 , 他是 org.springframework.retry.support 包中的工具
1 | java复制代码@Bean |
使用 Callback 方式
1 | java复制代码 |
源码一览
1 | java复制代码 |
3.4.2 异常检测配置
官方文档中并不建议这种方式 , 毕竟事务处理是很消耗资源的
1 | java复制代码// 先开始 事务处理 |
3.4.3 设置回调
- RabbitTemplate 只支持一个 ConfirmCallback
1 | java复制代码template.setConfirmCallback(new MySelfConfirmCallback()); |
深入源码
1 | java复制代码public SettableListenableFuture<Confirm> getFuture() { |
3.4.5 设置返回
- 将 CachingConnectionFactory#publisherReturns 设置为 true
- RabbitTemplate 设置 ReturnsCallback
1 | java复制代码template.setReturnsCallback(new SelfReturnsCallback()); |
ReturnedMessage 中包含如下属性 :
- message : 返回的消息本身
- replyCode : 指示返回原因的代码
- replyText : 返回的原因
- exchange : 信息被发送到的交换
- routingKey : 使用的路由键
每个 RabbitTemplate 只支持一个 ReturnsCallback。
3.4.6 独立连接
通过设置 useblisherconnection 属性 , 实现在可能的情况下使用与侦听器容器使用的不同的连接
这种方式可以避免当生产者因为任何原因被阻止时,消费者被阻止
1 | java复制代码template.setUsePublisherConnection(true); |
3.5 Event 事件
- AsyncConsumerStartedEvent : 消费者启动时
- AsyncConsumerRestartedEvent : 失败后重启消费者
- AsyncConsumerTerminatedEvent : 消费者停止
- AsyncConsumerStoppedEvent : 消费者停止 (SimpleMessageListenerContainer 使用)
- ConsumeOkEvent : 当从代理接收到 consumeOk 时
- ListenerContainerIdleEvent :
- MissingQueueEvent : 丢失 Queue
3.6 配置消息转换器
1 | java复制代码 |
3.7 配置 BatchListener
BatchListerner 用于在一个调用中接收整个批处理
1 | java复制代码@Bean |
3.8 消息转换和序列化
1 | java复制代码public RabbitTemplate getRabbitTemplate(){ |
3.9 Rabbit MQ 的事务管理
事务管理通过 SimpleMessageListenerContainer 中 channelTransacted 属性来控制 , 当设置为 True 时 , 框架使用事务通道,并通过提交或回滚(取决于结果)结束所有操作(发送或接收) ,同时有一个异常信号发出回滚
1 | java复制代码 @Bean |
3.10 前后置处理
- RabbitTemplate
- setBeforePublishPostProcessors()
- setAfterReceivePostProcessors()
- SimpleMessageListenerContainer
- setAfterReceivePostProcessors()
1 | java复制代码 public RabbitTemplate getRabbitTemplate1(){ |
3.11 自动恢复
当连接重新建立时,RabbitAdmin 重新声明任何基础结构 bean (队列和其他) , 所以它不依赖于 amqp-client 库现在提供的自动恢复
1 | java复制代码C- com.rabbitmq.client.ConnectionFactory |
3.12 重试方式
前面 RabbitTemplate 中已经提到了其配置重试的方式 , 这里说说其他的重试 :
批量重试
批量重试基于MessageBatchRecoverer 的实现类
3.13 多代理的方式
多代理是指声明多组基础结构(连接工厂、管理员、容器工厂)
1 | java复制代码 |
四 . 类关联关系
- SimpleMessageListenerContainer
- RabbitAdmin
TODO : 待完善
五 . 解决方案
5.1 集群解决方案及效率问题
通信效率问题
RabbitMQ 是通过 TCP 之上建立的虚拟连接-信道(Channel)来传输数据. Channel 的官方释义为 : 共享单个 TCP 连接的轻量级连接.PS : 同时打开多个 TCP 连接是不可取的,因为这样做会消耗系统资源,并且使配置防火墙更加困难
信道概念 :
客户机执行的每个协议操作都发生在通道上。特定信道上的通信完全独立于另一个信道上的通信,因此每个协议方法都带有一个信道 ID (又名信道编号) ,代理和客户机都使用这个 id 来确定该方法用于哪个信道
通道只存在于连接的上下文中,从不独立存在。当连接关闭时,连接上的所有通道也都关闭。
信道的生命周期 :
应用程序在成功打开连接后立即打开通道。
1 | java复制代码ConnectionFactory cf = new ConnectionFactory(); |
就像连接一样,通道意味着长寿命。也就是说,没有必要为每个操作打开一个通道,这样做效率非常低,因为打开一个通道是一个网络往返。
客户端库提供了一种观察和应对通道异常的方法。例如,在 Java 客户机中,有一种方法可以注册错误处理程序并访问通道关闭(闭包)原因。
集群的类型和部署方式
1 . 普通集群模式
- 集成方式 : 多台服务器单独部署节点 , 每个节点保存着 queue 的元数据或者实例 , 当访问一个仅Queue元数据的节点时 , 则会从具有实例的节点拉取数据
- 提高吞吐量
- 非高可用
2 . 镜像集群模式
- 集成方式 : 多台服务器单独部署节点 , 一个节点会向其他节点同步数据 , 所以每个节点持有所有的消息数据
- 高可用
- 性能开销大
可以参考这位的导图 : juejin.cn/editor/draf…
5.2 消息丢失问题
丢失通常有三种类型 :
- 消息发送过程中丢失 , 消费者未获取到消息
- 消费者收取到消息 , 未来得及消费
- 消费者消费消息 ,但是消费未成功完成
为题一 : 消息发送过程中丢失 , 消费者未获取到消息
丢失问题和核心在于确认 , RabbitMQ 是发送方确认 , Channel 被设置为 Confirm 模式后 , 所有消息都会分配一个 ChannelID , 消费完成后 , 会将消费消费成功的id 返回给生产者
内部错误时 , 会返回 nack 消息
1 | java复制代码void confirm(CorrelationData correlationData, boolean ack, String cause); |
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。
PS : RabbitMQ 提供了事务功能
问题二 : 消费者收取到消息 , 未来得及消费
通常这种原因是因为消息被放在了内存中 ,而来不及消费 , 最好的解决方式就是持久化
- 创建时即将 Queue 设置未持久化
- 发送消息时 , 将消息设置为持久化
问题三 : 消费者消费消息 ,但是消费未成功完成
ACK 由消费者通过业务层面处理
总结 : 保持消息持久化 , 通过业务层面 ACK Confirm 消息 , 基本上可以解决大部分可靠性问题
5.3 重复消费问题
在消息生产时,MQ 内部针对每条生产者发送的消息生成一个 inner-msg-id ,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列
在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重和幂等的依据,避免同一条消息被重复消费。
消息分发:
若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。
5.4 消息顺序问题
当RabbitMQ 但节点多个消费者的时候 , 虽然有 ack 机制 ,但是整个消费不是阻塞的 , 这就导致每个节点的性能不同会导致其消费的效率不同, 这个时候 , 快的节点就很有可能出现超前消费
借这位老哥的梯子看看 @ www.processon.com/view/5c5bf6…
解决方案 :
1 . 多queue分别对应一个 consumer
2 . 单Queue单 Consumer
5.5 消费失败问题
- 保证消息持久化
- 消费失败处理及回调 (ErrorHandler)
5.6 消费限流问题
RabbitMQ提供了qos(服务质量保证)功能:在非自动消息确认的前提下,如果一定数目的消息未被消费前,不会进行消费新的消息
- prefetchSize: 单个消息大小限制,一般为0
- prefetchCount:告诉rabbitmq不要给一个消费者一次推送超过N条消息,直到消费者主动ack
- global: true\false是否将上述设置应用于channel,即上面的设置是通道级别还是消费之级别,一般为fals
AbstractMessageListenerContainer 中提供了以下方法的功能
六 . 性能指标
TODO
总结
感觉文档并没有达到之前预想的效果 , 预期是想通过官方文档 + 源码根源 找到可以定制化的点 , 但是写下来并没有实现这种效果 , 想了想 , 主要是因为Rabbit 体系太大了 , 官方文档很长 , 很难找到突破点 , 整篇文章里面还有很多很多没有讲到 , 源码也不是很深入
后面几天决定就和它死磕了 , 看看是开个单章还是就在本章深入了 , 祝顺利 !
更新日志 :
20210407 : 完善 Transaction 部分 , 完善其他小细节20210408 : 计划完成类关系图
本文转载自: 掘金