RocketMQ-事务消息(分布式事务)

1、业务场景

在电商场景里面,成功付款后,会发放优惠券。

上面的场景:在电商系统中,会出现,付款成功后,准备发优惠券的时候,服务器宕机了。这个时候会造成用户成功付款,却没收到优惠券的情况。这种情况下,我们很容易想到用事务来保证付款和发优惠券的原子性即可:要么付款和发优惠券同时成功,要么同时失败,是不允许其他一个成功,另一个失败的。

但上面,存在一种情况:付款和发优惠券高度耦合,这样子容易出现:发优惠券一直失败,会导致付款一直失败的场景。

对于这种场景的解决方案:引入消息中间件MQ来解耦。

但是上述这种情景,存在MQ不可用,宕机的情况。会产生付款成功,发优惠券失败的情况。

针对这种情况,需要引入分布式事务。

2、事务消息

分布式事务是一种抽象的概念。

那具体的实现呢?

是有很多种实现的。

在这里,主要介绍:RocketMQ的事务消息。

事务消息的流程图

流程步骤:
  • 1、生产者发送half消息
  • 2、MQ回复ACK确认消息
  • 3、执行本地事务:订单付款。如果订单付款成功,那么就给MQ发送commit消息。如果订单付款失败,就发送rollback消息
  • 4、如果步骤3发送消息失败,这个时候MQ的定时器会检查half消息。MQ回调方法,去检查本地事务的执行情况。如果执行成功,就返回commit消息。如果执行失败,就返回rollback消息。
  • 5、如果MQ收到的是commit消息,此时会把half消息复制到真正的topic中
  • 6、消费者对消息进行消费,下发优惠券

3、如何使用

上面,大概知道了事务消息的流程。

接下来,要知道如何使用。

还是以付款下发优惠券为例。

3.1 发送half消息-MQ回复ACK确认消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码
@Override
public void finishedOrder(String orderNo, String phoneNumber) {

try {
// 退房事务消息,topic:完成订单
Message msg = new Message(orderFinishedTopic, JSON.toJSONString(orderInfo).getBytes(StandardCharsets.UTF_8));

// 发送half消息
TransactionSendResult transactionSendResult = orderFinishedTransactionMqProducer.sendMessageInTransaction(msg, null);

} catch (MQClientException e) {

}

}

3.2 执行本地事务:付款

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

