作者:小先
源代码仓库:github.com/zhshuixian/…
在上一小节《实战 SQL 数据库(MyBatis)》中,主要介绍了 MyBatis 如何连接数据库,实现数据的增删改查等操作。这一小节,将实战 Spring Boot 整合 RocketMQ。消息中间件是现代分布式系统的重要组件,RocketMQ 是一款开源的分布式消息中间件,具有低延迟,高性能、高可用、可伸缩的消息发布与订阅服务,支持万亿级容量。
RocketMQ 是由阿里巴巴团队采用 Java 语言开发,在 2016 年的时候贡献给 Apache 基金会,是 Apache 的顶级项目。
1)RocketMQ 的安装和运行
安装和运行 RocketMQ 需要的先决条件:
- 64 bit 的系统,推荐 Linux/Unix/Mac,Windows 系统也可以运行
- 64 bit 的 JDK 1.8+
- 4G 的空闲磁盘
下载 RocketMQ 4.6.1 ,打开 RocketMQ 官网 rocketmq.apache.org/release_not… ,选择二进制文件:
下载完成后,解压到安装目录,打开终端进入安装目录 ROCKETMQ_HOME,运行如下的命令:
设置 JVM 的最小内存和最大内存
1 | 复制代码# 打开 runbroker.sh 或者 runbroker.cmd(Windows) |
运行 Name Server
1 | 复制代码> nohup sh bin/mqnamesrv & |
运行 Broker
1 | 复制代码> nohup sh bin/mqbroker -n localhost:9876 & |
关闭 Server
1 | 复制代码 > sh bin/mqshutdown broker |
Windows 系统
1 | 复制代码# Windows 系统需要设置环境变量 %ROCKETMQ_HOME% |
2)开始使用
rocketmq-spring-boot-starter 是 Spring Boot 快速与 RocketMQ 集成的启动器(Starter),需要 Spring Boot 2.0 及更高版本。
实战 Spring Boot 整合 RocketMQ,实现写入消息(Producer)和消费消息(Consumer)。
2.1)新建项目和共同配置
这里将新建两个项目,04-rocketmq-producer 和 04-rocketmq-consumer,分别生产信息和消费信息。Spring Boot 选择 2.1.13,依赖选择 Spring Web,其除了项目名称以外,其它配置基本相同。
添加 rocketmq-spring-boot-starter:
1 | 复制代码// Gradle |
1 | 复制代码<!-- Maven --> |
配置 application.properties
1 | 复制代码# 04-rocketmq-producer 不需要设置 spring.main.web-application-type |
在两个项目中新建 User 类:
1 | 复制代码public class User { |
2.2)Producer 实现消息的写入
项目名称 04-rocketmq-producer 。实现从 RESTful API 接收的消息写入 RocketMQ。
新建 ProducerService.class:
1 | 复制代码@Service |
代码解析:
@Value(value = “${boot.rocketmq.topic}”):将 application.properties 文件中定义的 boot.rocketmq.topic 值自动注入到 springTopic 变量。
新建 RESTful API ,ProducerController.class
1 | 复制代码@RestController |
2.2)Consumer 消费信息
项目名称 04-rocketmq-consumer ,实现 RocketMQ 中消息的读取与消费。注意 这个项目不需要启动 Web 容器。
StringConsumer.class 消费 String 类型的消息。
1 | 复制代码@Service |
UserConsumer.class 消费 User 类型的消息
1 | 复制代码@Service |
代码解析:
@RocketMQMessageListener:指定监听的 topic,consumerGroup,selectorExpression等;
topic:消息主题,表示一类的消息,比如上文的 string-topic 、user-topic,topic = “string-topic” 表示值消费 string-topic 主题的消息;
consumerGroup:消费组,同一个消费组一般情况消费相同的消息;
selectorExpression*:选择 tag,selectorExpression=”tagA”,只消费 tag 为 tagA 的消息;默认 “\“,即所有的 tag;
RocketMQListener : 实现 RocketMQListener,我么只需要重写消息处理方法即可;
3)运行项目
启动 RocketMQ,分别启动 04-rocketmq-producer 和 04-rocketmq-consumer。
Producer 运行的 Web 端口是 8080,Consumer 没有启动 Web 容器。
启动 Consumer 可以看到如下日志输出:
1 | 复制代码DefaultRocketMQListenerContainer{consumerGroup='user_consumer', nameServer='192.168.128.10:9876', topic='user-topic', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING} |
打开 Postname,测试 String 类型的消息,访问 http://localhost:8080/producer/string
Producer 日志输出:
1 | 复制代码2020-03-16 23:14:21.681 INFO 16776 --- [nio-8080-exec-2] org.xian.producer.ProducerService : |
Consumer 日志输出:
1 | 复制代码2020-03-16 23:14:21.983 INFO 16092 --- [MessageThread_1] org.xian.consumer.StringConsumer : |
测试 User 类型的消息,访问 http://localhost:8080/producer/user
Producer 日志输出:
1 | 复制代码2020-03-16 23:18:11.548 INFO 16776 --- [nio-8080-exec-5] org.xian.producer.ProducerService : |
Consumer 日志输出:
1 | 复制代码2020-03-16 23:18:11.591 INFO 16092 --- [MessageThread_1] org.xian.consumer.UserConsumer : |
参考和扩展阅读:
- 消息队列扫盲 github.com/Snailclimb/…
- RocketMQ-Spring github.com/apache/rock…
本章节主要介绍了如何单机运行 RocketMQ、 Spring Boot 如何整合 RocketMQ 并实现消息的产生和消费,对于如何集群运行 RocketMQ,RocketMQ 高级使用方法,这里暂不做介绍。下一章节,将开始实战 Spring 的安全框架,主要包括:
- Spring Security
- Spring Security 整合 JJWT 实现 Token 登录和验证
- 整合 Shiro (Token)
- 实现微信登录 (Token)
欢迎关注《编程技术进阶》或者小先的博客。
本文转载自: 掘金