盘点 Seata Server 端事务的 Session

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

一 . 前言

上一篇对 Session 的管理进行了了解 , 这一篇对其 SQL 的处理了解看看 , 相关的概念需要看一看上一篇 : 盘点 Seata : Server 端事务的 Session 初始化


整个 Session 的处理会分别对2个操作进行处理 , 一个为 global_table , 一个为 branch_table , 依次来说 :

Pro 1 : global_table 的作用

global_table 用于持久化全局事务 , 可以通过 store.db.global.table 进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Pro 2 : branch_table 的作用

branch_table 用于标识分支事务 , 可以通过 store.db.branch.table 进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

global_table 和 branch_table 是主事务和分支事务的关系 . 当一个业务运行时 , 会在 global_table 中创建一条数据 , 而每一个数据库操作单元 (RM 事务) , 都会创建一条 branch_table 数据.

二 . session 数据走向

来看一下整个流程的数据走向 , 先看一下整体的处理逻辑

1
2
3
4
5
6
java复制代码C1- Netty Request 发起请求
C2- AbstractTCInboundHandler 处理请求 , 发起 Handler 处理
C3- GlobalSession 开启事务
C4- AbstractSessionManager 抽象级的管理 Session
C5- DataBaseTransactionStoreManager 进行实际的 Session 管理
C6- LogStoreDataBaseDAO 进行持久化处理

2.1 Session 数据的入口

在上一篇文档中 , 已经了解到通过 DataBaseTransactionStoreManager # writeSession 开启了 Session 的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// 再来看一下 , 可以看到 , 每个 if 中都有一个具体的操作类型 , 主要分为 Global 和 Branch 2个部分
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
// 处理 BRANCH 的相关操作
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {

throw new StoreException("Unknown LogOperation:" + logOperation.name());
}
}

// 可以看到这里通过 if 进行了多个类型的判断 , 再来看一下整体的调用逻辑

2.2 Session 数据的管理

Session 的管理通过 AbstractSessionManager 和 具体的对应类来实现 , AbstractSessionManager 中提供了如下的接口对 Session 进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码public interface SessionManager extends SessionLifecycleListener, Disposable {

// Global 部分
void addGlobalSession(GlobalSession session) throws TransactionException;
GlobalSession findGlobalSession(String xid) ;
GlobalSession findGlobalSession(String xid, boolean withBranchSessions);
void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException;
void removeGlobalSession(GlobalSession session) throws TransactionException;

// Branch 部分
void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException;
void updateBranchSessionStatus(BranchSession session, BranchStatus status) throws TransactionException;
void removeBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException;

Collection<GlobalSession> allSessions();
// 定时处理
List<GlobalSession> findGlobalSessions(SessionCondition condition);
// 锁定后执行
<T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
throws TransactionException;

// 定时锁
default boolean scheduledLock(String key) {
return true;
}

default boolean unScheduledLock(String key) {
return true;
}
}


// 相对的 , Seata 为 SessionManger 提供了多个实现类
@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager implements Initialize

@LoadLevel(name = "file", scope = Scope.PROTOTYPE)
public class FileSessionManager extends AbstractSessionManager implements Reloadable

@LoadLevel(name = "redis", scope = Scope.PROTOTYPE)
public class RedisSessionManager extends AbstractSessionManager implements Initialize

2.3 Session 的创建

当第一次业务执行的时候 , 就会发起 addGlobalSession 的逻辑流程 , Session 的创建是通过 SessionManager 创建

  • C- AbstractTCInboundHandler # handle
  • C- DefaultCoordinator # doGlobalBegin
  • C- DefaultCore # begin
  • C- GlobalSession # begin
  • C- AbstractSessionManager # onBegin
  • C- DataBaseSessionManager # addGlobalSession
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
java复制代码// Step 临时 : onRequest   
AbstractResultMessage onRequest(AbstractMessage request, RpcContext context)

