盘点 Seata Client 端配置流程

这是我参与更文挑战的第13天,活动详情查看: 更文挑战

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

一 . 前言

这一篇来看一下 Seate Client 端的配置文件 , 以及配置的方式和流程 , 先来看一下有哪些配置 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
yml复制代码#====================================Seata Config===============================================
seata:
enabled: true
application-id: business-seata-example
tx-service-group: business-service-seata-service-group # 事务群组(可以每个应用独立取名,也可以使用相同的名字)
client:
rm-report-success-enable: true
rm-table-meta-check-enable: false # 自动刷新缓存中的表结构(默认false)
rm-report-retry-count: 5 # 一阶段结果上报TC重试次数(默认5)
rm-async-commit-buffer-limit: 10000 # 异步提交缓存队列长度(默认10000)
rm:
lock:
lock-retry-internal: 10 # 校验或占用全局锁重试间隔(默认10ms)
lock-retry-times: 30 # 校验或占用全局锁重试次数(默认30)
lock-retry-policy-branch-rollback-on-conflict: true # 分支事务与其它全局回滚事务冲突时锁策略(优先释放本地锁让回滚成功)
tm-commit-retry-count: 3 # 一阶段全局提交结果上报TC重试次数(默认1次,建议大于1)
tm-rollback-retry-count: 3 # 一阶段全局回滚结果上报TC重试次数(默认1次,建议大于1)
undo:
undo-data-validation: true # 二阶段回滚镜像校验(默认true开启)
undo-log-serialization: jackson # undo序列化方式(默认jackson)
undo-log-table: undo_log # 自定义undo表名(默认undo_log)
log:
exceptionRate: 100 # 日志异常输出概率(默认100)
support:
spring:
datasource-autoproxy: true
service:
vgroup-mapping:
my_test_tx_group: default # TC 集群(必须与seata-server保持一致)
enable-degrade: false # 降级开关
disable-global-transaction: false # 禁用全局事务(默认false)
grouplist:
default: 127.0.0.1:8091
transport:
shutdown:
wait: 3
thread-factory:
boss-thread-prefix: NettyBoss
worker-thread-prefix: NettyServerNIOWorker
server-executor-thread-prefix: NettyServerBizHandler
share-boss-worker: false
client-selector-thread-prefix: NettyClientSelector
client-selector-thread-size: 1
client-worker-thread-prefix: NettyClientWorkerThread
type: TCP
server: NIO
heartbeat: true
serialization: seata
compressor: none
enable-client-batch-send-request: true # 客户端事务消息请求是否批量合并发送(默认true)
registry:
file:
name: file.conf
type: nacos
nacos:
server-addr: localhost:8848
namespace:
cluster: default
config:
file:
name: file.conf
type: nacos
nacos:
namespace:
server-addr: localhost:8848

二 . 配置对象

2.1 Seata 顶级对象

该对象对应的 seata.xxx 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码@ConfigurationProperties(prefix = "seata")
@EnableConfigurationProperties(SpringCloudAlibabaConfiguration.class)
public class SeataProperties {
/**
* 是否启用自动配置
*/
private boolean enabled = true;
/**
* application id
*/
private String applicationId;
/**
* 事务服务组
*/
private String txServiceGroup;
/**
* 是否启用数据源bean的自动代理
*/
private boolean enableAutoDataSourceProxy = true;
/**
* 数据源代理模式
*/
private String dataSourceProxyMode = DefaultValues.DEFAULT_DATA_SOURCE_PROXY_MODE;
/**
* 是否使用JDK代理而不是CGLIB代理
*/
private boolean useJdkProxy = false;
/**
* 指定哪个数据源bean不符合自动代理的条件
*/
private String[] excludesForAutoProxying = {};

}

2.2 seata.client 配置对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码 client:
rm-report-success-enable: true
rm-table-meta-check-enable: false # 自动刷新缓存中的表结构(默认false)
rm-report-retry-count: 5 # 一阶段结果上报TC重试次数(默认5)
tm-commit-retry-count: 3 # 一阶段全局提交结果上报TC重试次数(默认1次,建议大于1)
tm-rollback-retry-count: 3 # 一阶段全局回滚结果上报TC重试次数(默认1次,建议大于1)

// 一级目录下有如下配置 , 对应的对象为 :
public class RmProperties {
private int asyncCommitBufferLimit = 10000;
private int reportRetryCount = 5;
private boolean tableMetaCheckEnable = false;
private boolean reportSuccessEnable = false;
private boolean sagaBranchRegisterEnable = false;
private String sagaJsonParser = fastjson;
}

以上是 client 一级配置对应的类 , 我们来看一下他的子配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码// 二级目录 : seata.client.log
public class LogProperties {
private int exceptionRate = 100;
}

// 二级目录 : seata.client.undo
public class UndoProperties {
private boolean dataValidation = true;
private String logSerialization = "jackson";
private String logTable = "undo_log";
private boolean onlyCareUpdateColumns = true;
}

