开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

跳表(SkipList) 会跳的链表真的非常diao

发表于 2020-12-26

微信搜一搜「bigsai」关注这个有趣的程序员
你的点赞三连肯定对我至关重要!
文章已收录在 我的Github bigsai-algorithm 欢迎star

前言

跳表是面试常问的一种数据结构,它在很多中间件和语言中得到应用,我们熟知的就有Redis跳表。并且在面试的很多场景可能会问到,偶尔还会让你手写试一试(跳表可能会让手写,红黑树是不可能的),这不,给大伙复原一个场景:

image-20201225113330615

但你别慌,遇到蘑菇头这种面试官也别怕,因为你看到这篇文章了(得意😏),不用像熊猫那样窘迫。

对于一个数据结构或算法,人群数量从听过名称、了解基本原理、清楚执行流程、能够手写 呈抖降的趋势。因为很多数据结构与算法其核心原理可能简单,但清楚其执行流程就需要动脑子去思考想明白,但是如果能够把它写出来,那就要自己一步步去设计和实现。可能要花很久才能真正写出来,并且还可能要查阅大量的资料。

而本文在前面进行介绍跳表,后面部分详细介绍跳表的设计和实现,搞懂跳表,这一篇真的就够了。

快速了解跳表

跳跃表(简称跳表)由美国计算机科学家William Pugh发明于1989年。他在论文《Skip lists: a probabilistic alternative to balanced trees》中详细介绍了跳表的数据结构和插入删除等操作。

跳表(SkipList,全称跳跃表)是用于有序元素序列快速搜索查找的一个数据结构,跳表是一个随机化的数据结构,实质就是一种可以进行二分查找的有序链表。跳表在原有的有序链表上面增加了多级索引,通过索引来实现快速查找。跳表不仅能提高搜索性能,同时也可以提高插入和删除操作的性能。它在性能上和红黑树,AVL树不相上下,但是跳表的原理非常简单,实现也比红黑树简单很多。

在这里你可以看到一些关键词:链表(有序链表)、索引、二分查找。想必你的脑海中已经有了一个初略的印象,不过你可能还是不清楚这个”会跳的链表”有多diao,甚至还可能会产生一点疑虑:跟随机化有什么关系?你在下文中很快就能得到答案!

回顾链表,我们知道链表和顺序表(数组)通常都是相爱相杀,成对出现,各有优劣。而链表的优势就是更高效的插入、删除。痛点就是查询很慢很慢!每次查询都是一种O(n)复杂度的操作,链表估计自己都气的想哭了😢。

image-20201224155243423

这是一个带头结点的链表(头结点相当于一个固定的入口,不存储有意义的值),每次查找都需要一个个枚举,相当的慢,我们能不能稍微优化一下,让它稍微跳一跳呢?答案是可以的,我们知道很多算法和数据结构以空间换时间,我们在上面加一层索引,让部分节点在上层能够直接定位到,这样链表的查询时间近乎减少一半,链表自己虽然没有开心起来,但收起了它想哭的脸。

image-20201224160740034

这样,在查询某个节点的时候,首先会从上一层快速定位节点所在的一个范围,如果找到具体范围向下然后查找代价很小,当然在表的结构设计上会增加一个向下的索引(指针)用来查找确定底层节点。平均查找速度平均为O(n/2)。但是当节点数量很大的时候,它依旧很慢很慢。我们都知道二分查找是每次都能折半的去压缩查找范围,要是有序链表也能这么跳起来那就太完美了。没错跳表就能让链表拥有近乎的接近二分查找的效率的一种数据结构,其原理依然是给上面加若干层索引,优化查找速度。

image-20201224175922421

通过上图你可以看到,通过这样的一个数据结构对有序链表进行查找都能近乎二分的性能。就是在上面维护那么多层的索引,首先在最高级索引上查找最后一个小于当前查找元素的位置,然后再跳到次高级索引继续查找,直到跳到最底层为止,这时候以及十分接近要查找的元素的位置了(如果查找元素存在的话)。由于根据索引可以一次跳过多个元素,所以跳查找的查找速度也就变快了。

对于理想的跳表,每向上一层索引节点数量都是下一层的1/2.那么如果n个节点增加的节点数量(1/2+1/4+…)<n。并且层数较低,对查找效果影响不大。但是对于这么一个结构,你可能会疑惑,这样完美的结构真的存在吗?大概率不存在的,因为作为一个链表,少不了增删该查的一些操作。而删除和插入可能会改变整个结构,所以上面的这些都是理想的结构,在插入的时候是否添加上层索引是个概率问题(1/2的概率),在后面会具体讲解。

跳表的增删改查

上面稍微了解了跳表是个啥,那么在这里就给大家谈谈跳表的增删改查过程。在实现本跳表的过程为了便于操作,我们将跳表的头结点(head)的key设为int的最小值(一定满足左小右大方便比较)。

对于每个节点的设置,设置成SkipNode类,为了防止初学者将next向下还是向右搞混,直接设置right,down两个指针。

1
2
3
4
5
6
7
8
9
10
java复制代码class SkipNode<T>
{
int key;
T value;
SkipNode right,down;//右下个方向的指针
public SkipNode (int key,T value) {
this.key=key;
this.value=value;
}
}

跳表的结构和初始化也很重要,其主要参数和初始化方法为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public class SkipList <T> {

SkipNode headNode;//头节点,入口
int highLevel;//当前跳表索引层数
Random random;// 用于投掷硬币
final int MAX_LEVEL = 32;//最大的层

SkipList(){
random=new Random();
headNode=new SkipNode(Integer.MIN_VALUE,null);
highLevel=0;
}
//其他方法
}

查询操作

很多时候链表也可能这样相连仅仅是某个元素或者key作为有序的标准。所以有可能链表内部存在一些value。不过修改和查询其实都是一个操作,找到关键数字(key)。并且查找的流程也很简单,设置一个临时节点team=head。当team不为null其流程大致如下:

(1) 从team节点出发,如果当前节点的key与查询的key相等,那么返回当前节点(如果是修改操作那么一直向下进行修改值即可)。

(2) 如果key不相等,且右侧为null,那么证明只能向下(结果可能出现在下右方向),此时team=team.down

(3) 如果key不相等,且右侧不为null,且右侧节点key小于待查询的key。那么说明同级还可向右,此时team=team.right

(4)(否则的情况)如果key不相等,且右侧不为null,且右侧节点key大于待查询的key 。那么说明如果有结果的话就在这个索引和下个索引之间,此时team=team.down。

最终将按照这个步骤返回正确的节点或者null(说明没查到)。

image-20201224210130178

例如上图查询12节点,首先第一步从head出发发现右侧不为空,且7<12,向右;第二步右侧为null向下;第三步节点7的右侧10<12继续向右;第四步10右侧为null向下;第五步右侧12小于等于向右。第六步起始发现相等返回节点结束。

而这块的代码也非常容易:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public SkipNode search(int key) {
SkipNode team=headNode;
while (team!=null) {
if(team.key==key)
{
return team;
}
else if(team.right==null)//右侧没有了,只能下降
{
team=team.down;
}
else if(team.right.key>key)//需要下降去寻找
{
team=team.down;
}
else //右侧比较小向右
{
team=team.right;
}
}
return null;
}

删除操作

删除操作比起查询稍微复杂一丢丢,但是比插入简单。删除需要改变链表结构所以需要处理好节点之间的联系。对于删除操作你需要谨记以下几点:

(1)删除当前节点和这个节点的前后节点都有关系

(2)删除当前层节点之后,下一层该key的节点也要删除,一直删除到最底层

根据这两点分析一下:如果找到当前节点了,它的前面一个节点怎么查找呢?这个总不能在遍历一遍吧!有的使用四个方向的指针(上下左右)用来找到左侧节点。是可以的,但是这里可以特殊处理一下 ,不直接判断和操作节点,先找到待删除节点的左侧节点。通过这个节点即可完成删除,然后这个节点直接向下去找下一层待删除的左侧节点。设置一个临时节点team=head,当team不为null具体循环流程为:

(1)如果team右侧为null,那么team=team.down(之所以敢直接这么判断是因为左侧有头结点在左侧,不用担心特殊情况)

(2)如果team右侧不 为null,并且右侧的key等于待删除的key,那么先删除节点,再team向下team=team.down为了删除下层节点。

(3)如果team右侧不 为null,并且右侧key小于待删除的key,那么team向右team=team.right。

(4)如果team右侧不 为null,并且右侧key大于待删除的key,那么team向下team=team.down,在下层继续查找删除节点。

image-20201225002518856

例如上图删除10节点,首先team=head从team出发,7<10向右(team=team.right后面省略);第二步右侧为null只能向下;第三部右侧为10在当前层删除10节点然后向下继续查找下一层10节点;第四步8<10向右;第五步右侧为10删除该节点并且team向下。team为null说明删除完毕退出循环。

删除操作实现的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public void delete(int key)//删除不需要考虑层数
{
SkipNode team=headNode;
while (team!=null) {
if (team.right == null) {//右侧没有了,说明这一层找到,没有只能下降
team=team.down;
}
else if(team.right.key==key)//找到节点,右侧即为待删除节点
{
team.right=team.right.right;//删除右侧节点
team=team.down;//向下继续查找删除
}
else if(team.right.key>key)//右侧已经不可能了,向下
{
team=team.down;
}
else { //节点还在右侧
team=team.right;
}
}
}

插入操作

插入操作在实现起来是最麻烦的,需要的考虑的东西最多。回顾查询,不需要动索引;回顾删除,每层索引如果有删除就是了。但是插入不一样了,插入需要考虑是否插入索引,插入几层等问题。由于需要插入删除所以我们肯定无法维护一个完全理想的索引结构,因为它耗费的代价太高。但我们使用随机化的方法去判断是否向上层插入索引。即产生一个[0-1]的随机数如果小于0.5就向上插入索引,插入完毕后再次使用随机数判断是否向上插入索引。运气好这个值可能是多层索引,运气不好只插入最底层(这是100%插入的)。但是索引也不能不限制高度,我们一般会设置索引最高值如果大于这个值就不往上继续添加索引了。

我们一步步剖析该怎么做,其流程为

(1)首先通过上面查找的方式,找到待插入的左节点。插入的话最底层肯定是需要插入的,所以通过链表插入节点(需要考虑是否为末尾节点)

(2)插入完这一层,需要考虑上一层是否插入,首先判断当前索引层级,如果大于最大值那么就停止(比如已经到最高索引层了)。否则设置一个随机数1/2的概率向上插入一层索引(因为理想状态下的就是每2个向上建一个索引节点)。

(3)继续(2)的操作,直到概率退出或者索引层数大于最大索引层。

在具体向上插入的时候,实质上还有非常重要的细节需要考虑。首先如何找到上层的待插入节点 ?

这个各个实现方法可能不同,如果有左、上指向的指针那么可以向左向上找到上层需要插入的节点,但是如果只有右指向和下指向的我们也可以巧妙的借助查询过程中记录下降的节点。因为曾经下降的节点倒序就是需要插入的节点,最底层也不例外(因为没有匹配值会下降为null结束循环)。在这里我使用栈这个数据结构进行存储,当然使用List也可以。下图就是给了一个插入示意图。

image-20201225100031207

其次如果该层是目前的最高层索引,需要继续向上建立索引应该怎么办?

首先跳表最初肯定是没索引的,然后慢慢添加节点才有一层、二层索引,但是如果这个节点添加的索引突破当前最高层,该怎么办呢?

这时候需要注意了,跳表的head需要改变了,新建一个ListNode节点作为新的head,将它的down指向老head,将这个head节点加入栈中(也就是这个节点作为下次后面要插入的节点),就比如上面的9节点如果运气够好在往上建立一层节点,会是这样的。

image-20201225100432978

插入上层的时候注意所有节点要新建(拷贝),除了right的指向down的指向也不能忘记,down指向上一个节点可以用一个临时节点作为前驱节点。如果层数突破当前最高层,头head节点(入口)需要改变。

