Dubbo整合Seata,教你轻松实现TTC模式分布式事务!

整理了一份Java、JVM、多线程、MySQL、Redis、Kafka、Docker、RocketMQ、Nginx、MQ队列、数据结构、并发编程、并发压测、秒杀架构等技术知识点PDF,如果你有需要的话,可_click here_领取

Seata

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

Seata的事务模式 Seata针对不同的业务场景提供了四种不同的事务模式,具体如下

  • AT模式: AT 模式的一阶段、二阶段提交和回滚(借助undo_log表来实现)均由 Seata 框架自动生成,用户只需编写“业务SQL”,便能轻松接入分布式事务,AT 模式是一种对业务无任何侵入的分布式事务解决方案。
  • TTC模式: 相对于 AT 模式,TCC 模式对业务代码有一定的侵入性,但是 TCC 模式无 AT 模式的全局行锁,TCC 性能会比 AT模式高很多。( 适用于核心系统等对性能有很高要求的场景。)
  • SAGA模式:Sage 是长事务解决方案,事务驱动,使用那种存在流程审核的业务场景,如: 金融行业,需要层层审核。
  • XA模式: XA模式是分布式强一致性的解决方案,但性能低而使用较少。
![](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/b3f56ca13e013a33369b9c4175bf9c8ba7a59e216cf226be189e81d006066a29)

TCC模式

tcc模式主要可以分为三个阶段:

  • Try:做业务检查和资源预留
  • Confirm:确认提交
  • Cancel:业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放

TCC模式下常见的三种异常

1.空回滚

空回滚就是对于一个分布式事务,在没有调用 TCC 资源 Try 方法的情况下(如机器宕机、网络异常),调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回滚,然后直接返回成功。

解决方案

需要一张额外的事务控制表,其中有分布式事务 ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。

2.幂等

幂等就是对于同一个分布式事务的同一个分支事务,重复去调用该分支事务的第二阶段接口,因此,要求 TCC 的二阶段 Confirm 和 Cancel 接口保证幂等,不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致资损等严重问题。

解决方案

记录每个分支事务的执行状态。在执行前状态,如果已执行,那就不再执行;否则,正常执行。前面在讲空回滚的时候,已经有一张事务控制表了,事务控制表的每条记录关联一个分支事务,那我们完全可以在这张事务控制表上加一个状态字段,用来记录每个分支事务的执行状态。

3.悬挂

悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。因为允许空回滚的原因,Cancel 接口认为 Try 接口没执行,空回滚直接返回成功,对于 Seata 框架来说,认为分布式事务的二阶段接口已经执行成功,整个分布式事务就结束了。

解决方案

二阶段执行时插入一条事务控制记录,状态为已回滚,这样当一阶段执行时,先读取该记录,如果记录存在,就认为二阶段已经执行;否则二阶段没执行。

Seata Server安装

第一步:下载安装服务端,并解压到指定位置

1
python复制代码unzip  seata-server-1.1.0.zip

seata的目录结构:

  • bin:存放各系统的启动脚本
  • conf:存放seata server启动时所需要配置信息、数据库模式下所需要的建表语句
  • lib:运行seata server所需要的的依赖包

第二步:配置seata

seata的配置文件(conf目录下)

  • file.conf: 该文件用于配置存储方式、透传事务信息的NIO等信息,默认对应registry.conf中file配置方式
  • registry.conf:seata server核心配置文件,可以通过该文件配置服务注册方式、配置读取方式。

注册方式目前支持file、nacos、eureka、redis、zk、consul、etcd3、sofa等方式,默认为file,对应读取file.conf内的注册方式信息。 读取配置信息的方式支持file、nacos、apollo、zk、consul、etcd3等方式,默认为file,对应读取file.conf文件内的配置。

修改registry.conf

  • 注册中心使用Nacos
  • 配置中心使用file进行配置

注册中心配置,用于TC,TM,RM的相互服务发现

1
2
3
4
5
6
7
8
ini复制代码registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
consul {
cluster = "seata"
serverAddr = "127.0.0.1:8848"
}
}

配置中心配置,用于读取TC的相关配置

