盘点 Seata Client 端 AT 事务请求Ser

这是我参与更文挑战的第15天,活动详情查看: 更文挑战

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

一 . 前言

前面一篇说了 AT 模式中到 Template 的所有流程 , 这一篇来看一下后面的 begiunTransaction 做了什么

二 . 流程梳理

流程分为几个节点 :

  • 属性部分 : TransactionInfo + GlobalLockConfig
  • 事务部分 : beginTransaction + commitTransaction
  • 逻辑部分 : execute + TransactionalExecutor

2.1 TransactionInfo 详情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码> PS:M52_02_01 TransactionInfo 对象包含了什么?


```java
C53- TransactionInfo
F53_01- int timeOut : 超时时间
F53_02- String name : 事务名
F53_03- Set<RollbackRule> rollbackRules : 回退规则
F53_04- Propagation propagation
F53_05- int lockRetryInternal : 重试间隔
F53_06- int lockRetryTimes : 重试次数
M53_01- rollbackOn :
M53_02- getPropagation()

public final class TransactionInfo implements Serializable {

public boolean rollbackOn(Throwable ex) {

RollbackRule winner = null;
int deepest = Integer.MAX_VALUE;

if (CollectionUtils.isNotEmpty(rollbackRules)) {
winner = NoRollbackRule.DEFAULT_NO_ROLLBACK_RULE;
for (RollbackRule rule : this.rollbackRules) {
int depth = rule.getDepth(ex);
if (depth >= 0 && depth < deepest) {
deepest = depth;
winner = rule;
}
}
}

return !(winner instanceof NoRollbackRule);
}

public Propagation getPropagation() {
if (this.propagation != null) {
return this.propagation;
}
//default propagation
return Propagation.REQUIRED;
}

}

Propagation 的作用 ?

Propagation 是一个枚举 , 表示的是事务传播的模式 , 包括如下几种 :

  • REQUIRED : 如果事务存在,则使用当前事务执行,否则使用新事务执行
  • REQUIRES_NEW : 如果事务存在,将暂停它,然后使用新事务执行业务。
  • NOT_SUPPORTED : 如果事务存在,则挂起它,然后执行没有事务的业务
  • SUPPORTS : 如果事务不存在,则不执行全局事务,否则执行当前事务的业务
  • NEVER : 如果事务存在,抛出异常,否则执行没有事务的业务
  • MANDATORY: 如果事务不存在,抛出异常,否则执行与当前事务相关的业务

2.2 GlobalLockConfig 详情

对象属性 :

1
2
3
4
5
6
7
8
java复制代码// 再次回顾一下之前看过的对象
public class GlobalLockConfig {

// 锁定重试间隔
private int lockRetryInternal;
// 锁定重试次数
private int lockRetryTimes;
}

逻辑处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码
// 再来看一下配置的方式
private GlobalLockConfig replaceGlobalLockConfig(TransactionInfo info) {

GlobalLockConfig myConfig = new GlobalLockConfig();
myConfig.setLockRetryInternal(info.getLockRetryInternal());
myConfig.setLockRetryTimes(info.getLockRetryTimes());
// 主要看一下这个里面做了什么
return GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
}


public class GlobalLockConfigHolder {

// 关键一 : 线程存储
private static ThreadLocal<GlobalLockConfig> holder = new ThreadLocal<>();

// 关键二 : 这里有个有趣的地方 , 可以看到取得 Previous , 同时设置 Current
public static GlobalLockConfig setAndReturnPrevious(GlobalLockConfig config) {
GlobalLockConfig previous = holder.get();
holder.set(config);
return previous;
}

}

[Pro] : 为什么关键二中 , 获取得是之前的 Config

获取前一个 GlobalLockConfig 主要是用于回退

1
2
3
4
5
6
7
8
java复制代码GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
//.......事务
} finally {
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}

那么问题来了 ,我都已经到了下一个操作了 ,再获取前一个全局锁是为什么 ?

大概想了一下 , 主要应该是这样的流程 , 当第一个事务获取全局锁时 , 其他本地事务如果要执行 ,必须获取全局锁 , 那么 , 下一个事务应该去关注上一个全局锁配置.

因为上一个全局锁未处理完的情况下 , 下一个事务实际上是拿不到一个全局锁的.

1
2
3
4
5
6
7
java复制代码private void resumeGlobalLockConfig(GlobalLockConfig config) {
if (config != null) {
GlobalLockConfigHolder.setAndReturnPrevious(config);
} else {
GlobalLockConfigHolder.remove();
}
}

PS : 不知道这里理解得对不对 ,因为这里ThreadLocal 获取到的是当前线程的配置 , 即一个线程内我的全局锁唯一吗?

TODO : 后文看全局锁的时候再来回顾一下

2.3 beginTransaction 开启事务

上文看完了配置信息 , 这里来看一下事务的启动

1
2
3
4
5
6
7
8
9
10
java复制代码// 其中可以看到 , 主要是3步走 >>>
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);
}
}

2.3.1 triggerBeforeBegin()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码// 先来看一下 trigger 主要逻辑
M51_03- triggerBeforeBegin
FOR- 循环所有的 TransactionHook : getCurrentHooks
- hook.beforeBegin()

M51_04- triggerAfterBegin
FOR- 循环所有的 TransactionHook : getCurrentHooks
- hook.afterBegin()

// 2个的核心都是调用 TransactionHook的对应方法 , 这里带来了2个问题 :
- TransactionHook 是什么 ?
- TransactionHook 的管理 ?

// [Pro1] : TransactionHook 是什么 ?
TransactionHook 是一个接口 , 它允许通过插槽的方式对流程进行附加操作 , 它的主要实现类为 TransactionHookAdapter

public interface TransactionHook {

/**
* before tx begin
*/
void beforeBegin();

/**
* after tx begin
*/
void afterBegin();

/**
* before tx commit
*/
void beforeCommit();

/**
* after tx commit
*/
void afterCommit();

/**
* before tx rollback
*/
void beforeRollback();

/**
* after tx rollback
*/
void afterRollback();

/**
* after tx all Completed
*/
void afterCompletion();
}

// 这里大概看了一下 , 应该是可以手动配置 Hook 的 , 后面来详细看一下, 案例 :
public void testTransactionCommitHook() throws Throwable {
TransactionHook transactionHook = Mockito.mock(TransactionHook.class);

TransactionHookManager.registerHook(transactionHook);
TransactionalTemplate template = new TransactionalTemplate();
template.execute(transactionalExecutor);
}



// [Pro2] :TransactionHook 的管理 ?
private List<TransactionHook> getCurrentHooks() {
// 通过 TransactionHookManager 对 TransactionHook 进行管理
return TransactionHookManager.getHooks();
}

public final class TransactionHookManager {
// 同样的 , 其内部也是通过一个 ThreadLocal 进行管理
private static final ThreadLocal<List<TransactionHook>> LOCAL_HOOKS = new ThreadLocal<>();
}

2.3.2 DefaultGlobalTransaction # begin 处理

继续来看三步中的核心步骤 : tx.begin(txInfo.getTimeOut(), txInfo.getName())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码C52- DefaultGlobalTransaction        
M52_01- begin(int timeout, String name)
- RootContext.getXID() : 获取 currentXid
- transactionManager.begin(null, null, name, timeout) : transactionManager 开始管理
- GlobalStatus.Begin : 修改装填
- RootContext.bind(xid) : 绑定事务 ID

public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
return;
}
assertXIDNull();
// Step 1 : 获取当前事务 ID
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
// Step 2 : 调用
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
}

PS : RootContext 是什么 ?

RootContext 是根上下文 ,它会当当前 XID 进行管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码C- RootContext
F- ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
F- BranchType DEFAULT_BRANCH_TYPE;
M- bind(@Nonnull String xid)
- CONTEXT_HOLDER.put(KEY_XID, xid)
?- 此处的 CONTEXT_HOLDER 为 FastThreadLocalContextCore


C- FastThreadLocalContextCore
private FastThreadLocal<Map<String, Object>> fastThreadLocal = new FastThreadLocal<Map<String, Object>>() {
@Override
protected Map<String, Object> initialValue() {
return new HashMap<>();
}
};

seata_ContextCore_system.png

2.3.3 TransactionManager 详情

seata_transactionManager.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// 这里终于使用 TransactionManager 进行相关的管理了
C53- DefaultTransactionManager
M53_01- begin(String applicationId, String transactionServiceGroup, String name, int timeout)
1- 构架一个新的 GlobalBeginRequest
2- setTransactionName + setTimeout
3- 调用 syncCall 开启事务 , 同时用 GlobalBeginResponse 接收


public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// Step 1 : 构建 Request
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
// Step 2 : 发起请求
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
// Step 3 : 获取Response
return response.getXid();
}

GlobalBeginRequest 与 GlobalBeginResponse 详情

image.png

2.3.4 远程调用

远程调用分为2步 :

  • Step 1 : syncCall 发起远程调用主逻辑
  • Step 2 : sendSyncRequest(Object msg) 正式的调用

Step 1 : syncCall 发起远程调用主逻辑

1
2
3
4
5
6
7
8
java复制代码private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
// 通过 TmNettyRemotingClient 发起 Netty 远程调用
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}

Step 2 : sendSyncRequest(Object msg) 正式的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码 public Object sendSyncRequest(Object msg) throws TimeoutException {
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
int timeoutMillis = NettyClientConfig.getRpcRequestTimeout();
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

// 批量发送消息 , 将消息放入basketMap
if (NettyClientConfig.isEnableClientBatchSendRequest()) {

// 发送批处理消息是同步请求,需要创建messageFuture并将其放入futures
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);

// 把信息放入 basketMap
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
basket.offer(rpcMessage);
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}

try {
// 消息发送获取
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}

} else {
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}

}

2.4 commitTransaction 提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
}


// 去掉log 后 , 可以看到其中的核心代码就是 transactionManager.commit(xid)
public void commit() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
return;
}
assertXIDNotNull();
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
}


@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
// 同样的 , commit 也是发起 syncCall
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}

2.5 execute 处理

具体业务的处理只有一句 rs = business.execute() , 来看一下着其中的所有逻辑 :

从上一篇文章我们知道 , business 是再 Inter 中构建的一个 TransactionalExecutor 匿名对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码 C52- TransactionalExecutor
M52_01- execute
M52_02- getTransactionInfo : 获取 TransactionInfo 对象 -> PS:M52_02_01

// Step 1 : 发起远程调用
C- GlobalTransactionalInterceptor
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
// 核心语句 , 方法代理
return methodInvocation.proceed();
}
});


// Step 2 :AOP 拦截
这里可以看到 , 实际上这里和AOP逻辑是一致的 , 最终通过 CglibAopProxy 中实现了方法的代理

methodInvocation 详情
image.png

总结

这一篇暂时不说 rollback 流程 , 仅仅说了正常的事务处理流程 , 下一篇来说说rollback 已经 Server 端的处理

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%