这部分更多的细节在代码中注释解释了,详细代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
java复制代码public void add(SkipNode node)
{

int key=node.key;
SkipNode findNode=search(key);
if(findNode!=null)//如果存在这个key的节点
{
findNode.value=node.value;
return;
}
Stack<SkipNode>stack=new Stack<SkipNode>();//存储向下的节点,这些节点可能在右侧插入节点
SkipNode team=headNode;//查找待插入的节点 找到最底层的哪个节点。
while (team!=null) {//进行查找操作
if(team.right==null)//右侧没有了,只能下降
{
stack.add(team);//将曾经向下的节点记录一下
team=team.down;
}
else if(team.right.key>key)//需要下降去寻找
{
stack.add(team);//将曾经向下的节点记录一下
team=team.down;
}
else //向右
{
team=team.right;
}
}
int level=1;//当前层数,从第一层添加(第一层必须添加,先添加再判断)
SkipNode downNode=null;//保持前驱节点(即down的指向,初始为null)
while (!stack.isEmpty()) {
//在该层插入node
team=stack.pop();//抛出待插入的左侧节点
SkipNode nodeTeam=new SkipNode(node.key, node.value);//节点需要重新创建
nodeTeam.down=downNode;//处理竖方向
downNode=nodeTeam;//标记新的节点下次使用
if(team.right==null) {//右侧为null 说明插入在末尾
team.right=nodeTeam;
}
//水平方向处理
else {//右侧还有节点,插入在两者之间
nodeTeam.right=team.right;
team.right=nodeTeam;
}
//考虑是否需要向上
if(level>MAX_LEVEL)//已经到达最高级的节点啦
break;
double num=random.nextDouble();//[0-1]随机数
if(num>0.5)//运气不好结束
break;
level++;
if(level>highLevel)//比当前最大高度要高但是依然在允许范围内 需要改变head节点
{
highLevel=level;
//需要创建一个新的节点
SkipNode highHeadNode=new SkipNode(Integer.MIN_VALUE, null);
highHeadNode.down=headNode;
headNode=highHeadNode;//改变head
stack.add(headNode);//下次抛出head
}
}
}

总结

对于上面,跳表完整分析就结束啦,当然,你可能看到不同品种跳表的实现,还有的用数组方式表示上下层的关系这样也可以,但本文只定义right和down两个方向的链表更纯正化的讲解跳表。

对于跳表以及跳表的同类竞争产品:红黑树,为啥Redis的有序集合(zset) 使用跳表呢?因为跳表除了查找插入维护和红黑树有着差不多的效率,它是个链表,能确定范围区间,而区间问题在树上可能就没那么方便查询啦。而JDK中跳跃表ConcurrentSkipListSet和ConcurrentSkipListMap。 有兴趣的也可以查阅一下源码。

对于学习,完整的代码是非常重要的,这里我把完整代码贴出来,需要的自取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
java复制代码import java.util.Random;
import java.util.Stack;
class SkipNode<T>
{
int key;
T value;
SkipNode right,down;//左右上下四个方向的指针
public SkipNode (int key,T value) {
this.key=key;
this.value=value;
}

}
public class SkipList <T> {

SkipNode headNode;//头节点,入口
int highLevel;//层数
Random random;// 用于投掷硬币
final int MAX_LEVEL = 32;//最大的层
SkipList(){
random=new Random();
headNode=new SkipNode(Integer.MIN_VALUE,null);
highLevel=0;
}
public SkipNode search(int key) {
SkipNode team=headNode;
while (team!=null) {
if(team.key==key)
{
return team;
}
else if(team.right==null)//右侧没有了,只能下降
{
team=team.down;
}
else if(team.right.key>key)//需要下降去寻找
{
team=team.down;
}
else //右侧比较小向右
{
team=team.right;
}
}
return null;
}

public void delete(int key)//删除不需要考虑层数
{
SkipNode team=headNode;
while (team!=null) {
if (team.right == null) {//右侧没有了,说明这一层找到,没有只能下降
team=team.down;
}
else if(team.right.key==key)//找到节点,右侧即为待删除节点
{
team.right=team.right.right;//删除右侧节点
team=team.down;//向下继续查找删除
}
else if(team.right.key>key)//右侧已经不可能了,向下
{
team=team.down;
}
else { //节点还在右侧
team=team.right;
}
}
}
public void add(SkipNode node)
{

int key=node.key;
SkipNode findNode=search(key);
if(findNode!=null)//如果存在这个key的节点
{
findNode.value=node.value;
return;
}

Stack<SkipNode>stack=new Stack<SkipNode>();//存储向下的节点,这些节点可能在右侧插入节点
SkipNode team=headNode;//查找待插入的节点 找到最底层的哪个节点。
while (team!=null) {//进行查找操作
if(team.right==null)//右侧没有了,只能下降
{
stack.add(team);//将曾经向下的节点记录一下
team=team.down;
}
else if(team.right.key>key)//需要下降去寻找
{
stack.add(team);//将曾经向下的节点记录一下
team=team.down;
}
else //向右
{
team=team.right;
}
}

int level=1;//当前层数,从第一层添加(第一层必须添加,先添加再判断)
SkipNode downNode=null;//保持前驱节点(即down的指向,初始为null)
while (!stack.isEmpty()) {
//在该层插入node
team=stack.pop();//抛出待插入的左侧节点
SkipNode nodeTeam=new SkipNode(node.key, node.value);//节点需要重新创建
nodeTeam.down=downNode;//处理竖方向
downNode=nodeTeam;//标记新的节点下次使用
if(team.right==null) {//右侧为null 说明插入在末尾
team.right=nodeTeam;
}
//水平方向处理
else {//右侧还有节点,插入在两者之间
nodeTeam.right=team.right;
team.right=nodeTeam;
}
//考虑是否需要向上
if(level>MAX_LEVEL)//已经到达最高级的节点啦
break;
double num=random.nextDouble();//[0-1]随机数
if(num>0.5)//运气不好结束
break;
level++;
if(level>highLevel)//比当前最大高度要高但是依然在允许范围内 需要改变head节点
{
highLevel=level;
//需要创建一个新的节点
SkipNode highHeadNode=new SkipNode(Integer.MIN_VALUE, null);
highHeadNode.down=headNode;
headNode=highHeadNode;//改变head
stack.add(headNode);//下次抛出head
}
}

}
public void printList() {
SkipNode teamNode=headNode;
int index=1;
SkipNode last=teamNode;
while (last.down!=null){
last=last.down;
}
while (teamNode!=null) {
SkipNode enumNode=teamNode.right;
SkipNode enumLast=last.right;
System.out.printf("%-8s","head->");
while (enumLast!=null&&enumNode!=null) {
if(enumLast.key==enumNode.key)
{
System.out.printf("%-5s",enumLast.key+"->");
enumLast=enumLast.right;
enumNode=enumNode.right;
}
else{
enumLast=enumLast.right;
System.out.printf("%-5s","");
}

}
teamNode=teamNode.down;
index++;
System.out.println();
}
}
public static void main(String[] args) {
SkipList<Integer>list=new SkipList<Integer>();
for(int i=1;i<20;i++)
{
list.add(new SkipNode(i,666));
}
list.printList();
list.delete(4);
list.delete(8);
list.printList();
}
}

进行测试一下可以发现跳表还是挺完美的(自夸一下)。

image-20201225105810595

原创不易,bigsai请掘友们帮两件事帮忙一下:

  1. 点赞、在看、分享支持一下, 您的肯定是我创作的源源动力。
  2. 微信搜索「bigsai」,关注我的公众号,不仅免费送你电子书,我还会第一时间在公众号分享知识技术。还可拉你进力扣打卡群一起打卡LeetCode。

咱们下次再见!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go 并发:一些有趣的现象和要避开的 “坑”

发表于 2020-12-26

若有任何问题或建议,欢迎及时交流和碰撞。我的公众号是 【脑子进煎鱼了】,GitHub 地址:github.com/eddycjy。

大家好,我是煎鱼。

最近在看 Go 并发相关的内容,发现还是有不少细节容易让人迷迷糊糊的,一个不小心就踏入深坑里,且指不定要在上线跑了一些数据后才能发现,那可真是太人崩溃了。

今天来分享几个案例,希望大家在编码时能够避开这几个 “坑”。

案例一

演示代码

第一个案例来自 @鸟窝 大佬在极客时间的分享,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
go复制代码func main() {
count := 0
wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100000; j++ {
count++
}
}()
}
wg.Wait()

fmt.Println(count)
}

思考一下,最后输出的 count 变量的值是多少?是不是一百万?

输出结果

在上述代码中,我们通过 for-loop 循环起 goroutine 进行自增,并使用了 sync.WaitGroup 来保证所有的 goroutine 都执行完毕才输出最终的结果值。

最终的输出结果如下:

1
2
3
4
5
6
7
8
arduino复制代码// 第一次执行
638853

// 第二次执行
654473

// 第三次执行
786193

输出的结果值不是恒定的,也就是每次输出的都不一样,且基本不会达到想象中的一百万。

分析原因

其原因在于 count++ 并不是一个原子操作,在汇编上就包含了好几个动作,如下:

1
2
3
sql复制代码MOVQ "".count(SB), AX 
LEAQ 1(AX), CX
MOVQ CX, "".count(SB)

因为可能会同时存在多个 goroutine 同时读取到 count 的值为 1212,并各自自增 1,再将其写回。

与此同时也会有其他的 goroutine 可能也在其自增时读到了值,形成了互相覆盖的情况,这是一种并发访问共享数据的错误。

发现问题

这类竞争问题可以通过 Go 语言所提供的的 race 检测(Go race detector)来进行分析和发现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bash复制代码$ go run -race main.go 
==================
WARNING: DATA RACE
Read at 0x00c0000c6008 by goroutine 13:
main.main.func1()
/Users/eddycjy/go-application/awesomeProject/main.go:28 +0x78

Previous write at 0x00c0000c6008 by goroutine 7:
main.main.func1()
/Users/eddycjy/go-application/awesomeProject/main.go:28 +0x91

Goroutine 13 (running) created at:
main.main()
/Users/eddycjy/go-application/awesomeProject/main.go:25 +0xe4

Goroutine 7 (running) created at:
main.main()
/Users/eddycjy/go-application/awesomeProject/main.go:25 +0xe4
==================
...
489194
Found 3 data race(s)
exit status 66

编译器会通过探测所有的内存访问,监听其内存地址的访问(读或写)。在应用运行时就能够发现对共享变量的访问和操作,进而发现问题并打印出相关的警告信息。

需要注意的一点是,go run -race 是运行时检测,并不是编译时。且 race 存在明确的性能开销,通常是正常程序的十倍,因此不要想不开在生产环境打开这个配置,很容易翻车。

案例二

演示代码

第二个案例来自煎鱼在脑子的分享,代码如下:

1
2
3
4
5
6
7
8
9
10
11
css复制代码func main() {
wg := sync.WaitGroup{}
wg.Add(5)
for i := 0; i < 5; i++ {
go func(i int) {
defer wg.Done()
fmt.Println(i)
}(i)
}
wg.Wait()
}

思考一下,最后输出的结果是什么?值都是 4 吗?输出是稳定有序的吗?

输出结果

在上述代码中,我们通过 for-loop 循环起了多个 goroutine,并将变量 i 作为形参传递给了 goroutine,最后在 goroutine 内输出了变量 i。

最终的输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
arduino复制代码// 第一次输出
0
1
2
4
3

// 第二次输出
4
0
1
2
3

显然,从结果上来看,输出的值都是无序且不稳定的,值更不是 4。这到底是为什么?

分析原因

其原因在于,即使所有的 goroutine 都创建完了,但 goroutine 不一定已经开始运行了。

也就是等到 goroutine 真正去执行输出时,变量 i (值拷贝)可能已经不是创建时的值了。

其整个程序扭转实质上分为了多个阶段,也就是各自运行的时间线并不同,可以其拆分为:

  • 先创建:for-loop 循环创建 goroutine。
  • 再调度:协程goroutine 开始调度执行。
  • 才执行:开始执行 goroutine 内的输出。

同时 goroutine 的调度存在一定的随机性(建议了解一下 GMP 模型),那么其输出的结果就势必是无序且不稳定的。

发现问题

这时候你可能会想,那前面提到的 go run -race 能不能发现这个问题呢。如下:

1
2
3
4
5
6
go复制代码$ go run -race main.go
0
1
2
3
4

没有出现警告,显然是不能的,因为其本质上并不是并发访问共享数据的错误,且会导致程序变成了串行,从而蒙蔽了你的双眼。

案例三

演示代码

第三个案例来自煎鱼在梦里的分享,代码如下:

1
2
3
4
5
6
7
8
9
10
11
go复制代码func main() {
wg := sync.WaitGroup{}
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}

思考一下,最后输出的结果是什么?值都是 4 吗?会像案例二一样乱窜吗?

输出结果

在上述代码中,与案例二大体没有区别,主要是变量 i 没有作为形参传入。

最终的输出结果如下:

1
2
3
4
5
6
arduino复制代码// 第一次输出
5
5
5
5
5

初步从输出的结果上来看都是 5,这时候就会有人迷糊了,为什么不是 4 呢?

不少人会因不是 4 而陷入了迷惑,但千万不要被一两次的输出迷惑了心智,认为铁定就是 5 了。可以再动手多输出几次,如下:

