盘点 MQ 消息队列 RabbitMQ 浅入

总文档 :文章目录

Github : github.com/black-ant

前言

本篇文章会尝试着说一说RabbitMQ 稍微深一点的技术点 , 期望能讲清楚 , 讲明白 .

RabbitMQ 介绍可以参考MQ总览 , 这里就不过多的描述了 , 我们围绕其中四种方式来依次聊一下.

一 . 基础使用

1.1 配置详情

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

1
2
3
4
5
6
yaml复制代码spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest

1.2 消费者

1
2
3
4
java复制代码@RabbitListener(bindings = @QueueBinding(value = @Queue("DirectA"),key = "ONE",exchange = @Exchange(name = "DirectExchange", type = ExchangeTypes.DIRECT)))
public void processA(String message) {
logger.info("------> DirectA 发送接收成功 :{}<-------", message);
}

1.3 生产者

1
java复制代码rabbitTemplate.convertAndSend("DirectExchange", "ONE", "发送消息 :" + msg);

image.png

通过图片我们可以看到 , DirectExchange 实际上绑定了三个 Queue , 所以我多个消费者都收到了消息

1.4 四种发送模式

fanout => 发送到该交换机的消息发送到所有绑定的队列

1
2
3
4
5
6
7
8
java复制代码    @RabbitListener(bindings = @QueueBinding(
value = @Queue("FanoutA"),
key = "ONE",
exchange = @Exchange(name = "FanoutExchange", type = ExchangeTypes.FANOUT)
))
public void processA(String message) {
logger.info("------> FanoutA 发送接收成功 :{}<-------", message);
}

direct => 消息路由到 BindingKey 和 RoutingKey 完全相同的队列

1
2
3
4
5
6
7
8
java复制代码    @RabbitListener(bindings = @QueueBinding(
value = @Queue("DirectA"),
key = "ONE",
exchange = @Exchange(name = "DirectExchange", type = ExchangeTypes.DIRECT)
))
public void processA(String message) {
logger.info("------> DirectA 发送接收成功 :{}<-------", message);
}

top => BindingKey 使用 */# 模糊匹配

1
2
3
4
5
6
7
8
java复制代码    @RabbitListener(bindings = @QueueBinding(
value = @Queue("TopicA"),
key = "ONE.*",
exchange = @Exchange(name = "TopicExchange", type = ExchangeTypes.TOPIC)
))
public void processA(String message) {
logger.info("------> TopicA 发送接收成功 :{}<-------", message);
}

header => 根据消息内容种的header 属性进行匹配

1.5 底层用法

1.5.1 创建连接发送消息

1
2
3
4
5
6
7
8
9
10
java复制代码// 以下是一个非Spring发送的完整流程 ,在 SpringBoot 中 , 这些流程被代理了
ConnectionFactory connectionFactory = new CachingConnectionFactory();

AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));

AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");

String foo = (String) template.receiveAndConvert("myqueue");

1.5.2 构建一个Binding

1
2
3
4
5
6
7
8
9
10
11
java复制代码
// Direct
new Binding(someQueue, someDirectExchange, "foo.bar");

// Topic
new Binding(someQueue, someTopicExchange, "foo.*");

// Fanout
new Binding(someQueue, someFanoutExchange);

//

1.5.3 连接工厂

RabbitTemplate 提供了三个连接工厂 :

  • PooledChannelConnectionFactory
    • 常用 ,基于连接池的连接工厂 (commons-pool2)
    • 支持简单的 publisher 确认
  • ThreadChannelConnectionFactory
    • 需要使用作用域操作 , 就可以确保严格的消息顺序
    • 支持简单的 publisher 确认
    • 此工厂确保同一线程上的所有操作使用相同的通道
  • CachingConnectionFactory
    • 可以通过 CacheMode 打开多个连接(共享连接 ,区别于连接池)
    • 可以由相关的发布者确认
    • 支持简单的 publisher 确认

PS : 前面2个需要 spring-rabbit 包

三种连接池提供了不同的功能, 我们可以根据自己的项目来选择和定制

PooledChannelConnectionFactory 的构建方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}

// 我们来看一下源码 :

C- ThreadChannelConnectionFactory
C- ConnectionWrapper
private final ThreadLocal<Channel> channels = new ThreadLocal<>();
private final ThreadLocal<Channel> txChannels = new ThreadLocal<>();
?- 内部主要通过2个ThreadLocal 来存储 Channel , 之所以是2个其中一个是为事务通道准备的


C- PooledChannelConnectionFactory
C- ConnectionWrapper
private final ObjectPool<Channel> channels;
private final ObjectPool<Channel> txChannels;
?- ObjectPool 是 CommonPool 的组件


//

CachingConnectionFactory 创建一个新连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();


// PS : 注意其中的构造器 ,其提供了不限于一种的构造方式
public CachingConnectionFactory(@Nullable String hostname)
public CachingConnectionFactory(int port)
public CachingConnectionFactory(@Nullable String hostNameArg, int port)
public CachingConnectionFactory(URI uri)
public CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory)
private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory,
boolean isPublisherFactory)
// 以及提供了一个静态构造器
private static com.rabbitmq.client.ConnectionFactory newRabbitConnectionFactory()


// 添加自定义客户端连接属性
?- 访问基础连接工厂 , 设置自定义客户机属性
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("thing1", "thing2");


// 再深入一点 :
1 内部类 CacheMode : 缓存模式 , 包括 CHANNEL , CONNECTION

// CHANNEL 相关属性
- connectionName : 由 ConnectionNameStrategy 生成的连接的名称
- channelCacheSize : 当前配置的允许空闲的最大通道
- localPort : 连接的本地端口
- idleChannelsTx : 当前空闲的事务通道数(缓存)
- idleChannelsNotTx : 当前空闲的非事务性通道的数量(缓存)
- idleChannelsTxHighWater : 已并发空闲的事务通道的最大数量(缓存)
- idleChannelsNotTxHighWater : 非事务性通道的最大数目已并发空闲(缓存)

