盘点 TCC-Transaction Debug 手册

这是我参与更文挑战的第1天,活动详情查看: 更文挑战

总文档 :文章目录

Github : github.com/black-ant

一 . 前言

文章目的 :

  • 基于 Feign 的 TCC-Transaction 案例
  • 梳理 TCC-Transaction 的 主流程及 Debug 方向

TCC-Transaction 背景简介 :

TCC 模型

TCC 模型是完全依赖于业务处理的分布式事务模型 , 他将一个 [大事务] 通过代码逻辑分解为 [多个小事务] , TCC 模型同样是 2PC (两阶段提交) 的实现之一.

TCC 的操作可以分为3个阶段 : Try / Confirm / Cancel

  • Try: 业务的主要处理 , 但仅进行初期操作 (例如订单生成)
    • 尝试执行业务
    • 所有子事务完成所有业务检查(一致性)
    • 锁定资源 , 预留必须业务资源(准隔离性)
  • Confirm: 对 Try 操作的一个补充,逐个执行Try操作指定的Confirm操作
    • 真正执行业务,不作任何业务检查
    • 只使用Try阶段预留的业务资源 , 扣除具体的资源
    • Confirm 操作满足幂等性
  • Cancel: 对Try操作的一个回撤
    • 取消执行业务
    • 释放Try阶段预留的业务资源, 业务上的回退
    • Cancel操作满足幂等性

TCC Module.jpg

TCC 的优缺点

优点:

  • 由业务方自行控制事务的范围
  • 自如的控制数据库粒度处理 , 降低锁冲突
  • 业务设计合理 , 可以大大提高吞吐量
  • 代码配置简单 , 无需太多配置 , 集成方式便利

缺点 :

  • 业务侵入大 , 耦合强 , 迁移及改造成本大
  • 设计难度大
  • 对于回滚的处理困难
  • 为了满足一致性的要求,confirm和cancel接口必须实现幂等

二 . TCC-Transaction 案例

官方提供过一个基于 Dubbo 的处理案例 , 本案例是基于 Feign 进行 RestAPI 调用的 , 其核心原理其实是一致的.

为了后文分析时更加清楚 , 首先看一下案例源码 :

业务模块 :

  • Order : 订单服务
  • Capital : 账户服务
  • RedPacket : 红包服务

订单支付后 , 扣除账户余额和红包 . 当红包余额不足时 , 发起回退

前期配置

TODO : TCC 的灵活配置

2.1 Order 服务

Order 主流程 : 发起整个事务及接口调用逻辑

  • makePayment : 生成 Order 订单 , 调用红包及账户服务扣除余额
  • confirmMakePayment
  • cancelMakePayment : 确定后修改状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
