盘点 Seata Seata Server 启动流程

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 .前言

这一篇文章将会开启 Seata 的源码分析流程 , 源码的整体逻辑会比较长 , 预计分为以下几个部分 :

  • Seata Server 启动流程
  • Seata Client 启动流程
  • Seata 的配置加载
  • Seata 的配置处理
  • Seata 事务的处理
  • Seata ID 的流转
  • Seata TCC 模式
  • Seata XA 模式
  • Seata sega 模式
  • Seata Nacos 及其他的服务管理

二 . Seata Server 源码下载及启动

Seata Server 下载地址 , 通过该地址下载 Server Code 即可 , 整个过程中只有2个文件需要变动

Server 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
xml复制代码<modules>
<module>all</module>
<module>bom</module>
<module>common</module>
<!-- 动态配置 , 配置中心 -->
<module>config</module>
<!-- 核心模块 -->
<module>core</module>
<!-- 服务注册与发现的相关功能-->
<module>discovery</module>
<module>distribution</module>
<!-- 用于对不同框架的集成-->
<module>integration/dubbo</module>
<module>integration/dubbo-alibaba</module>
<module>integration/sofa-rpc</module>
<module>integration/motan</module>
<module>integration/grpc</module>
<module>integration/http</module>
<!-- Seata 对 RM 的核心实现-->
<module>rm</module>
<module>rm-datasource</module>
<!-- Server 运行启动类 , TC 的核心实现 -->
<module>server</module>
<module>spring</module>
<!-- TCC 模块 -->
<module>tcc</module>
<module>test</module>
<!-- Seata 对 TM 的实现,提供了全局事务管理 -->
<module>tm</module>
<module>metrics</module>
<module>serializer</module>
<!-- Spring Boot 自动配置相关均在里面 -->
<module>seata-spring-boot-starter</module>
<module>compressor</module>
<!-- saga 模块 -->
<module>saga</module>
<module>sqlparser</module>
</modules>

源码的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
java复制代码// Step 1 : 打开 Server 模块 Resources 目录 , 修改 Registry.conf
// PS : 可以看到 , 它支持的发现中心有 file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
registry {
type = "nacos"

nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"

nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seataServer.properties"
}
}


// Step 2 : 添加 / 修改 nacos.conf
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.thread-factory.boss-thread-prefix=NettyBoss
transport.thread-factory.worker-thread-prefix=NettyServerNIOWorker
transport.thread-factory.server-executor-thread-prefix=NettyServerBizHandler
transport.thread-factory.share-boss-worker=false
transport.thread-factory.client-selector-thread-prefix=NettyClientSelector
transport.thread-factory.client-selector-thread-size=1
transport.thread-factory.client-worker-thread-prefix=NettyClientWorkerThread
transport.thread-factory.boss-thread-size=1
transport.thread-factory.worker-thread-size=8
transport.shutdown.wait=3
service.vgroup_mapping.order-service-seata-service-group=default
service.vgroup_mapping.account-service-seata-service-group=default
service.vgroup_mapping.storage-service-seata-service-group=default
service.vgroup_mapping.business-service-seata-service-group=default
service.vgroupMapping.order-service-seata-service-group=default
service.vgroupMapping.account-service-seata-service-group=default
service.vgroupMapping.storage-service-seata-service-group=default
service.vgroupMapping.business-service-seata-service-group=default
service.enableDegrade=false
service.disable=false
service.max.commit.retry.timeout=-1
service.max.rollback.retry.timeout=-1
client.async.commit.buffer.limit=10000
client.lock.retry.internal=10
client.lock.retry.times=30
store.mode=db
store.file.dir=file_store/data
store.file.max-branch-session-size=16384
store.file.max-global-session-size=512
store.file.file-write-buffer-cache-size=16384
store.file.flush-disk-mode=async
store.file.session.reload.read_size=100
store.db.driver-class-name=com.mysql.jdbc.Driver
store.db.datasource=dbcp
store.db.db-type=mysql
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=123456
store.db.min-conn=1
store.db.max-conn=3
store.db.global.table=global_table
store.db.branch.table=branch_table
store.db.query-limit=100
store.db.lock-table=lock_table
recovery.committing-retry-period=1000
recovery.asyn-committing-retry-period=1000
recovery.rollbacking-retry-period=1000
recovery.timeout-retry-period=1000
transaction.undo.data.validation=true
transaction.undo.log.serialization=jackson
transaction.undo.log.save.days=7
transaction.undo.log.delete.period=86400000
transaction.undo.log.table=undo_log
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registry-type=compact
metrics.exporter-list=prometheus
metrics.exporter-prometheus-port=9898
client.report.retry.count=5
service.disableGlobalTransaction=false
client.support.spring.datasource.autoproxy=true


