开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Spring Boot + Vue前后端分离项目,Maven

发表于 2021-06-28

​这是我参与更文挑战的第 28 天,活动详情查看: 更文挑战

前言

现在各类项目为了降低项目、服务模块间的高度耦合性,提出了“前后端分离”,而前后端分离的项目该如何打包呢?

一般的做法是前端项目打包完,将打包文件手动复制到后端项目工程的src\main\resources\static目录下,再进行后端工程项目打包,这样手动来回复制、多次打包总是让人觉得麻烦。(即使采用Jenkins打包部署,也会存在上面2次打包过程)

为了解决上述问题,我特意查询了Maven build的相关配置及插件,发现解决上述问题,通过Maven自动打包整合其实不难,在此与大家进行分享。

前后端项目结构要求

以Spring Boot + Vue的向后端项目为例说明。

通过Maven构建项目,针对子父项目结构创建前端、后端工程,结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
lua复制代码spring-boot-vue-parent
|---spring-boot # spring boot后端工程
|---src
|---main
|---java
|---...
|---resources
|---static # 存放前端资源的目录
|---pom.xml # spring-boot后端工程的pom.xml文件
|---vue-ui # Vue前端工程
|---...
|---dist # 打包编译时,自动创建的目录,无需手动创建该目录
|---pom.xml # Vue前端工程的pom.xml文件,此文件可不要
pom.xml 父工程的pom.xml文件

上述只罗列了关键的目录结构。

配置pom.xml文件

1、父工程的pom.xml文件

满足Maven 子父项目结构配置要求,配置spring-boot-maven-plugin插件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.xcbeyond.demo</groupId>
<artifactId>demo</artifactId>
<packaging>pom</packaging>
<version>1.0.0</version>

<modules>
<!-- spring boot后端工程,作为子工程 -->
<module>spring-boot</module>
<!-- Vue前端工程,作为子工程 -->
<module>vue-ui</module>
</modules>

<dependencies>
# 配置项目依赖包
……
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.0.7.RELEASE</version>
<executions>
<execution>
<goals>
<goal>repackage</goal><!--可以把依赖的包都打包到生成的Jar包中-->
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

</project>

2、Vue前端工程的pom.xml文件

对应Vue项目而言,pom.xml对它而言是不存在的,也是毫无意义的,此文件可以不要。在此体现出来,只是为了配置子父工程而已,凸显出Vue工程属于父工程的子工程而已,便于IDE导入呈现展示而已。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>demo</artifactId>
<groupId>com.xcbeyond.demo</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.xcbeyond.demo.vue.ui</groupId>
<artifactId>vue-ui</artifactId>

</project>

3、后端工程的pom.xml文件

该pom.xml文件是需要重点关注配置的,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>demo</artifactId>
<groupId>com.xcbeyond.demo</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.xcbeyond.demo.spring.boot</groupId>
<artifactId>spring-boot</artifactId>

<dependencies>
# 配置项目依赖包
……
</dependencies>

<build>
<plugins>
<!-- 插件maven-clean-plugin,用于在编译前,清除之前编译的文件、文件夹等,避免残留之前的内容 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<filesets>
<fileset>
<!-- 前端资源目录,即:存放前端包目录-->
<directory>src/main/resources/static</directory>
</fileset>
<fileset>
<!-- Vue项目打包自动生成的dist目录 -->
<directory>${project.parent.basedir}/vue-ui/dist</directory>
</fileset>
</filesets>
</configuration>
</plugin>

<!--frontend-maven-plugin为项目本地下载/安装Node和NPM,运行npm install命令-->
<plugin>
<groupId>com.github.eirslett</groupId>
<artifactId>frontend-maven-plugin</artifactId>
<version>1.6</version>
<configuration>
<workingDirectory>${project.parent.basedir}/vue-ui</workingDirectory>
</configuration>
<executions>
<execution>
<id>install node and npm</id>
<goals>
<goal>install-node-and-npm</goal>
</goals>
<configuration>
<nodeVersion>v8.12.0</nodeVersion>
<npmVersion>6.4.1</npmVersion>
</configuration>
</execution>
<!-- Install all project dependencies -->
<execution>
<id>npm install</id>
<goals>
<goal>npm</goal>
</goals>
<phase>generate-resources</phase>
<configuration>
<arguments>install</arguments>
</configuration>
</execution>
<!-- Build and minify static files -->
<execution>
<id>npm run build</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>run build</arguments>
</configuration>
</execution>
</executions>
</plugin>

<!--资源插件,主要为了从前端项目里复制打包好的文件到springboot项目-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<id>copy static</id>
<phase>generate-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<!-- 复制前端打包文件到这里 -->
<outputDirectory>src/main/resources/static</outputDirectory>
<overwrite>true</overwrite>
<resources>
<resource>
<!-- 从前端打包的目录dist进行指定文件、文件夹内容的复制-->
<directory>${project.parent.basedir}/vue-ui/dist</directory>
<includes>
<!-- 具体根据实际前端代码、及目录结构进行配置-->
<include>css/</include>
<include>fonts/</include>
<include>img/</include>
<include>js/</include>
<include>favicon.ico</include>
<include>index.html</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

打包部署

上述的pom.xml配置,已经整合了前后端项目的Maven自动打包,打包时,只需关注后端项目(spring-boot子工程)打包即可,就会将前端、后端一起打包到后端成功中。

在子工程后端工程中,执行打包命令

mvn clean package -Dmaven.test.skip=true

, 或采用IDE中相应的Maven直接打包。

至此,只需一次打包,即可完成前后端项目的Maven自动打包了,再也不用担心多次打包、漏打包的情况。

​

​

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java8 stream处理List,Map总结 Java

发表于 2021-06-28

Java 8 Stream

Java 8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据。

Stream 使用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。

Stream API可以极大提高Java程序员的生产力,让程序员写出高效率、干净、简洁的代码。

这种风格将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选, 排序,聚合等。

元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。

什么是 Stream?

Stream(流)是一个来自数据源的元素队列并支持聚合操作

<strong元素队列< strong=””>元素是特定类型的对象,形成一个队列。 Java中的Stream并不会存储元素,而是按需计算。
数据源 流的来源。 可以是集合,数组,I/O channel, 产生器generator 等。
聚合操作 类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等。
和以前的Collection操作不同, Stream操作还有两个基础的特征:

Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。
内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。
生成流

在 Java 8 中, 集合接口有两个方法来生成流:

stream() − 为集合创建串行流。

parallelStream() − 为集合创建并行流

下面写一下,我们经常会用到的一些操作案例

一,排序

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
less复制代码    List     

    1, 对象集合排序
//降序,根据创建时间降序;
List<User> descList = attributeList.stream().sorted(Comparator.comparing(User::getCreateTime, Comparator.nullsLast(Date::compareTo)).reversed())
                  .collect(Collectors.toList());
//升序,根据创建时间升序;
List<User> ascList = attributeList.stream().sorted(Comparator.comparing(User::getCreateTime, Comparator.nullsLast(Date::compareTo)))
                  .collect(Collectors.toList());

2, 数字排序
List<Integer> numbers = Arrays.asList(3, 2, 2, 3, 7, 3, 5);

//升序
List<Integer> ascList = numbers.stream().sorted().collect(Collectors.toList());
结果: [2, 2, 3, 3, 3, 5, 7]

//倒序
List<Integer> descList = numbers.stream().sorted((x, y) -> y - x).collect(Collectors.toList());
结果:[7, 5, 3, 3, 3, 2, 2]

 3, 字符串排序
     List<String> strList = Arrays.asList("a", "ba", "bb", "abc", "cbb", "bba", "cab");

//自然排序
List<String> ascList = strList.stream().sorted().collect(Collectors.toList());
结果:[a, abc, ba, bb, bba, cab, cbb]

//反转,倒序
ascList.sort(Collections.reverseOrder());
结果:[cbb, cab, bba, bb, ba, abc, a]

//直接反转集合
Collections.reverse(strList);
结果:[cab, bba, cbb, abc, bb, ba, a]
  
   Map
     //HashMap是无序的,当我们希望有顺序地去存储key-value时,就需要使用LinkedHashMap了,排序后可以再转成HashMap。
//LinkedHashMap是继承于HashMap,是基于HashMap和双向链表来实现的。
//LinkedHashMap是线程不安全的。
Map<String,String> map = new HashMap<>();
map.put("a","123");
map.put("b","456");
map.put("z","789");
map.put("c","234");

//map根据value正序排序
LinkedHashMap<String, String> linkedMap1 = new LinkedHashMap<>();
map.entrySet().stream().sorted(Comparator.comparing(e -> e.getValue())).forEach(x -> linkedMap1.put(x.getKey(), x.getValue()));
结果:{a=123, c=234, b=456, z=789}

//map根据value倒序排序
LinkedHashMap<String, String> linkedMap2 = new LinkedHashMap<>();
map.entrySet().stream().sorted(Collections.reverseOrder(Map.Entry.comparingByValue())).forEach(x -> linkedMap2.put(x.getKey(), x.getValue()));
结果:{z=789, b=456, c=234, a=123}

//map根据key正序排序
LinkedHashMap<String, String> linkedMap3 = new LinkedHashMap<>();
map.entrySet().stream().sorted(Comparator.comparing(e -> e.getKey())).forEach(x -> linkedMap3.put(x.getKey(), x.getValue()));
结果:{a=123, b=456, c=234, z=789}

//map根据key倒序排序
LinkedHashMap<String, String> linkedMap4 = new LinkedHashMap<>();
map.entrySet().stream().sorted(Collections.reverseOrder(Map.Entry.comparingByKey())).forEach(x -> linkedMap4.put(x.getKey(), x.getValue()));
结果:{z=789, c=234, b=456, a=123}

二,List 转 Map

    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
scss复制代码        1、指定key-value,value是对象中的某个属性值。
Map<Integer,String> userMap1 = userList.stream().collect(Collectors.toMap(User::getId,User::getName));

2、指定key-value,value是对象本身,User->User 是一个返回本身的lambda表达式
Map<Integer,User> userMap2 = userList.stream().collect(Collectors.toMap(User::getId,User->User));

3、指定key-value,value是对象本身,Function.identity()是简洁写法,也是返回对象本身
Map<Integer,User> userMap3 = userList.stream().collect(Collectors.toMap(User::getId, Function.identity()));

4、指定key-value,value是对象本身,Function.identity()是简洁写法,也是返回对象本身,key 冲突的解决办法,这里选择第二个key覆盖第一个key。
Map<Integer,User> userMap4 = userList.stream().collect(Collectors.toMap(User::getId, Function.identity(),(key1,key2)->key2));

5、将List根据某个属性进行分组,放入Map;然后组装成key-value格式的数据,分组后集合的顺序会被改变,所以事先设置下排序,然后再排序,保证数据顺序不变。
List<GoodsInfoOut> lst = goodsInfoMapper.getGoodsList();
Map<String, List<GoodsInfoOut>> groupMap = lst.stream().collect(Collectors.groupingBy(GoodsInfoOut::getClassificationOperationId));
List<HomeGoodsInfoOut> retList = groupMap.keySet().stream().map(key -> {
HomeGoodsInfoOut mallOut = new HomeGoodsInfoOut();
mallOut.setClassificationOperationId(key);
if(groupMap.get(key)!=null && groupMap.get(key).size()>0) {
mallOut.setClassificationName(groupMap.get(key).get(0).getClassificationName());
mallOut.setClassificationPic(groupMap.get(key).get(0).getClassificationPic());
mallOut.setClassificationSort(groupMap.get(key).get(0).getClassificationSort());
}
mallOut.setGoodsInfoList(groupMap.get(key));
return mallOut;
}).collect(Collectors.toList());
List<HomeGoodsInfoOut> homeGoodsInfoOutList = retList.stream().sorted(Comparator.comparing(HomeGoodsInfoOut::getClassificationSort))
                                 .collect(Collectors.toList());

5、根据用户性别将数据 - 分组
     Map<String, List<UserInfo>> groupMap = userList.stream().collect(Collectors.groupingBy(UserInfo::getSex()));

三,Map 转 List

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
less复制代码        Map<String,String> map1 = new HashMap<>();
map1.put("a","123");
map1.put("b","456");
map1.put("z","789");
map1.put("c","234");

1、默认顺序
List<UserInfo> list0 = map1.entrySet().stream()
                   .map(e -> new UserInfo(e.getValue(), e.getKey()))
                   .collect(Collectors.toList());
结果:[UserInfo(userName=123, mobile=a), UserInfo(userName=456, mobile=b), UserInfo(userName=234, mobile=c), UserInfo(userName=789, mobile=z)]

2、根据Key排序
List<UserInfo> list1 = map1.entrySet().stream()
                   .sorted(Comparator.comparing(e -> e.getKey())).map(e -> new UserInfo(e.getKey(), e.getValue()))
                   .collect(Collectors.toList());
结果:[UserInfo(userName=a, mobile=123), UserInfo(userName=b, mobile=456), UserInfo(userName=c, mobile=234), UserInfo(userName=z, mobile=789)]

3、根据Value排序
List<UserInfo> list2 = map1.entrySet().stream()
                  .sorted(Comparator.comparing(Map.Entry::getValue))
                  .map(e -> new UserInfo(e.getKey(), e.getValue()))
                  .collect(Collectors.toList());
结果:[UserInfo(userName=a, mobile=123), UserInfo(userName=c, mobile=234), UserInfo(userName=b, mobile=456), UserInfo(userName=z, mobile=789)]

