1、业务场景
在电商场景里面,成功付款后,会发放优惠券。
上面的场景:在电商系统中,会出现,付款成功后,准备发优惠券的时候,服务器宕机了。这个时候会造成用户成功付款,却没收到优惠券的情况。这种情况下,我们很容易想到用事务来保证付款和发优惠券的原子性即可:要么付款和发优惠券同时成功,要么同时失败,是不允许其他一个成功,另一个失败的。
但上面,存在一种情况:付款和发优惠券高度耦合,这样子容易出现:发优惠券一直失败,会导致付款一直失败的场景。
对于这种场景的解决方案:引入消息中间件MQ来解耦。
但是上述这种情景,存在MQ不可用,宕机的情况。会产生付款成功,发优惠券失败的情况。
针对这种情况,需要引入分布式事务。
2、事务消息
分布式事务是一种抽象的概念。
那具体的实现呢?
是有很多种实现的。
在这里,主要介绍:RocketMQ的事务消息。
事务消息的流程图
流程步骤:
- 1、生产者发送half消息
- 2、MQ回复ACK确认消息
- 3、执行本地事务:订单付款。如果订单付款成功,那么就给MQ发送commit消息。如果订单付款失败,就发送rollback消息
- 4、如果步骤3发送消息失败,这个时候MQ的定时器会检查half消息。MQ回调方法,去检查本地事务的执行情况。如果执行成功,就返回commit消息。如果执行失败,就返回rollback消息。
- 5、如果MQ收到的是commit消息,此时会把half消息复制到真正的topic中
- 6、消费者对消息进行消费,下发优惠券
3、如何使用
上面,大概知道了事务消息的流程。
接下来,要知道如何使用。
还是以付款下发优惠券为例。
3.1 发送half消息-MQ回复ACK确认消息
1 | java复制代码 |
3.2 执行本地事务:付款
1 | java复制代码@Override |
3.3 MQ定时器回调查询half消息状态
1 | java复制代码@Override |
3.4 消费者进行消费,下发优惠券
1 | java复制代码 @Bean(value = "orderFinishedConsumer") |
监听器:OrderFinishedMessageListener
1 | java复制代码@Override |
4、知其然知其所以然
你看完上面,已经知道如何使用事务消息。
接下来,你需要了解其底层原理:看看源码(面试常问)
step1:首先看发送half消息的代码:
step2:进入代码里面:
step3:其实就是默认调用了DefaultMQProducer#sendMessageInTransaction。
1 | java复制代码public TransactionSendResult sendMessageInTransaction(final Message msg, |
上面的DefaultMQProducerImpl#sendMessageInTransaction方法主要流程:
- 简单的数据校验
- 给消息添加属性,表明这个事务消息
- 发送消息,且返回消息的结果–重点0
- 根据消息不同结果,进行不同的处理
- 如果消息发送成功,那么就执行本地事务(付款),返回本地事务的结果–重点1
- 最后,根据本地事务的结果,给broker发送Commit或rollback的消息–重点2
上面我们简述了一个大概的流程。未涉及到太多细节,是对一个整体流程的了解。
接下来,我们深入了解一些细节:
我们先研究一下重点0:sendResult = this.send(msg);
我们点进去会发现,send的底层其实就是调用了DefaultMQProducerImpl#sendKernelImpl方法。
step4:接着到SendMessageProcessor#sendMessage
step5:事务消息,继续进入TransactionalMessageServiceImpl#prepareMessage–>TransactionalMessageBridge#putHalfMessage–>TransactionalMessageBridge#parseHalfMessageInner
step6:接着,我们坐着研究一下重点1,即transactionListener.executeLocalTransaction(msg, arg);
1 | java复制代码public interface TransactionListener { |
你会发现,这是一个接口,有2个方法,一个是执行本地事务executeLocalTransaction。另一个是检查本地事务checkLocalTransaction。这两个方法需要实现类去实现。
比如:执行本地事务:付款
step7:接着我们来看重点2:this.endTransaction(sendResult, localTransactionState, localException);
1 | java复制代码public void endTransaction( |
到这个时候,我们已经把消息从生产者发送到了broker里面。
那接下来,我们就需要了解broker是如何处理事务消息的。
step8: 事务消息如何回查
直接看代码注解即可
TransactionalMessageCheckService#onWaitEnd
1 | java复制代码@Override |
step9:进入check方法:TransactionalMessageServiceImpl#check。
直接看注解即可
1 | java复制代码@Override |
step10:继续深入研究一下:resolveHalfMsg
1 | java复制代码public void resolveHalfMsg(final MessageExt msgExt) { |
step11:继续追进sendCheckMessage(msgExt)方法
1 | java复制代码/** |
到这里,基本上把事务消息的流程和实现细节走了一遍。
还有什么问题的话,在留言区或者私信大头菜
5、问题:分布式事务还有其他实现
上面的事务消息是分布式事务的一种实现。
事务消息被称为二段提交。
问题:分布式事务,还有哪些具体的实现方式?
欢迎留言
6、后续文章
- RocketMQ-入门(已更新)
- RocketMQ-架构和角色(已更新)
- RocketMQ-消息发送(已更新)
- RocketMQ-消费信息
- RocketMQ-消费者的广播模式和集群模式(已更新)
- RocketMQ-顺序消息(已更新)
- RocketMQ-延迟消息(已更新)
- RocketMQ-批量消息
- RocketMQ-过滤消息
- RocketMQ-事务消息(已更新)
- RocketMQ-消息存储
- RocketMQ-高可用
- RocketMQ-高性能
- RocketMQ-主从复制
- RocketMQ-刷盘机制
- RocketMQ-幂等性
- RocketMQ-消息重试
- RocketMQ-死信队列
…
欢迎各位入(guan)股(zhu),后续文章干货多多。
本文转载自: 掘金