盘点Sharding-JDBC 读写分离

总文档 :文章目录

Github : github.com/black-ant

一 . 前言

本来也没打算看读写分离的代码 , 但是…他他他又出问题了 , 按照官方文档尝试配置了 读写分离后 , 并没有出现期待的结果

不过幸运的是 , 能跑起来 , 大概率就是读写分离的逻辑没有生效

二 . 源码分析

和上一篇最大的难点就是 , 不知道从哪里开始看起…

这个时候官方文档的作用就出来了 , 它告诉我有一个接口 ShardingSphereAlgorithm , 于是查找其实现类 , 有所发现

Sharding-jdbc-balance.png

可以看到 ,读写分离在 Sharding 中也有着负载均衡的作用 , 对应的接口为 ReplicaLoadBalanceAlgorithm

2.1 入口查找

先看一下接口中需要实现什么

1
2
3
4
5
6
7
8
java复制代码// 接口中提供了一个待实现的方法
public interface ReplicaLoadBalanceAlgorithm extends ShardingSphereAlgorithm {

// name : 查询逻辑数据源名称
// primaryDataSourceName : 主要数据源的名称
// replicaDataSourceNames : 副本数据源的名称
String getDataSource(String name, String primaryDataSourceName, List<String> replicaDataSourceNames);
}

我们随机选择一个 ShardingSphereAlgorithm 逆推一下流程 , 最终发现了类

  • ReplicaQueryDataSourceRouter # route
  • ReplicaQuerySQLRouter # createRouteContext
  • PartialSQLRouteExecutor # route

我们按照这个流程逆推一下 :

Step End : ReplicaQueryDataSourceRouter 最终规则处理

1
2
3
4
5
6
7
8
9
10
java复制代码C55- ReplicaQueryDataSourceRouter
M55_01- route(final SQLStatement sqlStatement)
?- route 规则类 , 干了如下2件事
- 是否为 isPrimaryRoute , 如果是 , 直接获取 getPrimaryDataSourceName
- 如果不是 , 通过规则获取 rule.getLoadBalancer()

// PS : 打个断点发现和预料的一样 , 并没有执行相关的逻辑 , 继续往上推导

C101- KernelProcessor ,这是 route 的核心处理类 , 再看一下重要的流程
M101_01- generateExecutionContext

一路往上 , 发现了核心处理类 KernelProcessor , 到了这里就基本能确定 , 还是会通过ShardingSpherePreparedStatement 处理

2.2 正向流程梳理

知道了主流程 , 我们就可以推断出正向流程 :

  1. C74- ShardingSpherePreparedStatement
  2. C101- KernelProcessor
    • M101_01- generateExecutionContext
  3. C102- SQLRouteEngine
    • route
  4. C103- PartialSQLRouteExecutor
    • route
  5. C60- ReplicaQuerySQLRouter
    • createRouteContext – 终于到了这个核心的地方

Step 1 : ShardingSpherePreparedStatement 创建 ExecutionContext

直接来到主入口进行调试 : ShardingSpherePreparedStatement , 其中有2个主要的方法 executeQuery() / executeUpdate()

上一次说了executeUpdate , 这一次主要走 executeQuery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码C74- ShardingSpherePreparedStatement
M74_10- executeQuery
1- 创建 createExecutionContext() -> M101_01
2- 通过 getInputGroups() 获取 InputGroup 集合
3- reparedStatementExecutor.executeQuery(inputGroups) 执行 Query
4- 合并结果
5- 构建 ShardingSphereResultSet 返回
M74_11- createExecutionContext
1- createLogicSQL() 构建逻辑 SQL , 即需要查询的 SQL -> PS :
2- 通过 KernelProcessor 构建一个 ExecutionContext
?- 这里就和上文关联起来了 ->

// PS:M74_11 createLogicSQL 创建的 SQL
select blogentity0_.id as id1_0_, blogentity0_.author as author2_0_, blogentity0_.column_id as column_i3_0_, blogentity0_.date as date4_0_, blogentity0_.title as title5_0_, blogentity0_.title_id as title_id6_0_ from t_blog_0 blogentity0_