3、根据Key排序
List<UserInfo> list3 = map1.entrySet().stream()
                  .sorted(Map.Entry.comparingByKey())
                  .map(e -> new UserInfo(e.getKey(), e.getValue()))
                  .collect(Collectors.toList());
结果:[UserInfo(userName=a, mobile=123), UserInfo(userName=b, mobile=456), UserInfo(userName=c, mobile=234), UserInfo(userName=z, mobile=789)]

四,从List中获取某个属性

1
2
3
4
5
6
7
8
scss复制代码    //拿出所有手机号
List<String> mobileList = userList.stream().map(RemindUserOut::getMobile).collect(Collectors.toList());

//拿出所有AppId,并去重
List<String> appIdList = appIdList.stream().map(WechatWebViewDomain::getAppId).collect(Collectors.toList()).stream().distinct().collect(Collectors.toList());

//拿出集合中重复的billNo,【.filter(map->StringUtils.isNotEmpty(map.getBillNo()))】这是过滤掉为空的数据;否则,有空数据会抛异常
List<String> repeatCodeList = resultList.stream().filter(map->StringUtils.isNotEmpty(map.getBillNo())).collect(Collectors.groupingBy(BillUploadIn::getBillNo, Collectors.counting())).entrySet().stream().filter(entry -> entry.getValue() > 1).map(Map.Entry::getKey).collect(Collectors.toList());

五,筛选并根据属性去重

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ini复制代码    List<UserInfo> uList = new ArrayList<>();
UserInfo u1 = new UserInfo(1,"小白","15600000000");
UserInfo u2 = new UserInfo(2,"小黑","15500000000");
uList.add(u1);
uList.add(u2);

//过滤名字是小白的数据
List list1= uList.stream()
.filter(b -> "小白".equals(b.getUserName()))
.collect(Collectors.collectingAndThen(Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(b -> b.getId()))), ArrayList::new));
结果:list1===[UserInfo(id=1, userName=小白, mobile=15600000000)]

//根据ID去重
List list2= uList.stream()
.collect(Collectors.collectingAndThen(Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(b -> b.getId()))), ArrayList::new));
结果:list2===[UserInfo(id=1, userName=小白, mobile=15600000000), UserInfo(id=2, userName=小黑, mobile=15500000000)]

六,计算;和,最大,最小,平均值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
scss复制代码    List<UserInfo> uList = new ArrayList<>();
UserInfo user1 = new UserInfo(1,"小白","15600000000",10,new BigDecimal(10));
UserInfo user2 = new UserInfo(2,"小黑","15500000000",15,new BigDecimal(20));
UserInfo user3 = new UserInfo(2,"小彩","15500000000",88,new BigDecimal(99));
uList.add(user1);
uList.add(user2);
uList.add(user3);

//和
Double d1 = uList.stream().mapToDouble(UserInfo::getNum).sum();
结果:113.0
//最大
Double d2 = uList.stream().mapToDouble(UserInfo::getNum).max().getAsDouble();
结果:88.0
//最小
Double d3 = uList.stream().mapToDouble(UserInfo::getNum).min().getAsDouble();
结果:10.0
//平均值
Double d4 = uList.stream().mapToDouble(UserInfo::getNum).average().getAsDouble();
结果:37.666666666666664

//除了统计double类型,还有int和long,bigDecimal需要用到reduce求和
DecimalFormat df = new DecimalFormat("0.00");//保留两位小数点
//和
BigDecimal add = uList.stream().map(UserInfo::getPrice).reduce(BigDecimal.ZERO, BigDecimal::add);
System.out.println(df.format(add));
结果:129.00
//最大
Optional<UserInfo> max = uList.stream().max((u1, u2) -> u1.getNum().compareTo(u2.getNum()));
System.out.println(df.format(max.get().getPrice()));
结果:99.00
//最小
Optional<UserInfo> min = uList.stream().min((u1, u2) -> u1.getNum().compareTo(u2.getNum()));
System.out.println(df.format(min.get().getPrice()));
结果:10.00

//求和,还有mapToInt、mapToLong、flatMapToDouble、flatMapToInt、flatMapToLong
list.stream().mapToDouble(UserInfo::getNum).sum();
//最大
list.stream().mapToDouble(UserInfo::getNum).max();
//最小
list.stream().mapToDouble(UserInfo::getNum).min();
//平均值
list.stream().mapToDouble(UserInfo::getNum).average();

未完,待续…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