// Step 1 : 开启全局事务 -> business-service-seata-service-group
C- DefaultCoordinator
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
}

// Step 2 : DefaultCore
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 开启 GlobalSession
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

session.begin();

// 发布事务 GlobalTransactionEvent
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));

return session.getXid();
}



// Step 3: 添加 GlobalSession C- DataBaseSessionManager
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (StringUtils.isBlank(taskName)) {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
} else {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
}
}

// Step 4: 构建 convertGlobalTransactionDO
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
{
"applicationId": "business-seata-example",
"beginTime": 1626253527508,
"status": 1,
"timeout": 300000,
"transactionId": 4386660905323928286,
"transactionName": "dubbo-gts-seata-example",
"transactionServiceGroup": "business-service-seata-service-group",
"xid": "192.168.181.2:8091:4386660905323928286"
}


// Step 5: 插入到 SQL 中 , 其中参数如下
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(1, "192.168.181.2:8091:4386660905323928286");
ps.setLong(2, 4386660905323928286);
ps.setInt(3, 1);
ps.setString(4, "business-seata-example");
ps.setString(5, "business-service-seata-service-group");
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0,
transactionNameColumnSize) : transactionName;
ps.setString(6, "dubbo-gts-seata-example");
ps.setInt(7, 300000);
ps.setLong(8, 1626253527508);
ps.setString(9, null);
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}

2.4 Session 数据的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码看完了管理 ,来看一下整个流程过程中 , 怎么实现 Session 的读取和使用流程


// Session 的读取主要还是通过 AbstractSessionManager 来管理 , Session 的读取有以下几个途径 :

// 途径一 : RetryRollback 回退
1. DefaultCoordinator # init
2. DefaultCoordinator # handleRetryRollbacking : 处理重试 Rollback 逻辑
3. DataBaseSessionManager # allSessions : 处理 SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME
4. DataBaseSessionManager # findGlobalSessions

// 同样以这个流程处理的还有以下几种 :
1. retryCommitting
2. timeoutCheck
3. asyncCommitting


// 途径二 : Session 的业务流程

在正常逻辑下的处理 , 后来来看一下 , 在整个业务逻辑中 , Session 在其中是如何作用的

2.4.1 Session 的获取

Session 中通过 DataBaseSessionManager # readSession 发起 Session 的读取 , 其中提供了三种方法 :

  • GlobalSession findGlobalSession(String xid)
  • GlobalSession findGlobalSession(String xid, boolean withBranchSessions)
  • List findGlobalSessions(SessionCondition condition) : 主要基于定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
java复制代码
// Step 1 : findSession 逻辑
public GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {
// 主要是对 ReadSession 的调用
return transactionStoreManager.readSession(xid, withBranchSessions);
// transactionStoreManager.readSession(condition)
}


// Step 2 : readSession 操作
public GlobalSession readSession(String xid, boolean withBranchSessions) {
// 查询 GlobalTransactionDO
GlobalTransactionDO globalTransactionDO = logStore.queryGlobalTransactionDO(xid);
if (globalTransactionDO == null) {
return null;
}
//branch transactions
List<BranchTransactionDO> branchTransactionDOs = null;
//reduce rpc with db when branchRegister and getGlobalStatus
if (withBranchSessions) {
branchTransactionDOs = logStore.queryBranchTransactionDO(globalTransactionDO.getXid());
}
return getGlobalSession(globalTransactionDO, branchTransactionDOs);
}

// Step 2-1 : 调用具体的 queryBranchTransactionDO
// 此处使用的是 LogStore 的实现类 LogStoreDataBaseDAO
public List<GlobalTransactionDO> queryGlobalTransactionDO(int[] statuses, int limit) {

conn = logStoreDataSource.getConnection();
// 自动提交事务
conn.setAutoCommit(true);

String paramsPlaceHolder = org.apache.commons.lang.StringUtils.repeat("?", ",", statuses.length);
// select xid, transaction_id, status, application_id, transaction_service_group,
// transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified
// from global_table where status in (?,?,?,?) order by gmt_modified limit ?
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQLByStatus(globalTable, paramsPlaceHolder);
// 发起 SQL 处理
ps = conn.prepareStatement(sql);

}

