盘点 Seata Seata Server 配置流程

这是我参与更文挑战的第9天,活动详情查看: 更文挑战

总文档 :文章目录

Github : github.com/black-ant

一 .前言

这是 Seata 部分的第二篇 , 主要来说一说 Seata Server 中如何完成配置的处理 , Seata 的启动可以参考 Seata 的启动流程 , 本篇文档主要包含如下内容 :

  • 配置的扫描
  • 配置的加载
  • - 配置的动态加载(下一篇)

二 . 配置的扫描

上一篇我们知道 , 配置主要有2个 , nacos.conf / registry.conf , 这2个文件主要由ConfigurationFactory 加载完成

配置处理的起点 :

先来看一下配置的起点 , 在前面说了 , 在处理 main 的时候 , 处理过 ParameterParser ,一切都是从那里开始 :

  • Step 1 : Server # main 函数中 , 发起过 ParameterParser
  • Step 2 : ParameterParser # init , 发起 ConfigurationFactory 的构建 ,同时传入 Mode 类型
1
2
3
4
5
6
7
8
9
java复制代码private void init(String[] args) {
if (StringUtils.isBlank(storeMode)) {

//.......
storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
SERVER_DEFAULT_STORE_MODE);
}

}

PS : 在读取网络配置之前 , 会优先从本地配置中获取连接信息

配置的读取类 :

配置这里使用 AbstractConfiguration 进行配置的读取 , 此处有以下几种配置类 :

  • FileConfiguration
  • SpringCloudConfiguration
  • ApolloConfiguration
  • NacosConfiguration
  • ConsulConfiguration
  • EtcdConfiguration
  • ZookeeperConfiguration

seata-AbstractConfiguration-system.png

2.1 配置的扫描流程

registry.conf 的读取主要是由 ConfigurationFactory 完成 , 来看一下主要的逻辑 :

2.1.1 : 配置类的初始化

在获取 ConfigurationFactory 实例的时候, 会触发一个静态代码块 , 调用 ConfigurationFactory # load , 主要分为 5 个流程 :

  • Step 1 : 依次获取配置类名称
  • Step 2 : 如果未设置特色配置 , 此处会获得名称 -> registry
  • Step 3 : envValue 可以看成 Spring 中的 Profile , 会获得特定环境的配置
  • Step 4 : 通过获取的参数 , 构建 Configuration 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码
private static final String SYSTEM_PROPERTY_SEATA_CONFIG_NAME = "seata.config.name";
private static final String REGISTRY_CONF_DEFAULT = "registry";


private static void load() {
// Step 1 : 依次获取配置类名称
// 从系统中获取配置文件名称 , 此处通常为 null
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
if (seataConfigName == null) {
// ENV_SEATA_CONFIG_NAME -> SEATA_CONFIG_NAME
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}

// Step 2 : 如果未设置特色配置 , 此处会获得名称 -> registry
if (seataConfigName == null) {
seataConfigName = REGISTRY_CONF_DEFAULT;
}

// Step 3 : envValue 可以看成 Spring 中的 Profile , 会获得特定环境的配置
String envValue = System.getProperty(ENV_PROPERTY_KEY);
if (envValue == null) {
envValue = System.getenv(ENV_SYSTEM_KEY);
}

// Step 4 : 通过获取的参数 , 构建 Configuration 对象 -> 2.2.2
Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName,
false) : new FileConfiguration(seataConfigName + "-" + envValue, false);
Configuration extConfiguration = null;

try {
// Step 5 : SPI 扩展 , 通过ExtConfigurationProvider的provide方法 , 替换扩展配置 -> Pro:211001
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
// End : 存在扩展配置,则返回扩展配置实例,否则返回文件配置实例
CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;
}

// Pro:211001 SPI 的扩展方式
例如使用 seata-spring-boot-starer时 , 会通过 SpringBootConfigurationProvider 进行扩展 ,此时会通过 application.properties/application.yaml中获取参数