2021全网最全Activiti7教程04(Activiti

发表于 2021-06-28

  本篇重点给大家介绍下Activiti中的高级内容,比如流程实例,个人任务,流程变量,组任务及网关。

全网最详细Activiti系列文章,强烈建议收藏加关注哦!

Activiti进阶篇

1.流程实例

1.1 什么是流程实例

  流程实例(ProcessInstance)代表流程定义的执行实例,一个流程实例包括了所有的运行节点,我们可以利用这个对象来了解当前流程实例的进度等信息
  例如:用户或者程序安装流程定义的内容发起了一个流程,这个就是一个流程实例
在这里插入图片描述

1.2 业务管理

  流程定义部署在Activiti后,我们就可以在系统中通过Activiti去管理流程的执行,但是如果我们要将我们的流程实例和业务数据关联,这时我们需要使用到Activiti中预留的BusinessKey(业务标识)来关联
在这里插入图片描述
实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码    /**
* 启动流程实例,添加businessKey
*/
@Test
public void test01(){
// 1.获取ProcessEngine对象
ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
// 2.获取RuntimeService对象
RuntimeService runtimeService = processEngine.getRuntimeService();
// 3.启动流程实例
ProcessInstance instance = runtimeService
.startProcessInstanceByKey("evection", "1001");
// 4.输出processInstance相关属性
System.out.println("businessKey = "+instance.getBusinessKey());
}

在这里插入图片描述

1.3 流程实例的挂起和激活

  在实际场景中可能由于流程变更需要将当前运行的流程暂停而不是删除,流程暂停后将不能继续执行。

1.3.1 全部流程挂起

  操作流程的定义为挂起状态,该流程定义下边所有的流程实例全部暂停。

流程定义为挂起状态,该流程定义将不允许启动新的流程实例,同时该流程定义下的所有的流程实例都将全部挂起暂停执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码    /**
* 全部流程挂起实例与激活
*/
@Test
public void test02(){
// 1.获取ProcessEngine对象
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
// 2.获取RepositoryService对象
RepositoryService repositoryService = engine.getRepositoryService();
// 3.查询流程定义的对象
ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery()
.processDefinitionKey("evection")
.singleResult();
// 4.获取当前流程定义的状态
boolean suspended = processDefinition.isSuspended();
String id = processDefinition.getId();
// 5.如果挂起就激活,如果激活就挂起
if(suspended){
// 表示当前定义的流程状态是 挂起的
repositoryService.activateProcessDefinitionById(
id // 流程定义的id
,true // 是否激活
,null // 激活时间
);
System.out.println("流程定义:" + id + ",已激活");
}else{
// 非挂起状态,激活状态 那么需要挂起流程定义
repositoryService.suspendProcessDefinitionById(
id // 流程id
,true // 是否挂起
,null // 挂起时间
);
System.out.println("流程定义:" + id + ",已挂起");
}
}

  挂起流程定义后,对于的实例对象中的状态会修改为2
在这里插入图片描述

  然后再去操作对于的流程实例会抛异常信息
在这里插入图片描述
  我们再将挂起的流程转变为激活状态,对于的状态值会从2更新为1
在这里插入图片描述

  然后就是业务流程可以正常处理了

1.3.2 单个实例挂起

  操作流程实例对象,针对单个流程执行挂起操作,某个流程实例挂起则此流程不再继续执行,当前流程定义的其他流程实例是不受干扰的。完成该流程实例的当前任务会抛异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码    /**
* 单个流程实例挂起与激活
*/
@Test
public void test03(){
// 1.获取ProcessEngine对象
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
// 2.获取RuntimeService
RuntimeService runtimeService = engine.getRuntimeService();
// 3.获取流程实例对象
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId("25001")
.singleResult();
// 4.获取相关的状态操作
boolean suspended = processInstance.isSuspended();
String id = processInstance.getId();
if(suspended){
// 挂起--》激活
runtimeService.activateProcessInstanceById(id);
System.out.println("流程定义:" + id + ",已激活");
}else{
// 激活--》挂起
runtimeService.suspendProcessInstanceById(id);
System.out.println("流程定义:" + id + ",已挂起");
}

}

  然后我们可以在数据库中查看到状态的更新
在这里插入图片描述

  1. 个人任务

2.1 分配任务责任人

2.1.1 固定分配

  在进行业务流程建模的时候指定固定的任务负责人:在这里插入图片描述
  在Properties视图中,填写Assiginee项为任务负责人

2.1.2 表达式分配

  在Activiti中支持使用UEL表达式,UEL表达式是Java EE6 规范的一部分, UEL(Unified Expression Language) 即 统一表达式语音, Activiti支持两种UEL表达式: UEL-value 和UEL-method

UEL-value

在这里插入图片描述

  在assignee中使用流程变量处理
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
  然后我们可以来操作

  首先我们需要将定义的流程部署到Activiti数据库中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码    /**
* 先将新定义的流程部署到Activiti中数据库中
*/
@Test
public void test01(){
// 1.获取ProcessEngine对象
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
// 2.获取RepositoryService进行部署操作
RepositoryService service = engine.getRepositoryService();
// 3.使用RepositoryService进行部署操作
Deployment deploy = service.createDeployment()
.addClasspathResource("bpmn/evection-uel.bpmn") // 添加bpmn资源
.addClasspathResource("bpmn/evection-uel.png") // 添加png资源
.name("出差申请流程-UEL")
.deploy();// 部署流程
// 4.输出流程部署的信息
System.out.println("流程部署的id:" + deploy.getId());
System.out.println("流程部署的名称:" + deploy.getName());
}

  部署成功后我们需要启动一个新的流程实例,然后在流程实例创建的其实关联UEL表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码    /**
* 创建一个流程实例
* 给流程定义中的 UEL表达式赋值
*/
@Test
public void test02(){
// 获取流程引擎
ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
// 获取RuntimeService对象
RuntimeService runtimeService = processEngine.getRuntimeService();
// 设置 assignee 的取值,
Map<String,Object> map = new HashMap<>();
map.put("assignee0","张三");
map.put("assignee1","李四");
map.put("assignee2","王五");
map.put("assignee3","赵财务");
// 创建流程实例
runtimeService.startProcessInstanceByKey("evection-uel",map);
}

  启动成功后我们在 act_ru_variable中可以看到UEL表达式对应的赋值信息
在这里插入图片描述

  UEL-method
在这里插入图片描述

  userBean 是 spring 容器中的一个 bean,表示调用该 bean 的 getUserId()方法。

UEL-method 与 UEL-value 结合

再比如:
${ldapService.findManagerForEmployee(emp)}
ldapService 是 spring 容器的一个 bean,findManagerForEmployee 是该 bean 的一个方法,emp 是 activiti
流程变量, emp 作为参数传到 ldapService.findManagerForEmployee 方法中。

其它

  表达式支持解析基础类型、 bean、 list、 array 和 map,也可作为条件判断。
  如下:
  ${order.price > 100 && order.price < 250}

2.1.3 监听器分配

  可以使用监听器来完成很多Activiti的流程业务。我们在此处使用监听器来完成负责人的指定,那么我们在流程设计的时候就不需要指定assignee
Event选项
在这里插入图片描述

1
2
3
4
txt复制代码create:任务创建后触发
assignment:任务分配后触发
Delete:任务完成后触发
All:所有事件都触发

自定义的监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码import org.activiti.engine.delegate.DelegateTask;
import org.activiti.engine.delegate.TaskListener;

public class MyTaskListener implements TaskListener {
@Override
public void notify(DelegateTask delegateTask) {
if("创建请假单".equals(delegateTask.getName())
&& "create".equals(delegateTask.getEventName())){
// 指定任务的负责人
delegateTask.setAssignee("张三-Listener");
}

}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码/**
* 先将新定义的流程部署到Activiti中数据库中
*/
@Test
public void test01(){
// 1.获取ProcessEngine对象
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
// 2.获取RepositoryService进行部署操作
RepositoryService service = engine.getRepositoryService();
// 3.使用RepositoryService进行部署操作
Deployment deploy = service.createDeployment()
.addClasspathResource("bpmn/evection-listener.bpmn") // 添加bpmn资源
.addClasspathResource("bpmn/evection-listener.png") // 添加png资源
.name("出差申请流程-UEL")
.deploy();// 部署流程
// 4.输出流程部署的信息
System.out.println("流程部署的id:" + deploy.getId());
System.out.println("流程部署的名称:" + deploy.getName());
}

/**
* 创建一个流程实例
* 给流程定义中的 UEL表达式赋值
*/
@Test
public void test02(){
// 获取流程引擎
ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
// 获取RuntimeService对象
RuntimeService runtimeService = processEngine.getRuntimeService();

// 创建流程实例
runtimeService.startProcessInstanceByKey("evection-listener");
}

在这里插入图片描述
在这里插入图片描述

2.2 查询任务

查询任务负责人的待办任务

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// 查询当前个人待执行的任务
@Test
public void findPersonalTaskList() {
// 流程定义key
String processDefinitionKey = "myEvection1";
// 任务负责人
String assignee = "张三";
// 获取TaskService
TaskService taskService = processEngine.getTaskService();
List<Task> taskList = taskService.createTaskQuery()
.processDefinitionKey(processDefinitionKey)
.includeProcessVariables()
.taskAssignee(assignee)
.list();
for (Task task : taskList) {
System.out.println("----------------------------");
System.out.println("流程实例id: " + task.getProcessInstanceId());
System.out.println("任务id: " + task.getId());
System.out.println("任务负责人: " + task.getAssignee());
System.out.println("任务名称: " + task.getName());
}
}

关联 businessKey

需求:
  在 activiti 实际应用时,查询待办任务可能要显示出业务系统的一些相关信息。

比如:查询待审批出差任务列表需要将出差单的日期、 出差天数等信息显示出来。

  出差天数等信息在业务系统中存在,而并没有在 activiti 数据库中存在,所以是无法通过 activiti 的 api 查询到出差天数等信息。
实现:
  在查询待办任务时,通过 businessKey(业务标识 )关联查询业务系统的出差单表,查询出出差天数等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码@Test
public void findProcessInstance(){
// 获取processEngine
ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
// 获取TaskService
TaskService taskService = processEngine.getTaskService();
// 获取RuntimeService
RuntimeService runtimeService = processEngine.getRuntimeService();
// 查询流程定义的对象
Task task = taskService.createTaskQuery()
.processDefinitionKey("myEvection1")
.taskAssignee("张三")
.singleResult();
// 使用task对象获取实例id
String processInstanceId = task.getProcessInstanceId();
// 使用实例id,获取流程实例对象
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 使用processInstance,得到 businessKey
String businessKey = processInstance.getBusinessKey();

System.out.println("businessKey=="+businessKey);

}

2.3 办理任务

  注意:在实际应用中,完成任务前需要校验任务的负责人是否具有该任务的办理权限 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码/**
* 完成任务,判断当前用户是否有权限
*/
@Test
public void completTask() {
//任务id
String taskId = "15005";
// 任务负责人
String assingee = "张三";
//获取processEngine
ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
// 创建TaskService
TaskService taskService = processEngine.getTaskService();
// 完成任务前,需要校验该负责人可以完成当前任务
// 校验方法:
// 根据任务id和任务负责人查询当前任务,如果查到该用户有权限,就完成
Task task = taskService.createTaskQuery()
.taskId(taskId)
.taskAssignee(assingee)
.singleResult();
if(task != null){
taskService.complete(taskId);
System.out.println("完成任务");
}
}

3.流程变量

3.1、什么是流程变量

  流程变量在 activiti 中是一个非常重要的角色,流程运转有时需要靠流程变量,业务系统和 activiti结合时少不了流程变量,流程变量就是 activiti 在管理工作流时根据管理需要而设置的变量。比如:在出差申请流程流转时如果出差天数大于 3 天则由总经理审核,否则由人事直接审核, 出差天数就可以设置为流程变量,在流程流转时使用。

注意:虽然流程变量中可以存储业务数据可以通过activiti的api查询流程变量从而实现 查询业务数据,但是不建议这样使用,因为业务数据查询由业务系统负责,activiti设置流程变量是为了流程执行需要而创建。

3.2、流程变量类型

  如果将 pojo 存储到流程变量中,必须实现序列化接口 serializable,为了防止由于新增字段无法反序列化,需要生成 serialVersionUID。
在这里插入图片描述

3.3、流程变量作用域

  流程变量的作用域可以是一个流程实例(processInstance),或一个任务(task),或一个执行实例
(execution)

3.3.1、globa变量

  流程变量的默认作用域是流程实例。当一个流程变量的作用域为流程实例时,可以称为 global 变量

注意:

如: Global变量:userId(变量名)、zhangsan(变量值)

  global 变量中变量名不允许重复,设置相同名称的变量,后设置的值会覆盖前设置的变量值。

3.3.2、local变量

  任务和执行实例仅仅是针对一个任务和一个执行实例范围,范围没有流程实例大, 称为 local 变量。Local 变量由于在不同的任务或不同的执行实例中,作用域互不影响,变量名可以相同没有影响。Local 变量名也可以和 global 变量名相同,没有影响。

3.4、流程变量的使用方法

3.4.1、在属性上使用UEL表达式

  可以在 assignee 处设置 UEL 表达式,表达式的值为任务的负责人,比如: ${assignee}, assignee 就是一个流程变量名称。

Activiti获取UEL表达式的值,即流程变量assignee的值 ,将assignee的值作为任务的负责人进行任务分配

3.4.2、在连线上使用UEL表达式

  可以在连线上设置UEL表达式,决定流程走向。比如:${price<10000} 。price就是一个流程变量名称,uel表达式结果类型为布尔类型。如果UEL表达式是true,要决定 流程执行走向。

3.5 流程变量使用

3.5.1 需求

​   员工创建出差申请单,由部门经理审核,部门经理申请通过后3天以下由财务直接申批,3天以上先由总经理审批,总经理审批通过后再由财务审批。
在这里插入图片描述

3.5.2 流程定义

  先通过UEL-value来设置负责人在这里插入图片描述
  然后在分支线上来设置条件

在这里插入图片描述
  那么还可以通过对象参数命名,比如 evection.num:

在这里插入图片描述

  另一根线对应的设置
在这里插入图片描述
在这里插入图片描述

  然后可以将相关的资源文件拷贝到项目中,

3.5.3 使用Global变量

  接下来使用Global变量控制流程

3.5.3.1 POJO创建

  首先创建POJO对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码/**
* 出差申请的POJO对象
*/
@Data
public class Evection {

private long id;

private String evectionName;


/**
* 出差的天数
*/
private double num;

private Date beginDate;

private Date endDate;

private String destination;

private String reson;
}
3.5.3.2 流程的部署
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码    /**
* 部署流程
*/
@Test
public void test01(){
// 1.获取ProcessEngine对象
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
// 2.获取RepositoryService进行部署操作
RepositoryService service = engine.getRepositoryService();
// 3.使用RepositoryService进行部署操作
Deployment deploy = service.createDeployment()
.addClasspathResource("bpmn/evection-variable.bpmn") // 添加bpmn资源
.addClasspathResource("bpmn/evection-variable.png") // 添加png资源
.name("出差申请流程-流程变量")
.deploy();// 部署流程
// 4.输出流程部署的信息
System.out.println("流程部署的id:" + deploy.getId());
System.out.println("流程部署的名称:" + deploy.getName());
}
3.5.3.3 设置流程变量
a.启动时设置流程变量

  在启动流程时设置流程变量,变量的作用域是整个流程实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码    /**
* 启动流程实例,设置流程变量
*/
@Test
public void test02(){
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RuntimeService runtimeService = engine.getRuntimeService();
// 流程定义key
String key = "evection-variable";
// 创建变量集合
Map<String,Object> variables = new HashMap<>();
// 创建出差对象 POJO
Evection evection = new Evection();
// 设置出差天数
evection.setNum(4d);
// 定义流程变量到集合中
variables.put("evection",evection);
// 设置assignee的取值
variables.put("assignee0","张三1");
variables.put("assignee1","李四1");
variables.put("assignee2","王五1");
variables.put("assignee3","赵财务1");
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(key, variables);
// 输出信息
System.out.println("获取流程实例名称:"+processInstance.getName());
System.out.println("流程定义ID:" + processInstance.getProcessDefinitionId());
}

  完成任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码   /**
* 完成任务
*/
@Test
public void test03(){
String key = "evection-variable";
String assignee = "李四1";
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = engine.getTaskService();
Task task = taskService.createTaskQuery()
.processDefinitionKey(key)
.taskAssignee(assignee)
.singleResult();
if(task != null){
taskService.complete(task.getId());
System.out.println("任务执行完成...");
}
}

  通过startProcessInstanceByKey方法设置流程变量的作用域是一个流程实例,流程变量使用Map存储,同一个流程实例map中的key相同,后者会覆盖前者

b.任务办理时设置

  在完成任务时设置流程变量,该流程变量只有在该任务完成后其它结点才可使用该变量,它的作用域是整个流程实例,如果设置的流程变量的key在流程实例中已存在相同的名字则后设置的变量替换前边设置的变量。

  这里需要在创建出差单任务完成时设置流程变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码    /**
* 启动流程实例,设置流程变量
*/
@Test
public void test02(){
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RuntimeService runtimeService = engine.getRuntimeService();
// 流程定义key
String key = "evection-variable";
// 创建变量集合
Map<String,Object> variables = new HashMap<>();

// 设置assignee的取值
variables.put("assignee0","张三1");
variables.put("assignee1","李四1");
variables.put("assignee2","王五1");
variables.put("assignee3","赵财务1");
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(key, variables);
// 输出信息
System.out.println("获取流程实例名称:"+processInstance.getName());
System.out.println("流程定义ID:" + processInstance.getProcessDefinitionId());
}

/**
* 完成任务
*/
@Test
public void test03(){
String key = "evection-variable";
String assignee = "李四1";
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = engine.getTaskService();
Task task = taskService.createTaskQuery()
.processDefinitionKey(key)
.taskAssignee(assignee)
.singleResult();

Map<String,Object> variables = new HashMap<>();
// 创建出差对象 POJO
Evection evection = new Evection();
// 设置出差天数
evection.setNum(4d);
// 定义流程变量到集合中
variables.put("evection",evection);

if(task != null){
taskService.complete(task.getId(),variables);
System.out.println("任务执行完成...");
}
}

说明:
通过当前任务设置流程变量,需要指定当前任务id,如果当前执行的任务id不存在则抛出异常。
任务办理时也是通过map<key,value>设置流程变量,一次可以设置多个变量。

c.当前流程实例设置

  通过流程实例id设置全局变量,该流程实例必须未执行完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码    @Test
public void setGlobalVariableByExecutionId(){
// 当前流程实例执行 id,通常设置为当前执行的流程实例
String executionId="2601";
// 获取processEngine
ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
// 获取RuntimeService
RuntimeService runtimeService = processEngine.getRuntimeService();
// 创建出差pojo对象
Evection evection = new Evection();
// 设置天数
evection.setNum(3d);
// 通过流程实例 id设置流程变量
runtimeService.setVariable(executionId, "evection", evection);
// 一次设置多个值
// runtimeService.setVariables(executionId, variables)
}

注意:
  executionId必须当前未结束 流程实例的执行id,通常此id设置流程实例 的id。也可以通runtimeService.getVariable()获取流程变量。

d.当前任务设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Test
public void setGlobalVariableByTaskId(){

//当前待办任务id
String taskId="1404";
// 获取processEngine
ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = processEngine.getTaskService();
Evection evection = new Evection();
evection.setNum(3);
//通过任务设置流程变量
taskService.setVariable(taskId, "evection", evection);
//一次设置多个值
//taskService.setVariables(taskId, variables)
}

注意:
  任务id必须是当前待办任务id,act_ru_task中存在。如果该任务已结束,会报错也可以通过taskService.getVariable()获取流程变量。

3.5.4 设置local流程变量

3.5.4.1、任务办理时设置

  任务办理时设置local流程变量,当前运行的流程实例只能在该任务结束前使用,任务结束该变量无法在当前流程实例使用,可以通过查询历史任务查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码/*
*处理任务时设置local流程变量
*/
@Test
public void completTask() {
//任务id
String taskId = "1404";
// 获取processEngine
ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = processEngine.getTaskService();
// 定义流程变量
Map<String, Object> variables = new HashMap<String, Object>();
Evection evection = new Evection ();
evection.setNum(3d);
// 定义流程变量
Map<String, Object> variables = new HashMap<String, Object>();
// 变量名是holiday,变量值是holiday对象
variables.put("evection", evection);
// 设置local变量,作用域为该任务
taskService.setVariablesLocal(taskId, variables);
// 完成任务
taskService.complete(taskId);
}

​ 说明:
  设置作用域为任务的local变量,每个任务可以设置同名的变量,互不影响。

3.5.4.2、通过当前任务设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Test
public void setLocalVariableByTaskId(){
// 当前待办任务id
String taskId="1404";
// 获取processEngine
ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = processEngine.getTaskService();
Evection evection = new Evection ();
evection.setNum(3d);
// 通过任务设置流程变量
taskService.setVariableLocal(taskId, "evection", evection);
// 一次设置多个值
//taskService.setVariablesLocal(taskId, variables)
}

注意:
  任务id必须是当前待办任务id,act_ru_task中存在。

3.5.4.3、 Local变量测试1

  如果上边例子中设置global变量改为设置local变量是否可行?为什么?
  Local变量在任务结束后无法在当前流程实例执行中使用,如果后续的流程执行需要用到此变量则会报错。

3.5.4.4、 Local变量测试2

  在部门经理审核、总经理审核、财务审核时设置local变量,可通过historyService查询每个历史任务时将流程变量的值也查询出来。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码// 创建历史任务查询对象
HistoricTaskInstanceQuery historicTaskInstanceQuery = historyService.createHistoricTaskInstanceQuery();
// 查询结果包括 local变量
historicTaskInstanceQuery.includeTaskLocalVariables();
for (HistoricTaskInstance historicTaskInstance : list) {
System.out.println("==============================");
System.out.println("任务id:" + historicTaskInstance.getId());
System.out.println("任务名称:" + historicTaskInstance.getName());
System.out.println("任务负责人:" + historicTaskInstance.getAssignee());
System.out.println("任务local变量:"+ historicTaskInstance.getTaskLocalVariables());

}

  注意:查询历史流程变量,特别是查询pojo变量需要经过反序列化,不推荐使用。

4.组任务

4.1、需求

  在流程定义中在任务结点的 assignee 固定设置任务负责人,在流程定义时将参与者固定设置在.bpmn 文件中,如果临时任务负责人变更则需要修改流程定义,系统可扩展性差。
  针对这种情况可以给任务设置多个候选人,可以从候选人中选择参与者来完成任务。

4.2、设置任务候选人

  在流程图中任务节点的配置中设置 candidate-users(候选人),多个候选人之间用逗号分开。
在这里插入图片描述

查看bpmn文件

1
xml复制代码<userTask activiti:candidateUsers="lisi,wangwu" activiti:exclusive="true" id="_3" name="经理审批"/>

  我们可以看到部门经理的审核人已经设置为 lisi,wangwu 这样的一组候选人,可以使用activiti:candiateUsers=”用户 1,用户 2,用户 3”的这种方式来实现设置一组候选人

4.3、组任务

4.3.1、组任务办理流程

a、查询组任务

  指定候选人,查询该候选人当前的待办任务。候选人不能立即办理任务。

b、拾取(claim)任务

  该组任务的所有候选人都能拾取。将候选人的组任务,变成个人任务。原来候选人就变成了该任务的负责人。如果拾取后不想办理该任务?需要将已经拾取的个人任务归还到组里边,将个人任务变成了组任务。

c、查询个人任务

  查询方式同个人任务部分,根据assignee查询用户负责的个人任务。

d、办理个人任务

4.3.2、 查询组任务

   根据候选人查询组任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码    /**
* 查询组任务
*/
@Test
public void test03(){
String key = "evection1";
String candidateUser = "lisi";
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = engine.getTaskService();
List<Task> list = taskService.createTaskQuery()
.processDefinitionKey(key)
.taskCandidateUser(candidateUser)
.list();
for (Task task : list) {
System.out.println("流程实例Id:" + task.getProcessInstanceId());
System.out.println("任务ID:" + task.getId());
System.out.println("负责人:" + task.getAssignee());
System.out.println("任务名称:" + task.getName());
}
}

4.3.3 、 拾取组任务

   候选人员拾取组任务后该任务变为自己的个人任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码    /**
* 候选人 拾取任务
*/
@Test
public void test04(){
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = engine.getTaskService();
String taskId = "72505";
// 候选人
String userId = "lisi";
// 拾取任务
Task task = taskService.createTaskQuery()
.taskId(taskId)
.taskCandidateUser(userId) // 根据候选人查询
.singleResult();
if(task != null){
// 可以拾取任务
taskService.claim(taskId,userId);
System.out.println("拾取成功");
}
}

4.3.4、 查询个人待办任务

  查询方式同个人任务查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码    @Test
public void test03(){
String key = "evection1";
String candidateUser = "lisi";
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = engine.getTaskService();
List<Task> list = taskService.createTaskQuery()
.processDefinitionKey(key)
//.taskCandidateUser(candidateUser)
//.taskCandidateOrAssigned(candidateUser)
.taskAssignee(candidateUser)
.list();
for (Task task : list) {
System.out.println("流程实例Id:" + task.getProcessInstanceId());
System.out.println("任务ID:" + task.getId());
System.out.println("负责人:" + task.getAssignee());
System.out.println("任务名称:" + task.getName());
}
}

4.3.5、 办理个人任务

  同个人任务办理

1
2
3
4
5
6
7
8
9
10
11
java复制代码    /**
* 完成个人任务
*/
@Test
public void test05(){
String taskId = "72505";
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = engine.getTaskService();
taskService.complete(taskId);
System.out.println("完成任务:" + taskId);
}

4.3.6、 归还组任务

   如果个人不想办理该组任务,可以归还组任务,归还后该用户不再是该任务的负责人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码   /**
* 归还任务
*/
@Test
public void test06(){
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = engine.getTaskService();
String taskId = "75002";
String userId= "zhangsan";
Task task = taskService.createTaskQuery()
.taskId(taskId)
.taskAssignee(userId)
.singleResult();
if(task != null){
// 如果设置为null,归还组任务,任务没有负责人
taskService.setAssignee(taskId,null);
}
}

4,3,7 任务交接

  任务负责人将任务交给其他负责人来处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码    /**
* 任务交接
*/
@Test
public void test07(){
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
TaskService taskService = engine.getTaskService();
String taskId = "75002";
String userId= "zhangsan";
Task task = taskService.createTaskQuery()
.taskId(taskId)
.taskAssignee(userId)
.singleResult();
if(task != null){
// 设置该任务的新的负责人
taskService.setAssignee(taskId,"赵六");
}
}

4.3.8、 数据库表操作

  查询当前任务执行表

1
sql复制代码SELECT * FROM act_ru_task

  任务执行表,记录当前执行的任务,由于该任务当前是组任务,所有assignee为空,当拾取任务后该字段就是拾取用户的id,查询任务参与者

1
sql复制代码SELECT * FROM act_ru_identitylink

  任务参与者,记录当前参考任务用户或组,当前任务如果设置了候选人,会向该表插入候选人记录,有几个候选就插入几个与act_ru_identitylink对应的还有一张历史表act_hi_identitylink,向act_ru_identitylink插入记录的同时也会向历史表插入记录。任务完成

5.网关

  网关用来控制流程的流向

5.1 排他网关ExclusiveGateway

5.1.1 什么是排他网关:

  排他网关,用来在流程中实现决策。 当流程执行到这个网关,所有分支都会判断条件是否为true,如果为true则执行该分支,

注意:排他网关只会选择一个为true的分支执行。如果有两个分支条件都为true,排他网关会选择id值较小的一条分支去执行。

为什么要用排他网关?

  不用排他网关也可以实现分支,如:在连线的condition条件上设置分支条件。在连线设置condition条件的缺点:如果条件都不满足,流程就结束了(是异常结束)。如果 使用排他网关决定分支的走向,如下:
在这里插入图片描述

  如果从网关出去的线所有条件都不满足则系统抛出异常。

1
2
java复制代码org.activiti.engine.ActivitiException: No outgoing sequence flow of the exclusive gateway 'exclusivegateway1' could be selected for continuing the process
at org.activiti.engine.impl.bpmn.behavior.ExclusiveGatewayActivityBehavior.leave(ExclusiveGatewayActivityBehavior.java:85)

5.1.2 流程定义

  排他网关图标,红框内:
在这里插入图片描述

5.1.3 测试

  在部门经理审核后,走排他网关,从排他网关出来的分支有两条,一条是判断出差天数是否大于3天,另一条是判断出差天数是否小于等于3天。设置分支条件时,如果所有分支条件都不是true,报错:

1
2
3
java复制代码org.activiti.engine.ActivitiException: No outgoing sequence flow of the exclusive gateway 'exclusivegateway1' could be selected for continuing the process

at org.activiti.engine.impl.bpmn.behavior.ExclusiveGatewayActivityBehavior.leave(ExclusiveGatewayActivityBehavior.java:85)

5.2 并行网关ParallelGateway

5.2.1 什么是并行网关

  并行网关允许将流程分成多条分支,也可以把多条分支汇聚到一起,并行网关的功能是基于进入和外出顺序流的:

l fork分支:

  并行后的所有外出顺序流,为每个顺序流都创建一个并发分支。

l join汇聚:

  所有到达并行网关,在此等待的进入分支, 直到所有进入顺序流的分支都到达以后, 流程就会通过汇聚网关。
注意,如果同一个并行网关有多个进入和多个外出顺序流, 它就同时具有分支和汇聚功能。 这时,网关会先汇聚所有进入的顺序流,然后再切分成多个并行分支。

与其他网关的主要区别是,并行网关不会解析条件。 即使顺序流中定义了条件,也会被忽略。

例子:在这里插入图片描述
说明:

  技术经理和项目经理是两个execution分支,在act_ru_execution表有两条记录分别是技术经理和项目经理,act_ru_execution还有一条记录表示该流程实例。待技术经理和项目经理任务全部完成,在汇聚点汇聚,通过parallelGateway并行网关。并行网关在业务应用中常用于会签任务,会签任务即多个参与者共同办理的任务。

5.2.2 流程定义

  并行网关图标,红框内:
在这里插入图片描述

5.2.3 测试

  当执行到并行网关数据库跟踪如下:当前任务表:SELECT * FROM act_ru_task

在这里插入图片描述

上图中:有两个任务当前执行。查询流程实例执行表:SELECT * FROM act_ru_execution
在这里插入图片描述
上图中,说明当前流程实例有多个分支(两个)在运行。

对并行任务的执行:
并行任务执行不分前后,由任务的负责人去执行即可。
执行技术经理任务后,查询当前任务表 SELECT * FROM act_ru_task

在这里插入图片描述
已完成的技术经理任务在当前任务表act_ru_task_已被删除。
在流程实例执行表:SELECT * FROM act_ru_execution有中多个分支存在且有并行网关的汇聚结点。
在这里插入图片描述

有并行网关的汇聚结点:说明有一个分支已经到汇聚,等待其它的分支到达。
当所有分支任务都完成,都到达汇聚结点后:
流程实例执行表:SELECT * FROM act_ru_execution,执行流程实例已经变为总经理审批,说明流程执行已经通过并行网关
在这里插入图片描述
总结:所有分支到达汇聚结点,并行网关执行完成。

5.3 包含网关InclusiveGateway

5.3.1 什么是包含网关

  包含网关可以看做是排他网关和并行网关的结合体。 和排他网关一样,你可以在外出顺序流上定义条件,包含网关会解析它们。 但是主要的区别是包含网关可以选择多于一条顺序流,这和并行网关一样。
  包含网关的功能是基于进入和外出顺序流的:

l 分支:

  所有外出顺序流的条件都会被解析,结果为true的顺序流会以并行方式继续执行, 会为每个顺序流创建一个分支。

l 汇聚:

  所有并行分支到达包含网关,会进入等待状态, 直到每个包含流程token的进入顺序流的分支都到达。 这是与并行网关的最大不同。换句话说,包含网关只会等待被选中执行了的进入顺序流。 在汇聚之后,流程会穿过包含网关继续执行。

5.3.2 流程定义:

  出差申请大于等于3天需要由项目经理审批,小于3天由技术经理审批,出差申请必须经过人事经理审批。
包含网关图标,红框内:
在这里插入图片描述
定义流程:
在这里插入图片描述
注意:通过包含网关的每个分支的连线上设置condition条件。

5.3.3 测试

  如果包含网关设置的条件中,流程变量不存在,报错;

1
java复制代码org.activiti.engine.ActivitiException: Unknown property used in expression: ${evection.num>=3}

  需要在流程启动时设置流程变量evection.num。

1)、当流程执行到第一个包含网关后,会根据条件判断,当前要走哪几个分支:

  流程实例执行表:SELECT * FROM act_ru_execution
在这里插入图片描述

第一条记录:包含网关分支。

后两条记录代表两个要执行的分支:
ACT_ID = “_13” 代表 项目经理神品
ACT_ID = “_5” 代表 人事经理审批
当前任务表:ACT_RU_TASK

在这里插入图片描述

上图中,项目经理审批、人事经理审批 都是当前的任务,在并行执行。
如果有一个分支执行先走到汇聚结点的分支,要等待其它执行分支走到汇聚。

2)、先执行项目经理审批,然后查询当前任务表:ACT_RU_TASK
在这里插入图片描述
当前任务还有人事经理审批需要处理。
流程实例执行表:SELECT * FROM act_ru_execution
在这里插入图片描述
  发现人事经理的分支还存在,而项目经理分支已经走到ACT_ID = _18的节点。而ACT_ID=__18就是第二个包含网关这时,因为有2个分支要执行,包含网关会等所有分支走到汇聚才能执行完成。