// 二级目录 : seata.client.support

2.3 seata.service 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码@Component
@ConfigurationProperties(prefix = "seata.service")
public class ServiceProperties implements InitializingBean {
/**
* vgroup->rgroup
*/
private Map<String, String> vgroupMapping = new HashMap<>();
/**
* group list
*/
private Map<String, String> grouplist = new HashMap<>();
/**
* degrade current not support
*/
private boolean enableDegrade = false;
/**
* disable globalTransaction
*/
private boolean disableGlobalTransaction = false;

}

2.4 seata.transport

我们来看一下配置类和默认参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码@Component
@ConfigurationProperties(prefix = TRANSPORT_PREFIX)
public class TransportProperties {
/**
* tcp, unix-domain-socket
*/
private String type = "TCP";
/**
* NIO, NATIVE
*/
private String server = "NIO";
/**
* enable heartbeat
*/
private boolean heartbeat = true;
/**
* serialization
*/
private String serialization = "seata";
/**
* compressor
*/
private String compressor = "none";

/**
* enable client batch send request
*/
private boolean enableClientBatchSendRequest = true;

}


@Component
@ConfigurationProperties(prefix = "seata.transport.thread-factory")
public class ThreadFactoryProperties {
private String bossThreadPrefix = "NettyBoss";
private String workerThreadPrefix = "NettyServerNIOWorker";
private String serverExecutorThreadPrefix = "NettyServerBizHandler";
private boolean shareBossWorker = false;
private String clientSelectorThreadPrefix = "NettyClientSelector";
private int clientSelectorThreadSize = 1;
private String clientWorkerThreadPrefix = "NettyClientWorkerThread";
}

2.5 seata.config

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Component
@ConfigurationProperties(prefix = CONFIG_PREFIX)
public class ConfigProperties {
/**
* file, nacos, apollo, zk, consul, etcd3, springCloudConfig
*/
private String type = "file";

}

ConfigNacosProperties 等 ConfigXXXProperties

注意 ,此处有多个实现类 , 每一个都对应一个配置类

image.png

2.6 seata.registry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Component
@ConfigurationProperties(prefix = "seata.registry")
public class RegistryProperties {
/**
* file, nacos, eureka, redis, zk, consul, etcd3, sofa
*/
private String type = "file";
/**
* the load balance
*/
private String loadBalance = DEFAULT_LOAD_BALANCE;
/**
* 负载均衡虚拟节点
*/
private int loadBalanceVirtualNodes = VIRTUAL_NODES_DEFAULT;

}

三 . Client 端的初始化流程

3.1 初始化配置类的流程

来看一下主配置类 SeataAutoConfiguration , 此处会扫描 io.seata.spring.boot.autoconfigure.properties 包下的所有配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
java复制代码// Client 的配置化主要基于 SeataAutoConfiguration 类来完成 Client 配置操作

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

@Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
@ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
public SpringApplicationContextProvider springApplicationContextProvider() {
return new SpringApplicationContextProvider();
}

@Bean(BEAN_NAME_FAILURE_HANDLER)
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}

@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}

/**
* 数据源配置
*/
@Configuration
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
static class SeataDataSourceConfiguration {

/**
* The bean seataDataSourceBeanPostProcessor.
*/
@Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
@ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}

/**
* The bean seataAutoDataSourceProxyCreator.
*/
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}
}
}

其中涉及到以下几个对象 :

  • C- SpringApplicationContextProvider
  • C- FailureHandler
  • C- GlobalTransactionScanner
  • C- SeataDataSourceBeanPostProcessor
  • C- SeataAutoDataSourceProxyCreator

3.1.1 SpringApplicationContextProvider

1
2
3
4
5
6
java复制代码public class SpringApplicationContextProvider implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT, applicationContext);
}
}

这里可以看到 , 调用了ObjectHolder存储了 applicationContext

之前分析 Spring 的时候说过 , Aware 可以用于创建后通知 , 此处是往枚举类中设置了一个参数.

第一次看到枚举类这样用的 ,这是不是实现了单例??

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public enum ObjectHolder {

INSTANCE;
private static final int MAP_SIZE = 8;
private static final Map<String, Object> OBJECT_MAP = new ConcurrentHashMap<>(MAP_SIZE);

public Object getObject(String objectKey) {
return OBJECT_MAP.get(objectKey);
}

public <T> T getObject(Class<T> clasz) {
return clasz.cast(OBJECT_MAP.values().stream().filter(clasz::isInstance).findAny().orElseThrow(() -> new ShouldNeverHappenException("Can't find any object of class " + clasz.getName())));
}

public Object setObject(String objectKey, Object object) {
return OBJECT_MAP.putIfAbsent(objectKey, object);
}
}

3.1.2 FailureHandler

FailureHandler 中提供了 以下方法 :