// CONNECTION 相关属性
- connectionName:<localPort> : 由 ConnectionNameStrategy 生成的连接的名称
- openConnections : 表示到代理的连接的连接对象的数量
- channelCacheSize : 当前配置的允许空闲的最大通道
- connectionCacheSize : 允许空闲的当前配置的最大连接
- idleConnections : 当前空闲的连接数
- idleConnectionsHighWater : 已并发空闲的最大连接数
- idleChannelsTx:<localPort> : 此连接当前空闲的事务通道数(缓存)
- idleChannelsNotTx:<localPort> : 此连接当前空闲的非事务性通道数(缓存)
- idleChannelsTxHighWater:<localPort> : 已并发空闲的事务通道的最大数量(缓存)
- idleChannelsNotTxHighWater:<localPort> : 非事务性通道的最大数目已并发空闲(缓存)

1.5.4 发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;

// 同样 , 在业务中我们可以使用更合理的发送方式
RabbitTemplate template = new RabbitTemplate();
template.setRoutingKey("queue.helloWorld");

// Template 同样可以定制 , 例如
template.setConfirmCallback(...);

// 这里可以定制配置
MessageProperties properties = new MessageProperties();
//... 省略
template.send(new Message("Hello World".getBytes(), properties));




// 同样 , 可以对 Message 进行详细处理
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
public static MessageBuilder withBody(byte[] body)
?- 生成器创建的消息有一个主体,它是对参数的直接引用
- return new MessageBuilder(body);
public static MessageBuilder withClonedBody(byte[] body)
?- Arrays 新建了一个数组
- return new MessageBuilder(Arrays.copyOf(body, body.length));
public static MessageBuilder withBody(byte[] body, int from, int to)
- return new MessageBuilder(Arrays.copyOfRange(body, from, to));
public static MessageBuilder fromMessage(Message message)
- return new MessageBuilder(message);
public static MessageBuilder fromClonedMessage(Message message)
- byte[] body = message.getBody();
- return new MessageBuilder(Arrays.copyOf(body, body.length), message.getMessageProperties());



// MessageProperties 扩展方式
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();

public static MessagePropertiesBuilder newInstance()
?- 使用默认值初始化新的消息属性对象
public static MessagePropertiesBuilder fromProperties(MessageProperties properties)
?- return new MessagePropertiesBuilder(properties) : 通过传入的 properties 创建
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties)
?- 内部进行了 builder.copyProperties(properties);

1.5.5 接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;

Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

1.5.6 开启注解

1
java复制代码@EnableRabbit

二 . 源码分析

2.1 基础接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码
// Exchange 接口表示 AMQP Exchange,即消息生成器发送到的内容。
// 代理虚拟主机中的每个 Exchange 都有唯一的名称以及一些其他属性
public interface Exchange {
String getName();
// direct、 topic、 fanout 和 header
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}

// Queue 类表示消息使用者从中接收消息的组件。
// 与各种 Exchange 类一样,我们的实现旨在作为这种核心 AMQP 类型的抽象表示
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
public Queue(String name) {
this(name, true, false, false);
}
}

2.2 主要类简述

之前说了非Spring 加载的底层用法 , 主要涉及到ConnectionFactory ,AmqpAdmin ,Queue , 这一栏只要说说前置处理的相关类

2.2.1 连接工厂系列

我们先来看看这三个类 :

ConnectionFactory :

这里的 ConnectionFactory 主要是 CachingConnectionFactory , 该类类似于缓存连接池 , CHANNEL(默认)从所有createConnection()调用中返回相同的连接,并忽略对Connection.close()的调用 ,并且缓存CHANNEL

默认情况下,将只缓存一个通道,并根据需要创建和处理进一步请求的通道.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码// 默认缓存大小
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-";
// channel 超时时间
private static final int CHANNEL_EXEC_SHUTDOWN_TIMEOUT = 30;


// 核心类
private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
Semaphore permits = null; // 发现了一个神奇的工具类 , 可以理解为共享锁
if (this.channelCheckoutTimeout > 0) {
permits = obtainPermits(connection);
}
LinkedList<ChannelProxy> channelList = determineChannelList(connection, transactional);
ChannelProxy channel = null;
if (connection.isOpen()) {
channel = findOpenChannel(channelList, channel);
}
if (channel == null) {
try {

channel = getCachedChannelProxy(connection, channelList, transactional);
}catch (RuntimeException e) {
//Semaphore 的释放 , 这是一个多线程信号量工具
}
return channel;
}


private ChannelProxy findOpenChannel(LinkedList<ChannelProxy> channelList,ChannelProxy channelArg) {
ChannelProxy channel = channelArg;
synchronized (channelList) {
while (!channelList.isEmpty()) {
// 此处训话获取 channelList 中第一个 channel , 并且从集合中移除
channel = channelList.removeFirst();
if (channel.isOpen()) {
break;
} else {
cleanUpClosedChannel(channel);
channel = null;
}
}
}
return channel;
}

// CachingConnectionFactory

2.2.2 框架成员系列

2.2.3 投递成员系列 (Admin)

AmqpAdmin

AmqpAdmin 是一个接口 , 其核心实现类是 RabbitAdmin , 实现了可移植的AMQP管理操作
只要应用程序上下文中存在 RabbitAdmin,就可以自动声明和绑定队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码// 方法 declareExchanges : 
// 声明了一个 exchange , 其配置主要来源于注解或配置
Map<String, Object> arguments = exchange.getArguments();
channel.exchangeDeclare(
exchange.getName(),
DELAYED_MESSAGE_EXCHANGE,
exchange.isDurable(),
exchange.isAutoDelete(),
exchange.isInternal(),
arguments
);

//-----------------
// 方法 : declareQueues 声明了一个 Queue
// 方法 : declareBindings 声明了绑定关系

后续可以看到 , 在开局扫描注解的时候 , 会通过该类完成相关的初始化

