微服务SpringCloud项目(十二):初步整合rabbi

小知识,大挑战!本文正在参与「程序员必备小知识」创作活动

本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

📖前言

1
复制代码心态好了,就没那么累了。心情好了,所见皆是明媚风景。

“一时解决不了的问题,那就利用这个契机,看清自己的局限性,对自己进行一场拨乱反正。”正如老话所说,一念放下,万般自在。如果你正被烦心事扰乱心神,不妨学会断舍离。断掉胡思乱想,社区垃圾情绪,离开负面能量。心态好了,就没那么累了。心情好了,所见皆是明媚风景。

1
2
3
4
5
6
7
8
9
10
11
text复制代码十年生死两茫茫,写程序,到天亮。

千行代码,Bug何处藏。

纵使上线又怎样,朝令改,夕断肠。

领导每天新想法,天天改,日日忙。

相顾无言,惟有泪千行。

每晚灯火阑珊处,夜难寐,再写两行!

RabbitMQ 简介

AMQP,即 Advanced Message Queuing Protocol高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX 。用于在 分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗

要学习 RabbitMQ 最靠谱的地方当然是他的官网:www.rabbitmq.com/,其他地方难免有解读误…

1. 引入依赖


本次需要创建2个子项目,一个 rabbitmq-provider (生产者),一个 rabbitmq-consumer(消费者)。

首先创建 rabbitmq-provider

pom.xml 里用到的jar依赖:

1
2
3
4
5
6
7
8
9
xml复制代码        <!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

2. 然后 application.yml


ps:里面的 虚拟 / 配置项 不是必须的,你们不创建,就不用加这个配置项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
yaml复制代码srping: 
# rabbitmq消息队列配置
rabbitmq:
password: 账号
username: 密码
port: 5672
addresses: 地址
#开启发送失败返回
publisher-returns: true
#配置确认回调
publisher-confirm-type: correlated
listener:
simple:
#指定最小的消费者数量.
concurrency: 5
#指定最大的消费者数量.
max-concurrency: 10
#开启ack
acknowledge-mode: auto
# 最多一次消费多少条数据 -限流
prefetch: 1
#开启ack
direct:
acknowledge-mode: auto
#支持消息的确认与返回
template:
mandatory: true

3. direct exchange(直连型交换机)


创建 DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
java复制代码package com.cyj.dream.test.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Description: 直连型交换机
* 1. 接着我们先使用下direct exchange(直连型交换机),创建DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,
* 在注释里有说明,后面的不同交换机的配置就不做同样说明了):
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.test.config
* @Author: ChenYongJia
* @CreateTime: 2021-10-18
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Configuration
public class DirectRabbitConfig {

/**
* 队列 起名:TestDirectQueue
*
* @return
*/
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);

//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("TestDirectQueue", true);
}

/**
* Direct交换机 起名:TestDirectExchange
*
* @return
*/
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange", true, false);
}

/**
* 绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
*
* @return
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}


@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}

}

1. SendMessageController 推送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码package com.cyj.dream.test.controller;

import com.cyj.dream.core.util.user.UUIDUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

/**
* @Description: 消息推送接口
* 1. 再写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求),SendMessageController.java:
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.test.controller
* @Author: ChenYongJia
* @CreateTime: 2021-10-18
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@RestController
public class SendMessageController {

/**
* 使用RabbitTemplate,这提供了接收/发送等等方法
*/
@Autowired
RabbitTemplate rabbitTemplate;

@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "这是一条测试消息, 你好啊骚年!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}

}

2. 把生产者项目项目运行,调用下接口:

1634864786.jpg

因为我们目前还没弄消费者 rabbitmq-consumer,消息没有被消费的,我们可以去 rabbitMq 管理页面看看,是否推送成功,这里就不多说了。

3. 接下来,创建 rabbitmq-consumer 项目:

pom.xml 的依赖加入

1
2
3
4
5
6
7
8
9
xml复制代码        <!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

然后是 application.yml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
yaml复制代码spring: 
# rabbitmq消息队列配置
rabbitmq:
password: 账号
username: 密码
port: 5672
addresses: 地址
#虚拟host 可以不设置,使用server默认host
#virtual-host: JCcccHost
#开启发送失败返回
publisher-returns: true
#配置确认回调
publisher-confirm-type: correlated
listener:
simple:
#指定最小的消费者数量.
concurrency: 5
#指定最大的消费者数量.
max-concurrency: 10
#开启ack
acknowledge-mode: auto
# 最多一次消费多少条数据 -限流
prefetch: 1
#开启ack
direct:
acknowledge-mode: auto
#支持消息的确认与返回
template:
mandatory: true