java复制代码@Service
public class PaymentServiceImpl {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
TradeOrderServiceProxy tradeOrderServiceProxy;

@Autowired
OrderRepository orderRepository;


/**
* 此处考虑后应该是要去掉小事务管理的的 (@Transaction) <br>
* 原因 : 如果此处存在事务 , 分布式应用上抛出异常 , 则回导致该事务回滚
* <p>
* 但是!!! 你可能这个时候会想 , 既然出现异常这个类会回滚 , 那不相当于分布式实现了吗 , 为什么还要加个框架处理
* <p>
* 原因 : 此处如果考虑红包的逻辑就对了 , 这个场景实际上为 : 余额足够 ,但是红包不够的情况!!!
*
* @param order
* @param redPacketPayAmount
* @param capitalPayAmount
*/
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true)
public void makePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {


logger.info("order try make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
logger.info("------> 开始事务管理 : 红包支付金额 [{}] / 账户支付余额 [{}] <-------", redPacketPayAmount, capitalPayAmount);
//check if the order status is DRAFT, if no, means that another call makePayment for the same order happened, ignore this call makePayment.
if (order.getStatus().equals("DRAFT")) {
order.pay(redPacketPayAmount, capitalPayAmount);
try {
orderRepository.updateOrder(order);
} catch (OptimisticLockingFailureException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}
}

logger.info("------> 订单处理完成 :[{}] <-------", order.getId());
String result = tradeOrderServiceProxy.record(null, buildCapitalTradeOrderDto(order));
logger.info("------> 余额消费完成 :[{}] <-------", result);
String result2 = tradeOrderServiceProxy.record(null, buildRedPacketTradeOrderDto(order));
logger.info("------> 红包消费完成 :[{}] <-------", result2);
}

public void confirmMakePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {

logger.warn("------> [进入 PayConfirm 流程] <-------");
logger.warn("order confirm make payment called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

Order foundOrder = orderRepository.findByMerchantOrderNo(order.getMerchantOrderNo());

//check order status, only if the status equals DRAFT, then confirm order
if (foundOrder != null && foundOrder.getStatus().equals("PAYING")) {
order.confirm();
orderRepository.updateOrder(order);
}
}

public void cancelMakePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {

logger.error("------> [进入 cancel 流程] <-------");
logger.error("order cancel make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

Order foundOrder = orderRepository.findByMerchantOrderNo(order.getMerchantOrderNo());
logger.info("------> [cancel 流程处理 Order :{}] <-------", JSONObject.toJSONString(foundOrder));

if (foundOrder != null && foundOrder.getStatus().equals("PAYING")) {
order.cancelPayment();
orderRepository.updateOrder(order);
}
}


private CapitalTradeOrderDto buildCapitalTradeOrderDto(Order order) {

CapitalTradeOrderDto tradeOrderDto = new CapitalTradeOrderDto();
tradeOrderDto.setAmount(order.getCapitalPayAmount());
tradeOrderDto.setMerchantOrderNo(order.getMerchantOrderNo());
tradeOrderDto.setSelfUserId(order.getPayerUserId());
tradeOrderDto.setOppositeUserId(order.getPayeeUserId());
tradeOrderDto.setOrderTitle(String.format("order no:%s", order.getMerchantOrderNo()));

return tradeOrderDto;
}

private RedPacketTradeOrderDto buildRedPacketTradeOrderDto(Order order) {
RedPacketTradeOrderDto tradeOrderDto = new RedPacketTradeOrderDto();
tradeOrderDto.setAmount(order.getRedPacketPayAmount());
tradeOrderDto.setMerchantOrderNo(order.getMerchantOrderNo());
tradeOrderDto.setSelfUserId(order.getPayerUserId());
tradeOrderDto.setOppositeUserId(order.getPayeeUserId());
tradeOrderDto.setOrderTitle(String.format("order no:%s", order.getMerchantOrderNo()));

return tradeOrderDto;
}
}

Capital , Red 远程调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Component
public class TradeOrderServiceProxy {

@Autowired
CapitalTradeOrderService capitalTradeOrderService;

@Autowired
RedPacketTradeOrderService redPacketTradeOrderService;

@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
return capitalTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}

@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {
return redPacketTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}
}

2.2 Capital 账户处理余额

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
java复制代码@Service
public class CapitalTradeOrderServiceImpl {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
CapitalAccountRepository capitalAccountRepository;

@Autowired
TradeOrderRepository tradeOrderRepository;

/**
* Step 1 : @Transactional 保证小事务的执行 , 避免余额反复添加
*
* @param transactionContext
* @param tradeOrderDto
* @return
*/
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
@Transactional
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {

logger.info("capital try record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

TradeOrder foundTradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

//check if trade order has been recorded, if yes, return success directly.
if (foundTradeOrder == null) {

TradeOrder tradeOrder = new TradeOrder(
tradeOrderDto.getSelfUserId(),
tradeOrderDto.getOppositeUserId(),
tradeOrderDto.getMerchantOrderNo(),
tradeOrderDto.getAmount()
);

try {
tradeOrderRepository.insert(tradeOrder);

CapitalAccount transferFromAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());

transferFromAccount.transferFrom(tradeOrderDto.getAmount());

capitalAccountRepository.save(transferFromAccount);

logger.info("------> 账户余额处理完成 , 现余额 [{}] <-------", JSONObject.toJSONString(transferFromAccount));

} catch (DataIntegrityViolationException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}
}

return "success";
}

@Transactional
public void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {

logger.warn("capital confirm record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

//check if the trade order status is DRAFT, if yes, return directly, ensure idempotency.
if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
tradeOrder.confirm();
tradeOrderRepository.update(tradeOrder);

CapitalAccount transferToAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getOppositeUserId());

transferToAccount.transferTo(tradeOrderDto.getAmount());

capitalAccountRepository.save(transferToAccount);
}
}

@Transactional
public void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {

logger.error("capital cancel record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

//check if the trade order status is DRAFT, if yes, return directly, ensure idempotency.
if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
tradeOrder.cancel();
tradeOrderRepository.update(tradeOrder);

CapitalAccount capitalAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());

capitalAccount.cancelTransfer(tradeOrderDto.getAmount());

capitalAccountRepository.save(capitalAccount);
}
}
}