1
2
3
4
5
6
7
ini复制代码config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
file {
name = "file.conf"
}
}

修改file.conf,配置中心配置,用于读取TC的相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
ini复制代码store {
## store mode: file?.b
mode = "file"

## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}

## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
registry.confuser = "mysql"
password = "mysql"
minConn = 1
maxConn = 10
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
}
}

第三步:创建seata数据库、以及所需的三张表

  • global_table: 存储全局事务session数据的表
  • branch_table:存储分支事务session数据的表
  • lockTable:存储分布式锁数据的表

– ——————————– The script used when storeMode is ‘db’ ——————————–
– the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS global_table
(
xid VARCHAR(128) NOT NULL,
transaction_id BIGINT,
status TINYINT NOT NULL,
application_id VARCHAR(32),
transaction_service_group VARCHAR(32),
transaction_name VARCHAR(128),
timeout INT,
begin_time BIGINT,
application_data VARCHAR(2000),
gmt_create DATETIME,
gmt_modified DATETIME,
PRIMARY KEY (xid),
KEY idx_gmt_modified_status (gmt_modified, status),
KEY idx_transaction_id (transaction_id)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
– the table to store BranchSession data
CREATE TABLE IF NOT EXISTS branch_table
(
branch_id BIGINT NOT NULL,
xid VARCHAR(128) NOT NULL,
transaction_id BIGINT,
resource_group_id VARCHAR(32),
resource_id VARCHAR(256),
branch_type VARCHAR(8),
status TINYINT,
client_id VARCHAR(64),
application_data VARCHAR(2000),
gmt_create DATETIME,
gmt_modified DATETIME,
PRIMARY KEY (branch_id),
KEY idx_xid (xid)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
– the table to store lock data
CREATE TABLE IF NOT EXISTS lock_table
(
row_key VARCHAR(128) NOT NULL,
xid VARCHAR(96),
transaction_id BIGINT,
branch_id BIGINT NOT NULL,
resource_id VARCHAR(256),
table_name VARCHAR(32),
pk VARCHAR(36),
gmt_create DATETIME,
gmt_modified DATETIME,
PRIMARY KEY (row_key),
KEY idx_branch_id (branch_id)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

第四步:启动seata server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sql复制代码sh seata-server.sh -p 8091 -h 0.0.0.0 -m file 

Options:

--host, -h
The host to bind.
Default: 0.0.0.0

--port, -p
The port to listen.
Default: 8091

--storeMode, -m log store mode : file、db
Default: file

--help

补充:

  • 外网访问:如果需要外网访问 需要将0.0.0.0转成外网IP
  • 后台启动: nohup sh seata-server.sh -p 8091 -h 127.0.0.1 -m file > catalina.out 2>&1 &

在nacos中看到seata的注册信息

Dubbo整合Seata实现AT模式

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
xml复制代码<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.1.0.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.1.0</version>
</dependency>

application.properties 配置文件

1
2
3
4
5
6
7
8
ini复制代码#mysql
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/dubbodemo
spring.datasource.username=mysql
spring.datasource.password=mysql

spring.cloud.alibaba.seata.tx-service-group=springcloud-alibaba-producer-test

resoures目录下新建 file.conf ,配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
ini复制代码transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = false
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThreadPrefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
# service configuration, only used in client side
service {
#transaction service group mapping
vgroupMapping.springcloud-alibaba-producer-test = "seata"
seata.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
#client transaction configuration, only used in client side
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
sqlParserType = druid
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}

resoures目录下新建registry.conf 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
ini复制代码registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"

nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
cluster = "seata"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"

nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}

DataSourceProxyConfig数据源加载

  • SEATA是基于数据源拦截来实现的分布式事务, 需要排除掉SpringBoot默认自动注入DataSourceAutoConfigurationBean,自定义配置数据源。

在启动类上添加:@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) ,并添加以下配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码@Configuration
public class DataSourceProxyConfig {

@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource(){
return new DruidDataSource();
}

@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}

@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources("classpath*:/mapper/*Mapper.xml"));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}