3)、执行人事经理审批
然后查询当前任务表:ACT_RU_TASK
在这里插入图片描述

  当前任务表已经不是人事经理审批了,说明人事经理审批已经完成。 流程实例执行表:SELECT * FROM act_ru_execution
在这里插入图片描述

包含网关执行完成,分支和汇聚就从act_ru_execution删除。

小结:在分支时,需要判断条件,符合条件的分支,将会执行,符合条件的分支最终才进行汇聚。

5.4 事件网关EventGateway

  事件网关允许根据事件判断流向。网关的每个外出顺序流都要连接到一个中间捕获事件。 当流程到达一个基于事件网关,网关会进入等待状态:会暂停执行。与此同时,会为每个外出顺序流创建相对的事件订阅。

  事件网关的外出顺序流和普通顺序流不同,这些顺序流不会真的”执行”, 相反它们让流程引擎去决定执行到事件网关的流程需要订阅哪些事件。 要考虑以下条件:

  1. 事件网关必须有两条或以上外出顺序流;
  2. 事件网关后,只能使用intermediateCatchEvent类型(activiti不支持基于事件网关后连接ReceiveTask)
  3. 连接到事件网关的中间捕获事件必须只有一个入口顺序流。

5.4.1流程定义

事件网关图标,红框内

在这里插入图片描述

intermediateCatchEvent:
在这里插入图片描述

intermediateCatchEvent支持的事件类型:
Message Event: 消息事件
Singal Event: 信号事件
Timer Event: 定时事件
在这里插入图片描述

使用事件网关定义流程:
在这里插入图片描述

