Spring StateMachine 状态机引擎在项目中的

背景

每次用到的时候新创建一个状态机,太奢侈了,官方文档里面也提到过这点。

而且创建出来的实例,其状态也跟当前订单的不符;spring statemachine暂时不支持每次创建时指定当前状态,所以对状态机引擎实例的持久化,就成了必须要考虑的问题。(不过在后续版本有直接指定状态的方式,这个后面会写)

扩展一下

这里扩展说明一下,状态机引擎的持久化一直是比较容易引起讨论的,因为很多场景并不希望再多存储一些中间非业务数据,之前在淘宝工作时,淘宝的订单系统tradeplatform自己实现了一套workflowEngine,其实说白了也就是一套状态机引擎,所有的配置都放在xml中,每次每个环节的请求过来,都会重新创建一个状态机引擎实例,并根据当前的订单状态来设置引擎实例的状态。

workflowEngine没有做持久化,私下里猜测下这样实现的原因:1、淘系数据量太大,一天几千万笔订单,额外的信息存储就要耗费很多存储资源;2、完全自主开发的状态机引擎,可定制化比较强,根据自己的业务需要可以按自己的需要处理。

而反过来,spring statemachine并不支持随意指定初始状态,每次创建都是固定的初始化状态,其实也只是有好处的,标准版流程,而且可以保证安全,每个节点都是按照事先定义好的流程跑下来,而不是随意指定。所以,状态机引擎实例的持久化,我们这次的主题,那就继续聊下去吧。

持久化

spring statemachine 本身支持了内存、redis及db的持久化,内存持久化就不说了,看源码实现就是放在了hashmap里,平时也没谁项目中可以这么奢侈,啥啥都放在内存中,而且一旦重启…..😓。下面详细说下利用redis进行的持久化操作。

依赖引入

spring statemachine 本身是提供了一个redis存储的组件的,在1.2.10.RELEASE版本中,这个组件需要通过依赖引入,同时需要引入的还有序列化的组件kyro、data-common:

gradle引入依赖 (build.gradle 或者 libraries.gradle,由自己项目的gradle组织方式来定):

1
2
3
4
复制代码compile 'org.springframework.statemachine:spring-statemachine-core:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-data-common:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-kyro:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-redis:1.2.10.RELEASE'

当然如果是maven的话,一样的,pom.xml如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码<dependencies>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-data-common</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-kyro</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-redis</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
</dependencies>
先把持久化的调用轨迹说明下

spring-statemachine-持久化.png

说明:

spring statemachine持久化时,采用了三层结构设计,persister —>persist —>repository。

  • 其中persister中封装了write和restore两个方法,分别用于持久化写及反序列化读出。
  • persist只是一层皮,主要还是调用repository中的实际实现;但是在这里,由于redis存储不保证百分百数据安全,所以我实现了一个自定义的persist,其中封装了数据写入db、从db中读取的逻辑。
  • repository中做了两件事儿
    • 序列化/反序列化数据,将引擎实例与二进制数组互相转换
    • 读、写redis
详细的实现
Persister
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.statemachine.redis.RedisStateMachinePersister;

@Configuration
public class BizOrderRedisStateMachinePersisterConfig {

@Autowired
private StateMachinePersist bizOrderRedisStateMachineContextPersist;

@Bean(name = "bizOrderRedisStateMachinePersister",autowire = Autowire.BY_TYPE)
public StateMachinePersister<BizOrderStatusEnum, BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister() {
return new RedisStateMachinePersister<>(bizOrderRedisStateMachineContextPersist);
}

}

这里采用官方samples中初始化的方式,通过@Bean注解来创建一个RedisStateMachinePersister实例,注意其中传递进去的Persist为自定义的bizOrderRedisStateMachineContextPersist

Persist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
复制代码import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.MessageHeaders;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.kryo.MessageHeadersSerializer;
import org.springframework.statemachine.kryo.StateMachineContextSerializer;
import org.springframework.statemachine.kryo.UUIDSerializer;
import org.springframework.statemachine.redis.RedisStateMachineContextRepository;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Base64;
import java.util.UUID;