1
2
3
4
5
6
arduino复制代码// 多输出几次
5
3
5
5
5

最终会发现…输出结果存在随机性,输出结果并不是 100% 都是 5,更不用提 4 了。这到底是为什么呢?

分析原因

其原因与案例二其实非常接近,理论上理解了案例二也就能解决案例三。

其本质还是创建 goroutine 与真正执行 fmt.Println 并不同步。因此很有可能在你执行 fmt.Println 时,循环 for-loop 已经运行完毕,因此变量 i 的值最终变成了 5。

那么相反,其也有可能没运行完,存在随机性。写个 test case 就能发现明显的不同。

总结

在本文中,我分享了几个近期看到次数最频繁的一些并发上的小 “坑”,希望对你有所帮助。同时你也可以回想一下,在你编写 Go 并发程序有没有也遇到过什么问题?

同时你也可以回想一下,在你编写 Go 并发程序有没有也遇到过什么问题?

欢迎大家一起讨论交流。

我的公众号

分享 Go 语言、微服务架构和奇怪的系统设计,欢迎大家关注我的公众号和我进行交流和沟通。

最好的关系是互相成就,各位的点赞就是煎鱼创作的最大动力,感谢支持。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go 错误处理:用 panic 取代 err != nil

发表于 2020-12-26

若有任何问题或建议,欢迎及时交流和碰撞。我的公众号是 【脑子进煎鱼了】,GitHub 地址:github.com/eddycjy。

前段时间我分享了文章 《先睹为快,Go2 Error 的挣扎之路》后,和一位朋友进行了一次深度交流,他给我分享了他们项目组对于 Go 错误处理的方式调整。

简单来讲,就是在业务代码中使用 panic 的方式来替代 “永无止境” 的 if err != nil。这就是今天本文的重点内容,我们一起来看看是怎么做,又有什么优缺点。

为什么想替换

在 Go 语言中 if err != nil 写的太多,还要管方法声明各种,嫌麻烦又不方便:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go复制代码err := foo()
if err != nil {
//do something..
return err
}

err := foo()
if err != nil {
//do something..
return err
}

err := foo()
if err != nil {
//do something..
return err
}

err := foo()
if err != nil {
//do something..
return err
}

上述还是示例代码,比较直面。若是在工程实践,还得各种 package 跳来跳去加 if err != nil,总的来讲比较繁琐,要去关心整体的上下游。更具体的就不赘述了,可以翻看我先前的文章。

怎么替换 err != nil

不想写 if err != nil 的代码,方式之一就是用 panic 来替代他。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
go复制代码func GetFish(db *sql.DB, name string) []string {
rows, err := db.Query("select name from users where `name` = ?", name)
if err != nil {
panic(err)
}
defer rows.Close()

var names []string
for rows.Next() {
var name string
err := rows.Scan(&name)
if err != nil {
panic(err)
}

names = append(names, name)
}

err = rows.Err()
if err != nil {
panic(err)
}

return names
}

在上述业务代码中,我们通过 panic 的方式取代了 return err 的函数返回,自然其所关联的下游业务代码也就不需要编写 if err != nil 的代码:

1
2
3
4
5
6
css复制代码func main() {
fish1 := GetFish(db, "煎鱼")
fish2 := GetFish(db, "咸鱼")
fish3 := GetFish(db, "摸鱼")
...
}

同时在转换为使用 panic 模式的错误机制后,我们必须要在外层增加 recover 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码func AppRecovery() gin.HandlerFunc {
return func(c *gin.Context) {
defer func() {
if err := recover(); err != nil {
if _, ok := err.(AppErr); ok {
// do something...
} else {
panic(err)
}
}
}()
}
}

每次 panic 后根据其抛出的错误进行断言,识别是否定制的 AppErr 错误类型,若是则可以进行一系列的处理动作。否则可继续向上 panic 抛出给顶级的 Recovery 方法进行处理。

这就是一个相对完整的 panic 错误链路处理了。

优缺点

  • 从优点上来讲:
+ 整体代码结构看起来更加的简洁,仅专注于实现逻辑即可。
+ 不需要关注和编写冗杂的 `if err != nil` 的错误处理代码。
  • 从缺点上来讲:
+ 认知负担的增加,需要参加项目的每一个新老同学都清楚该模式,要做一个基本规范或培训。
+ 存在一定的性能开销,每次 `panic` 都存在用户态的上下文切换。
+ 存在一定的风险性,一旦 `panic` 没有 `recover` 住,就会导致事故。
+ Go 官方并不推荐,与 `panic` 本身的定义相违背,也就是 `panic` 与 `error` 的概念混淆。

总结

在今天这篇文章给大家分享了如何使用 panic 的方式来处理 Go 的错误,其必然有利必有有弊,需要做一个权衡了。

你们团队有没有为了 Go 错误处理做过什么新的调整呢?欢迎在留言区交流和分享。

我的公众号

分享 Go 语言、微服务架构和奇怪的系统设计,欢迎大家关注我的公众号和我进行交流和沟通。

最好的关系是互相成就,各位的点赞就是煎鱼创作的最大动力,感谢支持。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

跨平台应用即将消亡!

发表于 2020-12-25

2015 年时,我是一名自由职业的原生 iOS 开发者。我知道 Objective C——这是唯一我睡着都能写的语言。Swift 还在努力解决 ABI 兼容性问题,而我还在观望。

当我决定重新进入就业市场时,每个人都想要 React Native。

我是这个领域的新手。每个工程博客,主要 包括 Airbnb,都在鼓吹“一次编写,随处发布”的优势。我的朋友们建议我转向跨平台,或者立即退休。

如果有人给我提供机会,相信我在 Windows + iOS 领域的工程技能,我可以在工作中学习原生安卓。

但是我犹豫要不要学习 RN。LinkedIn 在 2013 年放弃了其基于 HTML5 的 跨平台 app,这在我的脑海中记忆犹新。尽管 React Native 在执行时是完全原生的,但是它没有像 Objective C 或 Java 那样暴露出原生的感觉。

我的犹豫很快得到了验证:

我读到了相关新闻,RN 的创造者 Facebook——宣布将其 iOS 主页(新闻流)切换到 Kit 组件——一个基于 Objective C++ 创建的框架。他们大量借用了 React 的声明式方案,但是 用 Objective C++ 实现了所有东西 来利用 iOS 架构的真正力量——甚至 Swift 现在还无法提供。

在这里插入图片描述

现在大部分经常使用的应用程序依赖 C++ 或它的一些变种(对于安卓是 JNI,对于 iOS 是 Objective C++)。

快进到 2020 年。

Airbnb 早在 2019 年就 放弃了继续使用 RN。这在工程界引起了足够的轰动,迫使人们重新思考。在 2019 年的同一年,苹果发布了 SwiftUI,一个声明式的 UI 框架,旨在再次吸引新手开发者度过其 Swift 灾难(尽管现在已经稳定了)。

什么敲响了跨平台的丧钟:

跨平台的应用程序有其固有缺陷。随着市场的不断变化,现在市场已经成熟,它们要么改进要么消亡。

苹果正在整合

在开发者收入方面,苹果已经占据了主导地位

这是 公开的秘密,从未变过。尽管安卓的市场份额很大,但在应用程序收入方面,它远远落后于苹果。这不仅仅是因为安卓在世界的低收入地区处于领先地位,还因为安卓的核心业务是授权,而不是硬件制造。

因此,iOS + iDevice 的更新更符合其高付费用户的需求。

苹果的系统在移动体验方面已经非常成熟

尽管非苹果制造商引领了许多智能手机创新。

由于苹果对操作系统和硬件的控制更严格,苹果在提供卓越 + 个性化的体验方面做得更好,而这也是高价值用户在移动设备上的追求。

由于苹果最新致力于成为一个以隐私为中心的软件生态系统,显而易见,苹果将继续在企业移动解决方案领域占据主导地位。

对于开发者来说,苹果和安卓之间的收入分成非常简单:

  • 苹果开发 =>高价值付费用户 =>应用程序销售收入
  • 安卓开发 =>低价值用户 =>广告收入

苹果最近强制应用程序请求用户权限来分享广告数据。这对于那些以广告数据为唯一资产的公司来说是一个直接的打击——特别是 Facebook 和谷歌。

而 Facebook 和谷歌碰巧是 React Native 和 Flutter 的创造者,这应该不是个巧合。

你很容易看到这对于 App 开发者来说意味着什么:进一步搞明白你的理想的用户基础。尽早做出选择,并选择你相应的工具集。

例如,如果你不想做广告,你最好先开发苹果应用。经过验证后,再开发安卓应用。这种比较零碎的方案几乎没有动力来进行跨平台开发。只要团队保持不变,通用代码库需求很容易得到满足。

Apple Silicon 是苹果在应用经济领域的王牌

除非某家颠覆性的硬件公司从黑暗中走出来。

谷歌是一个规模相当大的挑战者,但并不可怕。它在移动领域的主要目的不是销售软件,而是掌握用户数据。如果不在硬件上取胜,正如 它所公开承认的那样,谷歌很难打破这种平衡。安卓的采用直接取决于它许可的硬件制造商。

有了 Apple Silicon,苹果就整合了它的开发者社区。它已经向开发者支付了更多。现在有了 Apple Silicon,iOS 开发者可以轻松迁移到新的苹果 Mac 上。

这一举措使得 开发苹果应用更有利可图,使用它自己的工具:XCode、Playground 等等,使用 Apple Silicon 驱动的 Mac 电脑,因为它可以完全控制它的硬件。有更明确的动机让所有人都投入到一个平台上,至少在最初是这样,而且没有开发者或创业公司会错过这个优势。

跨平台是一种渐进性改进,而不是创新

这是一种渐进性改进,告诉垄断者:

现在,我挑战你进行创新。我不能做一个更好的 IDE 或硬件,但是我可以剥夺你从中获取的一些收益或开发者。

一次又一次,跨平台工具将自己作为解决每个与平台相关的问题的灵丹妙药。

如果没有底层硬件,它们会成为垄断者的挑战者,因为垄断企业有很高的进入成本壁垒。Mac 机器 vs windows 机器的成本是被引用最多的标准。

它们迅速被采用的另一个原因,不是它们有能力创造更好的体验,而是开发者对专有平台的不满。

每个超过千万美元的初创公司都会雇佣一名移动开发人员来维护一个最终依赖于远程 GitHub 贡献者支持的代码库

它们可以是开源的。快速入门项目、Youtube 教程和价值 100 美元的模板充斥在开发人员的信息流中,但几乎很难找到依据:

跨平台 App 可以实现的东西用原生并不容易实现!(5 行代码 vs 3 个类!)。不要看原生提供的定制可能性 vs 5 行代码的底层机制。

它提供了 通用业务逻辑 的独特优势——这是任何初创公司都无法抗拒的。最后,每个 千万美元 的初创公司都会雇佣一名移动开发人员来维护一个最终依赖于 GitHub 贡献者支持的通用代码库。这些公司没有意识到的是,通用业务逻辑必须通过清晰的文档(嗯?那是什么)和简洁的规范(但我们是敏捷开发!)维护。

幸运的是,如果这些初创公司能够超过一个收入点,如果出现开发人员流失,增长策略就会介入,在 app store/play store 上排名退步 + 投资人的压力会促使创始人重新考虑他们最初追求快速但杂乱的解决方案的决定。这也解释了 LinkedIn、Facebook iOS 版、Airbnb 以及其它无数应用后期采用原生方案的原因。

然而,初露头角的创业公司数量从未减少,跨平台开发人员的市场也从未枯竭。反抗精神万岁!但现实是这样的:如果你知道 C++(或 Objective C 及其变种)或者 Java,你的技能将在几十年后还不落伍。如果你用一个奇特的跨平台开源工具集来修饰你的简历,那就要准备好每 3-5 年做一次彻底的调整。

跨平台糟糕透了

它们被开发是为了发布,而不是维护。

在跨平台应用中,困惑不断。

如今的跨平台产品能够提供几乎 100% 的原生代码——这一点毫无疑问。Xamarin、React Native 和 Flutter——都承诺 100% 原生执行。

它们不同于以往运行在浏览器上和基于 HTML5 的 PWA 应用。

但在设计上,它与每一个代码组织原则都相反。它们由 500 多个从完全不同的不受监控的源代码(而不是本地库)下载的包组成,模仿了 Web 开发方法。在移动开发中采用这种方法——其源代码被视为公开的,而不是在由开发人员控制的服务器的边界内,人们可以想象其风险。

跨平台工具很容易被采用,因为它们提供了初学者更容易理解的更高级别的抽象。它们融合了不同底层原生 API 之间的差异。它们采用了最常用的功能,其余的定制工作留给了开发人员。