4. 创建消息接收监听类

DirectReceiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码package com.cyj.dream.file.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @Description: 消息接收监听类,DirectReceiver.java:
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.file.listener
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Slf4j
@Component
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiver {

@RabbitHandler
public void process(Map testMessage) {
log.info("DirectReceiver消费者收到消息 : {}", testMessage.toString());
}

}

然后将 rabbitmq-consumer 项目运行起来,可以看到把之前推送的那条消息消费下来了:(继续调用 rabbitmq-provider 项目的推送消息接口,你将可以看到消费者即时消费消息:)

1634864787(1).jpg

等你尝试:直连交换机既然是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?将以轮询的方式对消息进行消费,而且不存在重复消费。

4. Topic Exchange 主题交换机。


在生产者项目中创建TopicRabbitConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码package com.cyj.dream.test.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Description: 通配符(话题)模式
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.file.config
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Configuration
public class TopicRabbitConfig {

/**
* 绑定键
*/
public final static String man = "topic.man";

public final static String woman = "topic.woman";

@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}

@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}


/**
* 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
*
* 这样只要是消息携带的路由键是topic.man,才会分发到该队列
* @return
*/
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}

/**
* 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
* 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
* @return
*/
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}

}

1. 在接口添加方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码// ================================= 生产者使用 Topic 话题模式 ================================

@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "message: M A N ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}

@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
return "ok";
}

2. 在消费者项目创建 TopicManReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码package com.cyj.dream.file.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @Description: Topic话题模式消费者
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.file.listener
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Slf4j
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {

@RabbitHandler
public void process(Map testMessage) {
log.info("TopicManReceiver消费者收到消息 : " + testMessage.toString());
}

}

3. 再建一个 TopicTotalReceiver 监听 topic.woman

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码package com.cyj.dream.file.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @Description: Topic话题模式总数监听器
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.file.listener
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Slf4j
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {

@RabbitHandler
public void process(Map testMessage) {
log.info("TopicTotalReceiver消费者收到消息 : " + testMessage.toString());
}

}

4. 添加 主题交换机 配置 TopicRabbitConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码package com.cyj.dream.file.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Description: 通配符(话题)模式--消费者一定要加这个配置吗? 不需要的其实
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.file.config
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Configuration
public class TopicRabbitConfig {

/**
* 绑定键
*/
public final static String man = "topic.man";

public final static String woman = "topic.woman";

@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}

@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}


/**
* 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
*
* 这样只要是消息携带的路由键是topic.man,才会分发到该队列
* @return
*/
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}

/**
* 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
* 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
* @return
*/
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}

}

5. 重启生产者和消费者,然后分别调用两个新加的接口,然后看消费者 rabbitmq-consumer 的控制台输出情况:

1634865296(1).jpg

1
2
3
java复制代码TopicTotalReceiver消费者收到消息  : {createTime=2021-10-22 09:14:45, messageId=54d2732244db48a3af3e579ae8e18465, messageData=message: M A N }
TopicManReceiver消费者收到消息 : {createTime=2021-10-22 09:14:45, messageId=54d2732244db48a3af3e579ae8e18465, messageData=message: M A N }
TopicTotalReceiver消费者收到消息 : {createTime=2021-10-22 09:15:11, messageId=0020ad91b22d49d28b49e5bc4d7b3550, messageData=message: woman is all }

5. 消息回调


那么接下来我们继续讲讲消息的回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)。

rabbitmq-provider 项目的 application.yml 文件上,加上消息确认的配置项后:

ps: 本篇文章使用 springboot 版本为 2.3.6.RELEASE ; 如果你们在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为版本导致的配置项不起效,可以把 publisher-confirm-type: correlated 替换为 publisher-confirms: true

1. 配置相关的消息确认回调函数