// Step 3 : 启动 io.seata.server.Server 类

三 . Seata 流程分析

3.1 Main 启动流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
java复制代码// Seata 的启动类是 Server # main , 来看一下这个
public static void main(String[] args) throws IOException {

// 获取 port 端口 , 该端口会用于 log 对象 -> 3.2
int port = PortHelper.getPort(args);
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

// 准备 log 对象
final Logger logger = LoggerFactory.getLogger(Server.class);
if (ContainerHelper.isRunningInContainer()) {
logger.info("The server is running in container.");
}

// 初始化参数解析器 -> 3.3
ParameterParser parameterParser = new ParameterParser(args);

// 初始化指标 -> 3.4
MetricsManager.get().init();

System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

// 线程池的构建
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(
NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(),
NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()),
new ThreadPoolExecutor.CallerRunsPolicy());

// 构建 NettyRemotingServer
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

// 构建 nettyRemotingServer 的监听端口
nettyRemotingServer.setListenPort(parameterParser.getPort());
UUIDGenerator.init(parameterParser.getServerNode());

//log store mode : file, db, redis
SessionHolder.init(parameterParser.getStoreMode());

DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
// 核心一 : 初始化 DefaultCoordinator
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

//127.0.0.1 and 0.0.0.0 are not valid here.
// 设置 XID 后 , 后续生成全局事务id 是会使用
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort());

try {
// 核心二 : 初始化 nettyRemotingServer
nettyRemotingServer.init();
} catch (Throwable e) {
logger.error("nettyServer init error:{}", e.getMessage(), e);
System.exit(-1);
}

System.exit(0);
}

3.2 Port 获取流程

Port 的获取流程很简单 , 没有什么值得深入的地方 , 核心关注点就是 :

  • 区分容器通过不同的方式处理 , 主要是 Docker 和 K8S
  • cat /proc/1/cgroup 获取类型 (cgroup是linux内核实现、用于控制linux系统资源的组件)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public static int getPort(String[] args) {
// 判断是否在容器中运行
if (ContainerHelper.isRunningInContainer()) {
return ContainerHelper.getPort();
} else if (args != null && args.length >= 2) {
for (int i = 0; i < args.length; ++i) {
// 通过参数 -p 获取 port 参数
if ("-p".equalsIgnoreCase(args[i]) && i < args.length - 1) {
return NumberUtils.toInt(args[i + 1], SERVER_DEFAULT_PORT);
}
}
}

return SERVER_DEFAULT_PORT;
}

注意 , 此处会放在 System 中 , 看注释是用于 logback 获取 System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port))

3.3 ParameterParser 的作用

ParameterParser 用于解析启动参数 , 参数解析器应该始终是要执行的第一行.

此处使用 com.beust.jcommander.JCommander 进行参数的处理 , 使用方式 : seata-server.bat -h 127.0.0.1 -p 8086

Seata 中提供了如下几种参数 :

  • –host, -h : 要注册到注册中心的ip
  • –port, -p : 监听端口 , 默认 0
  • –storeMode, -m : 日志存储方式:”file” , “db”
  • –serverNode, -n : 服务器节点id,如1、2、3。它将根据雪花默认生成
  • –seataEnv, -e : 用于多配置隔离的名称
  • –help