设想一个假设的 函数 F 在原生 iOS 和安卓中有如下实现:

iOS: function f (int a, int b, int c)

Android: function f (int a, int b, int p, int q, int r)

一个典型的跨平台包会提供如下函数:

function f (int a, int b)

如果你想要充分利用这两个功能,你必须在原生代码(即 Objective C 和 Java)中找出 c、p、q 和 r 的调整。

在我的上一个组织中,由于现有的 Flutter 通知包的缺点,开发人员必须在以下方面进行开发:

1.Dart

2.iOS 插件

3.安卓插件

因为 Flutter 开发人员只熟悉 XCode 或 Android Studio,并不精通 Objective C 或 Kotlin API,所以这被认为是一个缺陷。

大部分跨平台爱好者都是大学生,他们在图书馆中创建了他们的第一个软件,不能忘记他们的“初恋”。他们构建跨平台 App 是用来发布,而不是维护。

但当你被迫下载一个包来实现一个简单的功能,例如 设备信息,真正的痛苦就开始了。

大部分跨平台爱好者都是大学生,他们在图书馆中创建了他们的第一个软件,不能忘记他们的“初恋”。

平均而言,虽然一个移动应用开发人员开发一个单一代码库只节省了 20% 时间(而不是 50%,因为需求转化 + 定制化),包管理占用了维护者 70% 多的时间。

一个 React Native 应用程序被一些 非常严重的功能 + 性能相关问题 困扰,这些问题会在开发周期的后期被发现。

相比于原生开发者的 IPA 和 APK,一个典型的 flutter app 的尺寸要大得多。

在我的上一个组织中,我修改了 70 多个文件,来处理 Flutter 的 Equatable 实现的兼容性中断。

这并不痛苦,直到我了解了其背后的原因:早期的哈希算法对于一个对象内属性值的交换产生相同的哈希值!

换句话说,以下对象将在旧的有缺陷的实现中产生相同的哈希值,除非你显式地重写 Equatable 哈希函数,在 1.0 之前,这从不是一个强制要求!

对象 A:

x = 3, y = 4

对象 B:

x = 4, y = 3

想象一下,检查所有 500 多个包是否存在 equality 检查漏洞…😧

我问我的架构师,为什么像我们这样的数据密集型应用程序首先采用了 Flutter。

他的回答非常经典:

“管理人员常说:敏捷的目标之一,就是避免无价值的操作,例如文档。通用代码库就是我们的文档和唯一信源。”

跨平台是一种不依赖的依赖关系

这个问题不是跨平台固有的。但这个问题通过开源共享与跨平台紧紧绑定在了一起。

库所有人具有不可思议的权力

这个问题的存在有两个原因。

首先,GitHub(和它的竞品)是未评级的,但要对国家法律负责。它可以在任何时候通过合法的改组来 摧毁任何东西。

其次,库所有人对于贡献者的工作具有不可思议的权力,无论这个库有多大。

库所有者的暴政 已经在区块链领域臭名昭著,创始贡献者在制定链币发行规则方面占据上风。

所有头部公司(包括 FAAMG)都已经在利用开源,来使他们的 SDK 可以被获取 + 对 bug 修复开放。公司雇佣开发者来维护社区意识,关注开发者的兴趣,并根据大众需求来发布特性。

如果他们不这么做,他们的核心产品就会受损。

对于跨平台供应商则不是这样。事实上,他们对严重的 bugs 或关键的增强请求没有任何反映。因此,错误处理 GitHub 问题不会对他们造成任何后果。

如果 Flutter 有严重的 bugs,谷歌在已经微不足道的 Pixel 销量或庞大的搜索流量方面不会有任何减少。

如果 React Native 缺少一个功能,Facebook 的广告收入不会缩减。

但是,如果 Android/Kotlin 或 iOS/Swift 有严重的漏洞,谷歌和苹果肯定会遭受损失——谷歌在授权收入方面受损,苹果在 iPhone 销量方面受损。开发者会削减 30% 收入。

因此,与那些在午夜发布修补程序的原生平台所有者不同,跨平台开发者对开发人员的请求置若罔闻。

稍好点的回应者会对问题写一个正式的回应,并单方面关闭它们,而不进行后续处理。在没有适当文档的情况下,开发人员的唯一办法就是深入研究包 / 库代码,修复问题,然后发布他们的应用程序。

有时,他们被库开发者诱导回馈,而这也没有任何激励措施,这一切都是基于开源精神。

对于一个被承诺了高效跨平台实现的开发者来说,这是 2 个缺点:花在 bug 上的时间更多 + 特性请求是开源的。

但还有更糟糕的。

雪上加霜的是,他们的 PR 有时候会被库所有者拒绝或忽视,理由是为了保证他们的记录簿干净。

除非高薪的跨平台库所有者平等对待他们的低薪同行创新者,否则情况只会恶化。

随着有经验的开发人员转向更好的平台原生的解决方案,跨平台项目注定会是一个伪装成创新的巨大渐进改动。

结论

跨平台开发将很快意味着对于多种设备的单平台开发

在所有的移动开发厂商中,苹果是唯一一家真正的业务线是硬件的公司。随着他们平台的统一,它向开发者发出了一个明确的信息:你对我们的服务业务至关重要,我们关心你们的增长。

现在预测其长期市场主导地位还为时过早。谷歌有足够的财力来为其原生安卓平台提供燃料,使其对开发者更有吸引力。

虽然单靠苹果无法撼动整个跨平台社区,但它肯定能够迫使其竞争对手采用一种更结构化更易于维护且更不易出现故障的移动开发方案。

跨平台开发领域将很快意味着对多种设备的单平台开发,就像.NET 早期那样。

创业公司最好不要跨平台。公共代码库的诱惑必须被良好的老文档取代,这样才能在开发人员心中培养真正的通用商业逻辑。

你的客户值得原生平台提供的最佳统一体验。

你的开发者?休息(不是双关语) + 尊敬。

作者 | Pen Magnet 来自 前端之颠 译者 | 张健欣

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

选型必看:RabbitMQ 七战 Kafka,差异立现

发表于 2020-12-25

img

作为一个有丰富经验的微服务系统架构师,经常有人问我,“应该选择RabbitMQ还是Kafka?”。基于某些原因, 许多开发者会把这两种技术当做等价的来看待。的确,在一些案例场景下选择RabbitMQ还是Kafka没什么差别,但是这两种技术在底层实现方面是有许多差异的。

不同的场景需要不同的解决方案,选错一个方案能够严重的影响你对软件的设计,开发和维护的能力。

这篇文章会先介绍RabbitMQ和Apache Kafka内部实现的相关概念。紧接着会主要介绍这两种技术的主要不同点以及他们各自的优缺点,最后我们会说明一下怎样选择这两种技术。

一、异步消息模式

异步消息可以作为解耦消息的生产和处理的一种解决方案。提到消息系统,我们通常会想到两种主要的消息模式——消息队列和发布/订阅模式。

1、消息队列

利用消息队列可以解耦生产者和消费者。多个生产者可以向同一个消息队列发送消息;但是,一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。也就是说一个具体的消息只能由一个消费者消费。

img

消息队列

需要额外注意的是,如果消费者处理一个消息失败了,消息系统一般会把这个消息放回队列,这样其他消费者可以继续处理。消息队列除了提供解耦功能之外,它还能够对生产者和消费者进行独立的伸缩(scale),以及提供对错误处理的容错能力。

2、发布/订阅

发布/订阅(pub/sub)模式中,单个消息可以被多个订阅者并发的获取和处理。

img

发布/订阅

例如,一个系统中产生的事件可以通过这种模式让发布者通知所有订阅者。在许多队列系统中常常用主题(topics)这个术语指代发布/订阅模式。在RabbitMQ中,主题就是发布/订阅模式的一种具体实现(更准确点说是交换器(exchange)的一种),但是在这篇文章中,我会把主题和发布/订阅当做等价来看待。

一般来说,订阅有两种类型:

1)临时(ephemeral)订阅,这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出,相应的订阅以及尚未处理的消息就会丢失。

2)持久(durable)订阅,这种订阅会一直存在,除非主动去删除。消费者退出后,消息系统会继续维护该订阅,并且后续消息可以被继续处理。

二、RabbitMQ

RabbitMQ作为消息中间件的一种实现,常常被当作一种服务总线来使用。RabbitMQ原生就支持上面提到的两种消息模式。其他一些流行的消息中间件的实现有ActiveMQ,ZeroMQ,Azure Service Bus以及Amazon Simple Queue Service(SQS)。这些消息中间件的实现有许多共通的地方,这边文章中提到的许多概念大部分都适用于这些中间件。

1、队列

RabbitMQ支持典型的开箱即用的消息队列。开发者可以定义一个命名队列,然后发布者可以向这个命名队列中发送消息。最后消费者可以通过这个命名队列获取待处理的消息。

2、消息交换器

RabbitMQ使用消息交换器来实现发布/订阅模式。发布者可以把消息发布到消息交换器上而不用知道这些消息都有哪些订阅者。

每一个订阅了交换器的消费者都会创建一个队列;然后消息交换器会把生产的消息放入队列以供消费者消费。消息交换器也可以基于各种路由规则为一些订阅者过滤消息。

img

RabbitMQ消息交换器

需要重点注意的是RabbitMQ支持临时和持久两种订阅类型。消费者可以调用RabbitMQ的API来选择他们想要的订阅类型。

根据RabbitMQ的架构设计,我们也可以创建一种混合方法——订阅者以组队的方式然后在组内以竞争关系作为消费者去处理某个具体队列上的消息,这种由订阅者构成的组我们称为消费者组。按照这种方式,我们实现了发布/订阅模式,同时也能够很好的伸缩(scale-up)订阅者去处理收到的消息。

img

发布/订阅与队列的联合使用

三、Apache Kafka

Apache Kafka不是消息中间件的一种实现。相反,它只是一种分布式流式系统。

不同于基于队列和交换器的RabbitMQ,Kafka的存储层是使用分区事务日志来实现的。Kafka也提供流式API用于实时的流处理以及连接器API用来更容易的和各种数据源集成;当然,这些已经超出了本篇文章的讨论范围。

云厂商为Kafka存储层提供了可选的方案,比如Azure Event Hubsy以及AWS Kinesis Data Streams等。对于Kafka流式处理能力,还有一些特定的云方案和开源方案,不过,话说回来,它们也超出了本篇的范围。

1、主题

Kafka没有实现队列这种东西。相应的,Kafka按照类别存储记录集,并且把这种类别称为主题。

Kafka为每个主题维护一个消息分区日志。每个分区都是由有序的不可变的记录序列组成,并且消息都是连续的被追加在尾部。

当消息到达时,Kafka就会把他们追加到分区尾部。默认情况下,Kafka使用轮询分区器(partitioner)把消息一致的分配到多个分区上。

Kafka可以改变创建消息逻辑流的行为。例如,在一个多租户的应用中,我们可以根据每个消息中的租户ID创建消息流。IoT场景中,我们可以在常数级别下根据生产者的身份信息(identity)将其映射到一个具体的分区上。确保来自相同逻辑流上的消息映射到相同分区上,这就保证了消息能够按照顺序提供给消费者。

img

Kafka生产者

消费者通过维护分区的偏移(或者说索引)来顺序的读出消息,然后消费消息。

单个消费者可以消费多个不同的主题,并且消费者的数量可以伸缩到可获取的最大分区数量。

所以在创建主题的时候,我们要认真的考虑一下在创建的主题上预期的消息吞吐量。消费同一个主题的多个消费者构成的组称为消费者组。通过Kafka提供的API可以处理同一消费者组中多个消费者之间的分区平衡以及消费者当前分区偏移的存储。

img

Kafka消费者

2、Kafka实现的消息模式

Kafka的实现很好地契合发布/订阅模式。

生产者可以向一个具体的主题发送消息,然后多个消费者组可以消费相同的消息。每一个消费者组都可以独立的伸缩去处理相应的负载。由于消费者维护自己的分区偏移,所以他们可以选择持久订阅或者临时订阅,持久订阅在重启之后不会丢失偏移而临时订阅在重启之后会丢失偏移并且每次重启之后都会从分区中最新的记录开始读取。

但是这种实现方案不能完全等价的当做典型的消息队列模式看待。当然,我们可以创建一个主题,这个主题和拥有一个消费者的消费组进行关联,这样我们就模拟出了一个典型的消息队列。不过这会有许多缺点,我们会在第二部分详细讨论。

值得特别注意的是,Kafka是按照预先配置好的时间保留分区中的消息,而不是根据消费者是否消费了这些消息。这种保留机制可以让消费者自由的重读之前的消息。另外,开发者也可以利用Kafka的存储层来实现诸如事件溯源和日志审计功能。

