你的项目是时候集成RocketMQ了

1. RocketMQ介绍

rocketmq是阿里巴巴开源的一款分布式的消息中间件,他源于jms规范但是不遵守jms规范。对于分布式只一点,如果你了用过其他mq并且了解过rocketmq,就知道rocketmq天生就是分布式的,可以说是broker、provider、consumer等各种分布式

2. RocketMQ优点

  • 去除对zk的依赖
  • 支持异步和同步两种方式刷磁盘
  • 单机支持的队列或者topic数量是5w
  • 支持消息重试
  • 支持严格按照一定的顺序发送消息
  • 支持定时发送消息
  • 支持根据消息ID来进行查询
  • 支持根据某个时间点进行消息的回溯
  • 支持对消息服务端的过滤
  • 消费并行度:顺序消费 取决于queue数量,乱序消费 取决于consumer数量

3. RocketMQ发送消息和消费消息

(1) 创建父工程

pom.xml如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
java复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.james</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>

<packaging>pom</packaging>
<modules>
<module>MQProducer-demo</module>
<module>MQConsume-demo</module>
</modules>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.james</groupId>
<artifactId>common_utils</artifactId>
<version>0.0.3-SNAPSHOT</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>

</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<!--该配置必须 -->
<fork>true</fork>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>

</build>

</project>
(2) 创建消息生产者

新建工程MQProducer-demo

pom.xml如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>rocketmq-demo</artifactId>
<groupId>com.james</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<packaging>jar</packaging>

<artifactId>MQProducer-demo</artifactId>

<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.james</groupId>
<artifactId>common_utils</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>

修改application.properties文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
properties复制代码server.port=8082

spring.application.name=producer-demo

spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=
spring.redis.database=10

swagger.enable=true

spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8


rocketmq.name-server=localhost:9876
rocketmq.producer.group=2021-11

# 是否开启自动配置
rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.producer.groupName=${spring.application.name}
# mq的nameserver地址
rocketmq.producer.namesrvAddr=localhost:9876
# 消息最大长度 默认 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
# 发送消息超时时间,默认 3000
rocketmq.producer.sendMsgTimeOut=3000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2

新建消息生产者配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码package com.james.mq.producer.config;

import lombok.Data;
import lombok.ToString;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author james
* @version 1.0
* @description: 消息生产者配置
* @date 2021/10/30 11:43
*/
@Data
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MQProducerConfigure {


public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);

private String groupName;
private String namesrvAddr;
// 消息最大值
private Integer maxMessageSize;
// 消息发送超时时间
private Integer sendMsgTimeOut;
// 失败重试次数
private Integer retryTimesWhenSendFailed;

/**
* mq 生成者配置
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")
public DefaultMQProducer defaultProducer() throws MQClientException {
LOGGER.info("defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();
LOGGER.info("rocketmq producer server 开启成功----------------------------------");
return producer;
}
}

新建消息生产者接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码package com.james.mq.producer.controller;

import com.james.common.result.Result;
import com.james.common.utils.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @author james
* @version 1.0
* @description: 消息生产者
* @date 2021/10/30 11:12
*/
@RestController
@RequestMapping("/api")
public class ProducerController {

public static final Logger LOGGER = LoggerFactory.getLogger(ProducerController.class);

@Autowired
DefaultMQProducer defaultMQProducer;

/**
* 发送简单的MQ消息
* @param msg
* @return
*/
@GetMapping("/send")
public Result send(String msg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
if (StringUtils.isEmpty(msg)) {
return Result.OK();
}
LOGGER.info("发送MQ消息内容:" + msg);
Message sendMsg = new Message("HelloTopic", "HelloTag", msg.getBytes());
// 默认3秒超时
SendResult sendResult = defaultMQProducer.send(sendMsg);
LOGGER.info("消息发送响应:" + sendResult.toString());
return Result.OK(sendResult);
}

}

测试

生产者控制台发送消息:

截屏2021-10-31 00.20.55

(3) 创建消息消费者

pom.xml同消息生成者模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>rocketmq-demo</artifactId>
<groupId>com.james</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>


<artifactId>MQConsume-demo</artifactId>

<packaging>jar</packaging>

<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.james</groupId>
<artifactId>common_utils</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

</project>

修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
properties复制代码spring.application.name=consumers-demo
server.port=8801

spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=
spring.redis.database=10

swagger.enable=true

spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8

# 是否开启自动配置
rocketmq.consumer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.consumer.groupName=${spring.application.name}
# mq的nameserver地址
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
# 消费者订阅的主题topic和tags(*标识订阅该主题下所有的tags),格式: topic~tag1||tag2||tags3;
rocketmq.consumer.topics=TestTopic~TestTag;TestTopic~HelloTag;HelloTopic~HelloTag;MyTopic~*
# 消费者线程数据量
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
# 设置一次消费信心的条数,默认1
rocketmq.consumer.consumeMessageBatchMaxSize=1

mq消费者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
java复制代码package com.james.mq.consume.config;

import lombok.Data;
import lombok.ToString;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author james
* @version 1.0
* @description: mq消费者配置
* @date 2021/10/30 17:34
*/
@Data
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MQConsumerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfigure.class);

private String groupName;
private String namesrvAddr;
private String topics;
// 消费者线程数据量
private Integer consumeThreadMin;
private Integer consumeThreadMax;
private Integer consumeMessageBatchMaxSize;

@Autowired
private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;
/**
* mq 消费者配置
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")
public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
LOGGER.info("defaultConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
// 设置监听
consumer.registerMessageListener(consumeMsgListenerProcessor);

/**
* 设置consumer第一次启动是从队列头部开始还是队列尾部开始
* 如果不是第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);

try {
// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
String[] topicArr = topics.split(";");
for (String topic : topicArr) {
String[] tagArr = topic.split("~");
consumer.subscribe(tagArr[0],"*");
}
consumer.start();
LOGGER.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr);
} catch (MQClientException e) {
LOGGER.error("consumer 创建失败!");
}
return consumer;
}
}

这个只是初始化操作,实际对消费者对消息处理放在 consumer.registerMessageListener(consumeMsgListenerProcessor); 这个监听类里面了,实际接收消息,处理消息都放在监听类里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码package com.james.mq.consume.config;

import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.List;

/**
* @author james
* @version 1.0
* @description: 消费者监听
* @date 2021/10/30 17:35
*/
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);


/**
* 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
* 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
*
* @param msgList
* @param consumeConcurrentlyContext
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty(msgList)) {
LOGGER.info("MQ接收消息为空,直接返回成功");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = msgList.get(0);
LOGGER.info("MQ接收到的消息为:" + messageExt.toString());
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");

LOGGER.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
} catch (Exception e) {
LOGGER.error("获取MQ消息内容异常{}", e);
}
// TODO 处理业务逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

}

启动测试

截屏2021-10-31 00.47.01

如图,收到生产者消息

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%