~好了进阶篇的内容就给大家介绍到这儿了,如果感觉有帮助欢迎点赞关注加收藏哦 V_V

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

vivo商城促销系统架构设计与实践-概览篇 一、前言 二、系

发表于 2021-06-28

一、前言

随着商城业务渠道不断扩展,促销玩法不断增多,原商城v2.0架构已经无法满足不断增加的活动玩法,需要进行促销系统的独立建设,与商城解耦,提供纯粹的商城营销活动玩法支撑能力。

我们将分系列来介绍vivo商城促销系统建设的过程中遇到的问题和解决方案,分享架构设计经验。

二、系统框架

2.1 业务梳理

在介绍业务架构前我们先简单了解下vivo商城促销系统业务能力建设历程,对现促销能力进行梳理回顾。在商城v2.0中促销功能存在以下问题:

1. 促销模型不够抽象,维护混乱,没有独立的活动库存;

2. 混乱的活动共融互斥关系管理,缺乏统一的促销计价能力。

商城核心交易链路中商详页、购物车、下单这三块关于计价逻辑是分开独立维护的,没有统一,如下图所示。显然随着促销优惠的增加或者玩法的变动,商城侧业务重复开发量会显著加大。

(图2-1. 促销计价统一前)

3. 促销性能无法满足活动量级,往往会影响商城主站的性能。

因与商城系统耦合,无法提供针对性的性能优化,造成系统无法支撑越来越频繁的大流量场景下大促活动。

基于这些痛点问题,我们一期完成促销系统的独立,与商城解耦,搭建出促销系统核心能力:

优惠活动管理

对所有优惠活动抽象出统一的优惠模型和配置管理界面,提供活动编辑、修改、查询及数据统计等功能。并独立出统一的活动库存管理,便于活动资源的统一把控。

促销计价

基于高度灵活、抽象化的计价引擎能力,通过定义分层计价的促销计价模型,制定统一的优惠叠加规则与计价流程,实现vivo商城促销计价能力的建设。推动完成vivo商城所有核心链路接入促销计价,实现全链路优惠价格计算的统一,如下图:

(图2-2. 促销计价统一后)

随着一期促销系统核心能力的完成,极大的满足了业务需要,各类优惠玩法随之增多。但伴随而来的就是各种运营痛点:

  • 维护的促销活动无法提前点检,检查活动效果是否符合预期;
  • 随着优惠玩法的增多,一个商品所能享受的优惠越来越多,配置也越来越复杂,极易配置错误造成线上事故;

为此我们开始促销系统二期的能力建设,着重解决以上运营痛点:

  • 提供时光穿越功能,实现用户能够“穿越”至未来某个时间点,从而实现促销活动的提前点检;
  • 提供价格监控功能,结合「商城营销价格能力矩阵」规划的能力,通过事前/事中/事后多维度监控措施,来“降低出错概率,出错能及时止损”。

2.2 促销与优惠券

促销的主要目的就是向用户传递商品的各种优惠信息,提供优惠利益,吸引用户购买,从而起到促活拉新、提高销量的目的。从这种角度来看,优惠券也属于促销的一部分。

但因一些原因vivo商城促销系统独立过程中,并没有与促销系统放一块:

  • 首先,优惠券系统在商城v2.0时就已独立,已经对接很多上游业务,已经是成熟的中台系统;
  • 再者,就是优惠券也有相较与其它促销优惠的业务特殊性,如有发券、领券能力。

在考虑设计改造成本就未将优惠券包括在促销系统能力范畴,但优惠券毕竟也是商品价格优惠的一部分,因此促销计价需要依赖优惠券系统提供券优惠的能力。

2.3 业务架构&流程

至此我们也就梳理出整个促销系统的大概能力矩阵,整体架构设计如下:

(图2-3. 促销系统架构)

而随着促销系统独立,整个商城购物流程与促销系统的关系如下:

(图2-4. 最新商城购物流程)

三、技术挑战

作为中台能力系统,促销系统面临的技术挑战包括以下几方面:

  • 面对复杂多变的促销玩法、优惠叠加规则,如何让系统具备可扩展性,满足日益多变的优惠需求,提升开发与运营效率。
  • 面对新品发布、双11大为客户等大流量场景,如何满足高并发场景下的高性能要求。
  • 面对来自上游业务方的不可信调用,以及下游依赖方的不可靠服务等复杂系统环境,如何提升系统整体的稳定性,保障系统的高可用。

我们结合自身业务特点,梳理出一些技术解决方案。

3.1 可扩展性

扩展性提升主要体现在两块:

  • 优惠模型的定义,对所有优惠活动抽象出统一的优惠模型和配置管理界面;
  • 促销计价引擎的建立,计价模型的统一。

相关的详细设计内容,会有后续文章进行说明。

3.2 高并发/高性能

缓存

缓存几乎就是解决性能问题的“银弹”,在促销系统中也大量使用缓存进行性能提升,包括使用redis缓存与本地缓存。而使用缓存就需要关注数据一致性问题,redis缓存还好解决,但本地缓存不就好处理了。因此本地缓存的使用要看业务场景,尽量是数据不经常变更且业务上能接受一定不一致的场景。

批量化

促销系统的业务场景属于典型的读多写少场景,而读的过程中对性能影响最大的就是IO操作,包括db、redis以及第三方远程调用。而对这些IO操作进行批量化改造,以空间换时间,减少IO交互次数也是性能优化的一大方案。

精简化/异步化

简化功能实现,将非核心任务进行异步化改造。如活动编辑后的缓存处理、资源预占后的消息同步、拼团状态流转的消息通知等等。

冷热分离

对于读多写少场景对性能影响最大的除了IO操作,还有就是数据量,在促销系统中也存在一些用户态数据,如优惠资源预占记录、用户拼团信息等。这些数据都具备时间属性,存在热尾效应,大部分情况下需要的都是最近的数据。针对这类场景对数据进行冷热分离是最佳选择。

3.3 系统稳定性

限流降级

基于公司的限流组件,对非核心的服务功能进行流量限制与服务降级,高并发场景下全力保障整体系统的核心服务

幂等性

所有接口均具备幂等性,避免业务方的网络超时重试造成的系统异常

熔断

使用Hystrix组件对外部系统的调用添加熔断保护,防止外部系统的故障造成整个促销系统的服务崩溃

监控和告警

通过配置日志平台的错误日志报警、调用链的服务分析告警,再加上公司各中间件和基础组件的监控告警功能,让我们能够第一时间发现系统异常

四、踩过的坑

4.1 Redis SCAN命令使用

在Redis缓存数据清除的处理过程中,存在部分缓存key是通过模糊匹配的方式进行查找并清除操作,底层依赖Redis SCAN命令。

SCAN命令是一个基于游标的迭代器,每次被调用之后都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为 SCAN 命令的游标参数, 以此来延续之前的迭代过程。

对于使用KEYS命令,SCAN命令并不是一次性返回所有匹配结果,减少命令操作对Redis系统的阻塞风险。但并不是说SCAN命令就可以随便用,其实在大数据量场景下SCAN存在与KEYS命令一样的风险问题,极易造成Redis负载升高,响应变慢,进而影响整个系统的稳定性。

(图4-1 Redis负载升高)

(图4-2 Redis响应出现尖刺)

而解决方案就是:

  • 优化Redis key设计,减少不必要的缓存key;
  • 移除SCAN命令使用,通过精确匹配查找进行清除操作。

4.2 热点key问题

在促销系统中普遍使用redis缓存进行性能提升,缓存数据很多都是SKU商品维度。在新品发布、特定类型手机大促等业务场景下极容易产生热点Key问题。

热点Key具有聚集效应,会导致Redis集群内节点负载出现不均衡,进而造成整个系统不稳定。该问题是普通的机器扩容无法解决的。如下图某次线上摸排压测时redis负载情况:

常用的解决方案有两种:

  • 散列方案:对Redis Key进行散列,平均分散到RedisCluster Nodes中,解决热点Key的聚集效应。
  • 多级缓存方案:对热点Key增加使用本地缓存,最大限度加速访问性能,降低Redis节点负载。

我们是采用多级缓存方案,参照优秀的开源热点缓存框架,定制化扩展出一整套热点解决方案,支持热点探测 、本地缓存 、集群广播以及热点预热功能,做到准实时热点探测并将热点Key通知实例集群进行本地缓存,极大限度避免大量重复调用冲击分布式缓存,提升系统运行效率。

五、总结

本篇属于vivo商城促销系统概览介绍篇,简单回顾了vivo商城促销系统业务能力建设历程及系统架构,并分享遇到的技术问题与解决方案。后续我们会对促销系统的核心功能模块(优惠活动管理、促销计价、价格监控和时光穿越)的设计实践进行逐个分享,敬请期待。

作者:vivo互联网官方商城开发团

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

15分钟全面掌握zookeeper-zab协议

发表于 2021-06-28

ZAB协议介绍

基本概念

ZAB(ZooKeeper Atomic Broadcast)ZooKeeper原子消息广播协议。ZAB 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。

在ZooKeeper中,主要依赖ZAB协议来实现分布式数据一致性,基于该协议,ZooKeeper实现了一种主备模式的系统架构来保持集群中各副本之间数据的一致性。

具体的,ZooKeeper使用一个单一的主进程来接收并处理客户端的所有事务请求,并采用ZAB的原子广播协议,将服务器数据的状态变更以事务Proposal的形式广播到所有的副本进程上去。

考虑到主进程在任何时候都有可能出现崩溃退出或重启现象,因此,ZAB协议还需要做到在当前主进程出现上述异常情况的时候,依旧能够正常工作,也就是崩溃恢复。

ZAB大体流程:

  • 所有事务请求必须由一个全局唯一的服务器来协调处理,这样的服务器被称为 Leader服务器,
  • 余下的其他服务器则成为 Follower 服务器。
  • Leader 服务器负责将一个客户端事务请求转换成一个事务 Proposal(提议),并将该 Proposal 分发给集群中所有的Follower 服务器。
  • Leader 服务器需要等待所有 Follower 服务器的反馈,等待超过半数的 Follower 服务器进行了正确的反馈
  • Leader 就会再次向所有的 Follower服务器分发Commit消息,要求其将前一个Proposal进行提交。

关键词:主从集群模式、分布式事务、ZAB、二阶段提交

ZAB协议包括两种基本的模式,分别是消息广播和崩溃恢复

消息广播

上面流程已经介绍了消息广播模式的流程,类似于一个二阶段提交过程。

image-20210628124137915.png

此处 ZAB 协议中涉及的二阶段提交过程则与其略有不同。在 ZAB 协议的二阶段提交过程中,移除了中断逻辑,所有的 Follower 服务器要么正常反馈 Leader 提出的事务 Proposal,要么就抛弃Leader服务器。同时,ZAB协议将二阶段提交中的中断逻辑移除意味着我们可以在过半的 Follower 服务器已经反馈 Ack 之后就开始提交事务 Proposal 了,而不需要等待集群中所有的Follower服务器都反馈响应。一句话过半集群响应即可继续执行事务请求。

举个例子来说,一个由3台机器组成的ZAB服务,通常由1个Leader、2个Follower服务器组成。某一个时刻,假如其中一个 Follower服务器挂了,仍旧会将Commit广播,整个 ZAB 集群是不会中断服务,这是因为Leader服务器依然能够获得过半机器(包括Leader自己)的支持。

在整个消息广播过程中,Leader服务器会为每个事务请求生成对应的Proposal来进行广播,并且在广播事务Proposal之前,Leader服务器会首先为这个事务Proposal分配一个全局单调递增的唯一ID,我们称之为事务ID(即ZXID);因此可以利用递增特性保证消息处理的先后顺序,这也是ZK特性之一FIFO。

顺便提一句,如果集群中的其他机器接收到客户端的事务请求,那么这些非Leader服务器会首先将这个事务请求转发给Leader服务器。

具体实现

具体的,在消息广播过程中,Leader服务器会为每一个Follower服务器都各自分配一个单独的队列,然后将需要广播的事务 Proposal 依次放入这些队列中去,并且根据 FIFO策略进行消息发送。每一个Follower服务器在接收到这个事务Proposal之后,都会首先将其以事务日志的形式写入到本地磁盘中去,并且在成功写入后反馈给Leader服务器一个Ack响应。当Leader服务器接收到超过半数Follower的Ack响应后,就会广播一个Commit消息给所有的Follower服务器以通知其进行事务提交,同时Leader自身也会完成对事务的提交,而每一个Follower服务器在接收到Commit消息后,也会完成对事务的提交。

崩溃恢复

当整个服务框架在启动过程中,或是当Leader服务器出现网络中断、崩溃退出与重启等异常情况时,ZAB 协议就会进入恢复模式并选举产生新的 Leader 服务器。当选举产生了新的Leader服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步之后,ZAB协议就会退出恢复模式。其中,所谓的状态同步是指数据同步,用来保证集群中存在过半的机器能够和Leader服务器的数据状态保持一致。

基本特性:ZAB协议规定了如果一个事务Proposal在一台机器上被处理成功,那么应该在所有的机器上都被处理成功。这是如何做到的呢?

完成Leader选举之后,在正式开始工作(即接收客户端的事务请求,然后提出新的提案)之前,Leader服务器会首先确认事务日志中的所有Proposal是否都已经被集群中过半的机器提交了,即是否完成数据同步。接下来看看数据同步过程

数据同步

