盘点 Seata Server 端接收请求

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

突然发现 Server 端接受请求这一块漏掉了 , 这一块有点绕 , 建了几个线程来处理 , 所以有必要补上

Seata 的 Server 和 Client 端主要是通过 Netty 进行通信的

Client 与 Server 通信的主要目的是为了创建事务 GlobalSession 以及 往GlobalSession 中注册 Branch .

二 . Client 段请求

前置补充 : 前文中 说了 Client 的调用流程

1
2
3
4
5
6
java复制代码// C- DefaultTransactionManager Begin 中发起 GlobalSession 
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
// 发起 Netty 请求
// timeout=300000,transactionName=dubbo-gts-seata-example
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
}

三 . Seata Server 端处理

上一篇说了 seata client 发起了一个 GlobalLockRequest , 这里看一下 Server 端如何处理的 , 之前我们已经知道 , Seata 通过 Netty 实现的前后端交互 , 其主要逻辑如下 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码
// 入口类 : AbstractNettyRemotingServer
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
//.........
}

// 调用流程 :
C- AbstractNettyRemotingServer # channelRead
C- AbstractNettyRemoting # processMessage


// RM 的注册
RegRmProcessor

//
BatchLogHandler
DefaultCoordinator
AbstractNettyRemotingServer

3.1 流程的入口

seata-system-AbstractNettyRemoting.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
java复制代码// Step 1 : Netty 请求的入口
C- AbstractNettyRemotingServer
M- channelRead(final ChannelHandlerContext ctx, Object msg)
- processMessage(ctx, (RpcMessage) msg)


// Step 2 : 处理 Message
C- AbstractNettyRemoting
M- processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage)

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;

// 准备 Pair 对象 , 获取执行类 -> 3.2
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());

if (pair != null) {
if (pair.getSecond() != null) {
try {
// 步骤一 : 执行 ExecutorService ,准备线程
pair.getSecond().execute(() -> {
try {
// 步骤二 :执行 process
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
int idx = new Random().nextInt(100);
try {
Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
} catch (IOException exx) {
}
allowDumpStack = false;
}
}
} else {
pair.getFirst().process(ctx, rpcMessage);
}
}
}
}

seata-netty-message-request.jpg

3.2 ExecutorService 执行

ExecutorService 的初始化流程

从上文的结构图中可以看到 , 其最终的抽象类为 AbstractNettyRemoting , 这里主要有2个ExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码// 该 ExecutorService 主要在 init 后定期执行
ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("timeoutChecker", 1, true));

// 用于 Netty Request 时处理 , 该对象在 Server 初始化时通过构造器传入
ThreadPoolExecutor messageExecutor;

// PS : 这里来回顾一下
C- Server # main
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());

NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

ExecutorService 的使用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// 从上文可以看到 , 这里主要使用的是 NamedThreadFactory 创建线程
public Thread newThread(Runnable r) {
String name = prefix + "_" + counter.incrementAndGet();
if (totalSize > 1) {
name += "_" + totalSize;
}
// Group : java.lang.ThreadGroup[name=main,maxpri=10]
// name : ServerHandlerThread_1_18_500
Thread thread = new FastThreadLocalThread(group, r, name);

thread.setDaemon(makeDaemons);
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}

// PS : 为什么用 FastThreadLocalThread 呢 ?
- 因为百度第一个搜索结果为 <惊:FastThreadLocal吞吐量居然是ThreadLocal的3倍!!!>
- 主要原因就是吞吐量快 , 但是其快主要提现在单线程情况下 , 因为其使用数组避免了hash冲突的发生

- 另外 FastThreadLocalThread 是 Netty 内部自己写的线程类

3.3 Processor 处理

3.3.1 Processor 的加载处理

Processor 的加载是在 NettyRemotingServer 中进行 ,简述就是3步 :

Step 1 : Server # main 中开启 nettyRemotingServer init

Step 2 : registerProcessor 注册各种 Processor

Step 3 : 构建 Pair 放入集合 , 用于运行时使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
java复制代码// Step 1 : Server # main 中执行 (PS : Server 是 Seata 启动类 , 可以详见之前的文章)
public static void main(String[] args) throws IOException {

// .... 省略其他逻辑 , 最后对 Netty 进行初始化
try {
nettyRemotingServer.init();
} catch (Throwable e) {
System.exit(-1);
}
System.exit(0);
}


// Step 2 : init 初始化
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
}
}