2.3 RedPacket 红包处理

  • record : 创建红包订单 , 扣除金额
  • confirmRecord : 更新订单状态 , 扣除账户
  • cancelRecord :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
java复制代码@Service
public class RedPacketTradeOrderServiceImpl {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
RedPacketAccountRepository redPacketAccountRepository;

@Autowired
TradeOrderRepository tradeOrderRepository;

@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
@Transactional
public String record(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {

logger.info("red packet try record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

TradeOrder foundTradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

if (foundTradeOrder == null) {

TradeOrder tradeOrder = new TradeOrder(
tradeOrderDto.getSelfUserId(),
tradeOrderDto.getOppositeUserId(),
tradeOrderDto.getMerchantOrderNo(),
tradeOrderDto.getAmount()
);

try {
tradeOrderRepository.insert(tradeOrder);

RedPacketAccount transferFromAccount = redPacketAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());

transferFromAccount.transferFrom(tradeOrderDto.getAmount());

redPacketAccountRepository.save(transferFromAccount);

logger.info("------> [] <-------");

} catch (DataIntegrityViolationException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}
}

return "success";
}

@Transactional
public void confirmRecord(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {

logger.warn("red packet confirm record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
tradeOrder.confirm();
tradeOrderRepository.update(tradeOrder);

RedPacketAccount transferToAccount = redPacketAccountRepository.findByUserId(tradeOrderDto.getOppositeUserId());

transferToAccount.transferTo(tradeOrderDto.getAmount());

redPacketAccountRepository.save(transferToAccount);
}
}

@Transactional
public void cancelRecord(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {

logger.error("red packet cancel record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
tradeOrder.cancel();
tradeOrderRepository.update(tradeOrder);

RedPacketAccount capitalAccount = redPacketAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());

capitalAccount.cancelTransfer(tradeOrderDto.getAmount());

redPacketAccountRepository.save(capitalAccount);
}
}
}

2.4 流程总结

流程源码可以看 @项目源码

三 . 源码分析

3.1 TCC 成员梳理

  • 事务 : ( Transaction )
  • 事务ID对象 : ( TransactionXid )
  • 事务状态对象 : ( TransactionStatus )
  • 事务类型对象 : ( TransactionType )

  • 参与者 : ( org.mengyun.tcctransaction.Participant )
  • 事务管理器 : TransactionManager
  • 事务恢复配置器 : RecoverConfig
  • 事务恢复处理器 : TransactionRecovery
  • 默认事务恢复配置实现 : DefaultRecoverConfig
  • 事务恢复定时任务 : RecoverScheduledJob
  • 事务恢复处理器 : TransactionRecovery
  • 事务恢复处理器 : TransactionRecovery

3.2 TCC 流程快查

事务的主要流程如下所示 :

  • 发起根事务 : MethodType.ROOT / begin / registerTransaction
  • 传播发起分支事务 : MethodType.PROVIDER / try / propagationNewBegin
  • 传播获取分支事务 : MethodType.PROVIDER / confirm / cancel / propagationExistBegin
  • 提交事务 : commit / confirm / cancel /
  • 回滚事务 : rollback / confirm / cancel /
  • 添加事务 : enlistParticipant / try
  • 事务拦截器 : @Compensable / @Aspect / @Transactional

3.3 流程一 : 切面的处理

TCC 的注解基于 @Aspect + @Compensable 实现切面的处理 , 核心的处理类为 CompensableTransactionAspect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码@Aspect
public abstract class CompensableTransactionAspect {

private CompensableTransactionInterceptor compensableTransactionInterceptor;

public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
this.compensableTransactionInterceptor = compensableTransactionInterceptor;
}

@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() {

}

// 可以看到 , 这里使用的环绕切面
@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}

public abstract int getOrder();
}

处理拦截器

当切面拦截后 , 此处会使用拦截器进行真正的具体逻辑 :

  • org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor,可补偿事务拦截器。
  • org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor,资源协调者拦截器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
java复制代码C05- CompensableTransactionInterceptor
M05_01- interceptCompensableMethod
M05_02- rootMethodProceed

/**
*
*
**/
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

// 获取对应得 Method 对象
Method method = CompensableMethodUtils.getCompensableMethod(pjp);

Compensable compensable = method.getAnnotation(Compensable.class);
Propagation propagation = compensable.propagation();

// 获取事务容器
TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());

// 是否异步处理
boolean asyncConfirm = compensable.asyncConfirm();
boolean asyncCancel = compensable.asyncCancel();