// Step 2-2 : 将 GlobalTransactionDO 转换为 GlobalSession
private GlobalSession getGlobalSession(GlobalTransactionDO globalTransactionDO,
List<BranchTransactionDO> branchTransactionDOs) {
GlobalSession globalSession = SessionConverter.convertGlobalSession(globalTransactionDO);
//branch transactions
if (CollectionUtils.isNotEmpty(branchTransactionDOs)) {
// 加入分支事务 BranchTransactionDO
for (BranchTransactionDO branchTransactionDO : branchTransactionDOs) {
globalSession.add(SessionConverter.convertBranchSession(branchTransactionDO));
}
}
return globalSession;
}

// 到了这里一个简单的 Session 的查询流程就完了 , 与之对应的还有 SessionCondition 和 通过 statuses 进行查询

public List<GlobalSession> readSession(GlobalStatus[] statuses) {
int[] states = new int[statuses.length];
for (int i = 0; i < statuses.length; i++) {
states[i] = statuses[i].getCode();
}
// 查询 status 对应的所有的GlobalTransactionDO
List<GlobalTransactionDO> globalTransactionDOs = logStore.queryGlobalTransactionDO(states, logQueryLimit);
if (CollectionUtils.isEmpty(globalTransactionDOs)) {
return null;
}
List<String> xids = globalTransactionDOs.stream().map(GlobalTransactionDO::getXid).collect(Collectors.toList());
// 获取所有的分支事务
List<BranchTransactionDO> branchTransactionDOs = logStore.queryBranchTransactionDO(xids);
Map<String, List<BranchTransactionDO>> branchTransactionDOsMap = branchTransactionDOs.stream()
.collect(Collectors.groupingBy(BranchTransactionDO::getXid, LinkedHashMap::new, Collectors.toList()));
// 返回 Gloabl 集合
return globalTransactionDOs.stream().map(globalTransactionDO ->
getGlobalSession(globalTransactionDO, branchTransactionDOsMap.get(globalTransactionDO.getXid())))
.collect(Collectors.toList());
}

public List<GlobalSession> readSession(SessionCondition sessionCondition) {
// 定时情况会提供 Xid 或者 TransactionId
if (StringUtils.isNotBlank(sessionCondition.getXid())) {
GlobalSession globalSession = readSession(sessionCondition.getXid());
if (globalSession != null) {
List<GlobalSession> globalSessions = new ArrayList<>();
globalSessions.add(globalSession);
return globalSessions;
}
} else if (sessionCondition.getTransactionId() != null) {
GlobalSession globalSession = readSession(sessionCondition.getTransactionId());
if (globalSession != null) {
List<GlobalSession> globalSessions = new ArrayList<>();
globalSessions.add(globalSession);
return globalSessions;
}
} else if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) {
return readSession(sessionCondition.getStatuses());
}
return null;
}

2.4.2 Session 的添加

1
java复制代码 // 在Session 的创建后 ,后续 BranchSession 会加入 GlobalSession  -> 详见 3.1.3 注册主逻辑

2.5 Session 数据的销毁

主要流程如下 :

  • C- SessionHelper # endRollbacked
  • C- GlobalSession # end()
  • C- GlobalSession # onEnd()
  • C- DataBaseSessionManager # removeGlobalSession
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码// DataBaseSessionManager

// Step 1 : 销毁的入口 DefaultCore # doGlobalCommit
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), null, globalSession.getStatus()));

// ........ 省略 commit 提交逻辑
//If success and there is no branch, end the global transaction.
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession);

// committed event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
}
return success;
}


// Step 2 : commit 完结
public static void endCommitted(GlobalSession globalSession) throws TransactionException {
globalSession.changeStatus(GlobalStatus.Committed);
globalSession.end();
}