RabbitAdmin

RabbitAdmin 是比较底层的类 , 其主要在 这些地方被调用

0001.jpg

RabbitAdmin 最核心的就是多个声明的逻辑 ,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
java复制代码
// 节点一 : 先建立 Channel , 再通过 Callback 方法回调建立 Exchange
M- declareExchange(final Exchange exchange)
// 注意 , 这里 excute 的参数是一个 ChannelCallback , 其目的是为了channle 建立后的回调
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchange);
return null;
});

M- declareExchanges
?- 该方法允许一次性传入多个 Exchange
- 先准备arguments 属性
- 调用 channel.exchangeDeclare 实现声明操作


C- RabbitTemplate
M- execute : RabbitTemplate 通过 retryTemplate 进行重试调用
?- 注意 doExecute , Channel 的建立主要还是在 RabbitTemplate 中完成的
?- 在 doExecute 中再 Callback 之前 RabbitAdmin 里面的操作 , 类似于一个回调
- return this.retryTemplate.execute(
(RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
(RecoveryCallback<T>) this.recoveryCallback);

// 节点二 : 删除 Exchange
M- deleteExchange(final String exchangeName) : 通过名字删除 Exchange
- 先判断是否为默认 Exchange , 名称为""
- 再直接通过channel.exchangeDelete 删除


// 节点三 : 建立 Queue , 和Exchange 同理
M- declareQueue(final Queue queue)
- 同理 , 先建立 Channel 后回调
- 调用 declareQueues(channel, queue); 完成Queue 声明
- channel.queueDeclare : 通过 channel 对象完成 Queue 声明

M- declareQueue()
?- 声明一个服务器命名的独占的、autodelete的、非持久的队列
?- 这里是没有传入 Queue 对象的 , 直接创建一个 Chhenl , 然后创建一个 Queue
- this.rabbitTemplate.execute(Channel::queueDeclare)

M- deleteQueue(final String queueName)
?- 同样的 , 先创建一个 Channel , 再删除

M- deleteQueue(final String queueName, final boolean unused, final boolean empty)
?- 更详细的删除方式 , 包括是否使用 , 是否为空来判断是否删除

M- purgeQueue
?- 清除队列,可以选择不等待清除发生


// 节点四 : Binding 处理
M- declareBinding
- 先建立 Channel
- declareBindings(channel, binding);
- channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
- channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());



M- removeBinding(final Binding binding)
- channel.queueUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
- channel.exchangeUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());


M- QueueInformation getQueueInfo(String queueName)
- DeclareOk declareOk = channel.queueDeclarePassive(queueName);
- return new QueueInformation(declareOk.getQueue(), declareOk.getMessageCount(),declareOk.getConsumerCount());

2.2.4 event 事件系列

Rabbit 中主要有以下事件 :

  • AsyncConsumerStartedEvent : 消费者启动时
  • AsyncConsumerRestartedEvent : 失败后重启消费者
  • AsyncConsumerTerminatedEvent : 消费者停止
  • AsyncConsumerStoppedEvent : 消费者停止 (SimpleMessageListenerContainer 使用)
  • ConsumeOkEvent : 当从代理接收到 consumeOk 时
  • ListenerContainerIdleEvent :
  • MissingQueueEvent : 丢失 Queue

2.2.5 Transaction 事务处理

两种方式的不同主要在于一个是通过提供一个外部事务 (通过设置 channelTransacted + 一个TransactionManager) , 一个是内部逻辑 (通过设置 channelTransacted + 事务声明 (例如 @Transaction)).

方式一 :

在 RabbitTemplate 和 SimpleMessageListenerContainer 中,都有一个标志 channelTransacted,如果该标志为真,则告诉框架使用事务通道,并通过提交或回滚(取决于结果)结束所有操作(发送或接收) ,同时有一个异常信号发出回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码// 这种方式的体现主要在 Consumer 中, 以BlockingQueueConsumer 为例

// Step 1 : 创建 Consumer 的时候传入 isChannelTransacted
consumer = new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(),
this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);

// Step 2 : 最终在相关方法中调用
M- rollbackOnExceptionIfNecessary
if (this.transactional) -> RabbitUtils.rollbackIfNecessary(this.channel);


// 事务的声明 :
1 例如可以使用Spring 事务模型

// 事务的原理 :
TODO

方式二 :

提供一个外部事务,其中包含一个 Spring 的 PlatformTransactionManager 实现,作为正在进行的操作的上下文。如果在框架发送或接收消息时已经有一个事务在进行中,并且 channelTransacted 标志为 true,则消息传递事务的提交或回滚将推迟到当前事务结束时。如果 channelTransacted 标志为 false,则不会对消息传递操作应用事务语义(它是自动记录的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(rabbitTransactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

// 深入一下用法 :
C- SimpleMessageListenerContainer
M- doReceiveAndExecute
?- 其中在一个 Catch 中能看到对 transactionManager 的调用
... catch (Exception ex) {
- 判断是否有transactionManager
- 判断是否开启了Callback
RabbitResourceHolder resourceHolder = (RabbitResourceHolder)TransactionSynchronizationManager.getResource(getConnectionFactory());
if (resourceHolder != null) {
consumer.clearDeliveryTags();
} else {
consumer.rollbackOnExceptionIfNecessary(ex);
}

2.3 RabbitListener 注解详细分析

实际上整个过程中 ,我们并没有主动去建立一个 Queue 或者 Exchange , 那么注解为我们做了什么?

我们先看下注解中提供了哪些方法 :

  • id :
  • containerFactory : container 工厂类
  • queues : 声明 queues
  • queuesToDeclare : 如果在应用程序上下文中有一个RabbitAdmin,队列将在broker上使用默认绑定
  • exclusive :
  • priority :
  • admin : AmqpAdmin 类
  • bindings : 绑定的体系
  • group :
  • returnExceptions : 将异常返回给发送方, 异常包装在 RemoteInvocationResult 对象中
  • errorHandler : 异常处理 Handler
  • concurrency : 为该侦听器设置侦听器容器的并发性
  • autoStartup : 是否在 ApplicationContext 启动时启动
  • executor : 为这个监听器的容器设置任务执行器bean的名称;覆盖容器工厂上设置的任何执行程序
  • **ackMode :**AcknowledgeMode
  • replyPostProcessor :

自动持久化

1
2
3
4
5
java复制代码 @RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)

声明并绑定了一个匿名(排他的、自动删除的)队列

1
2
3
4
5
6
java复制代码// 这里 Queue 没有设置任何东西
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)

注解获取 Header

1
2
java复制代码@RabbitListener(queues = "myQueue")
public void processOrder(Order order, @Header("order_type") String orderType)

监听多个 Queue

1
2
3
4
java复制代码@RabbitListener(queues = { "queue1", "queue2" } )

// 另外还可以使用 SPEL 表达式
@RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )

SendTo 发送额外消息

  • @SendTo : 会将接收到的消息发送到指定的路由目的地,所有订阅该消息的用户都能收到,属于广播。
  • @SendToUser : 消息目的地有UserDestinationMessageHandler来处理,会将消息路由到发送者对应的目的地。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@SendTo("#{environment['my.send.to']}")


// 使用Bean 的方式
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}

@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}

// SPEL
@SendTo("!{'some.reply.queue.with.' + result.queueName}")

// Spring Enviroment
@SendTo("#{environment['my.send.to']}")

定义回复类型

1
2
java复制代码@RabbitListener(queues = "q1", messageConverter = "delegating",
replyContentType = "application/json")

核心类一 : RabbitListenerAnnotationBeanPostProcessor

其中会根据注释的参数 ,通过创建的AMQP消息监听器容器调用RabbitListenerContainerFactory , 同时自动检测容器中的任何RabbitListenerConfigurer实例,允许自定义注册表,默认的容器工厂或对端点注册的细粒度控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码// RabbitListenerAnnotationBeanPostProcessor


// Step Start : postProcessAfterInitialization
// postProcessAfterInitialization 来自于 BeanPostProcessor , 他会在自定义初始化之前调用
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
// 获取 ServletWebServerFactoryConfiguration
Class<?> targetClass = AopUtils.getTargetClass(bean);
// 获取每个类的 RabbitListener Metadata
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
// 如果存在注解 , 则处理
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}



// Step : findListenerAnnotations 扫描所有的注解
private RabbitListenerAnnotationBeanPostProcessor.TypeMetadata buildMetadata(Class<?> targetClass) {
// 此处会对每个类都检测是否存在 @RabbitListener
Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<RabbitListenerAnnotationBeanPostProcessor.ListenerMethod> methods = new ArrayList<>();
final List<Method> multiMethods = new ArrayList<>();
ReflectionUtils.doWithMethods(targetClass, method -> {
Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
if (listenerAnnotations.size() > 0) {
// 如果存在注解 , 则添加到集合中
methods.add(new RabbitListenerAnnotationBeanPostProcessor.ListenerMethod(method,
listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
}
if (hasClassLevelListeners) {
RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
if (rabbitHandler != null) {
multiMethods.add(method);
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);
if (methods.isEmpty() && multiMethods.isEmpty()) {
return RabbitListenerAnnotationBeanPostProcessor.TypeMetadata.EMPTY;
}
// 构建一个 TypeMetadata
return new RabbitListenerAnnotationBeanPostProcessor.TypeMetadata(
methods.toArray(new RabbitListenerAnnotationBeanPostProcessor.ListenerMethod[methods.size()]),
multiMethods.toArray(new Method[multiMethods.size()]),
classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
}

上面已经看到 , 最终会在postProcessAfterInitialization 方法中调用 processAmqpListener 来处理注解 ,我们来看看 processAmqpListener 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
java复制代码// processAmqpListener
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
// 如果是代理 , 返回最终代理的方法
Method methodToUse = checkProxy(method, bean);
// 构建了一个 MethodRabbitListenerEndpoint , 该单元用于处理这个端点的传入消息
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
// Listener 注解流程
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}


// 我们继续深入 , 很长的一段 ,我们省略掉其中无意义的一部分 :
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object target, String beanName) {

// endpoint 注入属性
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
// 这里仅仅只是对 QueueName 进行校验和解析
endpoint.setQueueNames(resolveQueues(rabbitListener));
// 处理监听器
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
endpoint.setBeanFactory(this.beanFactory);
//异常处理
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));

Object errorHandler = resolveExpression(rabbitListener.errorHandler());
if (errorHandler instanceof RabbitListenerErrorHandler) {
endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);
}
else if (errorHandler instanceof String) {
String errorHandlerBeanName = (String) errorHandler;
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
}
}
else {
throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "
+ errorHandler.getClass().toString());
}
String group = rabbitListener.group();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
String autoStartup = rabbitListener.autoStartup();
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
}

endpoint.setExclusive(rabbitListener.exclusive());
String priority = resolve(rabbitListener.priority());
if (StringUtils.hasText(priority)) {
endpoint.setPriority(Integer.valueOf(priority));
}

resolveExecutor(endpoint, rabbitListener, target, beanName);
resolveAdmin(endpoint, rabbitListener, target);
// ACK 模式我们回头再看看
resolveAckMode(endpoint, rabbitListener);
resolvePostProcessor(endpoint, rabbitListener, target, beanName);
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);

this.registrar.registerEndpoint(endpoint, factory);
}

// 注册 Endpoint
public void registerEndpoint(RabbitListenerEndpoint endpoint,
@Nullable RabbitListenerContainerFactory<?> factory) {
AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) {
this.endpointRegistry.registerListenerContainer(descriptor.endpoint, // NOSONAR never null
resolveContainerFactory(descriptor), true);
}else {
this.endpointDescriptors.add(descriptor);
}
}
}

看看RabbitTemplate 里面是怎么操作的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
java复制代码
// Step 1 : RabbitListenerEndpointRegistry Listener 启动
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}

