TX-LCN 集群下分布式事务失效问题及解决方案

前言

​ 近期新开发的服务上线后,用户反馈数据更新不成功;但经过本地测试又是正常的;考虑到本地和线上环境的区别是一个单体一个集群。考虑到这个因素,我在本地又起了一个服务,测试结果是大概率的操作失败,事务没有提交成功;由于选择的框架目前已无人维护所以只能开启debug模式来排查问题,经过两天时间的排查终于发现是TM根据模块名称找参与者造成的问题,由于框架的模块名称取值逻辑是采用项目名称,集群下的服务注册到TM时模块名称是一样的,则TM找寻不到具体的参与者,造成了事务提交失败的。

1、版本

WMErKU.png

2、流程分析

依据下图的TC代理控制处理的流程图,从源码层次来分析集群下分布式事务失效的原因。

[WMEsrF.png

从上图可知,事务的提交最终交由TM来控制,因此TM最终会通知参与方来响应事务;但在集群的环境下真正的参与方大概率没有接受到TM传来的消息;所以我们的切入点自然是TM的通知信息发送这个过程。

2.1 TM的NotifyGroup的源码分析

TM通知TC的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码private void notifyTransaction(DTXContext dtxContext, int transactionState) throws TransactionException {
List<TransactionUnit> transactionUnits = dtxContext.transactionUnits();//@@1
log.debug("group[{}]'s transaction units: {}", dtxContext.getGroupId(), transactionUnits);
for (TransactionUnit transUnit : transactionUnits) {
NotifyUnitParams notifyUnitParams = new NotifyUnitParams();
notifyUnitParams.setGroupId(dtxContext.getGroupId());
notifyUnitParams.setUnitId(transUnit.getUnitId());
notifyUnitParams.setUnitType(transUnit.getUnitType());
notifyUnitParams.setState(transactionState);
txLogger.txTrace(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify {}'s unit: {}",
transUnit.getModId(), transUnit.getUnitId());
try {
List<String> modChannelKeys = rpcClient.remoteKeys(transUnit.getModId());//@@2
if (modChannelKeys.isEmpty()) {
// record exception
throw new RpcException("offline mod.");
}
MessageDto respMsg =
rpcClient.request(modChannelKeys.get(0), MessageCreator.notifyUnit(notifyUnitParams));//@@3
if (!MessageUtils.statusOk(respMsg)) {
// 提交/回滚失败的消息处理
List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
rpcExceptionHandler.handleNotifyUnitBusinessException(params, respMsg.loadBean(Throwable.class));
}
} catch (RpcException e) {
// 提交/回滚通讯失败
List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
rpcExceptionHandler.handleNotifyUnitMessageException(params, e);
} finally {
txLogger.txTrace(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify unit over");
}
}
}
  • @@1 获取该事务组的所有参与方
  • @@2 根据参与方标识获取连接
  • @@3 向第一个连接发送消息

从@@2和@@3 可知,如果集群下是不是modId一样造成了TM找不到准确的TC,带着这个问题我们看看@@2 的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码 
public List<String> remoteKeys(String moduleName) {
return SocketManager.getInstance().remoteKeys(moduleName);// @@1
}
/*
*/
public List<String> remoteKeys(String moduleName) {
List<String> allKeys = new ArrayList<>();
for (Channel channel : channels) {
if (moduleName.equals(getModuleName(channel))) {//@@2
allKeys.add(channel.remoteAddress().toString());
}
}
return allKeys;
}
/*
*/
public String getModuleName(Channel channel) {//@@3
String key = channel.remoteAddress().toString();
return getModuleName(key);
}
/*
*/
public String getModuleName(String remoteKey) {//@@4
AppInfo appInfo = appNames.get(remoteKey);
return appInfo == null ? null : appInfo.getAppName();
}
  • @@1 依据moduleName获取TC地址
  • @@2 通过遍历所有与TM建立的连接,依据moduleName查找符合条件的连接
  • @@3 依据channel得到其ModuleName
  • @@4 依据远程地址得到ModuleName,其中AppInfo的结构是CurrentHashMap

从上述过程可知transUnit.getModId()和AppInfo 是我们排查的重点;通过其相互的调用关系,最终确定了两者初始化的地方。

AppInfo的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码//TM端的InitClientService
public void bindModuleName(String remoteKey, String appName,String labelName) throws RpcException{
AppInfo appInfo = new AppInfo();
appInfo.setAppName(appName);
appInfo.setLabelName(labelName);
appInfo.setCreateTime(new Date());
if(containsLabelName(labelName)){
throw new RpcException("labelName:"+labelName+" has exist.");
}
appNames.put(remoteKey, appInfo);
}
// appName=applicationName;
String appName = environment.getProperty("spring.application.name");
this.applicationName = StringUtils.hasText(appName) ? appName : "application";
// labelName=modIdProvider.getModId
public static String modId(ConfigurableEnvironment environment, ServerProperties serverProperties) {
String applicationName = environment.getProperty("spring.application.name");
applicationName = StringUtils.hasText(applicationName) ? applicationName : "application";
return applicationName + ":" + serverPort(serverProperties);
}

TransactionUnit的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码//JoinGroupExecuteService 
transactionManager.join(dtxContext, joinGroupParams.getUnitId(), joinGroupParams.getUnitType(),
rpcClient.getAppName(transactionCmd.getRemoteKey()), joinGroupParams.getTransactionState());
//NettyRpcClient
public String getAppName(String remoteKey) {
return SocketManager.getInstance().getModuleName(remoteKey);
}
//SocketManager
public String getModuleName(String remoteKey) {
AppInfo appInfo = appNames.get(remoteKey);
return appInfo == null ? null : appInfo.getAppName();
}
//SimpleTransactionManager
public void join(DTXContext dtxContext, String unitId, String unitType, String modId, int userState) throws TransactionException {
//手动回滚时设置状态为回滚状态 0
if (userState == 0) {
dtxContext.resetTransactionState(0);
}
TransactionUnit transactionUnit = new TransactionUnit();
transactionUnit.setModId(modId);
transactionUnit.setUnitId(unitId);
transactionUnit.setUnitType(unitType);
dtxContext.join(transactionUnit);
}

从上述源码可知TransactionUnit的ModId 最终传的AppName,而AppName则是environment.getProperty(“spring.application.name”)得来的。从中可知,集群下同一服务的spring.application.name是相同的。

3、结论

集群下由于应用名称是一样的,则造成了TM端发送通知信息时找不到准确的参与方,进而导致了事务提交失败;鉴于此种情况一种就是改应用服务名,启动一次服务改一次服务名,此种方法繁琐不切实际故舍去;第二种就是修改源码改变查找逻辑,将TransactionUnit的ModId改为labelName,其中LableName 的命名为服务名+ip+端口号,故需重写相关方法。

4、 解决方案

修改源码两种方式:

  • 直接下载源码在源码中修改,修改完成后打包使用
  • 在项目下创建同目录同文件的类,依据编译文件的优先级来达到修改源码的效果。

本次解决方案会采用第二种方式,第一种方式耦合度高,且不宜框架版本升级。

4.1、重写modId的方法

将该文件放在项目的公共模块类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码@Configuration
public class LcnConfig {
@Bean
@ConditionalOnMissingBean
@Primary
public ModIdProvider modIdProvider(ConfigurableEnvironment environment,
@Autowired(required = false) ServerProperties serverProperties) {
return () -> modId(environment, serverProperties);
}

private String modId(ConfigurableEnvironment environment, ServerProperties serverProperties) {

String applicationName = environment.getProperty("spring.application.name");
applicationName = StringUtils.hasText(applicationName) ? applicationName : "application";
return applicationName + ":" + serverPort(serverProperties);
}

/**
* 此方法如果获取的不准确的,请从配置文件中获取地址。
* IP:PORT
*
* @param serverProperties serverProperties
* @return int
*/
private String serverPort(ServerProperties serverProperties) {

return Objects.isNull(serverProperties) ? "127.0.0.1:8080" :
(Objects.isNull(serverProperties.getPort()) && Objects.isNull(serverProperties.getAddress().getHostAddress()) ? "127.0.0.1:8080" :
serverProperties.getAddress().getHostAddress() + ":" + serverProperties.getPort());
}
}

4.2 重写SocketManager类

注:在项目的公共包中建立同目录同类的文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
java复制代码/*
* Copyright 2017-2019 CodingApi .
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.codingapi.txlcn.txmsg.netty.bean;


import com.codingapi.txlcn.txmsg.RpcConfig;
import com.codingapi.txlcn.txmsg.dto.AppInfo;
import com.codingapi.txlcn.txmsg.dto.MessageDto;
import com.codingapi.txlcn.txmsg.dto.RpcCmd;
import com.codingapi.txlcn.txmsg.dto.RpcResponseState;
import com.codingapi.txlcn.txmsg.exception.RpcException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.*;

/**
* Created by lorne on 2017/6/30.
*/
@Slf4j
public class SocketManager {

private Map<String, AppInfo> appNames;

private ScheduledExecutorService executorService;

private ChannelGroup channels;

private static SocketManager manager = null;

private long attrDelayTime = 1000 * 60;

private SocketManager() {
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
appNames = new ConcurrentHashMap<>();
executorService = Executors.newSingleThreadScheduledExecutor();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException ignored) {
}
}));
}


