以下为RocketMQ—生产者系列文章索引:
一、Producer介绍
Producer 是 RocketMQ 消息的投递者,负责生产消息。它会与NameServer集群中的其中一个节点(随机)建立长连接(Keep-alive),定期从NameServer读取Topic路由信息,将路由信息保存在本地内存中;它向提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳;它只会向Master Broker发送消息,从Message Queue列表中选择合适的Queue发送消息,实现负载均衡;它支持发送消息类型有多种,例如:普通消息、事物消息、定时消息等;它发送消息的方式支持三种:同步、异步、单向方式等.可简单查看生产端与Master Broker 和NameServer简单交互图:
备注: 生产者还可向broker查询消息等其他功能交互。
二、生产者启动流程:
在了解具体生产启动流程之前,我们先提出出几个问题,带着问题去分析源码:
- 消息生产者启动时具体做了什么?
- 一个应用需要发送多个topic,不同topic需要发送到不同集群的broker,如何处理?
我们可先了解和分析生产者相关的类图关系:
从类图中可以看出,MQProducer有两种实现方式。
一个是 DefaultMQProducer(非事务消息生产者); 一个是 TransactionMQProducer(支持事务消息);
接下来先对接个类核心参数或方法进行简单分析:
##2.1 MqAdmin
MqAdmin:核心方法解析(Mq管理基础接口)
1 | arduino复制代码//创建一个主题 |
2.2 核心方法解析
MQProducer:核心方法解析(生产者基础接口):
1 | java复制代码//启动 |
备注: 其中启动start()和关闭shutdown()表示生产者的启动和关闭、
2.3 clientConfig
clientConfig:核心属性方法解析(客户端配置)
1 | typescript复制代码//nameServer-地址,默认从:系统属性:rocketmq.namesrv.addr 或 环境变量:NAMESRV_ADDR 中获取 |
备注: namesrvAddr表示nameServer地址,可调用setNamesrvAddr方法设置,或者通过环境变量、系统属性设置;buildMQClientId表示设置生产者Id.
三、TransactionMQProducer:(事务消息,后续单独讲解,本章忽略)
(略)
四、DefaultMQProducer 核心属性方法解析:(非事务消息生产者)
1 | java复制代码// 构造器 |
备注: DefaultMQProducer的构造器,send和start等相关的方法,其实都是围绕DefaultMQProducerImpl来转,defaultMQProducerImpl:默认生产者的实现类,其start方法作为生产者启动的核心方法,接下来将核心分析其start方法的实现.
DefaultMQProducerImpl#start
1 | kotlin复制代码/** |
分析如下:
0-服务状态设置:
设置状态值的意义是为了防止重复启动,其枚举类为:ServiceState; 如果初始化状态不等于:CREATE_JUST,则异常跑出
1-检测配置:
1 | csharp复制代码private void checkConfig() throws MQClientException { |
备注:为了检测-producerGroup的合法性
2-并改变生产者的instanceName为进程 ID。
1 | csharp复制代码// 判断producerGroup是否等于CLIENT_INNER_PRODUCER |
备注:instanceName == DEFAULT, 将其改为 启动的 进程ID,目的是为了MQClientInstance的构建
3-创建MQClientlnstance实例
MQClientManager管理MQClientInstance,其内部维护的数据结构为:ConcurrentHashMap,key:clientId,且MQClientManager本身是单例模式,核心方法分析如下: MQClientManager
1 | ini复制代码private static MQClientManager instance = new MQClientManager();//-单列模式 |
备注:
ClientConfig.buildMQClientId 在上面已分析,是为了构建clientId;getAndCreateMQClientInstance此方法的目的就是为了构建或查询MQClientInstance. MQClientInstance:封装了 RocketMQ 网络处理 API,是消息生产者( Producer)、消息消费者 (Consumer)与 NameServer、 Broker打交道的网络通道.
接下来分析多个生产者公用同一个MQClientInstance的优点和缺点:
- 优点:一般来讲,为了减少客户端的使用资源,如果将所有的instanceName和unitName设置为同样的值,就会只创建一个MQClientInstance实例(用于生产者的topic发送消息在同一套broker集群)
- 缺点:如果多个topic复用MQClientInstance会有怎么的结果呢?这种情况会出现在你在一个JVM里启动了多个Producer时,且没有设置instanceName和unitName,那么这两个Producer会公用一个MQClientInstance,发送的消息会路由到同一个集群。
例如,你起了两个Producer,并且配置的NameServer地址不一样,本意是让这两个Producer往不同集群上分配消息,但是由于共用了一个MQClientInstance,这个MQClientInstance是基于先来的Producer配置构建的,第二个Producer和他公用后被认为是同一instance,配置是相同的,消息的路由就是相同的,就没有达到你想要的效果。
4-向MQClientInstance注册生产者。
1 | java复制代码//key:group, value: 生产者 |
备注:DefaultMQProducerImpl实现的接口类为:MQProducerInner
5-添加默认topic信息缓存,此处需要理解topicPublishInfoTable数据结构的意思
1 | arduino复制代码//key:topic value:TopicPublishInfo-路由相关信息,用于消息发送 |
TopicPublishInfo:
分析,熟悉的佩服熟悉的味道,MessageQueue和TopicRouteData在NameServer已分析相当清除,分析如下:
1 | kotlin复制代码public class TopicPublishInfo { |
6-启动-MQClientInstance
1 | kotlin复制代码 MQClientInstance#start |
备注:后续单独讲解:this.startScheduledTask();
7-发送心跳到所有broker
(this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();)
1 | kotlin复制代码 public void sendHeartbeatToAllBrokerWithLock() { |
备注:sendHeartbeatToAllBroker,相对简单,
对返回结果维护了brokerVersionTable(ConcurrentHashMap),你不可错过.因为会有定时任务定时发送心跳至所有broker
小结: 通过7个步骤我们已经了解到生产者的启动流程,大致分为:检测相关配置、注册构建相关类(例如:MQClientInstance相关、netty相关等)、然后启动相关定时任务;简单总结生产者的启动流程,如下:
本文转载自: 掘金