// 问题一 : getListenerContainers 的来源 >>
RabbitListenerEndpointRegistry --> Map<String, MessageListenerContainer>
// 注意 , 这里就和上文的扫描匹配上了 , 由 registerEndpoint 调用
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {

String id = endpoint.getId();
synchronized (this.listenerContainers) {
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (this.contextRefreshed) {
container.lazyLoad();
}
if (startImmediately) {
startIfNecessary(container);
}
}
}

// 扫描时 , 会将 MessageListenerContainer 放入 Map , 用于后续使用
// Step 2 : RabbitListenerEndpointRegistry Listener 运行
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
// 主要是 SimpleMessageListenerContainer
// 继承了 AbstractMessageListenerContainer , start 在该类中
listenerContainer.start();
}
}

// Step 3 : start 方法

configureAdminIfNeeded();
// 校验 Queue 情况 ,主要是调用 RabbitAdmin , 校验失败的会 Channel shutdown
// Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'HeaderExchange' in vhost '/': received 'direct' but current is 'headers', class-id=40, method-id=10)
checkMismatchedQueues();
- 1 判断Exchange 情况并且输出日志 : !isDurable / isAutoDelete / isInfoEnabled
- 2 判断 Queue 情况并且输出日志 : !isDurable / isAutoDelete / isExclusive / isInfoEnabled(开启日志)
// 注意 , 这里就是之前苦苦寻找声明的地方 , 也可以理解为这里就开始连接了
- 3 declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
- channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(),exchange.isAutoDelete(), exchange.isInternal(), arguments);
- 4 declareQueues(channel, queues.toArray(new Queue[queues.size()]));
- channel.queueDeclare(queue.getName(), queue.isDurable(),queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
- 5 declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
- channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
// 最后调用了一下子类的 doStart , 用于后续扩展
doStart();

以上大概可以看到 , 注解是怎么把消息注册进去的 , 下面我们看看消息的发送和接受过程 :

2.4 发送消息

通常我们发送消息会使用如下代码 :

1
java复制代码rabbitTemplate.convertAndSend("DirectExchange", "ONE", "发送消息 :" + msg);

我们看看这个里面做了什么 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码
// 中间有一个 Boolean 型 ,我们用伪代码表示
Boolean item1 = RabbitTemplate.this.returnCallback != null|| (correlationData != null && StringUtils.hasText(correlationData.getId()))
Boolean item2 = RabbitTemplate.this.mandatoryExpression.getValue(RabbitTemplate.this.evaluationContext, message, Boolean.class)
Boolean mandatory = item1 && item2 ;


@Override
public void send(final String exchange, final String routingKey,
final Message message, @Nullable final CorrelationData correlationData)
throws AmqpException {
execute(channel -> {
// 核心方法 , 发送消息
doSend(channel, exchange, routingKey, message,mandatory,correlationData);
return null;
}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}

// doSend 中我们同样去掉无关的
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {

// 这里准备了 Exchange 参数 和 routingKey 参数
String exch = exchangeArg;
String rKey = routingKeyArg;

// 中间一大段都是对 MessageProperties 进行处理 , MessageProperties 存在 messageToUse 中

sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
if (isChannelLocallyTransacted(channel)) {
RabbitUtils.commitIfNecessary(channel);
}
}

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,Message message) throws IOException {

BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding);
// 这里就是常规的 RabbitMQ Client 操作了 , 到这里 发送就结束了
// com.rabbitmq.client.impl.ChannelN
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());

// basicPublish 核心如下 , channle 的逻辑我们可以未来再深入
AMQCommand command = new AMQCommand((new Builder()).exchange(exchange).routingKey(routingKey).mandatory(mandatory).immediate(immediate).build(), props, body);
this.transmit(command);


}

2.5 接收消息

接收消息肯定是通过代理类来做的 ,我们找一下具体的代理类 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码
// Step 1: SimpleMessageListenerContainer#run
// 其中有这样一段代码
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}

// Step 2 : 持续监听 ,往里面追溯 , 可以发现有一段这样的代码
M- doReceiveAndExecute
try {
executeListener(channel, message);
}

-> doExecuteListener
-> invokeListener(channel, message);
-> this.proxy.invokeListener(channel, data) // 到这里实际上还是看不到实际的处理类

// 一路debug , 最终会到这个方法中
protected void actualInvokeListener(Channel channel, Object data) {
Object listener = getMessageListener();
if (listener instanceof ChannelAwareMessageListener) {
doInvokeListener((ChannelAwareMessageListener) listener, channel, data);
}else if (listener instanceof MessageListener) {
boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
if (bindChannel) {
RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
resourceHolder);
}
try {
doInvokeListener((MessageListener) listener, data);
}
finally {
if (bindChannel) {
// unbind if we bound
TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
}
}
}
else if (listener != null) {
throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: "
+ listener);
}
else {
throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'");
}
}

2.6 消息监听