2.2.2 : FileConfiguration 的构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码
public FileConfiguration(String name, boolean allowDynamicRefresh) {
// Step 1 :获取 registry.conf 的 File 对象
File file = getConfigFile(name);
if (file == null) {
targetFilePath = null;
} else {
targetFilePath = file.getPath();
// Step 2 : 加载 FileConfig , 这个 对象 比我们想的更大 -> PS222001
fileConfig = FileConfigFactory.load(file, name);
}

if (targetFilePath == null) {
fileConfig = FileConfigFactory.load();
this.allowDynamicRefresh = false;
} else {
targetFileLastModified = new File(targetFilePath).lastModified();
this.allowDynamicRefresh = allowDynamicRefresh;
}

this.name = name;

// 顺带构建了一个连接池 , 该连接池会用于后面发起 ConfigOperateRunnable
configOperateExecutor = new ThreadPoolExecutor(CORE_CONFIG_OPERATE_THREAD, MAX_CONFIG_OPERATE_THREAD,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
new NamedThreadFactory("configOperate", MAX_CONFIG_OPERATE_THREAD));
}

Step 1 : getConfigFile 的获取流程

1
2
3
4
5
6
7
8
9
10
java复制代码private File getConfigFile(String name) {
// 是否为 file: 开头
boolean filePathCustom = name.startsWith(SYS_FILE_RESOURCE_PREFIX);
// 获取 File 路径 , 此处是 registry
String filePath = filePathCustom ? name.substring(SYS_FILE_RESOURCE_PREFIX.length()) : name;
String decodedPath = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name());

// 此处获得最终的 File : D:\java\study\seata_code\server\target\classes\registry.conf
File targetFile = getFileFromFileSystem(decodedPath);
}

Step 2 : FileConfigFactory.load(file, name) 加载流程

1
2
3
4
5
java复制代码public static FileConfig load(File targetFile, String name) {
String fileName = targetFile.getName();
String configType = getConfigType(fileName);
return loadService(configType, new Class[]{File.class, String.class}, new Object[]{targetFile, name});
}

Step 3 : 通过 EnhancedServiceLoader 进行加载

1
2
3
4
java复制代码private static FileConfig loadService(String name, Class[] argsType, Object[] args) {
FileConfig fileConfig = EnhancedServiceLoader.load(FileConfig.class, name, argsType, args);
return fileConfig;
}

Step 4 : EnhancedServiceLoader # load 流程

1
2
3
4
5
6
7
8
9
java复制代码 private S loadExtension(String activateName, ClassLoader loader, Class[] argTypes,
Object[] args) {
// 获取扩展配置类型 , 此处主要是
loadAllExtensionClass(loader);
ExtensionDefinition cachedExtensionDefinition = getCachedExtensionDefinition(activateName);
return getExtensionInstance(cachedExtensionDefinition, loader, argTypes, args);
}

// PS : 最终会通过 constructor.newInstance(args) 将 File 转换为 SimpleFileConfig

PS222001 : FileConfig 对象详情

该对象为一个接口 , 包括2个实现类 YamlFileConfig , SimpleFileConfig , 看下图可以知道 ,除了Nacos 配置 , 还有 Java 等等很多其他的配置

image.png

2.2.3 : 配置的加载

当成进行了 ConfigurationFactory.getInstance() 初始化后 , 会执行 getConfig 获取参数 , 最终调用 getLatestConfig :

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码
public String getLatestConfig(String dataId, String defaultValue, long timeoutMills) {

String value = getConfigFromSysPro(dataId);
if (value != null) {
return value;
}
// 构建一个 ConfigFuture 用于获取参数 : {"dataId":"config.type","operation":"GET","timeout":true}
ConfigFuture configFuture = new ConfigFuture(dataId, defaultValue, ConfigOperation.GET, timeoutMills);
configOperateExecutor.submit(new ConfigOperateRunnable(configFuture));
Object getValue = configFuture.get();
return getValue == null ? null : String.valueOf(getValue);
}

