rabbitMQ--广播模式

这是我参与更文挑战的第6天,活动详情查看: 更文挑战

我们之前的rabbitMQ的模式都是不存在交换机的,直接发送到队列,将下来讲的是订阅模型,一次像多个消费者发消息

image.png
一个生产者发送消息到交换机,交换机发给绑定在自己上边的队列,消费者在从队列拿到消息消费,
X(Exchange):交换机接受生产者发送的消息,另一方面知道如何处理消息,,发给某个队列,还是发给所有的队列,或者是直接舍弃,取决于交换机是如何配置的,交换机只负责发送,而不去存储消息。

交换机分为几类

1
2
3
4
5
js复制代码Publish/Subscribe:广播,将消息交给所有绑定到交换机的队列

Routing:定向,把消息交给符合指定routing key 的队列

Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

订阅模型–Publish/Subscribe

image.png

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个消费者
  • 2) 每个消费者有自己的queue(队列)
  • 3) 每个队列都要绑定到Exchange(交换机)
  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 5) 交换机把消息发送给绑定过的所有队列
  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

生产者

生产者声明交换机,不在声明队列,消息发送到交换机,比在发送到队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class p1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare("Subscribe_exchange", "fanout");
// 消息内容
String message = "Hello_Subscribe";
// 发布消息到Exchange
channel.basicPublish("Subscribe_exchange", "", null, message.getBytes());
System.out.println("生产者发送消息=:'" + message + "'");

channel.close();
connection.close();

}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class c1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("Subscribe_queue_1", false, false, false, null);

// 绑定队列到交换机
channel.queueBind("Subscribe_queue_1", "Subscribe_exchange", "");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("c1消费消息: "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("Subscribe_queue_1", false, consumer);
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public class c2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("Subscribe_queue_2", false, false, false, null);

// 绑定队列到交换机
channel.queueBind("Subscribe_queue_2", "Subscribe_exchange", "");

// 定义队列的消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("c2消费消息: "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};

// 监听队列,自动返回完成
channel.basicConsume("Subscribe_queue_2", false, consumer);
}
}

启东消费者,生产者发送一条消息,看输出

image.png

image.png

Routing–有选择的发送消息

订阅模式,在这个中我们可以做到不同的队列接受不同的消息,队列与交换机绑定必须指定,消息发送时也必须指定发送消息的routingKey
image.png

如上图所示生产者生产消息发送到交换机,交换机通过与rontingkley的匹配的队列发送消息。

生产者–分别发送三次不同的消息,匹配不同的routingkey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class p {
public static void main(String[] args) throws IOException, TimeoutException {

//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare("routing_exchange", "direct");
// 消息内容
//String message = "新增";
//String message = "删除";
String message = "更新";
// 发布消息到Exchange
//channel.basicPublish("routing_exchange", "insert", null, message.getBytes());
//channel.basicPublish("routing_exchange", "delect", null, message.getBytes());
channel.basicPublish("routing_exchange", "update", null, message.getBytes());
System.out.println("生产者发送消息=:开始'" + message + "'");

channel.close();
connection.close();


}
}

消费者insert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class insert {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("routing_queue_insert", false, false, false, null);

// 绑定队列到交换机
channel.queueBind("routing_queue_insert", "routing_exchange", "insert");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("insert 接收消息 : "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("routing_queue_insert", false, consumer);
}
}

消费者delect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class delect {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("routing_queue_delect", false, false, false, null);

// 绑定队列到交换机
channel.queueBind("routing_queue_delect", "routing_exchange", "delect");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("delect 接收消息 : "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("routing_queue_delect", false, consumer);
}
}

消费者update

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class update {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("routing_queue_update", false, false, false, null);

// 绑定队列到交换机
channel.queueBind("routing_queue_update", "routing_exchange", "update");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("delect 接收消息 : "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("routing_queue_update", false, consumer);
}
}

看控制台输出,可以看到,绑定了不同routingkey的收到不同的消息,每个队列可以有很多个routingkey

image.png

image.png

topic–

不同于Direct的交换机,topic匹配可以通过通配符

image.png

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割

1
2
js复制代码*(星号)可以正好代替一个词。
# (hash) 可以代替零个或多个单词。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class p {
public static void main(String[] args) throws IOException, TimeoutException {

//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare("topic_exchange", "topic");
// 消息内容
String message = "新增";
//String message = "删除";
//String message = "更新";
// 发布消息到Exchange
channel.basicPublish("topic_exchange", "goods.insert", null, message.getBytes());
//channel.basicPublish("topic_exchange", "goods.delect", null, message.getBytes());
// channel.basicPublish("topic_exchange", "goods.update", null, message.getBytes());
System.out.println("生产者发送消息=:开始'" + message + "'");

channel.close();
connection.close();


}
}

消费者1 只接收insert和delect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class c1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("topic_queue_1", false, false, false, null);

// 绑定队列到交换机
channel.queueBind("topic_queue_1", "topic_exchange", "goods.insert");
channel.queueBind("topic_queue_1", "topic_exchange", "goods.delect");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新增删除 接收消息 : "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("topic_queue_1", false, consumer);
}
}

消费者2 只要匹配到 goods.全拿下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class c2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("topic_queue_2", false, false, false, null);

// 绑定队列到交换机
channel.queueBind("topic_queue_2", "topic_exchange", "goods.*");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("商品 接收消息 : "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("topic_queue_2", false, consumer);
}
}

运行发送推广三条消息,看控制台输出

image.png

image.png

完美、

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%