RabbitMQ 中提供给了2个消息监听器 : SimpleMessageListenerContainer (SMLC)和 DirectMessageListenerContainer (DMLC) , 他们具有如下属性 :

  • ackTimeout : 设置 messagesPerAck后 ,此值作为ack 替代方案 , 会与上次 ack 之后的时间 , 确保真实性
  • acknowledgeMode : 有三种类型 NONE , MANUAL , AUTO
    • NONE : 没有发送任何 ack
    • MANUAL : 监听器必须通过调用 Channel.basicAck ()来确认所有消息
    • AUTO : 容器自动确认消息,除非 MessageListener 抛出异常
  • adviceChain :应 用于监听器执行的 AOP 通知数组
  • afterReceivePostProcessors : 在调用监听器之前调用的 MessagePostProcessor 实例数组
  • alwaysRequeueWithTxManagerRollback : true 表示在配置事务管理器时始终在回滚时重新查询消息
  • autoDeclare : 设置为 true (默认值)时,如果容器检测到在启动过程中至少有一个队列丢失(可能是因为它是自动删除队列或过期队列) ,则使用 RabbitAdmin 重新声明所有 AMQP 对象(队列、交换器、绑定) ,但如果队列因任何原因丢失,则重新声明将继续进行。
  • autoStartup : 指示容器应该在 ApplicationContext 启动时启动 , 默认true
  • batchSize : 与 acknowledgeMode.AUTO 一起使用时,容器在发送消息之前尝试批量处理该数量的消息
  • batchingStrategy : 批量处理策略
  • channelTransacted : 是否在事务中(手动或自动)确认所有消息
  • concurrency : 每个监听器的并发消费者范围
  • concurrentConsumers : 每个监听器最初启动的并发使用者数
  • connectionFactory : 对 ConnectionFactory 的引用
  • consecutiveActiveTrigger : 最小连续消息数 (小于该数不会发送接收超时)
  • consumerBatchEnabled : 启用批处理消息
  • consumerStartTimeout : 等待使用者线程启动的时间,以毫秒为单位
  • consumerTagStrategy : ConsumerTagStrategy 策略实现
  • consumersPerQueue : 每个配置的队列创建的使用者数
  • errorHandler : errorHandler 引用
  • exclusive : 此容器中的单个使用者是否对队列具有独占访问权
  • messagesPerAck : 在 acks 之间要接收的消息数
  • maxConcurrentConsumers : 根据需要启动的并发使用者的最大数量
  • noLocal : 设置为 true 以禁用从服务器向消费者传递在同一通道连接上发布的消息
  • rabbitAdmin : rabbitAdmin 实现
  • receiveTimeout : 每条消息等待的最长时间
  • shutdownTimeout : 关闭超时时间
  • startConsumerMinInterval : 启动消费者最小间隔
  • stopConsumerMinInterval : 关闭消费者最小间隔
  • transactionManager : 事务管理

监听器的功能
由此我们大概可以知道监听器的常用功能 :

  • 监听队列(多个队列)
  • 自动启动
  • 自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务并发量、开启事务、消息回滚
  • 设置消费者数量、最大最小数量、批量消费属性等
  • 设置 confirm 和 ack module、是否重回队列、errorHandler
  • 设置消费者 tag 生成策略、是否独占模式、消费者属性
  • 设置具体的转换器、 conversion 等
  • 以及其他常见的功能

DirectMessageListenerContainer

  • 每个队列的每个使用者使用一个单独的通道
  • 并发性由 rabbitClient控制
1
2
3
4
5
6
7
8
9
10
11
java复制代码// 配置方式
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("queueName1","queueName2");
container.setMessageListener((MessageListener) message -> {
//....
});
return container;
}

从源码中看看使用方式 :
TODO

三 . 深入知识点

3.1 配置 SSL

通过配置 RabbitConnectionFactoryBean 中 userSSL 来配置 SSL 相关属性 , 同时要注意配置以下属性 :

  • private Resource sslPropertiesLocation;
  • private String keyStore;
  • private String trustStore;

详见官方文档 4.1.2 @ docs.spring.io/spring-amqp…

3.2 配置集群

AddressShuffleMode.RANDOM 表示随机设置连接顺序 , 默认是逐个轮询

  • NONE : 不要在打开连接之前或之后更改地址;以固定顺序尝试连接
  • RANDOM : 在打开连接之前随机洗牌地址;尝试新顺序的连接
  • INORDER : 在打开连接后对地址进行洗牌,将第一个地址移动到最后 (听着怎么这么像切牌..)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.RANDOM);
return ccf;
}


// Step 1 : 这个 Address 最终会被设置到 address 属性 同时内部 publisherConnectionFactory 也会尝试设置
Address[] addressArray = Address.parseAddresses(addresses);
this.addresses = new LinkedList<>(Arrays.asList(addressArray));
this.publisherConnectionFactory.setAddresses(addresses);

// Step 2 : 模式判断
private synchronized com.rabbitmq.client.Connection connectAddresses(String connectionName)
throws IOException, TimeoutException {

List<Address> addressesToConnect = new ArrayList<>(this.addresses);

// RANDOM 模式处理
if (addressesToConnect.size() > 1 && AddressShuffleMode.RANDOM.equals(this.addressShuffleMode)) {
Collections.shuffle(addressesToConnect);
}

com.rabbitmq.client.Connection connection = this.rabbitConnectionFactory.newConnection(this.executorService,
addressesToConnect, connectionName);
// INORDER 模式
if (addressesToConnect.size() > 1 && AddressShuffleMode.INORDER.equals(this.addressShuffleMode)) {
this.addresses.add(this.addresses.remove(0));
}
return connection;
}


// Step 3 : 在 com.rabbitmq.client.ConnectionFactory 中 newConnection

3.3 配置连接工厂

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public class MyService {

@Autowired
private RabbitTemplate rabbitTemplate;

public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}

}

3.4 配置 RabbitTemplate 常见功能

3.4.1 重试功能

RetryTemplate 不是 Rabbit 专属 , 他是 org.springframework.retry.support 包中的工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Bean
public RabbitTemplate rabbitTemplate() {

RabbitTemplate template = new RabbitTemplate(connectionFactory());

// 使用 RetryTemplate , 这个类是 org.springframework.retry.support 的类
RetryTemplate retryTemplate = new RetryTemplate();

// 回退功能 , 归属于类 org.springframework.retry.backoff
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);

// 重试 + 回退
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);

return template;
}

使用 Callback 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码
retryTemplate.execute(
new RetryCallback<Object, Exception>() {

@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}

}, new RecoveryCallback<Object>() {

@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}

源码一览

1
java复制代码

3.4.2 异常检测配置

官方文档中并不建议这种方式 , 毕竟事务处理是很消耗资源的

1
2
3
4
5
java复制代码// 先开始 事务处理
template.setChannelTransacted(true);

// 再在事务的方法中检测异常
txCommit()