Leader服务器会为每一个Follower服务器都准备一个队列,并将那些没有被各Follower服务器同步的事务以Proposal消息的形式逐个发送给Follower服务器,并在每一个Proposal消息后面紧接着再发送一个Commit消息。等到Follower服务器将所有其尚未同步的事务 Proposal 都从 Leader 服务器上同步过来并成功应用到本地数据库中后,Leader服务器就会将该Follower服务器加入到真正的可用Follower列表中。如何判断Follower服务器事务是否需要同步呢?这就依靠ZXID的特性。

ZAB协议的事务编号ZXID,ZXID是一个64位的数字,低32位代表事务编号,单调递增每次加一;高32位则代表了Leader周期epoch的编号,每当选举产生一个新的Leader服务器,就会从这个Leader服务器上取出其本地日志中最大事务Proposal的ZXID,并从该ZXID中解析出对应的epoch值,然后再对其进行加1操作,然后将低32位从0开始重新计数。再利用高32对即将成为的Follower服务器的低32比较,就可以同步事务一致性。

对ZAB探讨完后,数据恢复中会有Leader选举过程,紧接着继续详细说明Zookeeper选举

Zookeeper选举

​ 选举主要分为两个阶段:服务器启动、服务器运行期间的Leader选举。

服务器启动期间选举

  1. 每个Server发出一个投票

由于是初始情况,因此对于Server1和Server2来说,都会将自己作为Leader服务器来进行投票,每次投票包含的最基本的元素包括:所推举的服务器的 myid 和ZXID,我们以(myid,ZXID)的形式来表示。因为是初始化阶段,因此无论是Server1还是Server2,都会投给自己,即Server1的投票为(1,0),Server2的投票为(2,0),然后各自将这个投票发给集群中其他所有机器。
2. 接收来自各个服务器的投票

每个服务器都会接收来自其他服务器的投票。集群中的每个服务器在接收到投票后,首先会判断该投票的有效性,包括检查是否是本轮投票、是否来自LOOKING状态的服务器。
3. 处理投票

在接收到来自其他服务器的投票后,针对每一个投票,服务器都需要将别人的投票和自己的投票进行PK,PK的规则如下。

* 优先检查ZXID。ZXID比较大的服务器优先作为Leader。
* 如果ZXID相同的话,那么就比较myid。myid比较大的服务器作为Leader服务器。
  1. 统计投票

​ 每次投票后,服务器都会统计所有投票,判断是否已经有过半的机器接收到相同的投票信息。“过半”就是指大于集群机器数量的一半,即大于或等于(n/2+1)

  1. 改变服务器状态
    一旦确定了 Leader,每个服务器就会更新自己的状态:如果是 Follower,那么就变更为FOLLOWING,如果是Leader,那么就变更为LEADING。

服务器运行期间的Leader选举

当Leader所在的机器挂了,那么整个集群将暂时无法对外服务,而是进入新一轮的Leader选举。服务器运行期间的Leader选举和启动时期的Leader选举基本过程是一致的。

  1. 变更状态。

当 Leader 挂了之后,余下的非 Observer 服务器都会将自己的服务器状态变更为LOOKING,然后开始进入Leader选举流程。
2. 每个Server会发出一个投票
在这个过程中,需要生成投票信息(myid,ZXID)。因为是运行期间,因此每个服务器上的ZXID可能不同,我们假定Server1的ZXID为123,而Server3的ZXID为 122。在第一轮投票中,Server1 和 Server3 都会投自己,即分别产生投票(1,123)和(3,122),然后各自将这个投票发给集群中所有机器。
3. 以下步骤跟启动时期的Leader选举过程是一致的。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

《springcloud超级入门教程》Spring Clou

发表于 2021-06-28

​
这是我参与更文挑战的第9天,活动详情查看:更文挑战

[为什么要使用学习springcloud以及他的优势]

Spring cloud是一系列框架的有序集合。它利用 Spring Boot 的开发便利性,巧妙地简化了分布式系统基础设施的开发,如服务注册、服务发现、配置中心、消息总线、负载均衡、断路器、数据监控等,这些都可以用 Spring Boot 的开发风格做到一键启动和部署。

通俗地讲,Spring Cloud 就是用于构建微服务开发和治理的框架集合(并不是具体的一个框架),主要贡献来自 Netflix OSS。
Spring Cloud 模块介绍
Spring Cloud 模块的相关介绍如下:

Eureka:服务注册中心,用于服务管理。
Ribbon:基于客户端的负载均衡组件。
Hystrix:容错框架,能够防止服务的雪崩效应。
Feign:Web 服务客户端,能够简化 HTTP 接口的调用。
Zuul:API 网关,提供路由转发、请求过滤等功能。
Config:分布式配置管理。
Sleuth:服务跟踪。
Stream:构建消息驱动的微服务应用程序的框架。
Bus:消息代理的集群消息总线。

除了上述模块,还有 Cli、Task等。教程中只介绍一些常用的模块。

Spring Cloud 是一个非常好的框架集合,它包含的功能模块非常多,不可能一一讲解到,凡是在教程中出现的模块都是真实开发中用得到的。

Spring Cloud 版本介绍
相信大家跟笔者一样,在第一次访问 Spring Cloud 官网时一定会有一个疑惑那就是版本太多了,到底哪个是稳定版本?哪个才是自己需要的版本?接下来就给大家简单介绍一下版本的问题。

访问官网 projects.spring.io/spring-clou… 可以看到网页右侧的版本列表,如图 1 所示。
Spring Cloud版本

图 1 Spring Cloud版本

从图 1 中可以看到 Spring Cloud 不是像别的项目那样,版本号采用 1.1、1.2、1.3 这种的格式。因为 Spring Cloud 是一个拥有诸多子项目的大型综合项目,可以说是对微服务架构解决方案的综合套件组件,其中包含的各个子项目都独立进行着内容的迭代与更新,各自维护着自己的发布版本号。

至于怎么选择适合自己的版本,笔者认为,大家可以在接触的时候直接选最新的稳定版本。新版本中的 Bug 肯定要少,并且更稳定。

本教程的案例都是基于 Finchley SR2 进行讲解的。不同的版本有不同的功能,对应的每个子模块的版本也不一样,那么如何知道每个大版本下面具体的子模块是什么版本呢?

答案就在官网的首页上面,在页面的最下方有一个表格(见表 1 ),通过这个表格我们可以清楚地知道 Finchley SR2 对应的 Spring Boot 版本是 2.0.6.RELEASE,Spring-Cloud-Bus 是 2.0.0.RELEASE。

表 1 Spring Cloud版本列表
Component Edgware.SR5 Finchley.SR2 Finchley.BUILD-SNAPSHOT
spring-cloud-aws 1.2.3RELEASE 2.0.1.RELEASE 2.0.1.BUILD-SNAPSHOT
spring-cloud-bus 1.3.3.RELEASE 2.0.0.RELEASE 2.0.1.BUILD-SNAPSHOT
spring-cloud-cli 1.4.1.RELEASE 2.0.0.RELEASE 2.0.1.BUILD-SNAPSHOT
spring-cloud-commons 1.3.5.RELEASE 2.0.2.RELEASE 2.0.2.BUILD-SNAPSHOT
spring-cloud-contract 1.2.6.RELEASE 2.0.2.RELEASE 2.0.2.BUILD-SNAPSHOT
spring-cloud-config 1.4.5.RELEASE 2.0.2.RELEASE 2.0.2.BUILD-SNAPSHOT
spring-cloud-netflix 1.4.6.RELEASE 2.0.2.RELEASE 2.0.2.BUILD-SNAPSHOT
spring-cloud-security 1.2.3.RELEASE 2.0.1.RELEASE 2.0.1.BUILD-SNAPSHOT
spring-cloud-cloudfoundry 1.1.2.RELEASE 2.0.1.RELEASE 2.0.1.BUILD-SNAPSHOT
spring-cloud-consul 1.3.5.RELEASE 2.0.1.RELEASE 2.0.2.BUILD-SNAPSHOT
spring-cloud-sleuth 1.3.5.RELEASE 2.0.2.RELEASE 2.0.2.BUILD-SNAPSHOT
spring-cloud-stream Ditmars.SR4 Elmhurst.SRI Elmhurst. BUILD-SNAPSHOT
spring-cloud-zookeeper 1.2.2.RELEASE 2.0.0.RELEASE 2.0.1.BUILD-SNAPSHOT
spring-boot 1.5.16.RELEASE 2.0.6.RELEASE 2.0.7.BUILD-SNAPSHOT
spring-cloud-task 1.2.3.RELEASE 2.0.0.RELEASE 2.0.1.BUILD-SNAPSHOT
spring-cloud-vault 1.1.2.RELEASE 2.0.2.RELEASE 2.0.2.BUILD-SNAPSHOT
spring-cloud-gateway 1.0.2.RELEASE 2.0.2.RELEASE 2.0.2.BUILD-SNAPSHOT

上一篇 Spring Cloud和Dubbo的区别及各自的优缺点

下一篇详细介绍Spring Cloud开发环境的准备和Lombok安装步骤

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

nginx 403 forbidden的解决

发表于 2021-06-28

nginx访问时报403

image.png

于是查看nginx日志,路径为/var/log/nginx/error.log。打开日志发现报错Permission denied,详细报错如下:

image.png

由于启动用户和nginx工作用户不一致所致

1.1查看nginx的启动用户,发现是nobody,而为是用root启动的

命令:ps aux | grep “nginx: worker process” | awk ‘{print $1}’

image.png

1.2将nginx.config的user改为和启动用户一致,

命令:vi conf/nginx.conf

image.png

1.3 权限问题,如果nginx没有web目录的操作权限,也会出现403错误。

解决办法:修改web目录的读写权限,或者是把nginx的启动用户改成目录的所属用户,重启Nginx即可解决

  1. chmod -R 777 /data
  2. chmod -R 777 /data/www/

image.png

1.4 setenforce 0
设置SELinux 成为permissive模式 临时关闭selinux

将SELINUX=enforcing 修改为 SELINUX=disabled ,永久关闭selinux。

1
2
3
ini复制代码vi /etc/selinux/config
#SELINUX=enforcing
SELINUX=disabled

重启生效 reboot

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Mysql 8 和mysql 57 的区别

发表于 2021-06-28
  1. NoSql存储
    Mysql从5.7 版本提供了NoSQL的存储功能,在8.0中这部分得到一些修改,不过这个在实际中用的极少

image.png

2.隐藏索引
隐藏索引的特性对于性能调试非常有用,在8.0 中,索引可以被隐藏和显示,当一个索引隐藏时,他不会被查询优化器所使用

image.png

也就是说可以隐藏一个索引,然后观察对数据库的影响.如果性能下降,就说明这个索引是有效的,于是将其”恢复显示”即可;如果数据库性能看不出变化,说明这个索引是多于的,可以删掉了

隐藏一个索引的语法

1
sql复制代码ALTER TABLE t ALTER INDEX i INVISIBLE;

恢复显示该索引的语法是:

1
sql复制代码ALTER TABLE t ALTER INDEX i VISIBLE;

当一个索引被隐藏时,我们可以从show index命令的输出汇总看出,该索引visible属性值为No

**注意:**当索引被隐藏时,他的内容仍然是和正常索引一样实时更新的,这个特性本身是专门为了优化调试而使用的,如果你长期隐藏一个索引,那还不如干掉,因为索引的存在会影响数据的插入\更新和删除功能

3.设置持久化
MySQL 的设置可以在运行时通过 SET GLOBAL 命令来更改,但是这种更改只会临时生效,到下次启动时数据库又会从配置文件中读取。
MySQL 8 新增了 SET PERSIST 命令,例如:

1
sql复制代码SET PERSIST max_connections = 500;

MySQL 会将该命令的配置保存到数据目录下的 mysqld-auto.cnf 文件中,下次启动时会读取该文件,用其中的配置来覆盖缺省的配置文件。

4.UTF-8 编码
从 MySQL 8 开始,数据库的缺省编码将改为 utf8mb4,这个编码包含了所有 emoji 字符。多少年来我们使用 MySQL 都要在编码方面小心翼翼,生怕忘了将缺省的 latin 改掉而出现乱码问题。从此以后就不用担心了。

5.通用表表达式(Common Table Expressions)
复杂的查询会使用嵌入式表,例如:

1
2
3
sql复制代码SELECT t1.*, t2.* FROM
(SELECT col1 FROM table1) t1,
(SELECT col2 FROM table2) t2;

而有了 CTE,我们可以这样写:

1
2
3
4
5
sql复制代码WITH
t1 AS (SELECT col1 FROM table1),
t2 AS (SELECT col2 FROM table2)
SELECT t1.*, t2.*
FROM t1, t2;

这样看上去层次和区域都更加分明,改起来也更清晰的知道要改哪一部分。
这个特性在很多报表场景是很有用的,也是mysql优化的一个很重要特性。

1.235窗口函数(Window Functions)
MySQL 被吐槽最多的特性之一就是缺少 rank() 函数,当需要在查询当中实现排名时,必须手写 @ 变量。但是从 8.0 开始,MySQL 新增了一个叫窗口函数的概念,它可以用来实现若干新的查询方式。
窗口函数有点像是 SUM()、COUNT() 那样的集合函数,但它并不会将多行查询结果合并为一行,而是将结果放回多行当中。也就是说,窗口函数是不需要 GROUP BY 的。

假设我们有一张 “班级学生人数” 表:

如果要对班级人数从小到大进行排名,可以这样利用窗口函数:

说明:在这里创建了名为 w 的 window,规定它对 stu_count 字段进行排序,然后在 select 子句中对 w 执行 rank() 方法,将结果输出为 rank 字段。
这个特性也是Oracle11g有的一个新特性,在优化也是起着很重要的作用。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

关于三次握手和四次挥手,面试官想听到怎样的回答?

发表于 2021-06-28

秋招面腾讯阿里字节,面试官都给我抛了这个百问不厌的问题,幸好我提前准备过,完美应付,下面是我根据个人经验总结出来的,按照我说的,基本稳。

三次握手

当面试官问你为什么需要有三次握手、三次握手的作用、讲讲三次三次握手的时候,我想很多人会这样回答:

首先很多人会先讲下握手的过程:

1、第一次握手:客户端给服务器发送一个 SYN 报文。

2、第二次握手:服务器收到 SYN 报文之后,会应答一个 SYN+ACK 报文。

3、第三次握手:客户端收到 SYN+ACK 报文之后,会回应一个 ACK 报文。

4、服务器收到 ACK 报文之后,三次握手建立完成。

作用是为了确认双方的接收与发送能力是否正常。

这里我顺便解释一下为啥只有三次握手才能确认双方的接受与发送能力是否正常,而两次却不可以:

第一次握手:客户端发送网络包,服务端收到了。这样服务端就能得出结论:客户端的发送能力、服务端的接收能力是正常的。

第二次握手:服务端发包,客户端收到了。这样客户端就能得出结论:服务端的接收、发送能力,客户端的接收、发送能力是正常的。不过此时服务器并不能确认客户端的接收能力是否正常。

第三次握手:客户端发包,服务端收到了。这样服务端就能得出结论:客户端的接收、发送能力正常,服务器自己的发送、接收能力也正常。

因此,需要三次握手才能确认双方的接收与发送能力是否正常。

这样回答其实也是可以的,但我觉得,这个过程的我们应该要描述的更详细一点,因为三次握手的过程中,双方是由很多状态的改变的,而这些状态,也是面试官可能会问的点。所以我觉得在回答三次握手的时候,我们应该要描述的详细一点,而且描述的详细一点意味着可以扯久一点。加分的描述我觉得应该是这样:

刚开始客户端处于 closed 的状态,服务端处于 listen 状态。然后

1、第一次握手:客户端给服务端发一个 SYN 报文,并指明客户端的初始化序列号 SN(c)。此时客户端处于 SYN_Send 状态。

2、第二次握手:服务器收到客户端的 SYN 报文之后,会以自己的 SYN 报文作为应答,并且也是指定了自己的初始化序列号 ISN(s),同时会把客户端的 ISN + 1 作为 ACK 的值,表示自己已经收到了客户端的 SYN,此时服务器处于 \SYN_REVD** 的状态。

3、第三次握手:客户端收到 SYN 报文之后,会发送一个 ACK 报文,当然,也是一样把服务器的 ISN + 1 作为 ACK 的值,表示已经收到了服务端的 SYN 报文,此时客户端处于 establised 状态。

4、服务器收到 ACK 报文之后,也处于 establised 状态,此时,双方以建立起了链接。

三次握手的作用

三次握手的作用也是有好多的,多记住几个,保证不亏。例如:

1、确认双方的接受能力、发送能力是否正常。

2、指定自己的初始化序列号,为后面的可靠传送做准备。

单单这样还不足以应付三次握手,面试官可能还会问一些其他的问题,例如:

1、(ISN)是固定的吗

三次握手的一个重要功能是客户端和服务端交换ISN(Initial Sequence Number), 以便让对方知道接下来接收数据的时候如何按序列号组装数据。

如果ISN是固定的,攻击者很容易猜出后续的确认号,因此 ISN 是动态生成的。

2、什么是半连接队列

服务器第一次收到客户端的 SYN 之后,就会处于 SYN_RCVD 状态,此时双方还没有完全建立其连接,服务器会把此种状态下请求连接放在一个队列里,我们把这种队列称之为半连接队列。当然还有一个全连接队列,就是已经完成三次握手,建立起连接的就会放在全连接队列中。如果队列满了就有可能会出现丢包现象。

这里在补充一点关于SYN-ACK 重传次数的问题: 服务器发送完SYN-ACK包,如果未收到客户确认包,服务器进行首次重传,等待一段时间仍未收到客户确认包,进行第二次重传,如果重传次数超 过系统规定的最大重传次数,系统将该连接信息从半连接队列中删除。注意,每次重传等待的时间不一定相同,一般会是指数增长,例如间隔时间为 1s, 2s, 4s, 8s, ….

3、三次握手过程中可以携带数据吗

很多人可能会认为三次握手都不能携带数据,其实第三次握手的时候,是可以携带数据的。也就是说,第一次、第二次握手不可以携带数据,而第三次握手是可以携带数据的。

为什么这样呢?大家可以想一个问题,假如第一次握手可以携带数据的话,如果有人要恶意攻击服务器,那他每次都在第一次握手中的 SYN 报文中放入大量的数据,因为攻击者根本就不理服务器的接收、发送能力是否正常,然后疯狂着重复发 SYN 报文的话,这会让服务器花费很多时间、内存空间来接收这些报文。也就是说,第一次握手可以放数据的话,其中一个简单的原因就是会让服务器更加容易受到攻击了。

而对于第三次的话,此时客户端已经处于 established 状态,也就是说,对于客户端来说,他已经建立起连接了,并且也已经知道服务器的接收、发送能力是正常的了,所以能携带数据页没啥毛病。

四次挥手

由于在面试中,三次握手是被问的最频繁的面试题,所以本次我们从面试的角度来讲解三次握手

四次挥手也一样,千万不要对方一个 FIN 报文,我方一个 ACK 报文,再我方一个 FIN 报文,我方一个 ACK 报文。然后结束,最好是说的详细一点,例如想下面这样就差不多了,要把每个阶段的状态记好,我上次面试就被问了几个了,呵呵。我答错了,还以为自己答对了,当时还解释的头头是道,呵呵。

刚开始双方都处于 establised 状态,假如是客户端先发起关闭请求,则:

1、第一次挥手:客户端发送一个 FIN 报文,报文中会指定一个序列号。此时客户端处于FIN_WAIT1状态。

2、第二次握手:服务端收到 FIN 之后,会发送 ACK 报文,且把客户端的序列号值 + 1 作为 ACK 报文的序列号值,表明已经收到客户端的报文了,此时服务端处于 CLOSE_WAIT状态。

3、第三次挥手:如果服务端也想断开连接了,和客户端的第一次挥手一样,发给 FIN 报文,且指定一个序列号。此时服务端处于 LAST_ACK 的状态。

4、第四次挥手:客户端收到 FIN 之后,一样发送一个 ACK 报文作为应答,且把服务端的序列号值 + 1 作为自己 ACK 报文的序列号值,此时客户端处于 TIME_WAIT 状态。需要过一阵子以确保服务端收到自己的 ACK 报文之后才会进入 CLOSED 状态

5、服务端收到 ACK 报文之后,就处于关闭连接了,处于 CLOSED 状态。

这里特别需要主要的就是****TIME_WAIT****这个状态了,这个是面试的高频考点,就是要理解,为什么客户端发送 ACK 之后不直接关闭,而是要等一阵子才关闭。这其中的原因就是,要确保服务器是否已经收到了我们的 ACK 报文,如果没有收到的话,服务器会重新发 FIN 报文给客户端,客户端再次收到 ACK 报文之后,就知道之前的 ACK 报文丢失了,然后再次发送 ACK 报文。

至于 TIME_WAIT 持续的时间至少是一个报文的来回时间。一般会设置一个计时,如果过了这个计时没有再次收到 FIN 报文,则代表对方成功就是 ACK 报文,此时处于 CLOSED 状态。

这里我给出每个状态所包含的含义,有兴趣的可以看看。

LISTEN - 侦听来自远方TCP端口的连接请求;

SYN-SENT -在发送连接请求后等待匹配的连接请求;

SYN-RECEIVED - 在收到和发送一个连接请求后等待对连接请求的确认;

ESTABLISHED- 代表一个打开的连接,数据可以传送给用户;

FIN-WAIT-1 - 等待远程TCP的连接中断请求,或先前的连接中断请求的确认;

FIN-WAIT-2 - 从远程TCP等待连接中断请求;

CLOSE-WAIT - 等待从本地用户发来的连接中断请求;

CLOSING -等待远程TCP对连接中断的确认;

LAST-ACK - 等待原来发向远程TCP的连接中断请求的确认;

TIME-WAIT -等待足够的时间以确保远程TCP接收到连接中断请求的确认;

CLOSED - 没有任何连接状态;

最后,在放在三次握手与四次挥手的图

另外,计算机网网络和操作系统被问的概率还是很高的,推荐大家看这份笔记,通俗易懂,看完基本就稳了

图解操作系统、网络、计算机组成 PDF 下载!

这里也有一些写的不错的文章,给大家找来了

1. 计算机网络五层模型入门

2. 通信双方如何保证消息不丢失?

3. 集线器、交换机与路由器有什么区别?

4. 什么是 TCP 拥塞控制?

5. 什么是 TCP 流量控制

6. 什么是 TCP 三次握手?

7. 什么是 TCP 四次挥手?

8. 什么是 HTTP?

9. 什么是 HTTPS?

10. 什么是 SSL/TLS 协议?

11. 什么是 DNS?

12. 什么是 DHCP ?

13. 什么是广播路由算法?

14. 什么是数字签名?

15. 什么是 SQL 注入攻击?

16. 什么是 XSS 攻击?

喜欢看视频的,也可以看我整理的这个计算机基础视频
计算机基础三门课视频

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

深入解读 Flink SQL 113

发表于 2021-06-28

本文由社区志愿者陈政羽整理,Apache Flink 社区在 5 月份发布了 1.13 版本,带来了很多新的变化。文章整理自徐榜江(雪尽) 5 月 22 日在北京的 Flink Meetup 分享的《深入解读 Flink SQL 1.13》,内容包括:

  1. Flink SQL 1.13 概览
  2. 核心 feature 解读
  3. 重要改进解读
  4. Flink SQL 1.14 未来规划
  5. 总结

一、Flink SQL 1.13 概览

img

Flink 1.13 是一个社区大版本,解决的 issue 在 1000 个以上,通过上图我们可以看到,解决的问题大部分是关于 Table/SQL 模块,一共 400 多个 issue 占了总体的 37% 左右。这些 issue 主要围绕了 5 个 FLIP 展开,在本文中我们也会根据这 5 个方面进行介绍,它们分别是:

img

下面我们对这些 FLIP 进行详细解读。

二、 核心 feature 解读

1. FLIP-145:支持 Window TVF

社区的小伙伴应该了解,在腾讯、阿里巴巴、字节跳动等公司的内部分支已经开发了这个功能的基础版本。这次 Flink 社区也在 Flink 1.13 推出了 TVF 的相关支持和优化。下面将从 Window TVF 语法、近实时累计计算场景、 Window 性能优化、多维数据分析,来分析这个新功能。

img

1.1 Window TVF 语法

在 1.13 版本前,window 的实现是通过一个特殊的 SqlGroupedWindowFunction:

1
2
3
4
5
6
7
sql复制代码SELECT 
TUMBLE_START(bidtime,INTERVAL '10' MINUTE),
TUMBLE_END(bidtime,INTERVAL '10' MINUTE),
TUMBLE_ROWTIME(bidtime,INTERVAL '10' MINUTE),
SUM(price)
FROM MyTable
GROUP BY TUMBLE(bidtime,INTERVAL '10' MINUTE)

在 1.13 版本中,我们对它进行了 Table-Valued Function 的语法标准化:

1
2
3
sql复制代码SELECT WINDOW_start,WINDOW_end,WINDOW_time,SUM(price) 
FROM Table(TUMBLE(Table myTable,DESCRIPTOR(biztime),INTERVAL '10' MINUTE))
GROUP BY WINDOW_start,WINDOW_end

通过对比两种语法,我们可以发现:TVF 语法更加灵活,不需要必须跟在 GROUP BY 关键字后面,同时 Window TVF 基于关系代数,使得其更加标准。在只需要划分窗口场景时,可以只用 TVF,无需用 GROUP BY 做聚合,这使得 TVF 扩展性和表达能力更强,支持自定义 TVF(例如实现 TOP-N 的 TVF)。

image-20210619135838869

上图中的示例就是利用 TVF 做的滚动窗口的划分,只需要把数据划分到窗口,无需聚合;如果后续需要聚合,再进行 GROP BY 即可。同时,对于熟悉批 SQL 的用户来说,这种操作是非常自然的,我们不再需要像 1.13 版本之前那样必须要用特殊的 SqlGroupedWindowFunction 将窗口划分和聚合绑定在一起。

目前 Window TVF 支持 tumble window,hop window,新增了 cumulate window;session window 预计在 1.14 版本也会支持。

1.2 Cumulate Window

image-20210619141938374

Cumulate window 就是累计窗口,简单来说,以上图里面时间轴上的一个区间为窗口步长。

  • 第一个 window 统计的是一个区间的数据;
  • 第二个 window 统计的是第一区间和第二个区间的数据;
  • 第三个 window 统计的是第一区间,第二个区间和第三个区间的数据。