@Component("bizOrderRedisStateMachineContextPersist")
public class BizOrderRedisStateMachineContextPersist implements StateMachinePersist<BizOrderStatusEnum, BizOrderStatusChangeEventEnum, String> {

@Autowired
@Qualifier("redisStateMachineContextRepository")
private RedisStateMachineContextRepository<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> redisStateMachineContextRepository;

@Autowired
private BizOrderStateMachineContextRepository bizOrderStateMachineContextRepository;

// 加入存储到DB的数据repository, biz_order_state_machine_context表结构:
// bizOrderId
// contextStr
// curStatus
// updateTime

/**
* Write a {@link StateMachineContext} into a persistent store
* with a context object {@code T}.
*
* @param context the context
* @param contextObj the context ojb
* @throws Exception the exception
*/
@Override
@Transactional
public void write(StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> context, String contextObj) throws Exception {

redisStateMachineContextRepository.save(context, contextObj);
// save to db
BizOrderStateMachineContext queryResult = bizOrderStateMachineContextRepository.selectByOrderId(contextObj);

if (null == queryResult) {
BizOrderStateMachineContext bosmContext = new BizOrderStateMachineContext(contextObj,
context.getState().getStatus(), serialize(context));
bizOrderStateMachineContextRepository.insertSelective(bosmContext);
} else {
queryResult.setCurOrderStatus(context.getState().getStatus());
queryResult.setContext(serialize(context));
bizOrderStateMachineContextRepository.updateByPrimaryKeySelective(queryResult);
}
}

/**
* Read a {@link StateMachineContext} from a persistent store
* with a context object {@code T}.
*
* @param contextObj the context ojb
* @return the state machine context
* @throws Exception the exception
*/
@Override
public StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> read(String contextObj) throws Exception {

StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> context = redisStateMachineContextRepository.getContext(contextObj);
//redis 访缓存击穿
if (null != context && BizOrderConstants.STATE_MACHINE_CONTEXT_ISNULL.equalsIgnoreCase(context.getId())) {
return null;
}
//redis 为空走db
if (null == context) {
BizOrderStateMachineContext boSMContext = bizOrderStateMachineContextRepository.selectByOrderId(contextObj);
if (null != boSMContext) {
context = deserialize(boSMContext.getContext());
redisStateMachineContextRepository.save(context, contextObj);
} else {
context = new StateMachineContextIsNull();
redisStateMachineContextRepository.save(context, contextObj);
}
}
return context;
}

private String serialize(StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> context) throws UnsupportedEncodingException {
Kryo kryo = kryoThreadLocal.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Output output = new Output(out);
kryo.writeObject(output, context);
output.close();
return Base64.getEncoder().encodeToString(out.toByteArray());
}

@SuppressWarnings("unchecked")
private StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> deserialize(String data) throws UnsupportedEncodingException {
if (StringUtils.isEmpty(data)) {
return null;
}
Kryo kryo = kryoThreadLocal.get();
ByteArrayInputStream in = new ByteArrayInputStream(Base64.getDecoder().decode(data));
Input input = new Input(in);
return kryo.readObject(input, StateMachineContext.class);
}

private static final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>() {

@SuppressWarnings("rawtypes")
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.addDefaultSerializer(StateMachineContext.class, new StateMachineContextSerializer());
kryo.addDefaultSerializer(MessageHeaders.class, new MessageHeadersSerializer());
kryo.addDefaultSerializer(UUID.class, new UUIDSerializer());
return kryo;
}
};
}

说明:

  1. 如果只是持久化到redis中,那么BizOrderStateMachineContextRepository相关的所有内容均可删除。不过由于redis无法承诺百分百的数据安全,所以我这里做了两层持久化,redis+db
  2. 存入redis中的数据默认采用kryo来序列化及反序列化,RedisStateMachineContextRepository中实现了对应代码。但是spring statemachine默认的db存储比较复杂,需要创建多张表,参加下图:

jpa-table.png

这里需要额外创建5张表,分别存储ActionGuardStateStateMachineTransition,比较复杂。

  1. 所以这里创建了一张表bizorderstatemachinecontext,结构很简单:bizOrderId,contextStr,curStatus,updateTime,其中关键是contextStr,用于存储与redis中相同的内容
Repository

有两个repository,一个是spring statemachine提供的redisRepo,另一个则是项目中基于mybatis的repo,先是db-repo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码   import org.apache.ibatis.annotations.Param;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Repository;

import java.util.List;

@Repository
public interface BizOrderStateMachineContextRepository {

int deleteByPrimaryKey(Long id);

BizOrderStateMachineContext selectByOrderId(String bizOrderId);

int updateByPrimaryKey(BizOrderStateMachineContext BizOrderStateMachineContext);

int updateByPrimaryKeySelective(BizOrderStateMachineContext BizOrderStateMachineContext);

int insertSelective(BizOrderStateMachineContext BizOrderStateMachineContext);

int selectCount(BizOrderStateMachineContext BizOrderStateMachineContext);

List<BizOrderStateMachineContext> selectPage(@Param("BizOrderStateMachineContext") BizOrderStateMachineContext BizOrderStateMachineContext, @Param("pageable") Pageable pageable);

}

然后是redisRepo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码   import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.statemachine.redis.RedisStateMachineContextRepository;

@Configuration
public class BizOrderRedisStateMachineRepositoryConfig {

/**
* 接入asgard后,redis的connectionFactory可以通过serviceName + InnerConnectionFactory来注入
*/
@Autowired
private RedisConnectionFactory finOrderRedisInnerConnectionFactory;

@Bean(name = "redisStateMachineContextRepository", autowire = Autowire.BY_TYPE)
public RedisStateMachineContextRepository<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> redisStateMachineContextRepository() {

return new RedisStateMachineContextRepository<>(finOrderRedisInnerConnectionFactory);
}
}
使用方式
1
2
3
4
5
6
7
8
9
10
复制代码    @Autowired
@Qualifier("bizOrderRedisStateMachinePersister")
private StateMachinePersister<BizOrderStatusEnum,BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister;

......
bizOrderRedisStateMachinePersister.persist(stateMachine, request.getBizCode());
......
StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine
= bizOrderRedisStateMachinePersister.restore(srcStateMachine,statusRequest.getBizCode());
......

支持,关于spring statemachine的持久化就交代完了,下面就是最关键的,怎么利用状态机来串联业务,下一节将会详细描述。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%