尽管有时候RabbitMQ和Kafka可以当做等价来看,但是他们的实现是非常不同的。所以我们不能把他们当做同种类的工具来看待;一个是消息中间件,另一个是分布式流式系统。

作为解决方案架构师,我们要能够认识到它们之间的差异并且尽可能的考虑在给定场景中使用哪种类型的解决方案。下面会指出这些差异并且提供什么时候使用哪种方案的指导建议。

四、RabbitMQ和Kafka的显著差异

RabbitMQ是一个消息代理,但是Apache Kafka是一个分布式流式系统。好像从语义上就可以看出差异,但是它们内部的一些特性会影响到我们是否能够很好的设计各种用例。

例如,Kafka最适用于数据的流式处理,但是RabbitMQ对流式中的消息就很难保持它们的顺序。

另一方面,RabbitMQ内置重试逻辑和死信(dead-letter)交换器,但是Kafka只是把这些实现逻辑交给用户来处理。

这部分主要强调在不同系统之间它们的主要差异。

1、消息顺序

对于发送到队列或者交换器上的消息,RabbitMQ不保证它们的顺序。尽管消费者按照顺序处理生产者发来的消息看上去很符合逻辑,但是这有很大误导性。

RabbitMQ文档中有关于消息顺序保证的说明:

“发布到一个通道(channel)上的消息,用一个交换器和一个队列以及一个出口通道来传递,那么最终会按照它们发送的顺序接收到。”

——RabbitMQ代理语义(Broker Semantics)

换话句话说,只要我们是单个消费者,那么接收到的消息就是有序的。然而,一旦有多个消费者从同一个队列中读取消息,那么消息的处理顺序就没法保证了。

由于消费者读取消息之后可能会把消息放回(或者重传)到队列中(例如,处理失败的情况),这样就会导致消息的顺序无法保证。

一旦一个消息被重新放回队列,另一个消费者可以继续处理它,即使这个消费者已经处理到了放回消息之后的消息。因此,消费者组处理消息是无序的,如下表所示:

img

使用RabbitMQ丢失消息顺序的例子

当然,我们可以通过限制消费者的并发数等于1来保证RabbitMQ中的消息有序性。更准确点说,限制单个消费者中的线程数为1,因为任何的并行消息处理都会导致无序问题。

不过,随着系统规模增长,单线程消费者模式会严重影响消息处理能力。所以,我们不要轻易的选择这种方案。

另一方面,对于Kafka来说,它在消息处理方面提供了可靠的顺序保证。Kafka能够保证发送到相同主题分区的所有消息都能够按照顺序处理。

在前面说过,默认情况下,Kafka会使用循环分区器(round-robin partitioner)把消息放到相应的分区上。不过,生产者可以给每个消息设置分区键(key)来创建数据逻辑流(比如来自同一个设备的消息,或者属于同一租户的消息)。

所有来自相同流的消息都会被放到相同的分区中,这样消费者组就可以按照顺序处理它们。

但是,我们也应该注意到,在同一个消费者组中,每个分区都是由一个消费者的一个线程来处理。结果就是我们没法伸缩(scale)单个分区的处理能力。

不过,在Kafka中,我们可以伸缩一个主题中的分区数量,这样可以让每个分区分担更少的消息,然后增加更多的消费者来处理额外的分区。

获胜者(Winner):

显而易见,Kafka是获胜者,因为它可以保证按顺序处理消息。RabbitMQ在这块就相对比较弱。

2、消息路由

RabbitMQ可以基于定义的订阅者路由规则路由消息给一个消息交换器上的订阅者。一个主题交换器可以通过一个叫做routing_key的特定头来路由消息。

或者,一个头部(headers)交换器可以基于任意的消息头来路由消息。这两种交换器都能够有效地让消费者设置他们感兴趣的消息类型,因此可以给解决方案架构师提供很好的灵活性。

另一方面,Kafka在处理消息之前是不允许消费者过滤一个主题中的消息。一个订阅的消费者在没有异常情况下会接受一个分区中的所有消息。

作为一个开发者,你可能使用Kafka流式作业(job),它会从主题中读取消息,然后过滤,最后再把过滤的消息推送到另一个消费者可以订阅的主题。但是,这需要更多的工作量和维护,并且还涉及到更多的移动操作。

获胜者:

在消息路由和过滤方面,RabbitMQ提供了更好的支持。

3、消息时序(timing)

在测定发送到一个队列的消息时间方面,RabbitMQ提供了多种能力:

1)消息存活时间(TTL)

发送到RabbitMQ的每条消息都可以关联一个TTL属性。发布者可以直接设置TTL或者根据队列的策略来设置。

系统可以根据设置的TTL来限制消息的有效期。如果消费者在预期时间内没有处理该消息,那么这条消息会自动的从队列上被移除(并且会被移到死信交换器上,同时在这之后的消息都会这样处理)。

TTL对于那些有时效性的命令特别有用,因为一段时间内没有处理的话,这些命令就没有什么意义了。

2)延迟/预定的消息

RabbitMQ可以通过插件的方式来支持延迟或者预定的消息。当这个插件在消息交换器上启用的时候,生产者可以发送消息到RabbitMQ上,然后这个生产者可以延迟RabbitMQ路由这个消息到消费者队列的时间。

这个功能允许开发者调度将来(future)的命令,也就是在那之前不应该被处理的命令。例如,当生产者遇到限流规则时,我们可能会把这些特定的命令延迟到之后的一个时间执行。

Kafka没有提供这些功能。它在消息到达的时候就把它们写入分区中,这样消费者就可以立即获取到消息去处理。

Kafka也没用为消息提供TTL的机制,不过我们可以在应用层实现。

不过,我们必须要记住的一点是Kafka分区是一种追加模式的事务日志。所以,它是不能处理消息时间(或者分区中的位置)。

获胜者:

毫无疑问,RabbitMQ是获胜者,因为这种实现天然的就限制Kafka。

4、消息留存(retention)

当消费者成功消费消息之后,RabbitMQ就会把对应的消息从存储中删除。这种行为没法修改。它几乎是所有消息代理设计的必备部分。

相反,Kafka会给每个主题配置超时时间,只要没有达到超时时间的消息都会保留下来。在消息留存方面,Kafka仅仅把它当做消息日志来看待,并不关心消费者的消费状态。

消费者可以不限次数的消费每条消息,并且他们可以操作分区偏移来“及时”往返的处理这些消息。Kafka会周期的检查分区中消息的留存时间,一旦消息超过设定保留的时长,就会被删除。

Kafka的性能不依赖于存储大小。所以,理论上,它存储消息几乎不会影响性能(只要你的节点有足够多的空间保存这些分区)。

获胜者:

Kafka设计之初就是保存消息的,但是RabbitMQ并不是。所以这块没有可比性,Kafka是获胜者。

5、容错处理

当处理消息,队列和事件时,开发者常常认为消息处理总是成功的。毕竟,生产者把每条消息放入队列或者主题后,即使消费者处理消息失败了,它仅仅需要做的就是重新尝试,直到成功为止。

尽管表面上看这种方法是没错的,但是我们应该对这种处理方式多思考一下。首先我们应该承认,在某些场景下,消息处理会失败。所以,即使在解决方案部分需要人为干预的情况下,我们也要妥善地处理这些情况。

消息处理存在两种可能的故障:

1)瞬时故障——故障产生是由于临时问题导致,比如网络连接,CPU负载,或者服务崩溃。我们可以通过一遍又一遍的尝试来减轻这种故障。

2)持久故障——故障产生是由于永久的问题导致的,并且这种问题不能通过额外的重试来解决。比如常见的原因有软件bug或者无效的消息格式(例如,损坏(poison)的消息)。

作为架构师和开发者,我们应该问问自己:“对于消息处理故障,我们应该重试多少次?每一次重试之间我们应该等多久?我们怎样区分瞬时和持久故障?”

最重要的是:“所有重试都失败后或者遇到一个持久的故障,我们要做什么?”

当然,不同业务领域有不同的回答,消息系统一般会给我们提供工具让我们自己实现解决方案。

RabbitMQ会给我们提供诸如交付重试和死信交换器(DLX)来处理消息处理故障。

DLX的主要思路是根据合适的配置信息自动地把路由失败的消息发送到DLX,并且在交换器上根据规则来进一步的处理,比如异常重试,重试计数以及发送到“人为干预”的队列。

查看下面篇文章,它在RabbitMQ处理重试上提供了额外的可能模式视角。

链接:engineering.nanit.com/rabbitmq-re…

在RabbitMQ中我们需要记住最重要的事情是当一个消费者正在处理或者重试某个消息时(即使是在把它返回队列之前),其他消费者都可以并发的处理这个消息之后的其他消息。

当某个消费者在重试处理某条消息时,作为一个整体的消息处理逻辑不会被阻塞。所以,一个消费者可以同步地去重试处理一条消息,不管花费多长时间都不会影响整个系统的运行。

img

消费者1持续的在重试处理消息1,同时其他消费者可以继续处理其他消息

和RabbitMQ相反,Kafka没有提供这种开箱即用的机制。在Kafka中,需要我们自己在应用层提供和实现消息重试机制。

另外,我们需要注意的是当一个消费者正在同步地处理一个特定的消息时,那么同在这个分区上的其他消息是没法被处理的。

由于消费者不能改变消息的顺序,所以我们不能够拒绝和重试一个特定的消息以及提交一个在这个消息之后的消息。你只要记住,分区仅仅是一个追加模式的日志。

一个应用层解决方案可以把失败的消息提交到一个“重试主题”,并且从那个主题中处理重试;但是这样的话我们就会丢失消息的顺序。

我们可以在Uber.com上找到Uber工程师实现的一个例子。如果消息处理的时延不是关注点,那么对错误有足够监控的Kafka方案可能就足够了。

如果消费者阻塞在重试一个消息上,那么底部分区的消息就不会被处理

获胜者:

RabbitMQ是获胜者,因为它提供了一个解决这个问题的开箱即用的机制。

6、伸缩

有多个基准测试,用于检查RabbitMQ和Kafka的性能。

尽管通用的基准测试对一些特定的情况会有限制,但是Kafka通常被认为比RabbitMQ有更优越的性能。

Kafka使用顺序磁盘I / O来提高性能。

从Kafka使用分区的架构上看,它在横向扩展上会优于RabbitMQ,当然RabbitMQ在纵向扩展上会有更多的优势。

Kafka的大规模部署通常每秒可以处理数十万条消息,甚至每秒百万级别的消息。

过去,Pivotal记录了一个Kafka集群每秒处理一百万条消息的例子;但是,它是在一个有着30个节点集群上做的,并且这些消息负载被优化分散到多个队列和交换器上。

链接:content.pivotal.io/blog/rabbit…

典型的RabbitMQ部署包含3到7个节点的集群,并且这些集群也不需要把负载分散到不同的队列上。这些典型的集群通常可以预期每秒处理几万条消息。

获胜者:

尽管这两个消息平台都可以处理大规模负载,但是Kafka在伸缩方面更优并且能够获得比RabbitMQ更高的吞吐量,因此这局Kafka获胜。

但是,值得注意的是大部分系统都还没有达到这些极限!所以,除非你正在构建下一个非常受欢迎的百万级用户软件系统,否则你不需要太关心伸缩性问题,毕竟这两个消息平台都可以工作的很好。

7、消费者复杂度

RabbitMQ使用的是智能代理和傻瓜式消费者模式。消费者注册到消费者队列,然后RabbitMQ把传进来的消息推送给消费者。RabbitMQ也有拉取(pull)API;不过,一般很少被使用。

RabbitMQ管理消息的分发以及队列上消息的移除(也可能转移到DLX)。消费者不需要考虑这块。

根据RabbitMQ结构的设计,当负载增加的时候,一个队列上的消费者组可以有效的从仅仅一个消费者扩展到多个消费者,并且不需要对系统做任何的改变。

img

RabbitMQ高效的伸缩

相反,Kafka使用的是傻瓜式代理和智能消费者模式。消费者组中的消费者需要协调他们之间的主题分区租约(以便一个具体的分区只由消费者组中一个消费者监听)。

消费者也需要去管理和存储他们分区偏移索引。幸运的是Kafka SDK已经为我们封装了,所以我们不需要自己管理。

另外,当我们有一个低负载时,单个消费者需要处理并且并行的管理多个分区,这在消费者端会消耗更多的资源。

当然,随着负载增加,我们只需要伸缩消费者组使其消费者的数量等于主题中分区的数量。这就需要我们配置Kafka增加额外的分区。