// M74_10 源代码
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious();
// 核心处理 , 此处已经把需要处理的 SQL 放入容器中
executionContext = createExecutionContext();
List<QueryResult> queryResults;
if (ExecutorConstant.MANAGED_RESOURCE) {
Collection<InputGroup<StatementExecuteUnit>> inputGroups = getInputGroups();
cacheStatements(inputGroups);
reply();
queryResults = preparedStatementExecutor.executeQuery(inputGroups);
} else {
queryResults = rawExecutor.executeQuery(getRawInputGroups(), new RawSQLExecutorCallback());
}
MergedResult mergedResult = mergeQuery(queryResults);
result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}

这是一个核心的锚点 , 找到了这个点 , 后面的就好分析了

PS : 后续整体流程和之前看的一样 , 主要是以下流程

Step 2 : KernelProcessor 构建 ExecutionContext 流程主逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码
C101- KernelProcessor ,这是 route 的核心处理类 , 再看一下重要的流程
M101_01- generateExecutionContext
1- 获取 rules 集合
?- 此处解决了第一个配置问题 , 详见-> PS:M101_01_01
?- 配置改好后 , 可以看到 rule 已经正常了 -> PS:M101_01_02


public ExecutionContext generateExecutionContext(final LogicSQL logicSQL, final ShardingSphereSchema schema, final ConfigurationProperties props) {
Collection<ShardingSphereRule> rules = schema.getRules();
// RouteEngine 生成 , 其中包好了 rules 对象其相关属性
SQLRouteEngine sqlRouteEngine = new SQLRouteEngine(rules, props);
// PS:M101_01_10
SQLStatementContext<?> sqlStatementContext = logicSQL.getSqlStatementContext();

// 核心流程
RouteContext routeContext = sqlRouteEngine.route(logicSQL, schema);
SQLRewriteEntry rewriteEntry = new SQLRewriteEntry(schema.getMetaData().getSchemaMetaData().getConfiguredSchemaMetaData(), props, rules);
SQLRewriteResult rewriteResult = rewriteEntry.rewrite(logicSQL.getSql(), logicSQL.getParameters(), sqlStatementContext, routeContext);
Collection<ExecutionUnit> executionUnits = ExecutionContextBuilder.build(schema.getMetaData(), rewriteResult, sqlStatementContext);
return new ExecutionContext(sqlStatementContext, executionUnits, routeContext);
}

PS:M101_01_10 相关属性一览
image.png

Step 3 : PartialSQLRouteExecutor 通过 rules 生成 result

注意 , 这里会根据是否已经创建了 RouteUnits 选择是否创建还是添加

处理流程 , 下面2个 问题解决完成后 , 配置正常 , 此时可以看到 rule 存在