PS : 另外 , Seata 中通过 Maven Plugin 做了第二层配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
xml复制代码<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>2.3.0</version>
<configuration>
<to>
<image>docker.io/seataio/seata-server</image>
<tags>
${image.tags}
</tags>
<!-- ..... -->
</to>
<container>
<appRoot>/seata-server</appRoot>
<workingDirectory>/seata-server</workingDirectory>
<mainClass>io.seata.server.Server</mainClass>
<ports>
<port>8091</port>
</ports>
<jvmFlags>
<jvmFlag>-Djava.security.egd=file:/dev/./urandom</jvmFlag>
<jvmFlag>-server</jvmFlag>
<jvmFlag>-Xss512k</jvmFlag>
<jvmFlag>-XX:+UnlockExperimentalVMOptions</jvmFlag>
<!-- ..... -->
</jvmFlags>
<!-- ..... -->
</container>
<!-- ..... -->
</configuration>
</plugin>

可以看到 , 此处配置了 Port 及 JVM 等参数

3.4 MetricsManager.get().init()

** MetricsManager 的作用** :

  1. metrics 是事务协调者
  2. MetricsManager 通过懒汉单例方式生成
  3. 此处通过 metrics.enabled 判断是否开启 metrics

总结一下 : 该工具用于快速详尽的获取到TC、TM(规划中)和RM(规划中)中事务的活动状态以及时延等重要统计信息

当状态有变化,EvenBus会把事件推送给MetricsSubscriber,MetricsSubscriber中调用Registry把度量数据写入。Exporter再定期把度量数据拉出来,发给外部监控系统。


注册方式 :

  1. 从配置中心读取配置,看是否需要初始化metric ( ConfigurationFactory.getInstance().getBoolean/metrics.enabled)
  2. 通过RegistryFactory.getInstance() 初始化 Registry对象
  3. 然后使用 ExporterFactory.getInstanceList() 设置Registry对象
  4. 最后用 EventBusManager注册一个metrics的订阅

成员功能 :
Registry : 定义了getCounter、getSummary、getTimer等接口

Exporter : 发布器,把度量数据同步给对应的监控系统

EventBusManager : metrics的数据来源就是通过订阅EventBus获取的

具体流程后续继续分析 >>>>

3.5 ThreadPoolExecutor 的创建

构建一个线程池 , 此处的参数主要是 NettyServerConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码// 线程池默认属性
// 1 . WORKER_THREAD_SIZE 在 NettyBaseConfig 中进行配置
private int serverSelectorThreads = WORKER_THREAD_SIZE ;
private int serverSocketSendBufSize = 153600;
private int serverSocketResvBufSize = 153600;
private int serverWorkerThreads = WORKER_THREAD_SIZE ;
private int soBackLogSize = 1024 ;
private int writeBufferHighWaterMark = 67108864 ;
private int writeBufferLowWaterMark = 1048576 ;
private static final int DEFAULT_LISTEN_PORT = 8091;
private static final int RPC_REQUEST_TIMEOUT = 30 * 1000;
private int serverChannelMaxIdleTimeSeconds = 30 ;
private static final String EPOLL_WORKER_THREAD_PREFIX = "NettyServerEPollWorker";
private static int minServerPoolSize = 50;
private static int maxServerPoolSize = 500;
private static int maxTaskQueueSize = 20000;
private static int keepAliveTime = 500;

3.6 NettyRemotingServer 的构建

NettyRemotingServer 的作用 :

该类用于实现Netty服务器端功能,接受数据包,在服务器端处理后发送给客户端 , 同时它还用于启动netty,监听服务器端口,接收TM、RM的请求,它还为处理不同的请求创建不同的处理器

可以看到 , 这里是把线程池传入 NettyRemotingServer

1
2
3
4
5
6
7
8
java复制代码NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
super(messageExecutor);
// Rpc server bootstrap.
serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
serverBootstrap.setChannelHandlers(new ServerHandler());
}

3.7 NettyRemotingServer 的配置

主要做了这些事情 :

  • 设置端口
  • 初始化 Session 方案
  • 构建 DefaultCoordinator , 并且初始化
1
2
3
4
5
6
7
8
9
java复制代码nettyRemotingServer.setListenPort(parameterParser.getPort()); -> 8091

