零、文章前言
- SpringBoot-RabbitMQ高级篇系列开始更新,本系列主要为SpringBoot整合RabbitMQ,实现高可用、可靠传递等
- 核心有四大交换器、死信队列、可靠传递、异常消费处理
- 系列共计三篇文章,本系列核心主讲整合和企业级内容,需要先具备SpringBoot和RabbitMQ基础知识
- 文章源码放到了网盘,没有放git仓库,需要的自行下载,脚本等信息在common下面
- 个人水平有限,有错误的地方欢迎指正
链接: pan.baidu.com/s/1lpZC6fr8… 提取码: qtvi
一、环境搭建
- 采用maven多module模式,共计创建三个子module
- common:通用实体信息
- rabbitmq-publisher:消息发布者,基于SpringBoot
- rabbitmq-subscriber:消息订阅者,基于SpringBoot
- 在消息发布者和订阅者两个项目中加入rabbitmq maven依赖
1 | xml复制代码<dependency> |
- 在两个项目中加入rabbitmq的配置信息
1 | yaml复制代码spring: |
- 上述三步完成后,rabbitmq的基础环境搭建完成
- rabbitmq配置属性类
- org.springframework.boot.autoconfigure.amqp.RabbitProperties
二、四大交换器
2.1 direct - 直连交换器
2.1.1 消息发送者
- 在消息发布者中新建配置类,声明交换器信息
- 只用声明交换器,队列和交换器绑定是订阅者操作
- 不同的类型提供不同的交换器
- 如果只声明交换器并不会创建交换器,而是绑定时或者发送消息时才创建
1 | java复制代码import org.springframework.amqp.core.DirectExchange; |
- 发送消息时,使用的是RabbitTemplate,为SpringBoot提供的RabbitMQ消息发送器
- org.springframework.amqp.rabbit.core.RabbitTemplate
- 发送消息示例
1 | java复制代码import org.springframework.amqp.AmqpException; |
2.2.2 消息接收者
- 接收者需要配置以下内容
- 交换器:直接new对应的交换器类型
- 队列:只有Queue类型,通过名称区分
- 交换器和队列的绑定:通过BindingBuilder.bind(队列).to(交换器).with(路由键);
- 只声明交换器和队列绑定,并不会马上创建,而是在发送消息或者监听队列时才会创建
1 | java复制代码import org.springframework.amqp.core.Binding; |
- 监听队列
- 监听的队列必须存在,否则将会报错
- 监听的队列消费完成会自动确认消息
- 如果多个队列同时监听一个队列,则消息会轮训地由不同方法处理
- 可以在参数中指定接收类型,消息将会自动转为对应类型
- 也可以指定Message参数获取对应消息信息
- org.springframework.amqp.core.Message
- 获取消息属性:message.getMessageProperties()
- 获取消息内容:message.getBody()
1 | java复制代码import org.springframework.amqp.rabbit.annotation.RabbitListener; |
2.1.3 消息发布订阅
- 先启动订阅者,可以看到队列声明
2. 启动发布者,然后发布消息
1 | java复制代码import org.springframework.amqp.AmqpException; |
- 订阅者会轮流收到信息
1 | java复制代码receiver01 message = direct |
2.2 topic - 主题交换器
2.2.1 消息发送者
- 声明topic交换器
1 | java复制代码import org.springframework.amqp.core.TopicExchange; |
- 声明controller
1 | java复制代码@RequestMapping("/topic") |
2.2.2 消息接收者
- 声明交换器、三个队列、队列的绑定
- *:匹配一个串
- #:匹配一个或者多个串
1 | java复制代码import org.springframework.amqp.core.Binding; |
- 监听队列
1 | java复制代码import org.springframework.amqp.rabbit.annotation.RabbitListener; |
2.2.3 消息发布订阅
- 发布者发送消息
- 订阅者收到消息
- 全匹配和模糊匹配
- 全匹配无论是哪个都会被匹配上
1 | java复制代码blogJavaListener message = hello |
2.3 fanout - 广播交换器
2.3.1 消息发送者
- 声明fanout交换器
1 | java复制代码import org.springframework.amqp.core.FanoutExchange; |
- 声明controller
1 | java复制代码@RequestMapping("/fanout") |
2.32 消息接收者
- 创建交换器、路由键、绑定
- 不需要使用路由键
1 | java复制代码import org.springframework.amqp.core.*; |
- 监听队列
1 | java复制代码import org.springframework.amqp.rabbit.annotation.RabbitListener; |
2.3.3 消息发布订阅
- 发布者发送消息
- 订阅者收到消息
1 | java复制代码radioListener message = fanout |
2.4 headers - 头交换器
2.4.1 消息发送者
- headers模式通过头匹配,会忽略路由键
- 发送者需要创建队列
1 | java复制代码import org.springframework.amqp.core.HeadersExchange; |
- 创建controller发送消息
- MessageProperties和Message包是:org.springframework.amqp.core
- 需要创建MessageProperties对象用于设置头信息
- Message用于存储消息和消息属性信息
1 | java复制代码@RequestMapping("/headers") |
2.4.2 消息接收者
- 接收者和上面三种一样,同样需要声明交换器、队列、绑定
- 在队列绑定时需要使用不同规则
- BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match()
- 所有字段属性和值全部匹配
- BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match()
- 任意字段属性和值全部匹配
- BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll(“name”, “token”).exist()
- 指定所有属性字段存在
- BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny(“name”, “token”).exist()
- 指定任意属性存在
- BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match()
- headerMap中存放的属性就是发送者中封装的属性,属性完全匹配则正确路由到此处
1 | java复制代码import org.springframework.amqp.core.Binding; |
- 队列监听
1 | java复制代码import org.springframework.amqp.rabbit.annotation.RabbitListener; |
2.4.3 消息发布订阅
- 发送消息
- 接收消息
1 | java复制代码headers01Listener message = headers |
本文转载自: 掘金