RabbitConfig.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码package com.cyj.dream.test.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Description: 配置相关的消息确认回调函数
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.rabbitmq.config
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Slf4j
@Configuration
public class RabbitConfig {

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("ConfirmCallback: " + "相关数据:" + correlationData);
log.info("ConfirmCallback: " + "确认情况:" + ack);
log.info("ConfirmCallback: " + "原因:" + cause);
}
});

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("ReturnCallback: " + "消息:" + message);
log.info("ReturnCallback: " + "回应码:" + replyCode);
log.info("ReturnCallback: " + "回应信息:" + replyText);
log.info("ReturnCallback: " + "交换机:" + exchange);
log.info("ReturnCallback: " + "路由键:" + routingKey);
}
});

return rabbitTemplate;
}

}

2. 触发条件

到这里,生产者推送消息的消息确认调用回调函数已经完毕。可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;那么这两种回调函数在什么情况会触发呢?

先从总体的情况分析,推送消息存在四种情况:

  1. 消息推送到server,但是在server里找不到交换机
  2. 消息推送到server,找到交换机了,但是没找到队列
  3. 消息推送到server,交换机和队列啥都没找到
  4. 消息推送成功

3. 分别测试一下

第一种
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@GetMapping("/TestMessageAck")
public String TestMessageAck() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "message: non-existent-exchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
return "ok";
}

调用结果如下:(这种情况触发的是 ConfirmCallback 回调函数。)

1
2
3
java复制代码2021-10-22 09:22:33.335  INFO 17412 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig   : ConfirmCallback:     相关数据:null
2021-10-22 09:22:33.335 INFO 17412 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig : ConfirmCallback: 确认情况:false
2021-10-22 09:22:33.335 INFO 17412 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig : ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
第二种

这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在 DirectRabitConfig 里面新增一个直连交换机,名叫 ‘lonelyDirectExchange’ ,但没给它做任何绑定配置操作:

1
2
3
4
typescript复制代码@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}

然后写个测试接口,把消息推送到名为 ‘lonelyDirectExchange’ 的交换机上(这个交换机是没有任何队列配置的):

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@GetMapping("/TestMessageAck2")
public String TestMessageAck2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: lonelyDirectExchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);
return "ok";
}

结果如下:(这种情况触发的是 ConfirmCallbackRetrunCallback 两个回调函数。两个函数都被调用了;这种情况下,消息是推送成功到服务器了的,所以 ConfirmCallback 对消息确认情况是 true

1
2
3
4
5
6
7
8
java复制代码2021-10-22 09:26:33.881  INFO 26800 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig   : ConfirmCallback:     相关数据:null
2021-10-22 09:26:33.881 INFO 26800 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig : ConfirmCallback: 确认情况:true
2021-10-22 09:26:33.881 INFO 26800 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig : ConfirmCallback: 原因:null
2021-10-22 09:26:33.882 INFO 26800 --- [nectionFactory1] com.cyj.dream.test.config.RabbitConfig : ReturnCallback: 消息:(Body:'{createTime=2021-10-22 09:26:33, messageId=660c817afa474d8f899b9b7ace6d1386, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2021-10-22 09:26:33.882 INFO 26800 --- [nectionFactory1] com.cyj.dream.test.config.RabbitConfig : ReturnCallback: 回应码:312
2021-10-22 09:26:33.882 INFO 26800 --- [nectionFactory1] com.cyj.dream.test.config.RabbitConfig : ReturnCallback: 回应信息:NO_ROUTE
2021-10-22 09:26:33.882 INFO 26800 --- [nectionFactory1] com.cyj.dream.test.config.RabbitConfig : ReturnCallback: 交换机:lonelyDirectExchange
2021-10-22 09:26:33.882 INFO 26800 --- [nectionFactory1] com.cyj.dream.test.config.RabbitConfig : ReturnCallback: 路由键:TestDirectRouting
第三种–这种情况触发的是 ConfirmCallback 回调函数。

消息推送到sever,交换机和队列啥都没找到,这种情况其实跟 第一种 很像,第三种第一种 情况回调是一致的,就不做结果说明了。

第四种–消息推送成功

按照正常调用之前消息推送的接口就行,这种情况触发的是 ConfirmCallback 回调函数。


PS:今天先说到这里吧,明天聊聊 消息确认机制,最后感谢大家耐心观看完毕,留个点赞收藏是您对我最大的鼓励!


🎉总结:

  • 更多参考精彩博文请看这里:《陈永佳的博客》
  • 喜欢博主的小伙伴可以加个关注、点个赞哦,持续更新嘿嘿!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%