UUIDGenerator.init(parameterParser.getServerNode());
//log store mode : file, db, redis
SessionHolder.init(parameterParser.getStoreMode());

DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

UUIDGenerator 的作用 的作用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码
init : IdWorker idWorker = new IdWorker(serverNode);

Step 1 : 立即初始化时间戳和序列
private void initTimestampAndSequence() {
long timestamp = getNewestTimestamp();
long timestampWithSequence = timestamp << sequenceBits;
this.timestampAndSequence = new AtomicLong(timestampWithSequence);
}

Step 2 : 初始化 workid
private void initWorkerId(Long workerId) {
if (workerId == null) {
// 使用最低10位可用MAC作为workerId
workerId = generateWorkerId();
}
if (workerId > maxWorkerId || workerId < 0) {
String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId);
throw new IllegalArgumentException(message);
}
this.workerId = workerId << (timestampBits + sequenceBits);
}

SessionHolder 的处理

  • Step 1 : StoreMode storeMode = StoreMode.get(mode); 生成 Mode 类型对象
  • Step 2 : 通过 storeMode 判断具体的处理类型 (DB , FILE . REDIS)

这里主要碰到了2个对象 :

  • SessionHolder : Session 处理器 , 用于初始化及处理
  • GlobalSession : GlobalSession 是 seata协调器DefaultCoordinator管理维护的重要部件

Session 主要处理那些数据 :

  • String ROOT_SESSION_MANAGER_NAME = “root.data”;
  • String ASYNC_COMMITTING_SESSION_MANAGER_NAME = “async.commit.data”;
  • String RETRY_COMMITTING_SESSION_MANAGER_NAME = “retry.commit.data”;
  • String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = “retry.rollback.data”;

SessionHolder 中会为这四个数据创建 Object ,存入 Session

3.8 构建和初始化 DefaultCoordinator

seata-DefaultCoordinator-system.png

作用 : DefaultCoordinator 为TC协调器 (事务控制协调器)

结构 :

  • 继承AbstractTCInboundHandler接口(为TC接受到RM和TM的request请求数据)

  • 实现TransactionMessageHandler接口(处理RPC消息)

  • 实现ResourceManagerInbound接口 (处理发送至RM的branchCommit,branchRollback请求)

整体结构如下图所示 :

seata-server-coordinator.jpg

DefaultCoordinator init 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码    public void init() {
retryRollbacking.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.retryRollbackingLock();
if (lock) {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
} finally {
SessionHolder.unRetryRollbackingLock();
}
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

retryCommitting.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.retryCommittingLock();
if (lock) {
try {
handleRetryCommitting();
} catch (Exception e) {
LOGGER.info("Exception retry committing ... ", e);
} finally {
SessionHolder.unRetryCommittingLock();
}
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

asyncCommitting.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.asyncCommittingLock();
if (lock) {
try {
handleAsyncCommitting();
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
} finally {
SessionHolder.unAsyncCommittingLock();
}
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

timeoutCheck.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.txTimeoutCheckLock();
if (lock) {
try {
timeoutCheck();
} catch (Exception e) {
LOGGER.info("Exception timeout checking ... ", e);
} finally {
SessionHolder.unTxTimeoutCheckLock();
}
}
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

undoLogDelete.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.undoLogDeleteLock();
if (lock) {
try {
undoLogDelete();
} catch (Exception e) {
LOGGER.info("Exception undoLog deleting ... ", e);
} finally {
SessionHolder.unUndoLogDeleteLock();
}
}
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}

3.9 ShutdownHook 的作用

ShutdownHook 主要用于销毁流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

public class ShutdownHook extends Thread {

private static final ShutdownHook SHUTDOWN_HOOK = new ShutdownHook("ShutdownHook");
private final PriorityQueue<DisposablePriorityWrapper> disposables = new PriorityQueue<>();
private final AtomicBoolean destroyed = new AtomicBoolean(false);

// 默认10。值越低优先级越高
private static final int DEFAULT_PRIORITY = 10;


// 主要方法 :
addDisposable() : 添加实例
destroyAll() : 销毁所有的实例
- Disposable disposable = disposables.poll();
- disposable.destroy();

}


// 调用的逻辑
GlobalTransactionScanner#distory
DefaultSagaTransactionalTemplate#distory

3.10 XID 的作用

之前看到 , 前面对 XID 进行了设置 , 加入了 Address 和 Port , 它实际上会在后面设置全局ID的时候使用 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort());


// XID 是一个全局的 事务ID 对象
public class XID {
private static int port;
private static String ipAddress;

}

public static String generateXID(long tranId) {
return ipAddress + ":" + port + ":" + tranId;
}

public static long getTransactionId(String xid) {
if (xid == null) {
return -1;
}

int idx = xid.lastIndexOf(":");
return Long.parseLong(xid.substring(idx + 1));
}

3.11 nettyRemotingServer.init() 初始化流程

Step Start : init 入口

1
2
3
4
5
6
7
java复制代码public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
}
}