但是,随着负载再次降低,我们不能移除我们之前增加的分区,这需要给消费者增加更多的工作量。尽管这样,但是正如我们上面提到过,Kafka SDK已经帮我们做了这个额外的工作。

img

Kafka分区没法移除,向下伸缩后消费者会做更多的工作

获胜者:

根据设计,RabbitMQ就是为了傻瓜式消费者而构建的。所以这轮RabbitMQ获胜。

## 五、如何选择?

现在我们就如面对百万美元问题一样:“什么时候使用RabbitMQ以及什么时候使用Kafka?”概括上面的差异,我们不难得出下面的结论。

优先选择RabbitMQ的条件:

  • 高级灵活的路由规则;
  • 消息时序控制(控制消息过期或者消息延迟);
  • 高级的容错处理能力,在消费者更有可能处理消息不成功的情景中(瞬时或者持久);
  • 更简单的消费者实现。

优先选择Kafka的条件:

  • 严格的消息顺序;
  • 延长消息留存时间,包括过去消息重放的可能;
  • 传统解决方案无法满足的高伸缩能力。

大部分情况下这两个消息平台都可以满足我们的要求。但是,它取决于我们的架构师,他们会选择最合适的工具。当做决策的时候,我们需要考虑上面着重强调的功能性差异和非功能性限制。

这些限制如下:

  • 当前开发者对这两个消息平台的了解;
  • 托管云解决方案的可用性(如果适用);
  • 每种解决方案的运营成本;
  • 适用于我们目标栈的SDK的可用性。

当开发复杂的软件系统时,我们可能被诱导使用同一个消息平台去实现所有必须的消息用例。但是,从我的经验看,通常同时使用这两个消息平台能够带来更多的好处。

例如,在一个事件驱动的架构系统中,我们可以使用RabbitMQ在服务之间发送命令,并且使用Kafka实现业务事件通知。

原因是事件通知常常用于事件溯源,批量操作(ETL风格),或者审计目的,因此Kafka的消息留存能力就显得很有价值。

相反,命令一般需要在消费者端做额外处理,并且处理可以失败,所以需要高级的容错处理能力。

这里,RabbitMQ在功能上有很多闪光点。以后我可能会写一篇详细的文章来介绍,但是你必须记住–你的里程(mileage)可能会变化,因为适合性取决于你的特定需求。

六、总结思想

写这篇文章是由于我观察到许多开发者把这RabbitMQ和Kafka作为等价来看待。我希望通过这篇文章的帮助能够让你获得对这两种技术实现的深刻理解以及它们之间的技术差异。

反过来通过它们之间的差异来影响这两个平台去给用例提供更好的服务。这两个消息平台都很棒,并且都能够给多个用例提供很好的服务。

但是,作为解决方案架构师,取决于我们对每一个用例需求的理解,以及优化,然后选择最合适的解决方案。

文章已经收录GitHub:github.com/JavaFamily

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

推荐5款最好用的内网穿透工具

发表于 2020-12-25

公众号开发过程中,频繁修改线上代码,开发过程比较繁琐,同时一旦出错将会影响到线上的其他正常业务,因此搭建一个微信公众号开发的本地环境能达到不影响线上业务的同时调试代码。这里推荐几款非常好用的内网穿透工具,如有其它推荐,欢迎补充

  1. frp

frp 是一个高性能的反向代理应用,可以帮助您轻松地进行内网穿透,对外网提供服务,支持 tcp, http, https 等协议类型,并且 web 服务支持根据域名进行路由转发。

项目地址:

1
bash复制代码https://github.com/fatedier/frp

使用步骤:

a. 部署frps 到云服务器上

b. 在本地服务器上运行frpc

  1. ngrok

ngrok 是一个反向代理,通过在公共的端点和本地运行的 Web 服务器之间建立一个安全的通道。

官网地址:

1
bash复制代码https://ngrok.com

使用步骤:

a. 进入ngrok官网,注册一个账号,并下载客户端

b. 在本地服务器上,运行ngrok

1
bash复制代码./ngrok authoken 授权码

c. 添加映射,将HTTP映射转发到本地80端口

1
bash复制代码./ngrok http 80
  1. 蜻蜓映射

蜻蜓映射是一款完全免费的端口映射工具,集成了动态域名解析和内网穿透。 支持多种类型的应用程序服务(办公OA、ERP、视频监控、微信开发等)。蜻蜓映射可以轻松的实现外网访问内网服务器,而无需运营商分配的公网IP地址,也无需动态域名。

官网地址

1
bash复制代码https://flynat.51miaole.com

使用步骤:

a. 下载客户端到目标主机,并运行。

b. 添加映射,设置映射协议和本地端口

c. 复制访问地址,在浏览器测试访问

  1. natapp

基于ngrok的内网映射工具,免费版本提供http,tcp,udp全隧道穿透、随机域名/随机TCP,UDP端口、不定时强制更换域名/端口和自定义本地端口。

官网地址:

1
bash复制代码https://natapp.cn

使用步骤:

a. 进入官网,下载客户端到目标主机,并运行

b. 购买隧道并设置隧道协议和端口

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Netty时间轮调度算法原理分析,再不了解你就out啦

发表于 2020-12-25

一、时间轮介绍

之前公司内部搭建的延迟队列服务有用到时间轮,但是一直没有了解过它的实现原理。

最近有个和支付宝对接的项目,支付宝接口有流量控制,一定的时间内只允许 N 次接口调用,针对一些业务我们需要频繁调用支付宝开放平台接口,如果不对请求做限制,很容易触发流控告警。

为了避免这个问题,我们按照一定延迟规则将任务加载进时间轮内,通过时间轮的调度来实现接口异步调用。

很多开源框架都实现了时间轮算法,这里以 Netty 为例,看下 Netty 中时间轮是怎么实现的。

1.1 快速入门

下面是一个 API 使用例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码public class WheelTimerSamples {

private static final HashedWheelTimerInstance INSTANCE = HashedWheelTimerInstance.INSTANCE;

public static void main(String[] args) throws IOException {

INSTANCE.getWheelTimer().newTimeout(new PrintTimerTask(), 3, TimeUnit.SECONDS);
System.in.read();
}


static class PrintTimerTask implements TimerTask {
@Override
public void run(Timeout timeout) {
System.out.println("Hello world");
}
}

enum HashedWheelTimerInstance {
INSTANCE;
private final HashedWheelTimer wheelTimer;

HashedWheelTimerInstance() {
wheelTimer = new HashedWheelTimer(r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((t1, e) -> System.out.println(t1.getName() + e.getMessage()));
t.setName("-HashedTimerWheelInstance-");
return t;
}, 100, TimeUnit.MILLISECONDS, 64);
}

public HashedWheelTimer getWheelTimer() {
return wheelTimer;
}
}
}

上面的例子中我们自定义了一个 HashedWheelTimer,然后自定义了一个 TimerTask,将一个任务加载进时间轮,3s 后执行这个任务,怎么样是不是很简单。

在定义时间轮时建议按照业务类型进行区分,将时间轮定义为多个单例对象。

PS:因为时间轮是异步执行的,在任务执行之前 JVM 不能退出,所以 System.in.read(); 这一行代码不能删除。

1.2 原理图解

二、原理分析

2.1 时间轮状态

时间轮有以下三种状态:

  • WORKER_STATE_INIT:初始化状态,此时时间轮内的工作线程还没有开启
  • WORKER_STATE_STARTED:运行状态,时间轮内的工作线程已经开启
  • WORKER_STATE_SHUTDOWN:终止状态,时间轮停止工作

状态转换如下,转换原理会在下面讲到:

2.2 构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码    public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {

if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}

// 初始化时间轮数组,时间轮大小为大于等于 ticksPerWheel 的第一个 2 的幂,和 HashMap 类似
wheel = createWheel(ticksPerWheel);
// 取模用,用来定位数组中的槽
mask = wheel.length - 1;

// 为了保证精度,时间轮内的时间单位为纳秒
long duration = unit.toNanos(tickDuration);

// 时间轮内的时钟拨动频率不宜太大也不宜太小
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}

if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}

// 创建工作线程
workerThread = threadFactory.newThread(worker);


// 非守护线程且 leakDetection 为 true 时检测内存是否泄漏
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

// 初始化最大等待任务数
this.maxPendingTimeouts = maxPendingTimeouts;

// 如果创建的时间轮实例大于 64,打印日志,并且这个日志只会打印一次
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}

构造函数中的参数相当重要,当自定义时间轮时,我们应该根据业务的范围设置合理的参数:

  • threadFactory:创建时间轮任务线程的工厂,通过这个工厂可以给我们的线程自定义一些属性(线程名、异常处理等)
  • tickDuration:时钟多长时间拨动一次,值越小,时间轮精度越高
  • unit:tickDuration 的单位
  • ticksPerWheel:时间轮数组大小
  • leakDetection:是否检测内存泄漏
  • maxPendingTimeouts:时间轮内最大等待的任务数

时间轮的时钟拨动时长应该根据业务设置恰当的值,如果设置的过大,可能导致任务触发时间不准确。如果设置的过小,时间轮转动频繁,任务少的情况下加载不到任务,属于一直空转的状态,会占用 CPU 线程资源。

为了防止时间轮占用过多的 CPU 资源,当创建的时间轮对象大于 64 时会以日志的方式提示。

构造函数中只是初始化了轮线程,并没有开启,当第一次往时间轮内添加任务时,线程才会开启。

2.3 往时间轮内添加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码    @Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}

// 等待的任务数 +1
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

// 如果时间轮内等待的任务数大于最大值,任务会被抛弃
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}

// 开启时间轮内的线程
start();

// 计算当前添加任务的执行时间
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 将任务加入队列
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}

任务会先保存在队列中,当时间轮的时钟拨动时才会判断是否将队列中的任务加载进时间轮。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码    public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
// 这里存在并发,通过 CAS 操作保证最终只有一个线程能开启时间轮的工作线程
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}

while (startTime == 0) {
try {
// startTimeInitialized 是一个 CountDownLatch,目的是为了保证工作线程的 startTime 属性初始化
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}

这里通过 CAS 加锁的方式保证线程安全,避免多次开启。

工作线程开启后,start() 方法会被阻塞,等工作线程的 startTime 属性初始化完成后才被唤醒。为什么只有等 startTime 初始化后才能继续执行呢?因为上面的 newTimeout 方法在线程开启后,需要计算当前添加进来任务的执行时间,而这个执行时间是根据 startTime 计算的。

2.4 时间轮调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码        @Override
public void run() {
// 初始化 startTime.
startTime = System.nanoTime();
if (startTime == 0) {
startTime = 1;
}

// 用来唤醒被阻塞的 HashedWheelTimer#start() 方法,保证 startTime 初始化
startTimeInitialized.countDown();

do {
// 时钟拨动
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
// 处理过期的任务
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
// 将任务加载进时间轮
transferTimeoutsToBuckets();
// 执行当前时间轮槽内的任务
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// 时间轮关闭,将还未执行的任务以列表的形式保存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
// 还未执行的任务可能会在两个地方,一:时间轮数组内,二:队列中
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 处理过期的任务
processCancelledTasks();
}

时间轮每拨动一次 tick 就会 +1,根据这个值与(时间轮数组长度 - 1)进行 & 运算,可以定位时间轮数组内的槽。因为 tick 值一直在增加,所以时间轮数组看起来就像一个不断循环的圆。

  • 先初始化 startTime 值,因为后面任务执行的时间是根据 startTime 计算的
  • 时钟拨动,如果时间未到,则 sleep 一会儿
  • 处理过期的任务
  • 将任务加载进时间轮
  • 执行当前时钟对应时间轮内的任务
  • 时间轮关闭,将所有未执行的任务封装到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
  • 处理过期的任务

上面简单罗列了下 run 方法的大概执行步骤,下面是具体方法的分析。

2.5 时钟拨动

如果时间轮设置的 tickDuration 为 100ms 拨动一次,当时钟拨动一次后,应该计算下一次时钟拨动的时间,如果还没到就 sleep 一会儿,等到拨动时间再醒来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码        private long waitForNextTick() {
// 计算时钟下次拨动的相对时间
long deadline = tickDuration * (tick + 1);

for (;;) {
// 获取当前时间的相对时间
final long currentTime = System.nanoTime() - startTime;
// 计算距离时钟下次拨动的时间
// 这里之所以加 999999 后再除 10000000, 是为了保证足够的 sleep 时间
// 例如:当 deadline - currentTime = 2000002 的时候,如果不加 999999,则只睡了 2ms
// 而 2ms 其实是未到达 deadline 时间点的,所以为了使上述情况能 sleep 足够的时间,加上 999999 后,会多睡 1ms
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

// <=0 说明可以拨动时钟了
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}


// 这里是为了兼容 Windows 平台,因为 Windows 平台的调度最小单位为 10ms,如果不是 10ms 的倍数,可能会引起 sleep 时间不准确
// See https://github.com/Netty/Netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}

try {
// sleep 到下次时钟拨动
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}

如果时间不到就 sleep 等待一会儿,为了使任务时钟准确,可以从上面的代码中看出 Netty 做了一些优化,比如 sleepTimeMs 的计算,Windows 平台的处理等。

2.6 将任务从队列加载进时间轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码        private void transferTimeoutsToBuckets() {

// 一次最多只处理队列中的 100000 个任务
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
// 过滤已经取消的任务
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
continue;
}
// 计算当前任务到执行还需要经过几次时钟拨动
// 假设时间轮数组大小是 10,calculated 为 12,需要时间轮转动一圈加两次时钟拨动后后才能执行这个任务,因此还需要计算一下圈数
long calculated = timeout.deadline / tickDuration;
// 计算当前任务到执行还需要经过几圈时钟拨动
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 有的任务可能在队列里很长时间,时间过期了也没有被调度,将这种情况的任务放在当前轮次内执行
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
// 计算任务在时间轮数组中的槽
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
// 将任务放到时间轮的数组中,多个任务可能定位时间轮的同一个槽,这些任务通过以链表的形式链接
bucket.addTimeout(timeout);
}
}

void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
// 任务构成双向链表
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}