try {
// 修改订单的状态
orderService.payOrder();

// 成功 提交prepare消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 执行本地事务失败 回滚prepare消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

3.3 MQ定时器回调查询half消息状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {

try {
//查询订单状态
Integer orderStatus = orderService.getOrderStatus();
if (Objects.equals(orderStatus, OrderStatusEnum.FINISHED.getStatus())) { //返回commit消息
return LocalTransactionState.COMMIT_MESSAGE;
} else {
//返回rollback消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
// 查询订单状态失败
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

3.4 消费者进行消费,下发优惠券

1
2
3
4
5
6
7
8
9
10
java复制代码 @Bean(value = "orderFinishedConsumer")
public DefaultMQPushConsumer finishedConsumer(@Qualifier(value = "orderFinishedMessageListener") OrderFinishedMessageListener orderFinishedMessageListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(orderFinishedConsumerGroup);
consumer.setNamesrvAddr(namesrvAddress);
//topic:完成订单
consumer.subscribe(orderFinishedTopic, "*");
consumer.setMessageListener(orderFinishedMessageListener);
consumer.start();
return consumer;
}
监听器:OrderFinishedMessageListener
1
2
3
4
5
6
7
8
java复制代码@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//下发优惠券
couponService.distributeCoupon();

}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

4、知其然知其所以然

你看完上面,已经知道如何使用事务消息。

接下来,你需要了解其底层原理:看看源码(面试常问)

step1:首先看发送half消息的代码:

step2:进入代码里面:

step3:其实就是默认调用了DefaultMQProducer#sendMessageInTransaction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码public TransactionSendResult sendMessageInTransaction(final Message msg,
...省略一堆代码

SendResult sendResult = null;
// 给待发送消息添加属性,表名是一个事务消息,即半消息,这里设置为true。(这个属性后面会用到)
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//发送消息--重点0
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
//消息发送成功
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {

localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//执行本地事务,executeLocalTransaction需要子类去具体实现
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

try {
// 最后,给broker发送提交或者回滚事务的RPC请求
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
// 组装结果返回
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

上面的DefaultMQProducerImpl#sendMessageInTransaction方法主要流程:

  • 简单的数据校验
  • 给消息添加属性,表明这个事务消息
  • 发送消息,且返回消息的结果–重点0
  • 根据消息不同结果,进行不同的处理
  • 如果消息发送成功,那么就执行本地事务(付款),返回本地事务的结果–重点1
  • 最后,根据本地事务的结果,给broker发送Commit或rollback的消息–重点2

上面我们简述了一个大概的流程。未涉及到太多细节,是对一个整体流程的了解。

接下来,我们深入了解一些细节:

我们先研究一下重点0:sendResult = this.send(msg);
我们点进去会发现,send的底层其实就是调用了DefaultMQProducerImpl#sendKernelImpl方法。

step4:接着到SendMessageProcessor#sendMessage

step5:事务消息,继续进入TransactionalMessageServiceImpl#prepareMessage–>TransactionalMessageBridge#putHalfMessage–>TransactionalMessageBridge#parseHalfMessageInner

step6:接着,我们坐着研究一下重点1,即transactionListener.executeLocalTransaction(msg, arg);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

你会发现,这是一个接口,有2个方法,一个是执行本地事务executeLocalTransaction。另一个是检查本地事务checkLocalTransaction。这两个方法需要实现类去实现。

比如:执行本地事务:付款

step7:接着我们来看重点2:this.endTransaction(sendResult, localTransactionState, localException);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public void endTransaction(
// 省略一堆代码
//事务id
String transactionId = sendResult.getTransactionId();
// broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
// 根据事务消息和本地事务的执行结果,发送不同的结果给broker
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
//发送给broker
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}

到这个时候,我们已经把消息从生产者发送到了broker里面。

那接下来,我们就需要了解broker是如何处理事务消息的。

step8: 事务消息如何回查

直接看代码注解即可

TransactionalMessageCheckService#onWaitEnd

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Override
protected void onWaitEnd() {
//timeout是从broker配置文件中获取transactionTimeOut值,代表事务的过期时间,(一个消息的存储时间 + timeout) > 系统当前时间,才会对该消息执行事务状态会查
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
//checkMax是从broker配置文件中获取transactionCheckMax值,代表事务的最大检测次数,如果超过检测次数,消息会默认为丢弃,即rollback消息
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
//回查:核心点org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.check
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

step9:进入check方法:TransactionalMessageServiceImpl#check。

直接看注解即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
java复制代码@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
//RMQ_SYS_TRANS_HALF_TOPIC主题
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
//获取RMQ_SYS_TRANS_HALF_TOPIC主题下的所有队列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
//数据校验
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
//遍历队列
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
//根据队列获取对应topic:RMQ_SYS_TRANS_OP_HALF_TOPIC下的opQueue
//RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主题,事务消息首先先进入到该主题。
//RMQ_SYS_TRANS_OP_HALF_TOPIC:当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题下
MessageQueue opQueue = getOpQueue(messageQueue);
//messageQueue队列的偏移量
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
//opQueue队列的偏移量
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
//如果其中一个队列的偏移量小于0,就跳过
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
//doneOpOffset和removeMap主要的目的是避免重复调用事务回查接口
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
//空消息的次数
int getMessageNullCount = 1;
//RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新偏移量
long newOffset = halfOffset;
//RMQ_SYS_TRANS_HALF_TOPIC的偏移量
long i = halfOffset;
while (true) {
//限制每次最多处理的时间是60s
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
//removeMap包含当前信息,则跳过,处理下一条信息
//removeMap的信息填充是在上面的fillOpRemoveMap
//fillOpRemoveMap具体逻辑是:具体实现逻辑是从RMQ_SYS_TRANS_OP_HALF_TOPIC主题中拉取32条,
//如果拉取的消息队列偏移量大于等于RMQ_SYS_TRANS_HALF_TOPIC#queueId当前的处理进度时
//会添加到removeMap中,表示已处理过
if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
//根据消息队列偏移量i从RMQ_SYS_TRANS_HALF_TOPIC队列中获取消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
//如果消息为空
if (msgExt == null) {
//则根据允许重复次数进行操作,默认重试一次 MAX_RETRY_COUNT_WHEN_HALF_NULL=1
//如果超过重试次数,直接跳出while循环,结束该消息队列的事务状态回查
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
//如果是由于没有新的消息而返回为空(拉取状态为:PullStatus.NO_NEW_MSG),则结束该消息队列的事务状态回查。
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
//其他原因,则将偏移量i设置为: getResult.getPullResult().getNextBeginOffset(),重新拉取
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
//判断该消息是否需要discard(吞没,丢弃,不处理)、或skip(跳过)
//needDiscard 依据:如果该消息回查的次数超过允许的最大回查次数,
// 则该消息将被丢弃,即事务消息提交失败,不能被消费者消费,其做法,
// 主要是每回查一次,在消息属性TRANSACTION_CHECK_TIMES中增1,默认最大回查次数为5次。

//needSkip依据:如果事务消息超过文件的过期时间,
// 默认72小时(具体请查看RocketMQ过期文件相关内容),则跳过该消息。
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
//消息的存储时间大于开始时间,中断while循环
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
//该消息已存储的时间=系统当前时间-消息存储的时间戳
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
//checkImmunityTime:检测事务的时间
//transactionTimeout:事务消息的超时时间
long checkImmunityTime = transactionTimeout;
//用户设定的checkImmunityTimeStr
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
//checkImmunityTime=Long.valueOf(checkImmunityTimeStr)
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
//最近进度=当前消息进度+1
newOffset = i + 1;
i++;
continue;
}
}
} else {//如果当前时间小于事务超时时间,则结束while循环
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
//是否需要回查,判断依据如下:
//消息已存储的时间大于事务超时时间
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);

if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {//11
continue;
}
//重点:进行事务回查(异步)
listener.resolveHalfMsg(msgExt);
} else {
//加载已处理的消息进行筛选
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
//保存half消息队列的回查进度
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
//保存处理队列opQueue的处理今夕
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Throwable e) {
log.error("Check error", e);
}

}

step10:继续深入研究一下:resolveHalfMsg

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//针对每个待反查的half消息,进行回查本地事务结果
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}

step11:继续追进sendCheckMessage(msgExt)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码/**
* 发送回查消息
* @param msgExt
* @throws Exception
*/
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
//原主题
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
//原队列id
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null) {
//回调查询本地事务状态
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}

到这里,基本上把事务消息的流程和实现细节走了一遍。

还有什么问题的话,在留言区或者私信大头菜

5、问题:分布式事务还有其他实现

上面的事务消息是分布式事务的一种实现。

事务消息被称为二段提交。

问题:分布式事务,还有哪些具体的实现方式?

欢迎留言

6、后续文章

  • RocketMQ-入门(已更新)
  • RocketMQ-架构和角色(已更新)
  • RocketMQ-消息发送(已更新)
  • RocketMQ-消费信息
  • RocketMQ-消费者的广播模式和集群模式(已更新)
  • RocketMQ-顺序消息(已更新)
  • RocketMQ-延迟消息(已更新)
  • RocketMQ-批量消息
  • RocketMQ-过滤消息
  • RocketMQ-事务消息(已更新)
  • RocketMQ-消息存储
  • RocketMQ-高可用
  • RocketMQ-高性能
  • RocketMQ-主从复制
  • RocketMQ-刷盘机制
  • RocketMQ-幂等性
  • RocketMQ-消息重试
  • RocketMQ-死信队列

欢迎各位入(guan)股(zhu),后续文章干货多多。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%