// 是否开启事务
boolean isTransactionActive = transactionManager.isTransactionActive();

if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
}

//
MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);

switch (methodType) {
case ROOT:
return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
case PROVIDER:
return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
default:
return pjp.proceed();
}
}


// 此处会通过事务的类型选择不同的方法进行处理 :
private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {

// 属性准备
Object returnValue = null;
Transaction transaction = null;

try {
// 开启事务 , 获取事务对象 -> M05_02_01
transaction = transactionManager.begin();
try {
// 执行 proceed 处理方法
returnValue = pjp.proceed();
} catch (Throwable tryingException) {

if (!isDelayCancelException(tryingException)) {
transactionManager.rollback(asyncCancel);
}
throw tryingException;
}
// 事件管理器提交 commit
transactionManager.commit(asyncConfirm);
} finally {
transactionManager.cleanAfterCompletion(transaction);
}
return returnValue;
}

Pro 1 : Propagation 是什么 ?

Propagation 提供了多种传播方式 , 来定义具体的传播类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public enum Propagation {
REQUIRED(0),
SUPPORTS(1),
MANDATORY(2),
REQUIRES_NEW(3);
//.............
}


// @Transactional(propagation=Propagation.REQUIRED)
- 如果有事务, 那么加入事务, 没有的话新建一个(默认情况下)

// @Transactional(propagation=Propagation.REQUIRES_NEW)
- 不管是否存在事务,都创建一个新的事务,原来的挂起,新的执行完毕,继续执行老的事务

// @Transactional(propagation=Propagation.MANDATORY)
- 必须在一个已有的事务中执行,否则抛出异常

// @Transactional(propagation=Propagation.SUPPORTS)
- 如果其他bean调用这个方法,在其他bean中声明事务,那就用事务.如果其他bean没有声明事务,那就不用事务

Pro 2 : MethodType 是什么

MethodType 表示方法对应的事务类型 :

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码

public enum MethodType {
ROOT,
CONSUMER,
PROVIDER,
NORMAL;
}

- MethodType.ROOT : 根事务 , 也可以理解为事务发起者
- MethodType.CONSUMER : 消费参与者
- MethodType.PROVIDER : 生产参与者
- MethodType.NORMAL :

M05_02_01 Transaction 实体类解析

tcc-trans-transMangerBean.jpg

3.4 事务的执行和通知

3.4.1 事务的commit

之前 C05- CompensableTransactionInterceptor # M05_02- rootMethodProceed 中通过 TransactionManager 执行 commit 操作

先看一下 TransactionManager 的结构 :

1
2
3
4
5
6
7
8
9
java复制代码C09- TransactionManager
F09_01- ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
M09_01- begin()
M09_02- propagationNewBegin(TransactionContext transactionContext)
M09_03- propagationExistBegin(TransactionContext transactionContext)
M09_04- commit(boolean asyncCommit)
M09_05- rollback(boolean asyncRollback)
M09_06- commitTransaction(Transaction transaction)
M09_07- rollbackTransaction(Transaction transaction)

Step 1 : TransactionManager 进行管理

TransactionManager 进行 Transaction 的配合和通过线程发起事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码
public void commit(boolean asyncCommit) {
// 从 ThreadLocal 中获取 Transaction
final Transaction transaction = getCurrentTransaction();
// commit 后修改状态
transaction.changeStatus(TransactionStatus.CONFIRMING);
// 更新 Transaction 状态
transactionRepository.update(transaction);

if (asyncCommit) {
try {
Long statTime = System.currentTimeMillis();

// 此处主要为 ThreadPoolExecutor , 通过线程池提交 -> M09_06
executorService.submit(new Runnable() {
@Override
public void run() {
commitTransaction(transaction);
}
});
} catch (Throwable commitException) {
throw new ConfirmingException(commitException);
}
} else {
commitTransaction(transaction);
}
}

Step 2 :提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码// M09_06 此处提交事务
private void commitTransaction(Transaction transaction) {
try {
// 提交多个事务
transaction.commit();
// 删除事务
transactionRepository.delete(transaction);
} catch (Throwable commitException) {
logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
throw new ConfirmingException(commitException);
}
}

// C15- Transaction -> PS:C15_01
public void commit() {
for (Participant participant : participants) {
participant.commit();
}
}

public void commit() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}


public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
try {
Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();
Method method = null;
// 代理执行对象的 Method
method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
return method.invoke(target, invocationContext.getArgs());
} catch (Exception e) {
throw new SystemException(e);
}
}
return null;
}