累积计算在业务场景中非常常见,如累积 UV 场景。在 UV 大盘曲线中:我们每隔 10 分钟统计一次当天累积用户 UV。

img

在 1.13 版本之前,当需要做这种计算时,我们一般的 SQL 写法如下:

1
2
3
4
5
6
7
8
9
10
sql复制代码INSERT INTO cumulative_UV
SELECT date_str,MAX(time_str),COUNT(DISTINCT user_id) as UV
FROM (
SELECT
DATE_FORMAT(ts,'yyyy-MM-dd') as date_str,
SUBSTR(DATE_FORMAT(ts,'HH:mm'),1,4) || '0' as time_str,
user_id
FROM user_behavior
)
GROUP BY date_str

先将每条记录所属的时间窗口字段拼接好,然后再对所有记录按照拼接好的时间窗口字段,通过 GROUP BY 做聚合,从而达到近似累积计算的效果。

  • 1.13 版本前的写法有很多缺点,首先这个聚合操作是每条记录都会计算一次。其次,在追逆数据的时候,消费堆积的数据时,UV 大盘的曲线就会跳变。
  • 在 1.13 版本支持了 TVF 写法,基于 cumulate window,我们可以修改为下面的写法,将每条数据按照 Event Time 精确地分到每个 Window 里面, 每个窗口的计算通过 watermark 触发,即使在追数据场景中也不会跳变。
1
2
3
4
5
6
sql复制代码INSERT INTO cumulative_UV
SELECT WINDOW_end,COUNT(DISTINCT user_id) as UV
FROM Table(
CUMULATE(Table user_behavior,DESCRIPTOR(ts),INTERVAL '10' MINUTES,INTERVAL '1' DAY))
)
GROUP BY WINDOW_start,WINDOW_end

UV 大盘曲线效果如下图所示:

img

1.3 Window 性能优化

Flink 1.13 社区开发者们对 Window TVF 进行了一系列的性能优化,包括:

  • **内存优化:**通过内存预分配,缓存 window 的数据,通过 window watermark 触发计算,通过申请一些内存 buffer 避免高频的访问 state;
  • **切片优化:**将 window 切片,尽可能复用已计算结果,如 hop window,cumulate window。计算过的分片数据无需再次计算,只需对切片的计算结果进行复用;
  • **算子优化:**window 算子支持 local-global 优化;同时支持 count(distinct) 自动解热点优化;
  • **迟到数据:**支持将迟到数据计算到后续分片,保证数据准确性。

img

基于这些优化,我们通过开源 Benchmark (Nexmark) 进行性能测试。结果显示 window 的普适性能有 2x 提升,且在 count(distinct) 场景会有更好的性能提升。

image-20210620115925303

1.4 多维数据分析

语法的标准化带来了更多的灵活性和扩展性,用户可以直接在 window 窗口函数上进行多维分析。如下图所示,可以直接进行 GROUPING SETS、ROLLUP、CUBE 的分析计算。如果是在 1.13 之前的版本,我们可能需要对这些分组进行单独的 SQL 聚合,再对聚合结果做 union 操作才能达到类似的效果。而现在,类似这种多维分析的场景,可以直接在 window TVF 上支持。

image-20210620181805583

支持 Window Top-N

除了多维分析,Window TVF 也支持 Top-N 语法,使得在 Window 上取 Top-N 的写法更加简单。

image-20210620182022617

2. FLIP-162:时区和时间函数

2.1 时区问题分析

大家在使用 Flink SQL 时反馈了很多时区相关的问题,造成时区问题的原因可以归纳为 3 个:

  • PROCTIME() 函数应该考虑时区,但未考虑时区;
  • CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() 函数未考虑时区;
  • Flink 的时间属性,只支持定义在 TIMESTAMP 这种数据类型上面,这个类型是无时区的,TIMESTAMP 类型不考虑时区,但用户希望是本地时区的时间。

img

针对 TIMESTAMP 类型没有考虑时区的问题,我们提议通过TIMESTAMP_LTZ类型支持 (TIMESTAMP_LTZ 是 timestamp with local time zone 的缩写)。可以通过下面的表格来进行和 TIMESTAMP 的对比:

img

TIMESTAMP_LTZ 区别于之前我们使用的 TIMESTAMP,它表示绝对时间的含义。通过对比我们可以发现:

  • 如果我们配置使用 TIMESTAMP,它可以是字符串类型的。用户不管是从英国还是中国时区来观察,这个值都是一样的;
  • 但是对于 TIMSTAMP_TLZ 来说,它的来源就是一个 Long 值,表示从时间原点流逝过的时间。同一时刻,从时间原点流逝的时间在所有时区都是相同的,所以这个 Long 值是绝对时间的概念。当我们在不同的时区去观察这个值,我们会用本地的时区去解释成 “年-月-日-时-分-秒” 的可读格式,这就是 TIMSTAMP_TLZ 类型,TIMESTAMP_LTZ 类型也更加符合用户在不同时区下的使用习惯。

下面的例子展示了 TIMESTAMP 和 TIMESTAMP_LTZ 两个类型的区别。

img

2.2 时间函数纠正

订正 PROCTIME() 函数

image-20210619172919191

当我们有了 TIMESTAMP_LTZ 这个类型的时候,我们对 PROCTIME() 类型做了纠正:在 1.13 版本之前,它总是返回 UTC 的 TIMESTAMP;而现在,我们把返回类型变为了 TIMESTAMP_LTZ。PROCTIME 除了表示函数之外,也可以表示时间属性的标记。

订正 CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() 函数

这些函数在不同时区下出来的值是会发生变化的。例如在英国 UTC 时区时候是凌晨 2 点;但是如果你设置了时区是 UTC+8,时间就是在早上的 10 点。不同时区的实际时间会发生变化,效果如下图:

image-20210619173221897

解决 processing time Window 时区问题

大家都知道 proctime 可以表示一个时间属性,对 proctime 的 window 操作:

  • 在 1.13 版本之前,如果我们需要做按天的 window 操作,你需要手动解决时区问题,去做一些 8 小时的偏移然后再减回去;
  • 在 FLIP-162 中我们解决了这个问题,现在用户使用的时候十分简单,只需要声明 proctime 属性,因为 PROCTIME() 函数的返回值是TIMESTAMP_LTZ,所以结果是会考虑本地的时区。下图的例子显示了在不同的时区下,proctime 属性的 window 的聚合是按照本地时区进行的。

img

订正 Streaming 和 Batch 模式下函数取值方式

时间函数其实在流和批上面的表现形式会有所区别,这次修正主要是让其更加符合用户实际的使用习惯。例如以下函数:

  • 在流模式中是 per-record 计算,即每条数据都计算一次;
  • 在 Batch 模式是 query-start 计算,即在作业开始前计算一次。例如我们常用的一些 Batch 计算引擎,如 Hive 也是在每一个批开始前计算一次。

img

2.3 时间类型使用

在 1.13 版本也支持了在 TIMESTAMP 列上定义 Event time,也就是说Event time 现在既支持定义在 TIMESTAMP 列上,也支持定义在 TIMESTAMP_ LTZ 列上。那么作为用户,具体什么场景用什么类型呢?

  • 当作业的上游源数据包含了字符串的时间(如:2021-4-15 14:00:00)这样的场景,直接声明为 TIMESTAMP 然后把 Event time 定义在上面即可,窗口在计算的时候会基于时间字符串进行切分,最终会计算出符合你实际想要的预想结果;

img

  • 当上游数据源的打点时间属于 long 值,表示的是一个绝对时间的含义。在 1.13 版本你可以把 Event time 定义在 TIMESTAMP_LTZ 上面。此时定义在 TIMESTAMP_LTZ 类型上的各种 WINDOW 聚合,都能够自动的解决 8 小时的时区偏移问题,无需按照之前的 SQL 写法额外做时区的修改和订正。

img

小提示:Flink SQL 中关于时间函数,时区支持的这些提升,是版本不兼容的。用户在进行版本更新的时候需要留意作业逻辑中是否包含此类函数,避免升级后业务受到影响。

2.4 夏令时支持

img

在 Flink 1.13 以前,对于国外夏令时时区的用户,做窗口相关的计算操作是十分困难的一件事,因为存在夏令时和冬令时切换的跳变。

Flink 1.13 通过支持在 TIMESTAMP_LTZ 列上定义时间属性,同时 Flink SQL 在 WINDOW 处理时巧妙地结合 TIMESTAMP 和 TIMESTAMP_LTZ 类型,优雅地支持了夏令时。这对国外夏令时时区用户,以及有海外业务场景的公司比较有用。

三、重要改进解读

1. FLIP-152:提升 Hive 语法兼容性

FLIP-152 主要是做了 Hive 语法的兼容性增强,支持了 Hive 的一些常用 DML 和 DQL 语法,包括:

img

通过 Hive dialect 支持 Hive 常用语法。Hive 有很多的内置函数,Hive dialect 需要配合 HiveCatalog 和 Hive Module 一起使用,Hive Module 提供了 Hive 所有内置函数,加载后可以直接访问。

img

与此同时,我们还可以通过 Hive dialect 创建/删除 Catalog 函数以及一些自定义的函数,这样使得 Flink SQL 与 Hive 的兼容性得到了极大的提升,让熟悉 Hive 的用户使用起来会更加方便。

img

2. FLIP-163:改进 SQL Client

在 1.13 版本之前,大家觉得 Flink SQL Client 就是周边的一个小工具。但是,FLIP-163 在 1.13 版本进行了重要改进:

img

  1. 通过 -i 的参数,提前把 DDL 一次性加载初始化,方便初始化表的多个 DDL 语句,不需要多次执行命令创建表,替代了之前用 yaml 文件方式创建表;
  2. 支持 -f 参数,其中 SQL 文件支持 DML(insert into)语句;
  3. 支持更多实用的配置:

img

* 通过 **SET SQL-client.verbose = true** , 开启 verbose,通过开启 verbose 打印整个信息,相对以前只输出一句话更加容易追踪错误信息;
* 通过 **SET execution.runtime-mode=streaming / batch** 支持设置批/流作业模式;
* 通过 **SET pipline.name=my\_Flink\_job** 设置作业名称;
* 通过 **SET execution.savepoint.path=/tmp/Flink-savepoints/savepoint-bb0dab** 设置作业 savepoint 路径;
* 对于有依赖的多个作业,通过 **SET Table.dml-sync=true** 去选择是否异步执行,例如离线作业,作业 a 跑完才能跑作业 b ,通过设置为 true 实现执行有依赖关系的 pipeline 调度。
  1. 同时支持 STATEMENT SET语法:

img

有可能我们的一个查询不止写到一个 sink 里面,而是需要输出到多个 sink,比如一个 sink 写到 jdbc,一个 sink 写到 HBase。

* 在 1.13 版本之前需要启动 2 个 query 去完成这个作业;
* 在 1.13 版本,我们可以把这些放到一个 statement 里面,以一个作业的方式去执行,能够实现节点的复用,节约资源。

3. FLIP-136:增强 DataStream 和 Table 的转换

虽然 Flink SQL 大大降低了我们使用实时计算的一些使用门槛,但 Table/SQL 这种高级封装也屏蔽了一些底层实现,如 timer,state 等。不少高级用户希望能够直接操作 DataStream 获得更多的灵活性,这就需要在 Table 和 DataStream 之间进行转换。FLIP-136 增强了 Table 和 DataStream 间的转换,使得用户在两者之间的转换更加容易。

  • 支持 DataStream 和 Table 转换时传递 EVENT TIME 和 WATERMARK;
1
2
3
4
5
6
7
java复制代码Table Table = TableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime","TIMESTMP(3)")
.watermark("rowtime","SOURCE_WATERMARK()")
.build());
)
  • 支持 Changelog 数据流在 Table 和 DataStream 间相互转换。
1
2
3
4
5
6
java复制代码//DATASTREAM 转 Table
StreamTableEnvironment.fromChangelogStream(DataStream<ROW>): Table
StreamTableEnvironment.fromChangelogStream(DataStream<ROW>,Schema): Table
//Table 转 DATASTREAM
StreamTableEnvironment.toChangelogStream(Table): DataStream<ROW>
StreamTableEnvironment.toChangelogStream(Table,Schema): DataStream<ROW>

四、Flink SQL 1.14 未来规划

1.14 版本主要有以下几点规划:

  • 删除 Legacy Planner:从 Flink 1.9 开始,在阿里贡献了 Blink-Planner 之后,很多一些新的 Feature 已经基于此 Blink Planner 进行开发,以前旧的 Legacy Planner 会彻底删除;
  • 完善 Window TVF:支持 session window,支持 window TVF 的 allow -lateness 等;
  • 提升 Schema Handling:全链路的 Schema 处理能力以及关键校验的提升;
  • 增强 Flink CDC 支持:增强对上游 CDC 系统的集成能力,Flink SQL 内更多的算子支持 CDC 数据流。

五、总结

本文详细解读了 Flink SQL 1.13 的核心功能和重要改进。

  • 支持 Window TVF;
  • 系统地解决时区和时间函数问题;
  • 提升 Hive 和 Flink 的兼容性;
  • 改进 SQL Client;
  • 增强 DataStream 和 Table 的转换。

同时还分享了社区关于 Flink SQL 1.14 的未来规划,相信看完文章的同学可以对 Flink SQL 在这个版本中的变化有更多的了解,在实践过程中大家可以多多关注这些新的改动和变化,感受它们所带来的业务层面上的便捷。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…628629630…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%