@Bean public GlobalTransactionScanner globalTransactionScanner(){ return new GlobalTransactionScanner("account-gts-seata-example", "account-service-seata-service-group"); }

}

创建业务表

1
2
3
4
5
6
7
8
9
sql复制代码# 业务数据表
CREATE TABLE `sys_user` (
`id` varchar(36) NOT NULL,
`name` varchar(100) NOT NULL,
`msg` varchar(500) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `sys_user` (`id`, `name`, `msg`) VALUES ('1', '小王', '初始化数据');

TTC事务接口

1
2
3
4
5
6
7
8
9
less复制代码public interface IUserTccService {

@TwoPhaseBusinessAction(name = "IUserTccService",commitMethod = "commit",rollbackMethod = "rollback")
boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "userPojo") UserPojo userPojo);

boolean commit(BusinessActionContext actionContext);

boolean rollback(BusinessActionContext actionContext);
}

业务接口

1
2
3
arduino复制代码public interface IUserService {
public String ceshi(String input);
}

TTC事务实现类

  • 初步操作 Try:完成所有业务检查,预留必须的业务资源,本身也有数据操作。(支付场景:冻结预扣款30元)
  • 确认操作 Confirm:真正执行的业务逻辑,不做任何业务检查,只使用 Try 阶段预留的业务资源。因此,只要 Try操作成功,Confirm 必须能成功。另外,Confirm 操作需满足幂等性,保证一笔分布式事务能且只能成功一次。(支付场景:扣除预付款)
  • 取消操作 Cancel:释放 Try 阶段预留的业务资源,来回滚Try的数据操作。同样的,Cancel操作也需要满足幂等性。(支付场景:释放预付款)
    @Service
    public class UserTccServiceImpl implements IUserTccService {
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typescript复制代码@Autowired
UserPojoMapper userPojoMapper;

@Override
public boolean prepare(BusinessActionContext actionContext, UserPojo userPojo) {
System.out.println("actionContext获取Xid commit>>> "+ RootContext.getXID());
int storage =userPojoMapper.updateByPrimaryKey(userPojo);
if (storage > 0){
return true;
}
return false;
}

@Override
public boolean commit(BusinessActionContext actionContext) {
System.out.println("actionContext获取Xid commit>>> "+actionContext.getXid());
return true;
}

@Override
public boolean rollback(BusinessActionContext actionContext) {
System.out.println("actionContext获取Xid rollback>>> "+actionContext.getXid());
UserPojo userPojo = JSONObject.toJavaObject((JSONObject)actionContext.getActionContext("userPojo"),UserPojo.class);
userPojo.setName("姓名被回滾了");
int storage = userPojoMapper.updateByPrimaryKey(userPojo);
if (storage > 0){
return true;
}
return false;
}

}

业务实现类

  • 没有涉及RPC调用的话只是一个分支事务,并不会触发rollback,并且分支事务共享同一个全局事务ID(即XID)。分支事务本身也具有原子性,可以确保本地事务的原子性(可以理解为prepare、commit方法虽然执行了,但是事务还是没执行,会在RM和TC协调下顺序执行 )
  • TC(事务协调者):会使得全部try执行成功后,才开始执行confirm,如果存在一个try执行失败(或者网络问题RPC调用失败),会开启回滚分布式事务,这一点满足了分布式事务的原子性。
  • GlobalTransactionScanner 会同时启动 RM 和 TM client。
    @Service
    public class UserServiceImpl implements IUserService{
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typescript复制代码@Autowired
UserPojoMapper userPojoMapper;

@Autowired
UserTccServiceImpl userTccService;

private static Logger logger = LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);

@Override
@GlobalTransactional
public String ceshi(String input) {
logger.info("全局XID:{}", RootContext.getXID());
UserPojo userPojo = userPojoMapper.selectByPrimaryKey("1");
// userPojo.setId("11111111");
userPojo.setName("正常提交");
if (userTccService.prepare(null,userPojo)){
return "Hello World,"+input+"! ,I am "+ userPojo.getName();
}
return "失敗";
}

}

调用服务,进行验证

日志打印:分布式事务信息

访问127.0.0.1:8081/hello 页面

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%