ConfigOperateRunnable 的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public void run() {
if (configFuture != null) {
if (configFuture.isTimeout()) {
setFailResult(configFuture);
return;
}
try {
if (allowDynamicRefresh) {
long tempLastModified = new File(targetFilePath).lastModified();
if (tempLastModified > targetFileLastModified) {
FileConfig tempConfig = FileConfigFactory.load(new File(targetFilePath), name);
if (tempConfig != null) {
fileConfig = tempConfig;
targetFileLastModified = tempLastModified;
}
}
}
if (configFuture.getOperation() == ConfigOperation.GET) {
String result = fileConfig.getString(configFuture.getDataId());
configFuture.setResult(result);
} else if (configFuture.getOperation() == ConfigOperation.PUT) {
configFuture.setResult(Boolean.TRUE);
} else if (configFuture.getOperation() == ConfigOperation.PUTIFABSENT) {
configFuture.setResult(Boolean.TRUE);
} else if (configFuture.getOperation() == ConfigOperation.REMOVE) {
configFuture.setResult(Boolean.TRUE);
}
} catch (Exception e) {
setFailResult(configFuture);
}
}
}

三 . 配置的加载

在上一个步骤中 , 是对 ConfigurationFactory 的创建 , 在创建过程中 , 对配置文件进行了扫描处理 , 后面会通过 buildConfiguration 来 创建 Configuration 对象 :

3.1 配置的加载入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
java复制代码// 此处以 Nacos 配置为例 : 
private static Configuration buildConfiguration() {
// 从 File 中获取 数据类型 -> 上文中获取
String configTypeName = CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);

if (StringUtils.isBlank(configTypeName)) {
throw new NotSupportYetException("config type can not be null");
}
// 获取 Config 类型 -> Nacos
ConfigType configType = ConfigType.getType(configTypeName);

Configuration extConfiguration = null;
Configuration configuration;
// 对 File 进行专门的处理
if (ConfigType.File == configType) {
String pathDataId = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
ConfigurationKeys.FILE_ROOT_CONFIG, FILE_TYPE, NAME_KEY);
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
configuration = new FileConfiguration(name);
try {
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
} else {
// 3.2 -> 通过 ConfigurationProvider 处理
configuration = EnhancedServiceLoader
.load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide();
}


try {
Configuration configurationCache;
if (null != extConfiguration) {
configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
} else {
// 3.3 ConfigurationCache 代理
configurationCache = ConfigurationCache.getInstance().proxy(configuration);
}
if (null != configurationCache) {
extConfiguration = configurationCache;
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load configurationCacheProvider:{}", e.getMessage(), e);
}
return null == extConfiguration ? configuration : extConfiguration;
}


// Step 4. NacosConfigurationProvider 的处理
@LoadLevel(name = "Nacos", order = 1)
public class NacosConfigurationProvider implements ConfigurationProvider {
@Override
public Configuration provide() {
return NacosConfiguration.getInstance();
}
}

知识点一 : ConfigurationProvider

3.2 NacosConfiguration 配置

我们以一个 NacosConfiguration 来看整个的处理 , 其实都是类型的 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码// NacosConfiguration 成员变量
private static volatile ConfigService configService; // 注意 volatile