1
2
3
4
5
6
7
8
9
10
11
java复制代码public interface FailureHandler {

// 启动错误
void onBeginFailure(GlobalTransaction tx, Throwable cause);
// 提交异常
void onCommitFailure(GlobalTransaction tx, Throwable cause);
// 回退异常
void onRollbackFailure(GlobalTransaction tx, Throwable originalException);
// 重试操作
void onRollbackRetrying(GlobalTransaction tx, Throwable originalException);
}

默认实现类为 DefaultFailureHandlerImpl , 这里可配置意味着可以自己实现和扩展

3.1.3 GlobalTransactionScanner

该类中涉及几个抽象和接口 :

  • AbstractAutoProxyCreator : 使用AOP代理包装bean的BeanPostProcessor实现
  • ConfigurationChangeListener : 配置修改监听
  • InitializingBean : 初始化调用
  • ApplicationContextAware : Aware 监听处理
  • DisposableBean : 销毁处理

C- AbstractAutoProxyCreator # wrapIfNecessary

AbstractAutoProxyCreator 适用于代理 postProcess , 主要运行的是在 postProcessAfterInitialization 阶段调用 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码    // 此方法用于校验
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
// 检测是否存在 TCC 代理
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// 创建一个 TccActionInterceptor 拦截器 , 并且添加到监听器
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
// 获取目标类class
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
//
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

// 判断是否存在 GlobalTransactional 注解 , 不存在直接返回
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}

// 如果存在注解 , 则为其创建一个 GlobalTransactionalInterceptor 用于事务拦截
if (interceptor == null) {
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}

// 如果不是 AOP 代理 , 则直接放回 , 否者 , 进行 AOP 实际处理
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}

PS : AdvisedSupport 作用

简单点说 , 就是为这个类做了代理 . 使用AOP代理包装每个合格bean的BeanPostProcessor实现,在调用bean本身之前将委托给指定的拦截器。

ConfigurationChangeListener

1
2
3
4
5
6
7
8
9
10
java复制代码@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
disableGlobalTransaction = Boolean.parseBoolean(event.getNewValue().trim());
if (!disableGlobalTransaction && initialized.compareAndSet(false, true)) {
// 可以看到 , 再修改之后会重新初始化一次客户端
initClient();
}
}
}

InitializingBean

1
2
3
4
5
6
7
8
9
10
11
java复制代码public void afterPropertiesSet() {
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
if (disableGlobalTransaction) {
return;
}
if (initialized.compareAndSet(false, true)) {
// 同样 , 初始化的时候调用了 initClient
initClient();
}
}

这里看到此处初始化了 TM 及 RM Client 端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码private void initClient() {

if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);

//init RM
RMClient.init(applicationId, txServiceGroup);

// 注册销毁钩子
registerSpringShutdownHook();

}

ApplicationContextAware

此处通过 Aware 完成通知设置 ApplicationContext

1
2
3
4
java复制代码public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
this.setBeanFactory(applicationContext);
}

DisposableBean

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public void destroy() {
ShutdownHook.getInstance().destroyAll();
}

private void registerSpringShutdownHook() {
if (applicationContext instanceof ConfigurableApplicationContext) {
((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
ShutdownHook.removeRuntimeShutdownHook();
}
ShutdownHook.getInstance().addDisposable(TmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
ShutdownHook.getInstance().addDisposable(RmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
}

3.1.4 SeataDataSourceBeanPostProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(SeataDataSourceBeanPostProcessor.class);

private final List<String> excludes;
private final BranchType dataSourceProxyMode;

public SeataDataSourceBeanPostProcessor(String[] excludes, String dataSourceProxyMode) {
this.excludes = Arrays.asList(excludes);
this.dataSourceProxyMode = BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode) ? BranchType.XA : BranchType.AT;
}

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//当不在排除时,放置和初始化代理
if (!excludes.contains(bean.getClass().getName())) {
//只放置和初始化代理,不返回代理
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}

//如果为SeataDataSourceProxy,则返回原始数据源
if (bean instanceof SeataDataSourceProxy) {
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean;
}
}


SeataDataSourceProxy 是一个接口 , 它提供了多个方法 :
public interface SeataDataSourceProxy extends DataSource {

// 获取目标数据源。
DataSource getTargetDataSource();

// 得到分支类型。
BranchType getBranchType();
}

而该代理类 , 有如下几个实现 :

seata-system-SeataDataSourceProxy.png

SeataAutoDataSourceProxyCreator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
private final List<String> excludes;
private final Advisor advisor;

public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {
this.excludes = Arrays.asList(excludes);
this.advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode));
setProxyTargetClass(!useJdkProxy);
}

@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
return new Object[]{advisor};
}

@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return !DataSource.class.isAssignableFrom(beanClass) ||
SeataProxy.class.isAssignableFrom(beanClass) ||
excludes.contains(beanClass.getName());
}
}

总结

其实整篇文章最重要的一个操作就是构建了一个 globalTransactionalInterceptor , 后续主流程中我们会用上这个拦截器

注意 , 每个标注了注解的方法都会通过拦截器进行处理.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%