// public final void com.tcc.demo.order.service.PaymentServiceImpl$$EnhancerBySpringCGLIB$$6a17cffa.confirmMakePayment(com.tcc.demo.order.model.Order,java.math.BigDecimal,java.math.BigDecimal)



// TransactionRepository 是用于事务管理的持久化操作
C10- TransactionRepository -> PS:C10_01


// ExecutorService 执行 Service
C12- ExecutorService -> PS:C11_01

PS:C10_01 TransactionRepository 家族体系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码public interface TransactionRepository {
int create(Transaction transaction);
int update(Transaction transaction);
int delete(Transaction transaction);
Transaction findByXid(TransactionXid xid);
List<Transaction> findAllUnmodifiedSince(Date date);
}


// 这里看一下 Transaction 的数据库结构

CREATE TABLE `TCC_TRANSACTION` (
`TRANSACTION_ID` int(11) NOT NULL AUTO_INCREMENT COMMENT '事务 ID',
`DOMAIN` varchar(100) DEFAULT NULL COMMENT '域名',
`GLOBAL_TX_ID` varbinary(32) NOT NULL COMMENT '全局事务ID',
`BRANCH_QUALIFIER` varbinary(32) NOT NULL,
`CONTENT` varbinary(8000) DEFAULT NULL COMMENT '序列化 Transaction 事务内容',
`STATUS` int(11) DEFAULT NULL COMMENT '事务状态',
`TRANSACTION_TYPE` int(11) DEFAULT NULL COMMENT '事务类型',
`RETRIED_COUNT` int(11) DEFAULT NULL COMMENT '重试次数',
`CREATE_TIME` datetime DEFAULT NULL COMMENT '创建时间',
`LAST_UPDATE_TIME` datetime DEFAULT NULL COMMENT '最后更新时间',
`VERSION` int(11) DEFAULT NULL COMMENT '乐观锁版本',
`IS_DELETE` int(12) DEFAULT NULL COMMENT '是否删除',
PRIMARY KEY (`TRANSACTION_ID`),
UNIQUE KEY `UX_TX_BQ` (`GLOBAL_TX_ID`,`BRANCH_QUALIFIER`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


// 每一个小事务都有个自己的 TCC_TRANSACTION 类 , 类名通常为 TCC_TRANSACTION_xxx

TransactionRepository_system.png

PS:C15_01 参数

tcc-participants.jpg

Pro 1 : Xid 的对象

1
2
3
4
5
6
7
8
9
10
java复制代码public interface Xid {
int MAXGTRIDSIZE = 64;
int MAXBQUALSIZE = 64;

int getFormatId();

byte[] getGlobalTransactionId();

byte[] getBranchQualifier();
}

3.4.2 事务的通知

事务的还有一个核心逻辑就是通知其他的应用执行相关的逻辑 , 那么事务是怎么相互告知的呢 ? 我们从实际应用出发 :

问题 :

疑点一 : 当 captial try 逻辑完成后 , 实际上已经返回了 , 并不会拿到对应的通知

现象 :

现象一 : 当 try 再次调用时 , 是通过 restAPI 接口进行网络调用 , 所以应该是外部调用实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码// 找了相关的代码找到了这个类 : 

// C15- Transaction -> PS:C15_01
public void commit() {
for (Participant participant : participants) {
participant.commit();
}
}

public class TradeOrderServiceProxy {

@Autowired
CapitalTradeOrderService capitalTradeOrderService;

@Autowired
RedPacketTradeOrderService redPacketTradeOrderService;

@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
return capitalTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}

//.............
}

如果我们跟着相关的代码 , 会发现确实在出现rollbakc 时 , 会调用该参与者

所以我们能得出如下的结论 :

  1. 当出现异常后 , 会通过 Participant 调用相同的接口一次
  2. 调用对应接口后 , 会因为拦截器的原因 , 通过 Transaction 状态 , 调用对应的所属流程 (例如异常就是 rollback)

3.5 事务的回退

事务的回退时 , 会先调用起本身的 cancel 方法 ,其次会调用依赖微服务的原方法

PS : 确实是原方法 , 但是由于代理 , 会进入 CompensableTransactionAspect 切面

通过判断 TransactionContext 中的 status 决定执行什么方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {

Transaction transaction = null;
try {

switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
case TRYING:
transaction = transactionManager.propagationNewBegin(transactionContext);
return pjp.proceed();
case CONFIRMING:
try {
transaction = transactionManager.propagationExistBegin(transactionContext);
transactionManager.commit(asyncConfirm);
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
}
break;
case CANCELLING:
//
try {
transaction = transactionManager.propagationExistBegin(transactionContext);
transactionManager.rollback(asyncCancel);
} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
}
break;
}

} finally {
transactionManager.cleanAfterCompletion(transaction);
}