在上面也提到过,任务刚加进来不会立即到时间轮中去,而是暂时保存到一个队列中,当时间轮时钟拨动时,会将任务从队列中加载进时间轮内。

时间轮每次最大处理 100000 个任务,因为任务的执行时间是用户自定义的,所以需要计算任务到执行需要经过多少次时钟拨动,并计算时间轮拨动的圈数。接着将任务加载进时间轮对应的槽内,可能有多个任务经过 hash 计算后定位到同一个槽,这些任务会以双向链表的结构保存,有点类似 HashMap 处理碰撞的情况。

2.7 执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码        public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;

while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// 任务执行的圈数 > 0,表示任务还需要经过 remainingRounds 圈时钟循环才能执行
if (timeout.remainingRounds <= 0) {
// 从链表中移除当前任务,并返回链表中下一个任务
next = remove(timeout);
if (timeout.deadline <= deadline) {
// 执行任务
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
// 过滤取消的任务
next = remove(timeout);
} else {
// 圈数 -1
timeout.remainingRounds --;
}
timeout = next;
}
}

public void expire() {
// 任务状态校验
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}

try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}

时间轮槽内的任务以链表形式存储,这些任务执行的时间可能会不一样,有的在当前时钟执行,有的在下一圈或者下两圈对应的时钟执行。当任务在当前时钟执行时,需要将这个任务从链表中删除,重新维护链表关系。

2.8 终止时间轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码    @Override
public Set<Timeout> stop() {
// 终止时间轮的线程不能是时间轮的工作线程
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
// 将时间轮的状态修改为 WORKER_STATE_SHUTDOWN,这里有两种情况
// 一:时间轮是 WORKER_STATE_INIT 状态,表明时间轮从创建到终止一直没有任务进来
// 二:时间轮是 WORKER_STATE_STARTED 状态,多个线程尝试终止时间轮,只有一个操作成功
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// 代码走到这里,时间轮只能是两种状态中的一个,WORKER_STATE_INIT 和 WORKER_STATE_SHUTDOWN
// 为 WORKER_STATE_INIT 表示时间轮没有任务,因此不用返回未处理的任务,但是需要将时间轮实例 -1
// 为 WORKER_STATE_SHUTDOWN 表示是 CAS 操作失败,什么都不用做,因为 CAS 成功的线程会处理
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
// 时间轮实例对象 -1
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
// CAS 操作失败,或者时间轮没有处理过任务,返回空的任务列表
return Collections.emptySet();
}

try {
boolean interrupted = false;
while (workerThread.isAlive()) {
// 中断时间轮工作线程
workerThread.interrupt();
try {
// 终止时间轮的线程等待时间轮工作线程 100ms,这个过程主要是为了时间轮工作线程处理未执行的任务
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}

if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
// 返回未处理的任务
return worker.unprocessedTimeouts();
}

当终止时间轮时,时间轮状态有两种情况:

  • WORKER_STATE_INIT:时间轮初始化,前面我们说过,当初始化时间轮对象时并不会立即开启时间轮工作线程,而是第一次添加任务时才开启,为 WORKER_STATE_INIT 表示时间轮没有处理过任务
  • WORKER_STATE_STARTED:时间轮在工作,这里也有两种情况,存在并发与不存在并发,如果多个线程都尝试终止时间轮,肯定只能有一个成功

时间轮停止运行后会将未执行的任务返回出去,至于怎么处理这些任务,由业务方自己定义,这个流程和线程池的 shutdownNow 方法是类似的。

如果时间轮在运行,怎么才能获取到未执行的任务呢,答案就在上面的 run() 方法中,如果时间轮处于非运行状态,会把时间轮数组与队列中未执行且未取消的任务保存到 unprocessedTimeouts 集合中。而终止时间轮成功的线程只需要等待一会儿即可,这个等待是通过 workerThread.join(100); 实现的。

取消时间轮内的任务相对比较简单,这里就不概述了,想要了解的自行查看即可。

上面就是时间轮运行的基本原理了。

三、总结

这里以问答的形式进行总结,大家也可以看下这些问题,自己能不能很好的回答出来?

3.1 时间轮是不是在初始化完成后就启动了?

不是,初始化完成时间轮的状态是 WORKER_STATE_INIT,此时时间轮内的工作线程还没有运行,只有第一次往时间轮内添加任务时,才会开启时间轮内的工作线程。时间轮线程开启后会初始化 startTime,任务的执行时间会根据这个字段计算,而且时间轮中时间的概念是相对的。

3.2 如果时间轮内还有任务未执行,服务重启了怎么办?

时间轮内的任务都在内存中,服务重启数据肯定都丢了,所以当服务重启时需要业务方自己做兼容处理。

3.3 如何自定义合适的时间轮参数?

自定义时间轮时有两个比较重要的参数需要我们注意:

  • tickDuration:时钟拨动频率,假设一个任务在 10s 后执行,tickDuration 设置为 3min 那肯定是不行的,tickDuration 值越小,任务触发的精度越高,但是没有任务时,工作线程会一直自旋尝试从队列中拿任务,比较消耗 CPU 资源
  • ticksPerWheel:时间轮数组大小,假设当时间轮时钟拨动时,有 10000 个任务处理,但是我们定义时间轮数组的大小为 8,这时平均一个时间轮槽内有 1250 个任务,如果这 1250 个任务都在当前时钟执行,任务执行是同步的,由于每个任务执行都会消耗时间,可能会导致后面的任务触发时间不准确。反之如果数组长度设置的过大,任务比较少的情况下,时间轮数组很多槽都是空的

所以当使用自定义时间轮时,一定要评估自己的业务后再设置参数。

3.4 Netty 的时间轮有什么缺陷?

Netty 中的时间轮是通过单线程实现的,如果在执行任务的过程中出现阻塞,会影响后面任务执行。除此之外,Netty 中的时间轮并不适合创建延迟时间跨度很大的任务,比如往时间轮内丢成百上千个任务并设置 10 天后执行,这样可能会导致链表过长 round 值很大,而且这些任务在执行之前会一直占用内存。

3.5 时间轮要设置成单例的吗?

强烈建议按照业务模块区分,每个模块都创建一个单例的时间轮对象。在上面的代码中我们看到了,当时间轮对象大于 64 时会以日志的形式提示。如果时间轮是非单例对象,那时间轮算法完全就失去了作用。

3.6 时间轮与 ScheduledExecutorService 的区别?

ScheduledExecutorService 中的任务维护了一个堆,当有大量任务时,需要调整堆结构导致性能下降,而时间轮通过时钟调度,可以不受任务量的限制。

当任务量比较少时时间轮会一直自旋空转拨动时钟,相比 ScheduledExecutorService 会占用一定 CPU 资源。

参考

netty源码解读之时间轮算法实现-HashedWheelTimer

HashedWheelTimer 使用及源码分析创建

定时器的几种实现方式

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

迷茫了,我们到底该不该用lombok?

发表于 2020-12-24

前言

最近上网查资料发现很多人对lombok褒贬不一,引起了我的兴趣,因为我们项目中也在大量使用lombok,大家不同的观点让我也困惑了几天,今天结合我实际的项目经验,说说我的个人建议。

随便搜搜就找到了这几篇文章:

这些人建议使用 lombok,觉得它是一个神器,可以大大提高编码效率,并且让代码更优雅。

在搜索的过程中,有些文章却又不推荐使用:

这些人觉得它有一些坑,容易给项目埋下隐患,我们到底该听谁的呢?

为什么建议使用lombok?

1.传统javabean

在没使用lombok之前,我们一般是这样定义javabean的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
java复制代码public class User {

private Long id;
private String name;
private Integer age;
private String address;

public User() {

}

public User(Long id, String name, Integer age, String address) {
this.id = id;
this.name = name;
this.age = age;
this.address = address;
}

public Long getId() {
return id;
}

public String getName() {
return name;
}

public Integer getAge() {
return age;
}

public String getAddress() {
return address;
}


public void setId(Long id) {
this.id = id;
}

public void setName(String name) {
this.name = name;
}

public void setAge(Integer age) {
this.age = age;
}

public void setAddress(String address) {
this.address = address;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
User user = (User) o;
return Objects.equals(id, user.id) &&
Objects.equals(name, user.name) &&
Objects.equals(age, user.age) &&
Objects.equals(address, user.address);
}

@Override
public int hashCode() {
return Objects.hash(id, name, age, address);
}

@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
", address='" + address + '\'' +
'}';
}
}

该User类中包含了:成员变量、getter/setter方法、构造方法、equals、hashCode方法。

咋一看,代码还是挺多的。而且还有个问题,如果User类中的代码修改了,比如:age字段改成字符串类型,或者name字段名称修改了,是不是需要同步修改相关的成员变量、getter/setter方法、构造方法、equals、hashCode方法全都修改一遍?

也许有些朋友会说:现在的idea非常智能,可以把修改一次性搞定。

没错,但是有更优雅的处理方法。

2.lombok的使用

第一步,引入jar包

1
2
3
4
5
6
java复制代码  <dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>

第二步,在idea中安装插件


注意:如果不按照插件idea中就无法编译使用lombok注解的代码。

第三步,在代码中使用lombok注解

上面的User类代码可以改成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@ToString
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class User {

private Long id;
private String name;
private Integer age;
private String address;
}

so good,代码可以优化到如此简单。User类的主体只用定义成员变量,其他的方法全都交给注解来完成。

如果修改了成员变量名称或者类型,怎么办呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@ToString
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class User {

private Long id;
private String userName;
private String age;
private String address;
}

你只用一心一意修改成员变量即可,其他的根本不用操心,简直太爽了。

更让人兴奋的是,还能进一步优化:

1
2
3
4
5
6
7
8
9
10
java复制代码@NoArgsConstructor
@AllArgsConstructor
@Data
public class User {

private Long id;
private String userName;
private String age;
private String address;
}

@Data相当于@Getter、@Setter、@ToString、@EqualsAndHashCode、@RequiredArgsConstructor的集合。

lombok注解整理如下:


图片来源占小狼

从上面看出使用lombok给人最大的感受是代码量显著减少了,能够有效的提升开发效率,而代码看起来更优雅,确实是一个不可多得的神器。

lombok工作原理

java程序的解析分为:运行时解析 和 编译时解析。

通常我们通过反射获取类、方法、注解和成员变量就是运行时解析。但是这种方式效率其实不高,要在程序运行起来才能解析。

这时候编译时解析就体现出它的价值了。

编译时解析又分为:注解处理器(Annotation Processing Tool)和
JSR 269 插入式注解处理器(Pluggable Annotation Processing API)

第一种处理器它最早是在 JDK 1.5 与注解(Annotation) 一起引入的,它是一个命令行工具,能够提供构建时基于源代码对程序结构的读取功能,能够通过运行注解处理器来生成新的中间文件,进而影响编译过程。

不过在JDK 1.8以后,第一种处理器被淘汰了,取而代之的是第二种处理器,我们一起看看它的处理流程:

Lombok的底层具体实现流程如下:

  1. javac对源代码进行分析,生成了一棵抽象语法树(AST)
  2. 编译过程中调用实现了“JSR 269 API”的Lombok程序
  3. 此时Lombok就对第一步骤得到的AST进行处理,找到@Data注解所在类对应的语法树(AST),然后修改该语法树(AST),增加getter和setter方法定义的相应树节点
  4. javac使用修改后的抽象语法树(AST)生成字节码文件,即给class增加新的节点(代码块)