1
2
3
4
5
6
java复制代码// 我们根据上面2个流程着重分析其中2步
C103- PartialSQLRouteExecutor
M103_01- route
FOR- 遍历所有的 rule , 根据是否生成了 RouteUnits 分别调用
- entry.getValue().createRouteContext(logicSQL, schema, entry.getKey(), props) -> M104_01
- entry.getValue().decorateRouteContext(result, logicSQL, schema, entry.getKey(), props -> M104_02

PS:M101_01_02 rules 列表

Sharding-jdbc-rules-list.jpg

Step 4 : ReplicaQuerySQLRouter / rules 处理主逻辑

这个逻辑中有 2个重要的方法

  • createRouteContext : 创建 RouteContext
  • decorateRouteContext : 补充 RouteContext
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
java复制代码C60- ReplicaQuerySQLRouter 
M60_01- createRouteContext
- 获取首个 rule , 构建 ReplicaQueryDataSourceRouter
- 调用 route 获取 需要执行的 datasourceName
- 最终会调用对应的 ReplicaLoadBalanceAlgorithm
-> PS:M60_01_01
M60_02- decorateRouteContext


// 以轮询为例
C61- RoundRobinReplicaLoadBalanceAlgorithm
F61_01- ConcurrentHashMap<String, AtomicInteger> COUNTS
M61_01- getDataSource
?- 轮询主要通过属性 F61_01 完成
?- 简单点说就是每次 For 中 , 这个COUNTS 都会 + 1 , 到了和总数一样的时候 , 执行 CAS 设置为 0
?- 算法部分先不深入太多 , 后续专门整理



// M104_01 createRouteContext 源代码
public RouteContext createRouteContext(final LogicSQL logicSQL, final ShardingSphereSchema schema, final ReplicaQueryRule rule, final ConfigurationProperties props) {
RouteContext result = new RouteContext();
String dataSourceName = new ReplicaQueryDataSourceRouter(rule.getSingleDataSourceRule()).route(logicSQL.getSqlStatementContext().getSqlStatement());
result.getRouteUnits().add(new RouteUnit(new RouteMapper(DefaultSchema.LOGIC_NAME, dataSourceName), Collections.emptyList()));
return result;
}

// M104_02 decorateRouteContext 源代码
public void decorateRouteContext(final RouteContext routeContext,
final LogicSQL logicSQL, final ShardingSphereSchema schema, final ReplicaQueryRule rule, final ConfigurationProperties props) {
Collection<RouteUnit> toBeRemoved = new LinkedList<>();
Collection<RouteUnit> toBeAdded = new LinkedList<>();
for (RouteUnit each : routeContext.getRouteUnits()) {
String dataSourceName = each.getDataSourceMapper().getLogicName();
Optional<ReplicaQueryDataSourceRule> dataSourceRule = rule.findDataSourceRule(dataSourceName);
if (dataSourceRule.isPresent() && dataSourceRule.get().getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {
toBeRemoved.add(each);
String actualDataSourceName = new ReplicaQueryDataSourceRouter(dataSourceRule.get()).route(logicSQL.getSqlStatementContext().getSqlStatement());
toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));
}
}
routeContext.getRouteUnits().removeAll(toBeRemoved);
routeContext.getRouteUnits().addAll(toBeAdded);
}

// PS:M60_01_01 获取负载均衡的 datasource
rule.getLoadBalancer().getDataSource(rule.getName(), rule.getPrimaryDataSourceName(), rule.getReplicaDataSourceNames());


// M61_01 源代码
public String getDataSource(final String name, final String primaryDataSourceName, final List<String> replicaDataSourceNames) {
AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0);
COUNTS.putIfAbsent(name, count);
count.compareAndSet(replicaDataSourceNames.size(), 0);
return replicaDataSourceNames.get(Math.abs(count.getAndIncrement()) % replicaDataSourceNames.size());
}

Step 补充环节 : Rule 的处理流程

rules 的处理流程主要是以下几步 :

  • SQLRouteEngine : route 处理引擎 , 开始处理 route
  • PartialSQLRouteExecutor : 选择哪种核心方式
  • ReplicaQuerySQLRouter : 核心的处理方式
  • ReplicaQueryDataSourceRouter : 调用算法获取Source 名称
  • RoundRobinReplicaLoadBalanceAlgorithm : 算法核心

rules 的构建流程主要是以下几步 :

  • SchemaContextsBuilder : 构建 ContextsBuilder
  • ShardingSphereRulesBuilder : 通过 rules 配置执行 build 主逻辑
  • ReplicaQueryRuleBuilder : 拿到相关配置 , 构建主要的 Rule
  • ReplicaQueryRule : 被构建的对象

PS:M101_01_01 rules 集合

第一次运行的时候发现 rule 为空 , 明白找到了第一个问题 , 配置实际上是缺失的 , 看一下rule 的继承体系

ShardingSphereRule.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码// 很明显 , 看到了一个 ReplicaQueryRule , 进去看看

C56- ReplicaQueryRule
?- 其中有2个主要的构造方法 , 通过 Configuration 进行的配置
MC56_01- ReplicaQueryRule(final ReplicaQueryRuleConfiguration config)
?- 调用 ReplicaQueryRuleBuilder 进行构建 -> M57_01
MC56_02- ReplicaQueryRule(final AlgorithmProvidedReplicaQueryRuleConfiguration config)
M01- getDataSourceMapper()