// Step 3 : 注册 Processor
private void registerProcessor() {
ServerOnRequestProcessor onRequestProcessor =new ServerOnRequestProcessor(this, getHandler());

super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);

//------
ServerOnResponseProcessor onResponseProcessor =new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);

//------
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);

//------
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);

//------
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);

}

// PS : 可以看到 , 这里大概有以下几种处理器 :
- ServerOnRequestProcessor : 处理 RM/TM 客户端请求信息
- ServerOnResponseProcessor : 处理 RM/TM 客户端返回信息
- RegRmProcessor : RM 注册处理器
- RegTmProcessor : TM 注册处理器
- ServerHeartbeatProcessor : 处理心跳信息

// PS : 这里还能看到 , 其中 传入了一个 messageExecutor (ThreadPoolExecutor) , 该对象用于后续创建线程


// Step End : 可以看到 , 这里就已经构建了 Pair , 用于后方使用
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}

// PS : 这里往父抽象类中设置了集合 (AbstractNettyRemoting)
protected final HashMap<Integer, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);

3.2 对请求进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码// Step 1 : 进入Process 处理 (ServerOnRequestProcessor)
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
onRegRmMessage(ctx, rpcMessage);
}

// Step 2 : 处理 Request
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
// 省略 Log 操作
if (!(message instanceof AbstractMessage)) {
return;
}
if (message instanceof MergedWarpMessage) {
AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];
// 如果是多请求
for (int i = 0; i < results.length; i++) {
final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);
// 3.3 调用 Handler 对 Message 进行处理
results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
}
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
} else {
// 如果是单独的请求
final AbstractMessage msg = (AbstractMessage) message;
// 3.3 调用 Handler 对 Message 进行处理
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
}
}

3.3 Handler 分别处理 Message

可以看到 , 其中有多个 MessageHandler 进行处理

seata-message-handler.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码// 其核心处理类为 TransactionMessageHandler : 在上层处理接收到的RPC消息
I- TransactionMessageHandler
M- AbstractResultMessage onRequest(AbstractMessage request, RpcContext context)
M- void onResponse(AbstractResultMessage response, RpcContext context)

C- DefaultCoordinator
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);

return transactionRequest.handle(context);
}


C- AbstractTCInboundHandler
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
@Override
public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
try {
// 发起 Gouble 处理
doGlobalBegin(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore,
String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
e);
}
}
}, request, response);
return response;
}

3.4 开始处理

可以看到 , 3.3 中就正式的开始了 doGlobalBegin 的处理 , 也就和 Session 那个章节连接起来了

1
java复制代码public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {

四 . 补充

4.1 . Seata RM 端注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码// Server 端注册的主要对象为 : RegRmProcessor

@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
onRegRmMessage(ctx, rpcMessage);
}

// 发起 Server 注册
private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
// RegisterRMRequest{resourceIds='null', applicationId='business-seata-example', transactionServiceGroup='business-service-seata-service-group'}
RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
boolean isSuccess = false;
String errorInfo = StringUtils.EMPTY;
try {
if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
// ChannelManager 注册当前 channel
ChannelManager.registerRMChannel(message, ctx.channel());
// 控制 Channel 版本
Version.putChannelVersion(ctx.channel(), message.getVersion());
isSuccess = true;
}
} catch (Exception exx) {
isSuccess = false;
errorInfo = exx.getMessage();
}
// 构建返回结果
RegisterRMResponse response = new RegisterRMResponse(isSuccess);
if (StringUtils.isNotEmpty(errorInfo)) {
response.setMsg(errorInfo);
}
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
}

RegTmProcessor 同理 , 这里就不看了 , 以后有兴趣可以把这个channel 和版本控制专门看一下

4.2 ServerHeartbeatProcessor 心跳检测

1
2
3
4
5
6
7
8
9
10
11
java复制代码public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
try {
// 好像啥事没做 , 就打了个 log
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), HeartbeatMessage.PONG);
} catch (Throwable throwable) {
LOGGER.error("send response error: {}", throwable.getMessage(), throwable);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("received PING from {}", ctx.channel().remoteAddress());
}
}

总结

终于把这个环节补齐了 , 整个 Seata 就通了 , 后面不准备开新文 ,准备把文章好好的优化下 ,补充细节 !!!

总结一下就是在 Server # main 中初始化了 ThreadPoolExecutorNettyRemotingServer , 在AbstractNettyRemotingServer 中对 NettyRequest 进行处理 .

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%