小知识,大挑战!本文正在参与「程序员必备小知识」创作活动
本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。
📖前言
1 | 复制代码心态好了,就没那么累了。心情好了,所见皆是明媚风景。 |
“一时解决不了的问题,那就利用这个契机,看清自己的局限性,对自己进行一场拨乱反正。”正如老话所说,一念放下,万般自在。如果你正被烦心事扰乱心神,不妨学会断舍离。断掉胡思乱想,社区垃圾情绪,离开负面能量。心态好了,就没那么累了。心情好了,所见皆是明媚风景。
1 | text复制代码十年生死两茫茫,写程序,到天亮。 |
RabbitMQ
简介
AMQP
,即Advanced Message Queuing Protocol
,高级消息队列协议
,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。RabbitMQ
是一个开源的AMQP
实现,服务器端用Erlang
语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP
等,支持AJAX
。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗
。
要学习 RabbitMQ
最靠谱的地方当然是他的官网:www.rabbitmq.com/,其他地方难免有解读误…
1. 引入依赖
本次需要创建2个子项目,一个 rabbitmq-provider
(生产者),一个 rabbitmq-consumer
(消费者)。
首先创建 rabbitmq-provider
,
pom.xml
里用到的jar依赖:
1 | xml复制代码 <!--rabbitmq--> |
2. 然后 application.yml
:
ps:里面的
虚拟 / 配置项
不是必须的,你们不创建,就不用加这个配置项。
1 | yaml复制代码srping: |
3. direct exchange(直连型交换机)
创建
DirectRabbitConfig.java
(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):
1 | java复制代码package com.cyj.dream.test.config; |
1. SendMessageController
推送消息
1 | java复制代码package com.cyj.dream.test.controller; |
2. 把生产者项目项目运行,调用下接口:
因为我们目前还没弄消费者
rabbitmq-consumer
,消息没有被消费的,我们可以去rabbitMq
管理页面看看,是否推送成功,这里就不多说了。
3. 接下来,创建 rabbitmq-consumer
项目:
pom.xml
的依赖加入
1 | xml复制代码 <!--rabbitmq--> |
然后是
application.yml:
1 | yaml复制代码spring: |
4. 创建消息接收监听类
DirectReceiver.java
1 | java复制代码package com.cyj.dream.file.listener; |
然后将
rabbitmq-consumer
项目运行起来,可以看到把之前推送的那条消息消费下来了:(继续调用rabbitmq-provider
项目的推送消息接口,你将可以看到消费者即时消费消息:)
等你尝试:直连交换机既然是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?将以轮询的方式对消息进行消费,而且不存在重复消费。
4. Topic Exchange
主题交换机。
在生产者项目中创建
TopicRabbitConfig.java
1 | java复制代码package com.cyj.dream.test.config; |
1. 在接口添加方法
1 | java复制代码// ================================= 生产者使用 Topic 话题模式 ================================ |
2. 在消费者项目创建 TopicManReceiver
1 | java复制代码package com.cyj.dream.file.listener; |
3. 再建一个 TopicTotalReceiver
监听 topic.woman
1 | java复制代码package com.cyj.dream.file.listener; |
4. 添加 主题交换机
配置 TopicRabbitConfig
1 | java复制代码package com.cyj.dream.file.config; |
5. 重启生产者和消费者,然后分别调用两个新加的接口,然后看消费者 rabbitmq-consumer
的控制台输出情况:
1 | java复制代码TopicTotalReceiver消费者收到消息 : {createTime=2021-10-22 09:14:45, messageId=54d2732244db48a3af3e579ae8e18465, messageData=message: M A N } |
5. 消息回调
那么接下来我们继续讲讲消息的回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)。
在
rabbitmq-provider
项目的application.yml
文件上,加上消息确认的配置项后:ps: 本篇文章使用
springboot
版本为2.3.6.RELEASE
; 如果你们在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为版本导致的配置项不起效,可以把publisher-confirm-type: correlated
替换为publisher-confirms: true
1. 配置相关的消息确认回调函数
RabbitConfig.java:
1 | java复制代码package com.cyj.dream.test.config; |
2. 触发条件
到这里,生产者推送消息的消息确认调用回调函数已经完毕。可以看到上面写了两个回调函数,一个叫
ConfirmCallback
,一个叫RetrunCallback
;那么这两种回调函数在什么情况会触发呢?
先从总体的情况分析,推送消息存在四种情况:
- 消息推送到server,但是在server里找不到交换机
- 消息推送到server,找到交换机了,但是没找到队列
- 消息推送到server,交换机和队列啥都没找到
- 消息推送成功
3. 分别测试一下
第一种
1 | java复制代码@GetMapping("/TestMessageAck") |
调用结果如下:(这种情况触发的是 ConfirmCallback
回调函数。)
1 | java复制代码2021-10-22 09:22:33.335 INFO 17412 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig : ConfirmCallback: 相关数据:null |
第二种
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在 DirectRabitConfig
里面新增一个直连交换机,名叫 ‘lonelyDirectExchange’
,但没给它做任何绑定配置操作:
1 | typescript复制代码@Bean |
然后写个测试接口,把消息推送到名为 ‘lonelyDirectExchange’
的交换机上(这个交换机是没有任何队列配置的):
1 | java复制代码@GetMapping("/TestMessageAck2") |
结果如下:(这种情况触发的是 ConfirmCallback
和 RetrunCallback
两个回调函数。两个函数都被调用了;这种情况下,消息是推送成功到服务器了的,所以 ConfirmCallback
对消息确认情况是 true
)
1 | java复制代码2021-10-22 09:26:33.881 INFO 26800 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig : ConfirmCallback: 相关数据:null |
第三种
–这种情况触发的是 ConfirmCallback
回调函数。
消息推送到sever,交换机和队列啥都没找到,这种情况其实跟 第一种
很像,第三种
和 第一种
情况回调是一致的,就不做结果说明了。
第四种
–消息推送成功
按照正常调用之前消息推送的接口就行,这种情况触发的是 ConfirmCallback
回调函数。
PS:今天先说到这里吧,明天聊聊 消息确认机制
,最后感谢大家耐心观看完毕,留个点赞收藏是您对我最大的鼓励!
🎉总结:
- 更多参考精彩博文请看这里:《陈永佳的博客》
- 喜欢博主的小伙伴可以加个关注、点个赞哦,持续更新嘿嘿!
本文转载自: 掘金