0.RabbitMQ介绍,应用场景
RabbitMQ是一种较为流行的一种消息中间件技术,它与Spring可以良好地整合。
那么RabbitMQ是用来做什么的,又或者说,我们在什么地方需要用到消息中间件。以下说明为消息中间件的应用场景:
0.0 异步处理
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种,1.串行的方式,2.并行的方式。
串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。
并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
除了以上两种方式,还可以使用消息队列异步去操作:
消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行方式使用时间100ms。虽然并行已经提高了处理时间,但是短信和邮件对于正常使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回。引入消息队列之后,把发送邮件,短信不是必须的业务逻辑异步处理。
0.1应用解耦
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。
但是这种做法有一个缺点:当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合。引入消息队列之后:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息,进行库存操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
0.2 流量削峰
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过设置的数量的订单直接丢弃
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。
2.秒杀业务根据消息队列中的请求信息,再做后续处理。
1.Rabbitmq安装-以rpm的方式
1.1 安装Erlang环境
1 | ruby复制代码curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash |
安装erlang:
1 | 复制代码yum install -y erlang |
检查erlang版本:
1 | 复制代码erl |
在安装Rabbitmq之前需要注意的是,一定要保证erlang的版本和rabbitmq的版本是对应的。
我这里安装的erlang版本是:
因为我安装的erlang版本是24的,所以接下来我安装RabbitMQ的版本至少也要3.8.16
1.2 安装RabbitMQ
1 | arduino复制代码rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey |
下载rpm包:
1 | ruby复制代码wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.25/rabbitmq-server-3.8.25-1.el7.noarch.rpm |
这里下载可能会很慢,也可以在windows下载rpm包之后再上传到服务器上。
安装:
1 | vbscript复制代码rpm -ivh rabbitmq-server-3.8.25-1.el7.noarch.rpm |
然后启动管理平台插件,如果需要访问RabbitMQ的web管理页面就需要操作这一步:
1 | bash复制代码rabbitmq-plugins enable rabbitmq_management |
启动RabbitMQ:
1 | sql复制代码systemctl start rabbitmq-server |
然后可以访问控制台地址:
http:你的服务器ip:15672
如果访问不了,需要检查你的RabbitMQ是否成功启动并且相关端口(15672)是否开放。
使用默认的账号进行登录:guest/guest。
这时候会发现登录不进去,并且会有一个提示”User can only log in via localhost“
出现这个信息表示RabbitMQ web管理页面只允许以localhost的方式进行访问,那么我们在外网如何进行web管理后台的访问呢?
需要进入到/etc/rabbitmq/目录下:
1 | bash复制代码cd /etc/rabbitmq |
1 | arduino复制代码vi rabbitmq.config |
在rabbitmq.config配置文件里面加上(最后的那个小点别忘了,不然会重启不成功):
1 | css复制代码[{rabbit, [{loopback_users, []}]}]. |
然后重启:
1 | vbscript复制代码systemctl restart rabbitmq-server |
2.AMQP协议
2.1 什么是AMQP
AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是一种网络协议。用于客户端应用和消息中间件之间的通信。
2.2 AMQP模型简介
上图是AMQP模型,主要的工作过程就是:消息由Publisher(发布者或者称为提供者),发送到交换机,交换机将收到的消息根据路由规则转发到不同的队列中去。最后RabbitMQ会将队列中的消息推送给订阅了此队列的消费者,或者消费者主动的从队列中获取消息。
从安全角度考虑,网络是不可靠的。所以消费者在消费消息的时候,可能会因为某种原因处理失败。基于这个原因,RabbitMQ提供了一种消息确认机制(message acknowledgements):当一个消息从队列投递给消费者之后,要么会自动进行消息确认,要么就需要手动进行消息确认。消费者确认之后,队列就会将此条消息进行删除。
消息确认有两种方式:
1.自动确认,当消息被发送到消费者之后,自动删除。
2.待应用发送一个确认回执后再删除消息。确认回执可以是在收到消息后立马发送,也可以将消息存储之后再进行回执发送,也可以在处理完该消息之后,进行回执发送。
在某些情况,例如有一个消息没有被成功路由时。消息或许会被返回给发布者并被丢弃。
2.3 交换机和交换机类型
AMQP提供了四种交换机:
2.3.1 默认交换机
默认交换机(default exchange)是RabbitMQ预先声明好的没有名字的交换机。每个新建队列,如果没有指明特定的交换机名称,那么它将会自动绑定到默认交换机上,绑定的路由键名称就是队列的名称。
我看到有些说法,就是说,通道可以把消息直接发送到交换机,或者直接发送到队列。我觉得这样说是不准确的,”直接发送到队列“这种情况应该是发送到默认的交换机,然后由默认交换机根据路由key来路由到对应的与之绑定的队列中。
2.3.2 直连交换机
直连交换机(direct exchange)是根据消息携带的路由键将消息转发给对应队列。直连交换机默认的模式是平均分配,如果一个直连交换机绑定了多个路由键一样的队列,那么直连交换机则会将消息平均的分配给所绑定的符合条件的队列。
2.3.3 扇形交换机
扇形交换机(fanout exchange)将消息路由给绑定到它审核的所有队列,而不理会绑定的路由键。如果有多个队列绑定到扇形交换机上,当有消息发送给扇形交换机的时候,交换机会将消息的拷贝分别发送给绑定到扇形交换机上面的所有队列。
2.3.4 主题交换机
主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
2.4 队列
2.4.1 队列持久化
持久化队列(Durable queues)会被存储在磁盘上,当RabbitMQ重启的时候,持久化队列依旧存在。没有被持久化的队列称为暂存队列(Transient queues)。
需要注意的是,队列的持久化,不意味着队列中的消息持久化。如果消息没有被持久化,那么在RabbitMQ重启之后,虽然持久化队列存在,但是消息不会被保留。
2.5 通道
消息是通过通道被发送到交换机和队列上的。是消息传输的介质。
2.6 虚拟主机
RabbitMQ提供虚拟主机来隔离多个环境。当建立连接的时候,需要指定使用哪一个虚拟主机。
3.Rabbitmq web管理页面大致了解
4.五种模型的demo实现
4.0 相关依赖
1 | xml复制代码 <dependencies> |
4.1 Hello World
这个消息的要素就三个,一个生产者,一个消费者,和一个队列。这个模型很容易的就让人觉得,生产者把消息直接发送到队列中,其实不是的。RabbitMQ中存在一个默认交换机,如果没有显式地把队列和一个具体名称的交换机绑定的话,默认就会把队列和默认交换机进行绑定,绑定的路由key就是队列的名称。所以这个模型的消息发送实质上还是消费者发送到交换机,然后交换机再把消息路由到队列,最后由消费者获取队列消息,或者队列向订阅了的消费者推送消息。
在操作之前需要创建一个虚拟主机和对应的测试用户:
消息生产者Provider生产消息:
1 | java复制代码/** |
执行完毕之后,就可以看到队列中已经存在一条消息了:
消费者Consumer消费消息:
1 | java复制代码/** |
执行完毕之后:
可以看到队列中的消息已经被消费了。
4.2 Work queues
可以看到工作队列(work queues)模型与第一个Helloworld模型类似,只不过工作队列模型的消费者由一个变成多个。
那这个工作队列(work queues)模型是在什么背景下使用的?在一个消费者一个生产者的模型下,如果生产者生产消息的速度远大于消费者消费消息的速度,那么消息就不断地累积在队列中,会阻塞队列。
使用工作队列模型,在原有的基础上,增加多个消费者,多个消费者共同消费同一个队列里面的消息,并且使用这个模型的前提下,并不会导致消息被重复消费。
创建消息生产者生产消息:
1 | java复制代码/** |
创建消费者1Consumer1:
1 | java复制代码/** |
创建消费者2Consumer2:
1 | java复制代码/** |
先运行两个消费者,然后再运行生产者,然后可以看到控制台输出:
可以看到消息被消费了。这里要说明一下,虽然在消费者2中的回调方法加了线程休眠,但是目前应该是不生效的,因为这里开启了消息自动确认,就是说,当消费者拿到消息之后,消息就会被自动确认,队列中对应的消息就会被删除,即使是回执方法没有执行完毕,队列也当做消费者已经消费了消息。
而且在控制台中可以看到,工作队列模型(work queues),默认的消息分配策略是轮询的,也可以说是平均的,队列会将消息平均地分配到每一个消费者上。
那么这个平均分配会带来什么问题,假设某一个消费者,处理消息的速度远慢于消息队列分发消息的速度,消费者此时是开启消息自动确认机制,消费者收到消息,但是并没有马上处理完成,如果程序在某个地方执行错误,导致程序崩溃,那么剩下没有被执行的消息将会被丢失。而消息队列中,在收到消息确认之后就会删除相关的消息。
那么如何来解决这个问题:1.消费者不开启消息自动确认,采用手动确认方式。就是消费者要在接收消息,处理消息完毕之后,再进行手动确认。
1 | java复制代码channel.basicQos(1);//每一次只能消费一个消息 |
经过修改之后,就会发现,处理速度 越快的消费者,它所收到的消息就越多。我们的工作队列(work queues)从原来的”平均分配”,变成了如今的”多劳多得”。
4.3 Fanout
fanout模型,也叫做广播模型。在这个模型下,消费者可以有多个,并且每个消费者有自己的队列。生产者把消息发送到交换机,由交换机来决定要发给哪个队列,交换机把消息发送给绑定过的所有队列。队列的消费者都能拿到消息。实现一条消息被多个消费者消费。
生产者:
1 | java复制代码/** |
消费者1:
1 | java复制代码public class Consumer1 { |
消费者2:
1 | java复制代码public class Consumer2 { |
消费者3:
1 | java复制代码public class Consumer3 { |
运行效果:
创建的交换机:
4.4 Routing-Direct
生产者:
1 | java复制代码public class Provider { |
消费者1:
1 | java复制代码/** |
消费者2:
1 | arduino复制代码/** |
运行结果:
direct交换机:
因为在消费者1中队列和交换机绑定的路由key为error,然而生产者发送消息时指定的路由key为info,所以消费者1自然就收不到消息。
4.5 Routing-Topic
Topic类型的交换机exchange与Direct类型的交换机相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型的交换机Exchange可以让队列在绑定RoutingKey的时候使用通配符。
Topic模型的通配符一般是由一个或者多个单词组成,多个单词之间以“.”分割。例如:item.insert.
1 | csharp复制代码// 通配符 |
生产者:
1 | java复制代码/** |
消费者1:
1 | java复制代码/** |
消费者2:
1 | java复制代码/** |
运行结果:
生产者发送消息设置的路由key为:”save.user.delete.qingyuan”
消费者1的临时队列和交换机绑定的路由key为:”.user.“
消费者2的临时队列和交换机绑定的路由key为:”*.user.#”
消费者1只能匹配user后面只带有一个词的,而消费者2可以匹配user后面带有多个词的。
5.SpringBoot中使用RabbitMQ
5.0 配置
1 | ini复制代码server.port=9090 |
5.1 HelloWorld
生产者:
1 | less复制代码/** |
消费者:
1 | less复制代码@Component |
运行SpringBoot启动类:
5.2 SpringBoot版Work Queues
生产者:
1 | less复制代码@RunWith(SpringRunner.class) |
消费者:
1 | less复制代码@Component |
同样的,在默认情况下,work queues模式是平均地将消息分配的消费者。
5.3 Fanout模式
fanout啊 topic啊其实说的就是消费端,队列和交换机的绑定关系。
生产者:
1 | less复制代码@RunWith(SpringRunner.class) |
消费者:
1 | less复制代码@Component |
5.4 Direct
生产者:
1 | less复制代码@RunWith(SpringRunner.class) |
消费者:
1 | less复制代码@Component |
5.5 Topic
生产者:
1 | less复制代码@RunWith(SpringRunner.class) |
消费者:
1 | less复制代码@Component |
运行结果:
6.参考资料
1.参考视频
本文转载自: 掘金