首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜
文章合集 : 🎁 juejin.cn/post/694164…
Github : 👉 github.com/black-ant
CASE 备份 : 👉 gitee.com/antblack/ca…
一 .前言
这一篇文章将会开启 Seata 的源码分析流程 , 源码的整体逻辑会比较长 , 预计分为以下几个部分 :
- Seata Server 启动流程
- Seata Client 启动流程
- Seata 的配置加载
- Seata 的配置处理
- Seata 事务的处理
- Seata ID 的流转
- Seata TCC 模式
- Seata XA 模式
- Seata sega 模式
- Seata Nacos 及其他的服务管理
二 . Seata Server 源码下载及启动
Seata Server 下载地址 , 通过该地址下载 Server Code 即可 , 整个过程中只有2个文件需要变动
Server 项目结构
1 | xml复制代码<modules> |
源码的配置
1 | java复制代码// Step 1 : 打开 Server 模块 Resources 目录 , 修改 Registry.conf |
三 . Seata 流程分析
3.1 Main 启动流程
1 | java复制代码// Seata 的启动类是 Server # main , 来看一下这个 |
3.2 Port 获取流程
Port 的获取流程很简单 , 没有什么值得深入的地方 , 核心关注点就是 :
- 区分容器通过不同的方式处理 , 主要是 Docker 和 K8S
- cat /proc/1/cgroup 获取类型 (cgroup是linux内核实现、用于控制linux系统资源的组件)
1 | java复制代码public static int getPort(String[] args) { |
注意 , 此处会放在 System 中 , 看注释是用于 logback 获取 System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port))
3.3 ParameterParser 的作用
ParameterParser 用于解析启动参数 , 参数解析器应该始终是要执行的第一行.
此处使用 com.beust.jcommander.JCommander 进行参数的处理 , 使用方式 : seata-server.bat -h 127.0.0.1 -p 8086
Seata 中提供了如下几种参数 :
- –host, -h : 要注册到注册中心的ip
- –port, -p : 监听端口 , 默认 0
- –storeMode, -m : 日志存储方式:”file” , “db”
- –serverNode, -n : 服务器节点id,如1、2、3。它将根据雪花默认生成
- –seataEnv, -e : 用于多配置隔离的名称
- –help
PS : 另外 , Seata 中通过 Maven Plugin 做了第二层配置
1 | xml复制代码<plugin> |
可以看到 , 此处配置了 Port 及 JVM 等参数
3.4 MetricsManager.get().init()
** MetricsManager 的作用** :
- metrics 是事务协调者
- MetricsManager 通过懒汉单例方式生成
- 此处通过 metrics.enabled 判断是否开启 metrics
总结一下 : 该工具用于快速详尽的获取到TC、TM(规划中)和RM(规划中)中事务的活动状态以及时延等重要统计信息
当状态有变化,EvenBus会把事件推送给MetricsSubscriber,MetricsSubscriber中调用Registry把度量数据写入。Exporter再定期把度量数据拉出来,发给外部监控系统。
注册方式 :
- 从配置中心读取配置,看是否需要初始化metric ( ConfigurationFactory.getInstance().getBoolean/metrics.enabled)
- 通过RegistryFactory.getInstance() 初始化 Registry对象
- 然后使用 ExporterFactory.getInstanceList() 设置Registry对象
- 最后用 EventBusManager注册一个metrics的订阅
成员功能 :
Registry : 定义了getCounter、getSummary、getTimer等接口Exporter : 发布器,把度量数据同步给对应的监控系统
EventBusManager : metrics的数据来源就是通过订阅EventBus获取的
具体流程后续继续分析 >>>>
3.5 ThreadPoolExecutor 的创建
构建一个线程池 , 此处的参数主要是 NettyServerConfig
1 | java复制代码// 线程池默认属性 |
3.6 NettyRemotingServer 的构建
NettyRemotingServer 的作用 :
该类用于实现Netty服务器端功能,接受数据包,在服务器端处理后发送给客户端 , 同时它还用于启动netty,监听服务器端口,接收TM、RM的请求,它还为处理不同的请求创建不同的处理器
可以看到 , 这里是把线程池传入 NettyRemotingServer
1 | java复制代码NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); |
3.7 NettyRemotingServer 的配置
主要做了这些事情 :
- 设置端口
- 初始化 Session 方案
- 构建 DefaultCoordinator , 并且初始化
1 | java复制代码nettyRemotingServer.setListenPort(parameterParser.getPort()); -> 8091 |
UUIDGenerator 的作用 的作用
1 | java复制代码 |
SessionHolder 的处理
- Step 1 : StoreMode storeMode = StoreMode.get(mode); 生成 Mode 类型对象
- Step 2 : 通过 storeMode 判断具体的处理类型 (DB , FILE . REDIS)
这里主要碰到了2个对象 :
- SessionHolder : Session 处理器 , 用于初始化及处理
- GlobalSession : GlobalSession 是 seata协调器DefaultCoordinator管理维护的重要部件
Session 主要处理那些数据 :
- String ROOT_SESSION_MANAGER_NAME = “root.data”;
- String ASYNC_COMMITTING_SESSION_MANAGER_NAME = “async.commit.data”;
- String RETRY_COMMITTING_SESSION_MANAGER_NAME = “retry.commit.data”;
- String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = “retry.rollback.data”;
SessionHolder 中会为这四个数据创建 Object ,存入 Session
3.8 构建和初始化 DefaultCoordinator
作用 : DefaultCoordinator 为TC协调器 (事务控制协调器)
结构 :
继承AbstractTCInboundHandler接口(为TC接受到RM和TM的request请求数据)
实现TransactionMessageHandler接口(处理RPC消息)
实现ResourceManagerInbound接口 (处理发送至RM的branchCommit,branchRollback请求)
整体结构如下图所示 :
DefaultCoordinator init 流程
1 | java复制代码 public void init() { |
3.9 ShutdownHook 的作用
ShutdownHook 主要用于销毁流程
1 | java复制代码ShutdownHook.getInstance().addDisposable(coordinator); |
3.10 XID 的作用
之前看到 , 前面对 XID 进行了设置 , 加入了 Address 和 Port , 它实际上会在后面设置全局ID的时候使用 :
1 | java复制代码if (NetUtil.isValidIp(parameterParser.getHost(), false)) { |
3.11 nettyRemotingServer.init() 初始化流程
Step Start : init 入口
1 | java复制代码public void init() { |
Step 2 : registerProcessor 处理流程
1 | java复制代码 |
init 处理操作
1 | java复制代码 |
PS : 后续是 Netty ServerBootstrap 的处理 , 考虑以后说 Netty 的时候单独说说
这里简单说一下就是 :
- group : 为父(接受方)和子(客户端)设置EventLoopGroup。这些EventLoopGroup的用于处理ServerChannel和Channel的所有事件和IO。
- channel : 用于从中创建Channel实例的类。
- ChannelOption : 允许指定一个ChannelOption,用于创建Channel实例。
- childOption : 允许指定一个 ChannelOption,用于创建Channel实例(在接受方接受了 Channel之后)。
3.12 其他
System 处理
初始化异常 : System.exit(-1)
初始化成功 : System.exit(0)
终止当前运行的Java虚拟机。参数作为一种状态代码;按照约定,非零状态码表示异常终止。
调用System.exit(n)有效地等价于调用:Runtime.getRuntime ().exit(n)
总结
因为是一篇为了入门的文章 , 所以文章相对简单 , 也没有涉及太多的流程 .
主要是为了完善整个体系 , 并且开启 Seata 的流程.
核心步骤 :
1 . MetricsManager.get().init() :初始化指标
2 . new ThreadPoolExecutor : 构建线程池
3 . new NettyRemotingServer : 构建 Netty Server
4 . SessionHolder.init
5 . 构建全局事务 ID
6 . nettyRemotingServer.init()
更新日志
- V20210805 : 添加 总结
本文转载自: 掘金