3.4.3 设置回调

  • RabbitTemplate 只支持一个 ConfirmCallback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码template.setConfirmCallback(new MySelfConfirmCallback());

// ConfirmCallback 实现类
public class MySelfConfirmCallback implements RabbitTemplate.ConfirmCallback {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("------> CorrelationData is :{} <-------", correlationData);
}
}

// CorrelationData 是客户机在发送原始消息时提供的对象
// ack : ture(ack) / false (nack)

// CorrelationData 提供了Future 接口用户异步获取数据
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
cd1.getFuture().get(10, TimeUnit.SECONDS).isAck();

深入源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public SettableListenableFuture<Confirm> getFuture() {
return this.future;
}

C- SettableListenableFuture
- private final SettableTask<T> settableTask = new SettableTask<>()
?- 其中包含一个 SettableTask 对象 ,该对象中有个 Thread

private static class SettableTask<T> extends ListenableFutureTask<T> {
@Nullable
private volatile Thread completingThread;
}



C- Confirm
- private final boolean ack;
- private final String reason;

3.4.5 设置返回

  • 将 CachingConnectionFactory#publisherReturns 设置为 true
  • RabbitTemplate 设置 ReturnsCallback
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码template.setReturnsCallback(new SelfReturnsCallback());

// 设置 ReturnCallBack
public class SelfReturnsCallback implements RabbitTemplate.ReturnsCallback {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void returnedMessage(ReturnedMessage returned) {

}
}

ReturnedMessage 中包含如下属性 :

  • message : 返回的消息本身
  • replyCode : 指示返回原因的代码
  • replyText : 返回的原因
  • exchange : 信息被发送到的交换
  • routingKey : 使用的路由键

每个 RabbitTemplate 只支持一个 ReturnsCallback。

3.4.6 独立连接

通过设置 useblisherconnection 属性 , 实现在可能的情况下使用与侦听器容器使用的不同的连接

这种方式可以避免当生产者因为任何原因被阻止时,消费者被阻止

1
2
3
4
5
6
7
8
9
10
java复制代码template.setUsePublisherConnection(true);

// 连接工厂为此维护第二个内部连接工厂
// 默认情况下,它与主工厂的类型相同,但是如果您希望使用不同的工厂类型发布,可以设置明确的类型。

// 源码一览 :
M- doSendAndReceiveWithDirect
if (this.usePublisherConnection && connectionFactory.getPublisherConnectionFactory() != null) {
connectionFactory = connectionFactory.getPublisherConnectionFactory();
}

3.5 Event 事件

  • AsyncConsumerStartedEvent : 消费者启动时
  • AsyncConsumerRestartedEvent : 失败后重启消费者
  • AsyncConsumerTerminatedEvent : 消费者停止
  • AsyncConsumerStoppedEvent : 消费者停止 (SimpleMessageListenerContainer 使用)
  • ConsumeOkEvent : 当从代理接收到 consumeOk 时
  • ListenerContainerIdleEvent :
  • MissingQueueEvent : 丢失 Queue

3.6 配置消息转换器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码
// 通过注解 , 这是Bean 名
@RabbitListener(..., messageConverter = "jsonConverter")

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
return factory;
}

// PS :
@Bean
public DefaultConversionService myConversionService() {
DefaultConversionService conv = new DefaultConversionService();
conv.addConverter(mySpecialConverter());
return conv;
}

3.7 配置 BatchListener

BatchListerner 用于在一个调用中接收整个批处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}

// 接收方式
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
this.amqpMessagesReceived = amqpMessages;
this.batch1Latch.countDown();
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
this.messagingMessagesReceived = messages;
this.batch2Latch.countDown();
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
this.batch3Strings = strings;
this.batch3Latch.countDown();
}

3.8 消息转换和序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public RabbitTemplate getRabbitTemplate(){
RabbitTemplate template = new RabbitTemplate();
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}


@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("thing1", Thing1.class);
idClassMapping.put("thing2", Thing2.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}

3.9 Rabbit MQ 的事务管理

事务管理通过 SimpleMessageListenerContainer 中 channelTransacted 属性来控制 , 当设置为 True 时 , 框架使用事务通道,并通过提交或回滚(取决于结果)结束所有操作(发送或接收) ,同时有一个异常信号发出回滚

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    @Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

// transactionManager 主要是 RabbitTransactionManager, 其实现类 AbstractPlatformTransactionManager

3.10 前后置处理

  • RabbitTemplate
    • setBeforePublishPostProcessors()
    • setAfterReceivePostProcessors()
  • SimpleMessageListenerContainer
    • setAfterReceivePostProcessors()
1
2
3
4
5
6
java复制代码    public RabbitTemplate getRabbitTemplate1(){
RabbitTemplate template = new RabbitTemplate();
template.setAfterReceivePostProcessors();
template.setBeforePublishPostProcessors();
return template;
}

3.11 自动恢复

当连接重新建立时,RabbitAdmin 重新声明任何基础结构 bean (队列和其他) , 所以它不依赖于 amqp-client 库现在提供的自动恢复

1
2
java复制代码C- com.rabbitmq.client.ConnectionFactory
M- setAutomaticRecoveryEnabled : client 的自动恢复开关

3.12 重试方式

前面 RabbitTemplate 中已经提到了其配置重试的方式 , 这里说说其他的重试 :

批量重试

批量重试基于MessageBatchRecoverer 的实现类

3.13 多代理的方式

多代理是指声明多组基础结构(连接工厂、管理员、容器工厂)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码
// Node 1 : 设置多工厂
@Bean
CachingConnectionFactory cf1() {
return new CachingConnectionFactory("localhost");
}

@Bean
CachingConnectionFactory cf2() {
return new CachingConnectionFactory("otherHost");
}

@Bean
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,CachingConnectionFactory cf2) {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(cf1);
rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2));
return rcf;
}


// Node 2 : 设置多 Admin

@Bean("factory1-admin")
RabbitAdmin admin1(CachingConnectionFactory cf1) {
return new RabbitAdmin(cf1);
}