Method method = ((MethodSignature) (pjp.getSignature())).getMethod();

return ReflectionUtils.getNullValue(method.getReturnType());
}

public void rollback() {
for (Participant participant : participants) {
participant.rollback();
}
}


// 回退的发起
public void rollback() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
}



// 注意 :
1- Participant -> Order Cancel
2- Participant -> Capital Rest API
3- Participant -> Capital Cancel

3.6 事务的异步处理

TCC-Transaction 的处理默认是同步的 , 可以通过注解来配置异步处理

@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false, asyncCancel = false)

这里来看一下2者的区别 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码    public void rollback(boolean asyncRollback) {

final Transaction transaction = getCurrentTransaction();
transaction.changeStatus(TransactionStatus.CANCELLING);

transactionRepository.update(transaction);

if (asyncRollback) {

try {
// 最大的区别就是此处调用线程池构建了一个新线程
executorService.submit(new Runnable() {
@Override
public void run() {
rollbackTransaction(transaction);
}
});
} catch (Throwable rollbackException) {
throw new CancellingException(rollbackException);
}
} else {

rollbackTransaction(transaction);
}
}

3.7 事务的恢复

事务的恢复和事务的通知并不是一个概念 , 当事务的初期执行出现异常后 , 事务在后续会通过定时任务的方式 , 完成事务的继续执行操作

  • org.mengyun.tcctransaction.recover.RecoverConfig,事务恢复配置接口
  • org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig,默认事务恢复配置实现
  • org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob,事务恢复定时任务,基于 Quartz 实现调度,不断不断不断执行事务恢复

总结

正常处理流程 , 执行 Confirm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码A- 事务发起者 (订单服务)
B- 事务消费对象B (账户服务)
C- 事务消费对象C (红包服务)

// Step 1 : 事务发起者处理
A1- 调用发起方法
A2- TCC CompensableTransactionAspect 切面拦截
A3- TCC CompensableTransactionInterceptor # rootMethodProceed 准备事务容器 , 大流程处理
A4- CompensableTransactionInterceptor # rootMethodProceed 开启事务及事务管理 (begin , process , commit)
A5- 进入真正的方法执行业务逻辑

// Step 2 : 账户对象处理
B1- 调用发起方法
B2- TCC CompensableTransactionAspect 切面拦截
B3- TCC CompensableTransactionInterceptor # rootMethodProceed 准备事务容器 , 大流程处理
B4- CompensableTransactionInterceptor # providerMethodProceed 执行消费处理
B5- 进入真正的方法执行业务逻辑

// PS : 主要对比 A4 - B4 的区别

// Step 3 : 账户处理完成后 , 订单服务继续处理
A6- transactionManager.commit(asyncConfirm) : 提交 commit
A7- Transaction.Participant # commit() 提交代理方法
A8- 执行 Order Confirm 方法


// Step 4 : 账户服务Confirm 确定
B6- TransactionAspectSupport # invokeWithinTransaction
B7- 执行 账户 Confirm 方法

回退处理逻辑 , 执行 Cancel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码// PS : 前几步都是一样的 , 基于代理的方式


// Step 1: A4 rootMethodProceed 处理过程中出现异常 , 由 catch 继续处理
A4- rootMethodProceed : 执行账户主方法
A5- 出现异常 , catch 处理 , 调用 rollback
A6- TransactionManager # rollback 执行 Rollback 主流程

// Step 2 : order 模块调用 cancel 方法


// Step 3 : 账户对象处理回退
B5- 再次原样调用主方法
B6- 事务容器状态为回退 , 执行回退逻辑

@Compensable 的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码// 整个流程中共有以下几个地方需要标注 @Compensable , 我们来单独看看其中的关联


// Step 1 : Order 中 try 方法标注
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true)

// Step 2 : Order 中调用远程接口时标注
@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
return capitalTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}

@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {
return redPacketTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}




// Step 3 : capital 中远程接口的 try 方法
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)

// Step 4 : red 接口中 try 方法
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)


// PS : 官方原文中 , 此处是没有添加对应方法的 , 不清楚是否是因为 Dubbo 的 Feign 的机制问题

感谢和参考

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%