// Step 3 : 生命周期关闭环节
public void end() throws TransactionException {
// Clean locks first
clean();

for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEnd(this);
}
}


// Step 4 : 移除全局事务
public void removeGlobalSession(GlobalSession session) throws TransactionException {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session);
if (!ret) {
throw new StoreException("removeGlobalSession failed.");
}
}

三 . branch_table 流程

3.1 branch_table 的创建流程

3.1.1 注册入口

这里主要用到的是 AbstractTCInboundHandler , 这个类很重要 , Branch 逻辑均会由该类进行处理 , 该类存在一个实现类 : DefaultCoordinator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码C- AbstractTCInboundHandler # handle

public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
BranchRegisterResponse response = new BranchRegisterResponse();
// 注意 , 这里创建了一个 AbstractCallback
exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
@Override
public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
throws TransactionException {
try {
// 进行 Branch 注册
doBranchRegister(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore, String
.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
}
}
}, request, response);
return response;
}

3.1.2 doBranchRegister 注册主流程

每一个事务处理单元都会注册一个 Branch ,

1
2
3
4
5
6
7
8
java复制代码C- DefaultCoordinator # doBranchRegister
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
RpcContext rpcContext) throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setBranchId(
core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
request.getXid(), request.getApplicationData(), request.getLockKey()));
}

3.1.3 注册主逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 构建 Branch 业务
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
branchSessionLock(globalSession, branchSession);
try {
// 添加 Branch 业务
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession);
throw new BranchTransactionException(........);
}
return branchSession.getBranchId();
});
}

3.1.4 添加 Branch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public void addBranch(BranchSession branchSession) throws TransactionException {
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onAddBranch(this, branchSession);
}
branchSession.setStatus(BranchStatus.Registered);
add(branchSession);
}


public void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException {
if (StringUtils.isNotBlank(taskName)) {
return;
}
// 又到了核心节点 , 调用 writeSession
boolean ret = transactionStoreManager.writeSession(LogOperation.BRANCH_ADD, session);
if (!ret) {
throw new StoreException("addBranchSession failed.");
}
}

// 这里省略 writeSession 逻辑 , 反正就那些

来看一下保存的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码C- LogStoreDataBaseDAO 
public boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertBranchTransactionSQL(branchTable);
Connection conn = null;
PreparedStatement ps = null;
try {
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(1, "192.168.181.2:8091:7214921829625028621");
ps.setLong(2, "7214921829625028621");
ps.setLong(3, "7214921829625028637");
ps.setString(4, null);
ps.setString(5, "jdbc:mysql://127.0.0.1:3306/seata");
ps.setString(6, "AT");
ps.setInt(7, 0);
ps.setString(8, "account-seata-example:192.168.181.2:49676");
ps.setString(9, null);
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}

五 . 其他环节补充

5.1 BranchSession 的删除

BranchSession 在 asyncCommit 逻辑中会对BranchSession 进行删除处理 , 主要流程如下 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
java复制代码// Step 1 : asyncCommit 提交 globalCommit
protected void handleAsyncCommitting() {
SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
core.doGlobalCommit(asyncCommittingSession, true);
});
}


// Step 2 : 提交 Global Commit 事务
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), null, globalSession.getStatus()));

if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
// 省略....
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

switch (branchStatus) {
case PhaseTwo_Committed:
globalSession.removeBranch(branchSession);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
if (globalSession.canBeCommittedAsync()) {

return CONTINUE;
} else {
SessionHelper.endCommitFailed(globalSession);
return false;
}
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {

return CONTINUE;
} else {
return false;
}
}
} catch (Exception ex) {

}
return CONTINUE;
});

}
//If success and there is no branch, end the global transaction.
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession);

// committed event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));

}
return success;
}