// 前置条件 , 配置 Rule
public final class ReplicaQueryRule implements DataSourceRoutedRule, StatusContainedRule {

static {
ShardingSphereServiceLoader.register(ReplicaLoadBalanceAlgorithm.class);
}

private final Map<String, ReplicaLoadBalanceAlgorithm> loadBalancers = new LinkedHashMap<>();

private final Map<String, ReplicaQueryDataSourceRule> dataSourceRules;

public ReplicaQueryRule(final ReplicaQueryRuleConfiguration config) {
Preconditions.checkArgument(!config.getDataSources().isEmpty(), "Replica query data source rules can not be empty.");
config.getLoadBalancers().forEach((key, value) -> loadBalancers.put(key, ShardingSphereAlgorithmFactory.createAlgorithm(value, ReplicaLoadBalanceAlgorithm.class)));
dataSourceRules = new HashMap<>(config.getDataSources().size(), 1);
for (ReplicaQueryDataSourceRuleConfiguration each : config.getDataSources()) {
// TODO check if can not find load balancer should throw exception.
ReplicaLoadBalanceAlgorithm loadBalanceAlgorithm = Strings.isNullOrEmpty(each.getLoadBalancerName()) || !loadBalancers.containsKey(each.getLoadBalancerName())
? TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) : loadBalancers.get(each.getLoadBalancerName());
dataSourceRules.put(each.getName(), new ReplicaQueryDataSourceRule(each, loadBalanceAlgorithm));
}
}

public ReplicaQueryRule(final AlgorithmProvidedReplicaQueryRuleConfiguration config) {
Preconditions.checkArgument(!config.getDataSources().isEmpty(), "Replica query data source rules can not be empty.");
loadBalancers.putAll(config.getLoadBalanceAlgorithms());
dataSourceRules = new HashMap<>(config.getDataSources().size(), 1);
for (ReplicaQueryDataSourceRuleConfiguration each : config.getDataSources()) {
// TODO check if can not find load balancer should throw exception.
ReplicaLoadBalanceAlgorithm loadBalanceAlgorithm = Strings.isNullOrEmpty(each.getLoadBalancerName()) || !loadBalancers.containsKey(each.getLoadBalancerName())
? TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) : loadBalancers.get(each.getLoadBalancerName());
dataSourceRules.put(each.getName(), new ReplicaQueryDataSourceRule(each, loadBalanceAlgorithm));
}
}

//...........

}


C57- ReplicaQueryRuleBuilder
M57_01- build(final ReplicaQueryRuleConfiguration ruleConfig, final Collection<String> dataSourceNames)
- new ReplicaQueryRule(ruleConfig) 构建 -> MC56_01

// 看到这里大概知道构建的位置了 , 上文看到过的类 C51- ShardingSphereRulesBuilder
C51- ShardingSphereRulesBuilder
M51_01- build(final Collection<RuleConfiguration> ruleConfigurations, final Collection<String> dataSourceNames)
?- 该方法会通过 RuleConfiguration 集合构建 Rule 对象
?- 再往上就不需要看了 , 最终还是会回到构建 ShardingSphereDataSource , 只需要根据配置类猜测配置方式即可 -> PS:M51_01_01

Step 附录 : 读写分离的相关配置分析方式

配置类还是之前的几个

  • SpringBootConfiguration
  • YamlReplicaQueryRuleSpringBootConfiguration : spring.shardingsphere.rules
    • YamlReplicaQueryRuleConfiguration : replicaQuery
      • Map<String, YamlReplicaQueryDataSourceRuleConfiguration> : dataSources
      • Map<String, YamlShardingSphereAlgorithmConfiguration> : loadBalancers

我算是明白了 , 也不知道是不是找的文档不对 ,指望官方文档还是不行的 , 最后还是要靠自己

这一小节就是如果通过 Sharding 的 配置类 Bean , 反推出相关的配置.

PS:M51_01_01 配置类的推测

1
2
3
4
5
6
7
8
9
10
11
java复制代码// 点开可以看到 , 其中只有2个属性
public final class ReplicaQueryRuleConfiguration implements RuleConfiguration {
private final Collection<ReplicaQueryDataSourceRuleConfiguration> dataSources;
private final Map<String, ShardingSphereAlgorithmConfiguration> loadBalancers;
}


// 这里再来说明一下 , 如何逆推配置 , Sharding 的配置解析方式是很常见的方式 , 按照类型和方法名去反写一个就行了 , 比如这个配置
spring.shardingsphere.rules.sharding.tables.t_blog.key-generate-strategy.column=id