@Bean("factory2-admin")
RabbitAdmin admin2(CachingConnectionFactory cf2) {
return new RabbitAdmin(cf2);
}

@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}

@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
= new MultiRabbitListenerAnnotationBeanPostProcessor();
postProcessor.setEndpointRegistry(registry);
postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
return postProcessor;
}


// Node 3 : 多 ContainFacotory

@Bean
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf1);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf2);
return factory;
}

// Node Other

@Bean
RabbitTemplate template(RoutingConnectionFactory rcf) {
return new RabbitTemplate(rcf);
}

@Bean
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
return new ConnectionFactoryContextWrapper(rcf);
}

四 . 类关联关系

  • SimpleMessageListenerContainer
  • RabbitAdmin

TODO : 待完善

五 . 解决方案

5.1 集群解决方案及效率问题

通信效率问题
RabbitMQ 是通过 TCP 之上建立的虚拟连接-信道(Channel)来传输数据. Channel 的官方释义为 : 共享单个 TCP 连接的轻量级连接.

PS : 同时打开多个 TCP 连接是不可取的,因为这样做会消耗系统资源,并且使配置防火墙更加困难

信道概念 :
客户机执行的每个协议操作都发生在通道上。特定信道上的通信完全独立于另一个信道上的通信,因此每个协议方法都带有一个信道 ID (又名信道编号) ,代理和客户机都使用这个 id 来确定该方法用于哪个信道

通道只存在于连接的上下文中,从不独立存在。当连接关闭时,连接上的所有通道也都关闭。

信道的生命周期 :
应用程序在成功打开连接后立即打开通道。

1
2
3
4
java复制代码ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.createConnection();
Channel ch = conn.createChannel();
ch.close();

就像连接一样,通道意味着长寿命。也就是说,没有必要为每个操作打开一个通道,这样做效率非常低,因为打开一个通道是一个网络往返。

客户端库提供了一种观察和应对通道异常的方法。例如,在 Java 客户机中,有一种方法可以注册错误处理程序并访问通道关闭(闭包)原因。

集群的类型和部署方式
1 . 普通集群模式

  • 集成方式 : 多台服务器单独部署节点 , 每个节点保存着 queue 的元数据或者实例 , 当访问一个仅Queue元数据的节点时 , 则会从具有实例的节点拉取数据
  • 提高吞吐量
  • 非高可用

2 . 镜像集群模式

  • 集成方式 : 多台服务器单独部署节点 , 一个节点会向其他节点同步数据 , 所以每个节点持有所有的消息数据
  • 高可用
  • 性能开销大

可以参考这位的导图 : juejin.cn/editor/draf…

image.png

image.png

5.2 消息丢失问题

丢失通常有三种类型 :

  1. 消息发送过程中丢失 , 消费者未获取到消息
  2. 消费者收取到消息 , 未来得及消费
  3. 消费者消费消息 ,但是消费未成功完成

为题一 : 消息发送过程中丢失 , 消费者未获取到消息

丢失问题和核心在于确认 , RabbitMQ 是发送方确认 , Channel 被设置为 Confirm 模式后 , 所有消息都会分配一个 ChannelID , 消费完成后 , 会将消费消费成功的id 返回给生产者

内部错误时 , 会返回 nack 消息

1
2
3
java复制代码void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData : 客户机在发送原始消息时提供的对象
ack : false -> nack (not ack )

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。
PS : RabbitMQ 提供了事务功能

问题二 : 消费者收取到消息 , 未来得及消费

通常这种原因是因为消息被放在了内存中 ,而来不及消费 , 最好的解决方式就是持久化

  1. 创建时即将 Queue 设置未持久化
  2. 发送消息时 , 将消息设置为持久化

问题三 : 消费者消费消息 ,但是消费未成功完成

ACK 由消费者通过业务层面处理

总结 : 保持消息持久化 , 通过业务层面 ACK Confirm 消息 , 基本上可以解决大部分可靠性问题

5.3 重复消费问题

在消息生产时,MQ 内部针对每条生产者发送的消息生成一个 inner-msg-id作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列

在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重和幂等的依据,避免同一条消息被重复消费。

消息分发:

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。

5.4 消息顺序问题

当RabbitMQ 但节点多个消费者的时候 , 虽然有 ack 机制 ,但是整个消费不是阻塞的 , 这就导致每个节点的性能不同会导致其消费的效率不同, 这个时候 , 快的节点就很有可能出现超前消费

借这位老哥的梯子看看 @ www.processon.com/view/5c5bf6…

image.png

解决方案 :

1 . 多queue分别对应一个 consumer

2 . 单Queue单 Consumer

5.5 消费失败问题

  1. 保证消息持久化
  2. 消费失败处理及回调 (ErrorHandler)

5.6 消费限流问题

RabbitMQ提供了qos(服务质量保证)功能:在非自动消息确认的前提下,如果一定数目的消息未被消费前,不会进行消费新的消息

  • prefetchSize: 单个消息大小限制,一般为0
  • prefetchCount:告诉rabbitmq不要给一个消费者一次推送超过N条消息,直到消费者主动ack
  • global: true\false是否将上述设置应用于channel,即上面的设置是通道级别还是消费之级别,一般为fals

AbstractMessageListenerContainer 中提供了以下方法的功能

六 . 性能指标

TODO

总结

感觉文档并没有达到之前预想的效果 , 预期是想通过官方文档 + 源码根源 找到可以定制化的点 , 但是写下来并没有实现这种效果 , 想了想 , 主要是因为Rabbit 体系太大了 , 官方文档很长 , 很难找到突破点 , 整篇文章里面还有很多很多没有讲到 , 源码也不是很深入

后面几天决定就和它死磕了 , 看看是开个单章还是就在本章深入了 , 祝顺利 !

更新日志 :
20210407 : 完善 Transaction 部分 , 完善其他小细节

20210408 : 计划完成类关系图

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%