Spring Cloud Alibaba 学习 -- 4、整

这是我参与11月更文挑战的第10天,活动详情查看:2021最后一次更文挑战

学习视频(B站):www.bilibili.com/video/BV1Mt…

GitHub 源码地址:github.com/tyronczt/sp…

基本概念:RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

详见:github.com/apache/rock…

一、Linux 安装

1、下载压缩包,rocketmq-all-4.9.0-bin-release.zip

网址:rocketmq.apache.org/dowloading/…

2、解压缩

1
shell复制代码unzip rocketmq-all-4.9.0-bin-release.zip

3、启动 NameServer

1
2
shell复制代码cd bin
nohup ./mqnamesrv &

4、检查是否启动成功

1
2
shell复制代码查看端口启动情况:netstat -an | grep 9876
查看启动日志:tail -f ~/logs/rocketmqlogs/namesrv.log

在这里插入图片描述

5、启动 Broker

启动之前需要编辑配置文件,修改 JVM 内存设置,默认给的内存 4 GB,超过现有虚拟机 JVM ,不然会报错
在这里插入图片描述

1
2
3
4
5
shell复制代码cd bin
vim runserver.sh
vim runbroker.sh
-Xms256m -Xmx256m -Xmn128m
Xms为虚拟机初始化堆大小,Xmx为最大堆大小,Xmn为最小堆大小

在这里插入图片描述

启动 Broker

1
shell复制代码nohup ./mqbroker -n localhost:9876 &

查看启动情况

1
shell复制代码tail -f ~/logs/rocketmqlogs/broker.log

在这里插入图片描述

6、测试 RocketMQ

测试消息发送

1
2
3
shell复制代码cd bin
export NAMESRV_ADDR=localhost:9876 【参考官网:RocketMQ provides multiple ways to achieve this. For simplicity, we use environment variable NAMESRV_ADDR,否则会报错:connect to null failed】
./tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述

测试消息接收

1
2
3
shell复制代码cd bin
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述

7、关闭 RocketMQ

1
2
3
shell复制代码cd bin
./mqshutdown broker
./mqshutdown namesrv

在这里插入图片描述

8、报错

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
shell复制代码#关闭RocketMq
./mqshutdown broker
./mqshutdown namesrv

nohup ./mqnamesrv &

cd ../conf/
vim broker.conf

#最后两行进行添加
namesrvAddr=192.168.255.100:9876
brokerIP1=192.168.255.100

cd ..
nohup sh bin/mqbroker -n 192.168.255.100:9876 autoCreateTopicEnable=true -c conf/broker.conf &

参考:rocketmq.apache.org/docs/quick-…

二、控制台安装

1、下载源码,rocketmq-console

网址:github.com/apache/rock…

2、打包

1
shell复制代码mvn clean package -Dmaven.test.skip=true

3、修改配置文件

1
2
3
4
ini复制代码properties
server.port=9877
rocketmq.config.namesrvAddr=192.168.255.100:9876
rocketmq.config.isVIPChannel=false

4、运行

1
java复制代码java -jar target/rocketmq-console-ng-2.0.0.jar

在这里插入图片描述

http://localhost:9877/
在这里插入图片描述

5、报错

rocketmq:connect to 172.17.0.1:10911 failed

这是因为我们的 RocketMQ 安装在 Linux 中,控制台在 windows,Linux 需要开放端口才能访问,开放 10909 和 10911和9876 端口

1
2
3
4
5
cmd复制代码firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload

三、Java 实现消息发送

在provider项目下

1、引入依赖

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>

2、生产消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class Test {
public static void main(String[] args) throws Exception {
//创建消息生产者
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//设置NameServer
producer.setNamesrvAddr("192.168.255.100:9876");
//启动生产者
producer.start();
//构建消息对象
Message message = new Message("myTopic", "myTag", ("Test MQ").getBytes());
//发送消息
SendResult result = producer.send(message, 1000);
System.out.println(result);
//关闭生产者
producer.shutdown();
}
}

3、查看消息

在这里插入图片描述
在这里插入图片描述

4、消费消息

处于阻塞状态,只要有消息发送过来,就会消费信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码@Slf4j
public class ConsumerTest {
public static void main(String[] args) throws MQClientException {
//创建消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
//设置NameServer
consumer.setNamesrvAddr("192.168.255.100:9876");
//指定订阅的主题和标签
consumer.subscribe("myTopic", "*");
//回调函数
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("Message=>{}", list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}
}

5、查看消息
在这里插入图片描述

四、SpringBoot整合RocketMQ

1、生产者引入依赖

1
2
3
4
5
6
7
8
9
10
xml复制代码<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>

2、配置文件

1
2
3
4
yml复制代码rocketmq:
name-server: 192.168.255.100:9876
producer:
group: myprovider

3、生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Autowired
private RocketMQTemplate rocketMQTemplate;

@GetMapping("/create")
public Order create(){
Order order = new Order(
1,
"张三",
"123123",
"软件园",
new Date()
);
this.rocketMQTemplate.convertAndSend("myTopic",order);
return order;
}

4、消费者引入依赖

1
2
3
4
5
6
7
8
9
10
xml复制代码<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>

5、配置文件

1
2
yaml复制代码rocketmq:
name-server: 192.168.255.100:9876

6、消费者代码

1
2
3
4
5
6
7
8
9
java复制代码@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "myConsumer", topic = "myTopic")
public class SmsService implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("新订单{},发短信", order);
}
}

只要生产者中的topic与消费者中的topic进行对应即可。
在这里插入图片描述

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%