Step 2 : registerProcessor 处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
java复制代码
private void registerProcessor() {
// 1. 注册表上的请求消息处理器
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. 注册表上的响应消息处理器
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
// 3. 注册表信息处理器
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. 注册表信息处理器
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. 注册表心跳消息处理器
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}


// PS : 这里来看一下 Registry 到底注册了什么
protected final HashMap<Integer, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}

// PS : processorTable 的使用
C- AbstractNettyRemoting
M- processMessage : Rpc消息处理

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {

Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 从 processorTable 中取出 Pair 对象
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
if (pair.getSecond() != null) {
try {
// ExecutorService 处理
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
//.......
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
//..............
}
}
}
}
}

init 处理操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
java复制代码
// Init 入口类
public void init() {
// Step 1 : 调用父类 init 方法
super.init();
// Step 2 : 调用 serverBootstrap 启动流程
serverBootstrap.start();
}

// Step 1 : 调用父类 init 方法
ConcurrentHashMap<Integer, MessageFuture> futures = new ConcurrentHashMap<>();
public void init() {
// 延迟 3 秒执行 , 3秒执行一次
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
if (entry.getValue().isTimeout()) {
futures.remove(entry.getKey());
entry.getValue().setResultMessage(null);
}
}

nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}


// Step 2 : 调用 serverBootstrap 启动流程
public void start() {
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(listenPort))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}

}
});

try {
ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
LOGGER.info("Server started, listen port: {}", listenPort);
RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
initialized.set(true);
future.channel().closeFuture().sync();
} catch (Exception exx) {
throw new RuntimeException(exx);
}

}

// 这里涉及到 NettyServerBootstrap , 该对象用于构建 RPC Server , 其中 会调用 ServerBootstrap 的处理流程

PS : 后续是 Netty ServerBootstrap 的处理 , 考虑以后说 Netty 的时候单独说说

这里简单说一下就是 : 

  • group : 为父(接受方)和子(客户端)设置EventLoopGroup。这些EventLoopGroup的用于处理ServerChannel和Channel的所有事件和IO。
  • channel : 用于从中创建Channel实例的类。
  • ChannelOption : 允许指定一个ChannelOption,用于创建Channel实例。
  • childOption : 允许指定一个 ChannelOption,用于创建Channel实例(在接受方接受了 Channel之后)。

3.12 其他

System 处理

初始化异常 : System.exit(-1)

初始化成功 : System.exit(0)

终止当前运行的Java虚拟机。参数作为一种状态代码;按照约定,非零状态码表示异常终止

调用System.exit(n)有效地等价于调用:Runtime.getRuntime ().exit(n)

总结

因为是一篇为了入门的文章 , 所以文章相对简单 , 也没有涉及太多的流程 .

主要是为了完善整个体系 , 并且开启 Seata 的流程.

核心步骤 :

1 . MetricsManager.get().init() :初始化指标

2 . new ThreadPoolExecutor : 构建线程池

3 . new NettyRemotingServer : 构建 Netty Server

4 . SessionHolder.init

5 . 构建全局事务 ID

6 . nettyRemotingServer.init()

更新日志

  • V20210805 : 添加 总结

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%