// Step 3 : 移除 Session
public void removeBranch(BranchSession branchSession) throws TransactionException {
// do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting),
// because it's already unlocked in 'DefaultCore.commit()'
if (status != Committing && status != CommitRetrying && status != AsyncCommitting) {
if (!branchSession.unlock()) {
throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId());
}
}
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onRemoveBranch(this, branchSession);
}
remove(branchSession);
}


// Step 4 : DataBaseTransactionStoreManager # writeSession 处理 Branch 删除

5.2 对比 Rollback 回退过程

和 Global 一样 , 异步的方式是不一样的

1
java复制代码TODO : 后续完善

5.3 DefaultCoordinator 类简述

这里来看一下 DefaultCoordinator 类有什么 ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码

// 内部对象
C- DefaultCoordinator
F- int TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS = 5000; // 定时任务关闭最大等待时间
F- long COMMITTING_RETRY_PERIOD // 提交重试时间 , 默认 1000L
F- long ASYNC_COMMITTING_RETRY_PERIOD // 异步提交重试时间
F- long ROLLBACKING_RETRY_PERIOD //回退重试时间
F- long TIMEOUT_RETRY_PERIOD // 超时重试
F- long UNDO_LOG_DELETE_PERIOD // UNDO_LOG 删除周期
F- long UNDO_LOG_DELAY_DELETE_PERIOD = 3 * 60 * 1000 // Undo_log延迟删除周期
F- int ALWAYS_RETRY_BOUNDARY = 0 // 总是重试
F- Duration MAX_COMMIT_RETRY_TIMEOUT // 最大提交重试超时时间
F- Duration MAX_ROLLBACK_RETRY_TIMEOUT // 最大回滚重试超时时间
F- boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE // 开启超时解锁
F- ScheduledThreadPoolExecutor retryRollbacking
F- ScheduledThreadPoolExecutor retryCommitting
F- ScheduledThreadPoolExecutor asyncCommitting
F- ScheduledThreadPoolExecutor timeoutCheck
F- ScheduledThreadPoolExecutor undoLogDelete
F- RemotingServer remotingServer
F- DefaultCore core
F-EventBus eventBus

5.4 GlobalSession 是什么

当创建全局事务后 , 会创建 全局 Session , 实现SessionLifecycle接口提供begin,changeStatus,changeBranchStatus,addBranch,removeBranch等操作session和branchSession的方法

  • GlobalSessionLock : 该对象内部持有ReentrantLock对象,利用ReentrantLock的lock和unlock机制
  • BranchSession : 分支session,管理分支数据,受globalSession统一调度管理,它的lock和unlock方法由lockManger实现
  • DefaultLockManager : DefaultLockManager是LockManager的默认实现,它获取branchSession的lockKey

说一说 lifecycleListeners 的作用
private Set lifecycleListeners = new HashSet<>()

在不同的方法中 , 会分别循环 Listeners 调用对应的逻辑

  • begin() -> lifecycleListener.onBegin(this)
  • changeStatus(GlobalStatus status) -> lifecycleListener.onStatusChange(this, status)
  • changeBranchStatus(BranchSession branchSession, BranchStatus status) -> lifecycleListener.onBranchStatusChange(this, branchSession, status)
  • close() -> lifecycleListener.onClose(this)
  • end() -> lifecycleListener.onEnd(this)

SessionLifecycleListener 模块

Seata-SessionLifecycleListener.png

可以看到 , 主要有3种实现 , DataBaseSessionMananger , FileSessionManager , RedisSessionManager

他们分别会对应各自的 Store 管理器 :

FileSessionManager -> FileTransactionStoreManager

RedisSessionManager -> RedisTransactionStoreManager

DataBaseSessionManager -> DataBaseTransactionStoreManager

5.5 主逻辑流程图

分布式事务-gloablSession.jpg

总结

文章内容比较简单 , 主要是为了完善整个 Seata 的版图 , 算是梳理个七七八八了

后续再把 undo-log 梳理完 , 整个流程就差不多可以结束了

等 Seata 完成了单纯得源码分析也可以告一段落了 , 后续慢慢的要开启应用篇 , 拭目以待

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%