// NacosConfiguration 构造函数
private NacosConfiguration() {
if (configService == null) {
try {
// Step 1 : 构建 ConfigService
configService = NacosFactory.createConfigService(getConfigProperties());
// Step 2 : 初始化 Seata Config
initSeataConfig();
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
}

Step 1 : NacosFactory.createConfigService(getConfigProperties()); -> PS:32001

这里 getConfigProperties() 会获得一个 Properties 对象 ,其中为 Nacos 的配置属性 , 主要流程如下 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码private static Properties getConfigProperties() {
// Step 1 : 准备 Properties 对象
Properties properties = new Properties();

// Step 2 : 判断 System 是否存在 endpoint 和 serverAddr 属性
if (System.getProperty(ENDPOINT_KEY) != null) {
properties.setProperty(ENDPOINT_KEY, System.getProperty(ENDPOINT_KEY));
properties.put(ACCESS_KEY, Objects.toString(System.getProperty(ACCESS_KEY), ""));
properties.put(SECRET_KEY, Objects.toString(System.getProperty(SECRET_KEY), ""));
} else if (System.getProperty(PRO_SERVER_ADDR_KEY) != null) {
properties.setProperty(PRO_SERVER_ADDR_KEY, System.getProperty(PRO_SERVER_ADDR_KEY));
} else {
// Step 3 : 获取 Nacos Address 路径
String address = FILE_CONFIG.getConfig(getNacosAddrFileKey());
if (address != null) {
properties.setProperty(PRO_SERVER_ADDR_KEY, address);
}
}

if (System.getProperty(PRO_NAMESPACE_KEY) != null) {
properties.setProperty(PRO_NAMESPACE_KEY, System.getProperty(PRO_NAMESPACE_KEY));
} else {
// Step 4 : 设置 namespace
String namespace = FILE_CONFIG.getConfig(getNacosNameSpaceFileKey());
if (namespace == null) {
namespace = DEFAULT_NAMESPACE;
}
properties.setProperty(PRO_NAMESPACE_KEY, namespace);
}
String userName = StringUtils.isNotBlank(System.getProperty(USER_NAME))
? System.getProperty(USER_NAME)
: FILE_CONFIG.getConfig(getNacosUserName());
if (StringUtils.isNotBlank(userName)) {
String password = StringUtils.isNotBlank(System.getProperty(PASSWORD))
? System.getProperty(PASSWORD)
: FILE_CONFIG.getConfig(getNacosPassword());
if (StringUtils.isNotBlank(password)) {
properties.setProperty(USER_NAME, userName);
properties.setProperty(PASSWORD, password);
}
}

// 通常继承配置拿到的是 {"namespace":"","serverAddr":"127.0.0.1:8848"}
return properties;
}

PS: 此处并不是从 Nacos 获取配置!

Step 2 : initSeataConfig 运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码
private static void initSeataConfig() {
try {
// nacosDataId -> seataServer.properties
String nacosDataId = getNacosDataId();

// Step 1 : 此处才是从 NacosConfigService 中获取 Nacos 配置 (getNacosGroup -> SEATA_GROUP)
String config = configService.getConfig(nacosDataId, getNacosGroup(), DEFAULT_CONFIG_TIMEOUT);
if (StringUtils.isNotBlank(config)) {
try (Reader reader
= new InputStreamReader(new ByteArrayInputStream(config.getBytes()), StandardCharsets.UTF_8)) {
// 流处理 , 加载到 Properties 中
seataConfig.load(reader);
}
NacosListener nacosListener = new NacosListener(nacosDataId, null);
// 添加 NacosListener , 该 Listernr 用于动态通知
configService.addListener(nacosDataId, getNacosGroup(), nacosListener);
}
} catch (NacosException | IOException e) {
LOGGER.error("init config properties error", e);
}
}

附加一 : NacosConfigService

该类是 com.alibaba.nacos.client.config 的工具类 , 后续我们来分析配置的动态处理

3.3 ConfigurationCache

此处有点意思 , 还能这么缓存 ,通过把 get 方法代理 ,实现了配置的缓存操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码private static final ConcurrentHashMap<String, ObjectWrapper> CONFIG_CACHE = new ConcurrentHashMap<>();


public Configuration proxy(Configuration originalConfiguration) {
return (Configuration)Enhancer.create(Configuration.class,
(MethodInterceptor)(proxy, method, args, methodProxy) -> {
// 判断是 get 方法且不是 LatestConfig
if (method.getName().startsWith(METHOD_PREFIX)
&& !method.getName().equalsIgnoreCase(METHOD_LATEST_CONFIG)) {
String rawDataId = (String)args[0];
ObjectWrapper wrapper = CONFIG_CACHE.get(rawDataId);
// 获取参数名
String type = method.getName().substring(METHOD_PREFIX.length());
if (!ObjectWrapper.supportType(type)) {
type = null;
}
if (null == wrapper) {
Object result = method.invoke(originalConfiguration, args);
// ObjectWrapper -> 包装器 , 数据只在非空时存在于缓存中
if (result != null) {
wrapper = new ObjectWrapper(result, type);
CONFIG_CACHE.put(rawDataId, wrapper);
}
}
return wrapper == null ? null : wrapper.convertData(type);
}
// 没有缓存则走原方法
return method.invoke(originalConfiguration, args);
});
}

总结

这篇文章相对上篇 , 稍稍深入了一下.

附录

查找资料的时候 ,发现了一个非常好的图片 , 在完善这篇文章的时候 , 拜读了很久 , 强烈建议大家看一下原文档 , 说的比我清楚多了 @ booogu.top/2021/02/28/…

seata_config_initialization.png

PS : 脑子说我也画的出来 , 手说你会画个P

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%