为什么建议不用lombok?

即使lombok是一个神器,但是却有很多人不建议使用,这又是为什么呢?

1.强制要求队友安装idea插件

这点确实比较恶心,因为如果使用lombok注解编写代码,就要求参与开发的所有人都必须安装idea的lombok插件,否则代码编译出错。

2.代码可读性变差

使用lombok注解之后,最后生成的代码你其实是看不到的,你能看到的是代码被修改之前的样子。如果要想查看某个getter或setter方法的引用过程,是非常困难的。

3.升级JDK对功能有影响

有人把JDK从Java 8升级到Java 11时,我发现Lombok不能正常工作了。

4.有一些坑

  1. 使用@Data时会默认使用@EqualsAndHashCode(callSuper=false),这时候生成的equals()方法只会比较子类的属性,不会考虑从父类继承的属性,无论父类属性访问权限是否开放。
  2. 使用@Builder时要加上@AllArgsConstructor,否则可能会报错。

5.不便于调试

我们平时大部分人都喜欢用debug调试定位问题,但是使用lombok生成的代码不太好调试。

6.上下游系统强依赖

如果上游系统中提供的fegin client使用了lombok,那么下游系统必须也使用lombok,否则会报错,上下游系统构成了强依赖。

我们该如何选择?

lombok有利有弊,我们该如何选择呢?

个人建议要结合项目的实际情况做最合理的选择。

  1. 如果你参与的是一个新项目,上下游系统都是新的,这时候建议使用lombok,因为它可以显著提升开发效率。
  2. 如果你参与的是一个老项目,并且以前没有使用过lombok,建议你后面也不要使用,因为代码改造成本较高。如果以前使用过lombok,建议你后面也使用,因为代码改造成本较高。
  3. 其实只要引入jar包可能都有:强制要求队友安装idea插件、升级JDK对功能有影响、有一些坑 和 上下游系统强依赖 这几个问题,只要制定好规范,多总结使用经验这些问题不大。
  4. 代码的可读性变差 和 不便于调试 这两个问题,我认为也不大,因为lombok一般被使用在javabean上,该类的逻辑相对来说比较简单,很多代码一眼就能看明白,即使不调试问题原因也能猜测7、8分。

最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙扫描下发二维码关注一下,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

排序算法-插入排序

发表于 2020-12-24

简介

插入排序,一般也被称为直接插入排序。对于少量元素的排序,它是一个有效的算法 。插入排序是一种最简单的排序方法,它的基本思想是将一个记录插入到已经排好序的有序表中,从而一个形成一个有序表。在其实现过程使用双层循环,外层循环除第一个元素之外的所有元素,内层循环对当前元素前面有序表进行待插入位置查找,并进行移动 。

时间复杂度

O(n^2)

思路分析

排序规则:从小到大

在这里插入图片描述

  1. 有两个表,一张表A中存已排序好的(有序),默认放数组第一个元素。另一张表B中放未排序的元素
  2. 每次从B表中取第一个元素放入A表合适的位置,B中所有数据取出来后即排序完成。

具体可参考代码注释。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码public class InsertSort {
public static void main(String[] args) {
int[] arr = {9, 2, 5, 3, 1};
insertSort(arr);
System.out.println(Arrays.toString(arr));
}

public static void insertSort(int[] arr) {
// 排序规则:从小到大
// 1.有两个表,一张表A中存已排序好的(有序),默认放数组第一个元素。另一张表B中放未排序的元素
// 2.每次从B表中取第一个元素放入A表合适的位置,B中所有数据取出来后即排序完成。

for (int k = 1; k < arr.length; k++) {
// A表的索引 (while循环执行之前是A表中最后一个元素的索引)
int aIndex = k - 1;
// B表中取得数据
int bVal = arr[k];

while (
// 比较完A表中所有数据则退出循环
aIndex > -1
// 比较B表中取得元素和A表中元素比较
// 当B表中的元素比A表中的元素小时,则让当前A表中元素后移一位,前面一位预留给bVal
&& bVal < arr[aIndex]
) {
// 这里其实就是一个移动元素的过程,把当前元素移动到后一个位置,因为当前这个位置要预留给B中取出的元素
// 下列例子是模拟交换过程,没有按实际排序走,用于理解交换逻辑。
// int[] arr = {9, 2, 5, 3, 1};
// 假设 aIndex = 2
// arr[3] = arr[2] => {9, 2, 5, 5, 1}
// aIndex--;
// 下一次循环 aIndex = 1
// arr[2] = arr[1] => {9, 2, 2, 5, 1}
// aIndex--;
// 下一次循环 aIndex = 0
// arr[1] = arr[0] => {9, 9, 2, 5, 1}
arr[aIndex + 1] = arr[aIndex];
aIndex--;
}
// 如果当前索引变化了,意味着有序列表中有比bVal大的值,才赋值,
// 否则意味着A表中没有比bVal大的值,保持不动即可,也就是A表中的最后一个位置。(优化作用)
if (aIndex + 1 != k) {
arr[aIndex + 1] = bVal;
}
}
}
}

运行输出:

[1, 2, 3, 5, 9]

总结

在这里插入图片描述

该算法的核心其实就是拿B表中的第一个元素和A表中一个个比较,从A表中最后一个元素开始比较找到合适的位置放入,B表中的元素取完则排序完成。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

数据库SQL:GROUPBY多分组+取MAX最大(MIN最小

发表于 2020-12-24

点击上方“罗晓胜”,马上关注,您的支持对我帮助很大

上期文章

  • 从0到1学数据库:数据库基础
  • 从0到1学数据库:简单查询语句

/ 前言 /

如何在一条sql中,对数据表中的数据进行分组,同时求每组最大(小)值。

/ 正文 /

测试案例

求每个班级中的年龄最大的学生

1
go复制代码SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0; -- ------------------------------ Table structure for student-- ----------------------------DROP TABLE IF EXISTS `student`;CREATE TABLE `student` (`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT '' COMMENT '姓名',`age` int(3) NULL DEFAULT 0 COMMENT '年龄',`c_class` int(4) NULL DEFAULT 0 COMMENT '班级',PRIMARY KEY (`id`) USING BTREE) ENGINE = MyISAM AUTO_INCREMENT = 7 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic; -- ------------------------------ Records of student-- ----------------------------INSERT INTO `student` VALUES (1, '张三', 22, 1);INSERT INTO `student` VALUES (2, '李四', 26, 1);INSERT INTO `student` VALUES (3, '王五', 20, 2);INSERT INTO `student` VALUES (4, '赵六', 20, 2);INSERT INTO `student` VALUES (5, '孙七', 22, 3);INSERT INTO `student` VALUES (6, '李八', 28, 3);INSERT INTO `student` VALUES (7, '阿九', 28, 3);

案例分析

有两种方式都可以找到结果

  • 第一种实现方式
+ 先排序,然后外面包一层查询+分组(这种有个问题,就是分组必须取第一条数据,**可是分组默认并不一定取第一条数据**)
  • 第二种实现方式
+ 先查询所有的最大(小)值,然后外面包一层查询匹配这些值+分组(这种也存在问题,一是性能问题,还有同组存在多个最大值相同也有问题)

错误写法——使用MAX函数

1
sql复制代码SELECT *,MAX(age) FROM student;SELECT *,MAX(age) FROM student GROUP BY `c_class`;

错误分析:MAX并不会取到最大值所在行,分组并不会取最大值所在行

错误写法——使用排序+分组

1
vbnet复制代码SELECT  *FROM  ( SELECT * FROM student order by age desc,c_class asc) AS bGROUP BY  `c_class`;

排序结果

)

SQL结果

)

错误分析:很明显结果是错的,这里的分组并不会取排序的第一条结果,如果恰巧是的,你打乱数据试试

第一种方式正确写法——使用limit

1
vbnet复制代码SELECT  *FROM  ( SELECT * FROM student order by age desc,c_class asc limit 99999999) AS bGROUP BY  `c_class`;

分析:limit 99999999是必须要加的,如果不加的话,数据不会先进行排序,通过 explain 查看执行计划,可以看到没有 limit 的时候,少了一个 DERIVED(得到) 操作。

使用limit之后

第一种方式正确写法——使用having

1
sql复制代码SELECT  *FROM  ( SELECT * FROM student having 1 order by age desc,c_class asc) AS bGROUP BY  `c_class`;

分析:通过 explain 查看执行计划,可以看到 使用having 1,也会使用 DERIVED(得到) 操作。

子查询展开/派生类合并——(这也是为什么不加limit只查询到一张表,加limit才走两张表)

不加limit的时候,执行日志显示只有一个表的处理,不对呀,应该是两张表,先从from查询出一张表然后再从这张表筛选出一张新表,总共两张表才对。原来,这是mysql的版本在捣鬼,

在mysql5.6中,如果是这样写确实会出现两张表的处理,执行日志显示出现了一个主表一个Derived表,Derived为派生表,也就是说,from里面查询出的是派生表,也可以理解为临时表,先将查询到的记录放到这个临时表,然后再从这个临时表进行分组,分组后的结果放入一张新表,就产生了正确结果。

**那么为什么切换了版本后就好了呢?**其实mysql5.7针对于5.6版本做了一个优化,针对mysql本身的优化器增加了一个控制优化器的参数叫 derived_merge ,什么意思呢,“派生类合并”。

ok,既然已经了解了很多,原来是派生类合并在作怪。

官方手册:

优化器可以使用两种策略(也适用于视图引用)处理派生类引用:

1.将派生类合并到外部查询块中

2.将派生类实现为内部临时表

1
sql复制代码SELECT * FROM  ( SELECT * FROM student order by age desc,c_class asc) AS b 等价于SELECT * FROM student order by age desc,c_class asc;

同时由于这个机制,子查询中的里面的 order by 应该会跟外部块一起执行,也就是说 order by 会跑到外面来(说的形象一点哈),那么为什么结果的排序依旧是乱的:

官方使用文档:

如果这些条件都为真,则优化器将派生类或视图引用中的ORDER BY子句传播到外部查询块。

1.外部查询未分组或聚合

2.外部查询未指定DISTINCT,HAVING或ORDER BY

3.外部查询将此派生表或视图引用作为FROM子句中的唯一源

否则,优化器将忽略ORDER BY子句

上面的sql里的外部块由于使用到了分组,那么优化器会忽略掉 order by 子句

使合并派生类失效

其实也有多种办法不需要修改 derived_merge 参数而使合并派生类失效,具体做法可参考官方使用手册

官方文档:

可以通过在子查询中使用任何阻止合并的构造来禁用合并,尽管这些构造对实现的影响并不明确。防止合并的构造对于派生表和视图引用是相同的:

1.聚合函数( SUM() , MIN() , MAX() , COUNT()等)

2.DISTINCT

3.GROUP BY

4.HAVING

5.LIMIT

6.UNION或UNION ALL

7.选择列表中的子查询

8.分配给用户变量

9.仅引用文字值(在这种情况下,没有基础表)

第二种方式写法分析

错误写法示例

1
sql复制代码SELECT  *FROM studentWHERE age in  (select max(age) Max_age FROM student GROUP BY `c_class`)

分析:数据量如果很大会存在效率问题,max会扫描全表,in也会扫描全表,如果存在同组多个最大值相同也有问题,同组可能会得出多个结果

所以,只能在最外层再添加一层GROUP BY

1
vbnet复制代码 SELECT    *FROM studentWHERE age in    (select max(age) Max_age FROM student GROUP BY `c_class`)GROUP BY    `c_class`;

是不是到这里感觉的结果是对的,但是你以为这样就正确了吗?

这里的分组与第一种写法会面临同样的问题,分组并不会取第一条数据

例如:如果我要取同组最大值最新一条数据,就会报错

你可以试试重新排序再分组,看能不能得到对应的结果(很遗憾,我试了,结果并不能)

1
sql复制代码select * from (    SELECT     *     FROM student     WHERE age in         (select max(age) Max_age FROM student GROUP BY `c_class`)     order by age desc,id desc,c_class asc    ) as bGROUP BY     `c_class`;

但是使用having或者limit可以实现分组取最新一条数据

1
sql复制代码 SELECT    * FROM    ( SELECT * FROM student having 1 order by age desc,id desc) AS bGROUP BY    `c_class`;

/ 总结 /

版本只是变化之一,当发现结果不对时,要多使用执行计划分析SQL。

多分组和获取最大值最小值推荐使用having或者limit实现多分组取最大(小)值

往期推荐:

如何入门做软件开发

为什么我不推荐入行程序员

做全栈开发很难吗

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…748749750…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%