public static SocketManager getInstance() {
if (manager == null) {
synchronized (SocketManager.class) {
if (manager == null) {
manager = new SocketManager();
}
}
}
return manager;
}


public void addChannel(Channel channel) {
channels.add(channel);
}

public void removeChannel(Channel channel) {
channels.remove(channel);
String key = channel.remoteAddress().toString();

// 未设置过期时间,立即过期
if (attrDelayTime < 0) {
appNames.remove(key);
return;
}

// 设置了过期时间,到时间后清除
try {
executorService.schedule(() -> {
appNames.remove(key);
}, attrDelayTime, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException ignored) {
// caused down server.
}
}


private Channel getChannel(String key) throws RpcException {
for (Channel channel : channels) {
String val = channel.remoteAddress().toString();
if (key.equals(val)) {
return channel;
}
}
throw new RpcException("channel not online.");
}


public RpcResponseState send(String key, RpcCmd cmd) throws RpcException {
Channel channel = getChannel(key);
ChannelFuture future = channel.writeAndFlush(cmd).syncUninterruptibly();
return future.isSuccess() ? RpcResponseState.success : RpcResponseState.fail;
}

public MessageDto request(String key, RpcCmd cmd, long timeout) throws RpcException {
NettyRpcCmd nettyRpcCmd = (NettyRpcCmd) cmd;
log.debug("get channel, key:{}", key);
Channel channel = getChannel(key);
channel.writeAndFlush(nettyRpcCmd);
log.debug("await response");
if (timeout < 1) {
nettyRpcCmd.await();
} else {
nettyRpcCmd.await(timeout);
}
MessageDto res = cmd.loadResult();
log.debug("response is: {}", res);
nettyRpcCmd.loadRpcContent().clear();
return res;
}

public MessageDto request(String key, RpcCmd cmd) throws RpcException {
return request(key, cmd, -1);
}


public List<String> loadAllRemoteKey() {
List<String> allKeys = new ArrayList<>();
for (Channel channel : channels) {
allKeys.add(channel.remoteAddress().toString());
}
return allKeys;
}

public ChannelGroup getChannels() {
return channels;
}

public int currentSize() {
return channels.size();
}


public boolean noConnect(SocketAddress socketAddress) {
for (Channel channel : channels) {
if (channel.remoteAddress().toString().equals(socketAddress.toString())) {
return false;
}
}
return true;
}

/**
* 获取模块的远程标识keys
*
* @param moduleName 模块名称
* @return remoteKeys
*/
public List<String> remoteKeys(String moduleName) {
List<String> allKeys = new ArrayList<>();
for (Channel channel : channels) {
if (moduleName.equals(getModuleName(channel))) {
allKeys.add(channel.remoteAddress().toString());
}
}
return allKeys;
}


/**
* 绑定连接数据
*
* @param remoteKey 远程标识
* @param appName 模块名称
* @param labelName TC标识名称
*/
public void bindModuleName(String remoteKey, String appName, String labelName) throws RpcException {
AppInfo appInfo = new AppInfo();
appInfo.setAppName(appName);
appInfo.setLabelName(labelName);
appInfo.setCreateTime(new Date());
if (containsLabelName(labelName)) {
throw new RpcException("labelName:" + labelName + " has exist.");
}
appNames.put(remoteKey, appInfo);
}

public boolean containsLabelName(String moduleName) {
Set<String> keys = appNames.keySet();
for (String key : keys) {
AppInfo appInfo = appNames.get(key);
if (moduleName.equals(appInfo.getLabelName())) {
return true;
}
}
return false;
}

public void setRpcConfig(RpcConfig rpcConfig) {
attrDelayTime = rpcConfig.getAttrDelayTime();
}

/**
* 获取模块名称
*
* @param channel 管道信息
* @return 模块名称
*/
public String getModuleName(Channel channel) {
String key = channel.remoteAddress().toString();
return getModuleName(key);
}

/**
* 获取模块名称
*
* @param remoteKey 远程唯一标识
* @return 模块名称
*/
public String getModuleName(String remoteKey) {
AppInfo appInfo = appNames.get(remoteKey);
return appInfo == null ? null : appInfo.getLabelName();
}

public List<AppInfo> appInfos() {
return new ArrayList<>(appNames.values());
}
}

5、总结

本人写作能力水平有限,文中相关的讲解有误请帮忙指出,谢谢!下一步的计划分析tx-lcn的框架,了解其内部原理,从中加强对分布式事务的理解。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%