首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜
文章合集 : 🎁 juejin.cn/post/694164…
Github : 👉 github.com/black-ant
CASE 备份 : 👉 gitee.com/antblack/ca…
一 . 前言
突然发现 Server 端接受请求这一块漏掉了 , 这一块有点绕 , 建了几个线程来处理 , 所以有必要补上
Seata 的 Server 和 Client 端主要是通过 Netty 进行通信的
Client 与 Server 通信的主要目的是为了创建事务 GlobalSession 以及 往GlobalSession 中注册 Branch .
二 . Client 段请求
前置补充 : 前文中 说了 Client 的调用流程
1 | java复制代码// C- DefaultTransactionManager Begin 中发起 GlobalSession |
三 . Seata Server 端处理
上一篇说了 seata client 发起了一个 GlobalLockRequest , 这里看一下 Server 端如何处理的 , 之前我们已经知道 , Seata 通过 Netty 实现的前后端交互 , 其主要逻辑如下 :
1 | java复制代码 |
3.1 流程的入口
1 | java复制代码// Step 1 : Netty 请求的入口 |
3.2 ExecutorService 执行
ExecutorService 的初始化流程
从上文的结构图中可以看到 , 其最终的抽象类为 AbstractNettyRemoting , 这里主要有2个ExecutorService
1 | java复制代码// 该 ExecutorService 主要在 init 后定期执行 |
ExecutorService 的使用流程
1 | java复制代码// 从上文可以看到 , 这里主要使用的是 NamedThreadFactory 创建线程 |
3.3 Processor 处理
3.3.1 Processor 的加载处理
Processor 的加载是在 NettyRemotingServer 中进行 ,简述就是3步 :
Step 1 : Server # main 中开启 nettyRemotingServer init
Step 2 : registerProcessor 注册各种 Processor
Step 3 : 构建 Pair 放入集合 , 用于运行时使用
1 | java复制代码// Step 1 : Server # main 中执行 (PS : Server 是 Seata 启动类 , 可以详见之前的文章) |
3.2 对请求进行处理
1 | java复制代码// Step 1 : 进入Process 处理 (ServerOnRequestProcessor) |
3.3 Handler 分别处理 Message
可以看到 , 其中有多个 MessageHandler 进行处理
1 | java复制代码// 其核心处理类为 TransactionMessageHandler : 在上层处理接收到的RPC消息 |
3.4 开始处理
可以看到 , 3.3 中就正式的开始了 doGlobalBegin 的处理 , 也就和 Session 那个章节连接起来了
1 | java复制代码public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler { |
四 . 补充
4.1 . Seata RM 端注册
1 | java复制代码// Server 端注册的主要对象为 : RegRmProcessor |
RegTmProcessor 同理 , 这里就不看了 , 以后有兴趣可以把这个channel 和版本控制专门看一下
4.2 ServerHeartbeatProcessor 心跳检测
1 | java复制代码public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { |
总结
终于把这个环节补齐了 , 整个 Seata 就通了 , 后面不准备开新文 ,准备把文章好好的优化下 ,补充细节 !!!
总结一下就是在 Server # main 中初始化了 ThreadPoolExecutor 和 NettyRemotingServer , 在AbstractNettyRemotingServer 中对 NettyRequest 进行处理 .
本文转载自: 掘金