- sharding 来源 : YamlShardingRuleSpringBootConfiguration

sharding-config.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码我们按照这个方法 , 结合官方文档 , 逆推一下 , 可以找到如下几个类 : 

- YamlReplicaQueryRuleSpringBootConfiguration : spring.shardingsphere.rules
- YamlReplicaQueryRuleConfiguration : replicaQuery
- Map<String, YamlReplicaQueryDataSourceRuleConfiguration> : dataSources
- Map<String, YamlShardingSphereAlgorithmConfiguration> : loadBalancers

public final class YamlReplicaQueryDataSourceRuleConfiguration implements YamlConfiguration {
private String name;
private String primaryDataSourceName;
private List<String> replicaDataSourceNames = new ArrayList<>();
private String loadBalancerName;
private Properties props = new Properties();
}

public final class YamlShardingSphereAlgorithmConfiguration implements YamlConfiguration {
private String type;
private Properties props = new Properties();
}

// 逆推得到读写分离的配置信息 :
spring.shardingsphere.rules.replica-query.data-sources.ds_0.primary-data-source-name=primary_ds_0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.replica-data-source-names=primary_ds_0_replica_0, primary_ds_0_replica_1
spring.shardingsphere.rules.replica-query.data-sources.ds_1.primary-data-source-name=primary_ds_1
spring.shardingsphere.rules.replica-query.data-sources.ds_1.replica-data-source-names=primary_ds_1_replica_0, primary_ds_1_replica_1

// 小技巧 : 拿着 spring.shardingsphere.rules.replica-query 去网上搜 , 可能会有意想不到的惊喜

最终配置结果 :
spring.shardingsphere.rules.replica-query.data-sources.ds_0.name=rq-ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.primary-data-source-name=ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.replica-data-source-names=ds0,ds1,ds2
# Load balance 算法
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.type=round_robin
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.props.null=

// 请求地址 , 可以看到分别从 ds0,ds1,ds2 获取信息 , 插入数据只会插入 ds0
// PS : 读写分离一般配合数据库主从同步做的

这不看源码你叫我怎么猜的出来??????????????

image-20210508162525556.png

三 . 配置

这里把完整的配置贴出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
properties复制代码server.port=8085
##Jpa配置
spring.jpa.database=mysql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=none
# 配置真实数据源 ds0,ds1,ds2
spring.shardingsphere.datasource.names=ds0,ds1,ds2
spring.shardingsphere.datasource.common.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.common.driver-class-name=com.mysql.jdbc.Driver
# 配置第 1 个数据源
spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://127.0.0.1:3306/database0?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=123456
# 配置第 2 个数据源
spring.shardingsphere.datasource.ds1.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds1.jdbc-url=jdbc:mysql://127.0.0.1:3306/database1?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=123456
# 配置从库 3
spring.shardingsphere.datasource.ds2.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds2.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds2.jdbc-url=jdbc:mysql://127.0.0.1:3306/database2?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC
spring.shardingsphere.datasource.ds2.username=root
spring.shardingsphere.datasource.ds2.password=123456
# Sharding 读写分离配置
# 是否显示 SQL
spring.shardingsphere.props.sql.show=true
# 是否 Bean 覆盖
spring.main.allowBeanDefinitionOverriding=true
# 官网提供的配置 , 这是哪里的?????????????????????
#spring.shardingsphere.rules.readwrite-splitting.data-sources.dataSource.primary-data-source-name= ds0
#spring.shardingsphere.rules.readwrite-splitting.data-sources.dataSource.replica-data-source-names= ds1,ds2
#spring.shardingsphere.rules.readwrite-splitting.data-sources.dataSource.load-balancer-name= round_robin_type
# 实际配置信息
spring.shardingsphere.rules.replica-query.data-sources.ds_0.name=rq-ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.primary-data-source-name=ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.replica-data-source-names=ds0,ds1,ds2
# Load balance 算法
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.type=round_robin
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.props.null=

总结

读写分离其实很简单 , 主要是不知道新版的配置方式 , 希望这篇文档对以后排错能有所帮助

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%