RabbitMQ(3) 集成 Springboot 项目

这是我参与11月更文挑战的第2天,活动详情查看:2021最后一次更文挑战

前言

前两篇文章分别介绍了 RabbitMQ 的搭建与基础使用,哪些基础都是学习 MQ 的必要知识,而想要在项目中用到,则需要集成到我们的 Spring 项目中,本文只介绍 Springboot 如何集成 RabbitMQ,通过这个中间件给其他的微服务发送消息。

一、生产者服务搭建

1.1、建立springboot父子工程。

其中的xiaolei-server 是rabbitmq-producer 的父项目,在其下添加一个 rabbitmq的生产者服务。这样代码比较有结构。而middleware-rabbitmq 是后期对rabbitmq 封装的 SDK,里面封装一些方法和配置,后期需要,这里可以不用管他。

image-20211029104829081

1.2、配置 application.yml 文件

1
2
3
4
5
6
7
8
9
10
11
yaml复制代码# 服务端口
server:
port: 8200
# 配置rabbitmq服务
spring:
rabbitmq:
username: test
password: test
virtual-host: test
host: 192.168.81.102
port: 5672

1.3、在项目中导入依赖

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.4、定义生产者,发送消息

生产者,我们继续发送上一文中讲的影片的案例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码@RestController
@RequestMapping("/firm")
public class SendFirmMsgController {

@Autowired
private RabbitTemplate rabbitTemplate;
// 1、定义交换机
private String exchangeName = "exchange_firm";
// 2、定义路由key
private String routeKey1 = "爱国.吴京";
private String routeKey2 = "爱国.沈腾";
private String routeKey3 = "动作.吴京";
private String routeKey4 = "喜剧.沈腾";

@PostMapping("/send")
public void sendMsg(){

for (int i = 1; i <=40; i++) {
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
if(i%4==0){
rabbitTemplate.convertAndSend(exchangeName,routeKey1,("爱国.吴京,说第"+i+"遍。").getBytes());
}else if(i%4 ==1){
rabbitTemplate.convertAndSend(exchangeName,routeKey2,("爱国.沈腾,说第"+i+"遍。").getBytes());
}else if(i%4 ==2){
rabbitTemplate.convertAndSend(exchangeName,routeKey3,("动作.吴京,说第"+i+"遍。").getBytes());
}else if(i%4 ==3){
rabbitTemplate.convertAndSend(exchangeName,routeKey4,("喜剧.沈腾,说第"+i+"遍。").getBytes());
}
System.out.println("发送第"+i);
}
}
}

1.5 初始化队列和交换机关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
typescript复制代码package com.xiaolei.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 主题消费类型配置
* @Author xiaolei
* @Date 2021/10/29 11:03
**/
@Configuration
public class TopicRabbitConfig {

/**
* 给队列取名字
* @return
*/
@Bean
public Queue firstQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);

//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue ("queue1",true);
}
@Bean
public Queue SecondQueue() {
return new Queue ("queue2",true);
}

@Bean
public Queue ThreeQueue() {
return new Queue ("queue3",true);
}

@Bean
public Queue FourQueue() {
return new Queue ("queue4",true);
}

/**
* 给交换机取名
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("exchange_firm",true,false);
}

@Bean
public Binding bindingTopic1(){
return BindingBuilder.bind(firstQueue()).to(topicExchange()).with("*.吴京");
}

@Bean
public Binding bindingTopic2(){
return BindingBuilder.bind(SecondQueue()).to(topicExchange()).with("爱国.吴京");
}

@Bean
public Binding bindingTopic3(){
return BindingBuilder.bind(ThreeQueue()).to(topicExchange()).with("爱国.*");
}
@Bean
public Binding bindingTopic4(){
return BindingBuilder.bind(FourQueue()).to(topicExchange()).with("#.沈腾");
}
}

1.6 调用 postman 接口测试

发现此时在rabbitmq 上存在了新的 交换机和topci信息

image-20211029120043364

二、消费者服务搭建

image-20211029121221570

2.1 导入依赖

这过程我们在父项目中已经导入了,所以这里可以省略。

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 application.yml 配置

1
2
3
4
5
6
7
8
9
10
11
yaml复制代码# 服务端口
server:
port: 8201
# 配置rabbitmq服务
spring:
rabbitmq:
username: test
password: test
virtual-host: test
host: 192.168.81.102
port: 5672

2.3 定义四个消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
less复制代码@RestController
public class MsgController {

@RabbitListener(bindings = @QueueBinding(
// 指定队列名字
value = @Queue(value = "queue1",autoDelete = "false"),
// 指定交换机的名字
exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC)
))
@RabbitHandler
public void consumrmsg1(String msg){
System.out.println("吴京粉丝-------------->" + msg);
}

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue2",autoDelete = "false"),exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC)))
@RabbitHandler
public void consumrmsg2(String msg){
System.out.println("爱国吴京粉丝-------------->" + msg);
}

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue3",autoDelete = "false"),exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC)))
@RabbitHandler
public void consumrmsg3(String msg){
System.out.println("爱国粉丝-------------->" + msg);
}

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue4",autoDelete = "false"),exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC)))
@RabbitHandler
public void consumrmsg4(String msg){
System.out.println("沈腾粉丝-------------->" + msg);
}
}

打印效果如下:

image-20211029120459138

2.4 细节讲解

消费者类中通过 @RabbitListener 和 @RabbitHandler 注解将一个方法定义为消息监听的方法。

image-20211029120838050

其他几种的类型都差不多,我们只要自己来配置对应的类就好了。目前基础配置已经完成。

三、小结

本文介绍了 RabbitMq 中与 Springboot 的集成,这是属于初级应用,后面的文章中,我们将再考虑其他的问题,研究 RabbitMq的相关特性。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%