「这是我参与11月更文挑战的第26天,活动详情查看:2021最后一次更文挑战」
API使用步骤
在使用MQ原生API的时候,生产者和消费者的使用分为下面的几个步骤
生产者
- 创建生产者(Producer),需要指定生产者名
- 指定NameServer地址,没有指定的话,则从操作系统中读取NAMESRV_ADDR 配置项地址
- 启动生产者(Producer)
- 构建消息体(Message),其中包含:Topic,Tag及消息体内容
- 发送消息体
- 关闭生产者
1 | java复制代码//1.创建生产者 |
消费者
- 创建消费者(Consumer),并指定消费者组名
- 指定NameServer地址,没有指定的话,则从操作系统中读取NAMESRV_ADDR 配置项地址
- 启动消费者
- 监听Topic,设定回调函数,接受消息后并处理消息
- 启动消费者(Consumer)
1 | java复制代码//1.创建消费者 |
API使用样例
生产者
基础消息
单向发送
生产者将消息发送给MQ后,无需关心发送MQ的情况,就直接执行后面的操作逻辑。
- 代码实现
1 | java复制代码producer.sendOneway(message); |
同步发送
生产者将消息发送给MQ后,等待MQ返回结果后,在执行后面的流程操作,这种发送方式可以知道是否发送MQ成功,能够知道发送结果,如果发送失败,生产者可以重试发送或者走异常逻辑。
- 代码实现
1 | java复制代码SendResult result = producer.send(message); |
通过SendResult可以拿到发送的结果信息
异步发送
生产者将消息发送给MQ后,不需要等待MQ返回结果,而是去执行后面的逻辑,生产者会提供一个接口供MQ调用,MQ将消息处理成功后通过指定的接口回调给生产者。
- 代码实现
1 | java复制代码producer.send(message, new SendCallback() { |
顺序消息
在采用默认方式发送消息的时候,客户端会采用轮询的方式将消息发送到不同的MessageQueue中,消费者拉取消息的时候是跟MessageQueue绑定在一起的,一个MessageQueue对应的消费者只会有一个客户端,所以在发送消息的时候,将需要局部有序的消息发送到同一个MessageQueue上去,这样的话消费者在消费时候拉取到的消息就是局部有序。
而消费者在拿取消息的时候需要从多个MessageQueue中拿取消息,这个时候生产者发生消息的时候是保证每个队列有序的,但是多个队列之间的数据是无序的,如果需要保证局部有序,就需要消费者拿取数据的时候按队列一个个取数据,取完一个队列数据后在去取下一个队列的数据,这个的话就可以保证消费者拿取的消息局部有序。这时候消费者就需要注册MessageListenerOrderly监听器,这样在拉取数据的时候就会保证一个一个队列去取数据,从而保证消息的局部有序,MessageListenerConcurrently这个监听器每次从多个队列中拿去一批消息给客户端,无法保证消息有序。
RocketMQ的顺序消息只能够保证局部有序,如果需要保证消息的全局有效性,只能够将消息队列个数设置为1个,这样的话就会保证全局有序,但是性能低下,只有一个实例能够处理消息。
- 代码实现
实现MessageQueueSelector接口,自定义MessageQueue选择的逻辑
1 | java复制代码for (int i = 0; i < 10; i++) { |
广播消息
消息者在广播模式下,所有的消费者都可以消费到这个消息,集群模式下,同一个组的消费者,每一条消息只能够被同一个组下的某个消费者消费,而广播模式则是将消息发送给每一个消费者去消费消息。广播消息播跟生产者没有关系,只跟消费者的消费模式有关系
- 代码实现
将消费者的消息模式设为广播模式即可,其他代码一样,默认集群模式
1 | java复制代码consumer.setMessageModel(MessageModel.BROADCASTING); |
延迟消息
MQ在收到消息后不会将消息立即发送给消费者,而是延迟指定的时间才将消息发送给消费者,从而达到消息延迟的作用。在开源的版本中,延迟消息只支持18个消息的延迟级别
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 代码实现
生产者发送消息的时候告诉MQ延迟级别
1 | java复制代码 message.setDelayTimeLevel(2); |
批量消息
将多条消息合并成一个消息体发送给MQ,这样的话可以减少网络带宽,提高吞吐量。发送批量消息的限制比较多:
- 一个批量消息的大小不能大于1MB
- 批量消息的Topic必须一致
- 延迟消息,事务消息不支持批量消息
官方提供的API中,如果消息大于最大值,可以采用:ListSplitter 将消息体拆分成批量消息后在发送
代码实现:
1 | java复制代码List<Message> batchList = new ArrayList<>(); |
过滤消息
过滤消息最常用的是:采用Tag来区分消息,消费者可以只拉取自己需要的消息,RocketMQ官方提供了类似sql语法一样的来过滤消息,通过MessageSelector.bySql()来实现消息的过滤,采用SQL过滤只支持推模式,拉模式不支持SQL过滤,过滤消息的语法按照SQL92标准执行,支持下面的语法:
- 数值比较:>,>=,<,<=,BETWEEN等
- 字符比较: <>,IN 等
- 逻辑运算:AND,OR,NOT等
代码实现
消费者设定过滤规则
1 | java复制代码// tag 过滤消息 |
事务消息
事务消息可以保证本地事务与消息发送两个操作的原子性,要么全部成功,要么全部失败。事务消息不支持延迟消息和批量消息,通过两阶段提交的思路来设计,事务消息只跟发送者有关系,跟消费者没有任何关系,对于消费者是无感的。实现机制原理图参考下图:
- 代码实现
1 | java复制代码TransactionMQProducer producer = new TransactionMQProducer(); |
消费者
拉模式
消费者主动从Broker上拉取消息,需要消费者端维护拉取消息的点位
- 代码实现
实现类:DefaultLitePullConsumer 由这个类来实现,很多客户端需要维护的信息,官方API已经帮忙完成
1 | java复制代码DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("CONSUMER_GROUP"); |
推模式
Broker收到消息后,主动将消息推送给消费者,消费者只需要在第一次接入的时候设置拉取消息的点位,后续拉取消息不需要维护点位,由MQ来维护推消息的点位
ConsumeFromWhere枚举值:
- CONSUME_FROM_LAST_OFFSET:从上次消费点开始消费
- CONSUME_FROM_FIRST_OFFSET:从队列的最开始消费
- CONSUME_FROM_TIMESTAMP:指定时间戳位置开启拉取
- 代码实现
实现类:DefaultMQPushConsumer
1 | java复制代码consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); |
本文转载自: 掘金