开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

图解二叉树、满二叉树、完全二叉树、平衡二叉树

发表于 2021-11-28

这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战

欢迎关注公众号OpenCoder,来和我做朋友吧~❤😘😁🐱‍🐉👀

一、什么是树?

树是一种数据结构,它是由n(n≥1)个有限节点组成一个具有层次关系的集合。把它叫做“树”是因为它看起来像一棵倒挂的树,也就是说它是根朝上,而叶朝下的。

每个节点有零个或多个子节点;没有父节点的节点称为根节点;每一个非根节点有且只有一个父节点;除了根节点外,每个子节点可以分为多个不相交的子树。

image-20211119120429089

如上图,许多逻辑可能存在着一对多或者对多的情况。四川省包含了很多个城市,每个城市又包含了很多个区县。

我们可以使用以上结构来保存数据同时描述所属关系,这样的非线性结构被称为树。

树是n个节点的有限集(n>=0)。当n=0时,称为空树。在任意一个非空树中,有如下的特点。

  1. 没有父节点的节点被称为根节点,且一棵树形结构中,有且仅有一个根节点。
  2. 当n>1时,其余节点可分为m(m>0)个互不相交的有限集,每一个集合本身又是一个树,并成为根的子树。

如下图所示,根、子树、叶子节点组成的树形结构。

树的专用术语,如下图:

image-20211119145423662

如上图所示,节点1的上一级节点,被称为1的父节点。从节点1衍生出来的节点被称为1的孩子节点。和节点1同级别的节点被称为1的兄弟节点。树的最大层级数,被称为树的高度。上面图上的树,高度为4。

二、什么是二叉树

二叉树是树形结构中的一种特殊形式。何为二叉?此树形结构中,每个节点最多有两个孩子节点。

二叉树中节点也可能是没有子节点或者一个子节点,但不能超过两个子节点,如下图就是一颗二叉树。

image-20211119150348208

二叉树那个,节点的两个孩子节点,左边的被称为左孩子,右边的被称为右孩子。

满二叉树: 一个二叉树所有的非叶子节点都存在左右两个孩子,并且所有叶子节点都在同一层级上,这样的树被称为满二叉树。

如图:

image-20211119150930382

发现树每个非叶子节点(1、 2、 3)的,都有两个子节点,并且所有的叶子节点(4、5、6、7)都在同一个层级。

完全二叉树: 对于一个有n个节点的二叉树,按照层级顺序来编号,则所有节点的编号为从1到n。如果这个树的所有节点和同样深度的满二叉树的编号为1到n的节点位置相同,则这个二叉树为完全二叉树。

如图所示:

image-20211119151904321

完全二叉树的特点:叶子结点只能出现在最下层和次下层,且最下层的叶子结点集中在树的左部。需要注意的是,满二叉树肯定是完全二叉树,而完全二叉树不一定是满二叉树。

完全二叉树的条件比满二叉树要轻松一些,满二叉树要求所有的叶子节点都是满的,而完全二叉树只需要保证最后一个节点之前的节点齐全即可。

三、二叉树的应用

二叉树表现形式有多种,满二叉树和完全二叉树只是其中的两种。每一种形式都有其具体应用场景。观察所有的形式发现,二叉树最主要的还是应用于查询和顺序保存两个方面。

3.1 二叉树查询

下面以二叉查找树为例,这种树主要是方便我们做查询。

二叉查找树: 在二叉树的基础上增加了以下条件:

3.1.1 如果当前节点的左子树不为空,则左子树上所有的节点的值均小于当前节点的值。

3.1.2 如果当前节点的右子树不为空,则右子树上的所有节点的值均大于当前节点的值。

3.1.3 左右子树也都满足上述两个条件。

如下图:

image-20211119153212044

以上的存储数据的条件,注定了让二叉查找树的查询更加方便。以上图为例,我们要查询 5的节点步骤如下:

1.访问根节点6,发现5<6,我们要找的5在根节点的左孩子树中。

image-20211119153908599

2.访问根节点的左孩子4,发现5>4,我们要找的5在节点4的右孩子中。

image-20211119153957256

3.访问节点4的右孩子,发现5=5,这就是我们要找的节点5。

image-20211119154045340

上述图示中,完整的表示了二叉查找树的查询思路。

如果一个节点分布相对均匀的二叉查找树,假设节点总个数为n,那查询对应节点的时间复杂度就是O(logn),和树的深度是一致的。

这样比较大小决定左右的查询方式,与二分查找算法非常接近。

3.2 二叉树顺序存储

在二叉查找树中,左子树小于父节点,右子树大于父节点,因此保证了二叉树的顺序。所以二叉查找树又被称为二叉排序树。

在新增数据的时候,仍要遵循二叉查找树的基本原则。

下面完成数据顺序存储,下图是准备好的一颗二叉排序树。

image-20211120121515801

假设现在要将 10存入上图的二叉树中,由于10>6,走右边,10>7,继续走右边,10>8继续走右边。所以10会插入到8的右孩子位置,如下图:

image-20211120121728508

假设现在继续将9插入树中,由于9>6,9>7,9>8,且9<10,所以9应该在6/7/8的右边且在10的左边,如下图:

image-20211120122028454

3.3 二叉树的问题

以上的操作看起来很顺利,但是其中却隐藏着问题。假设已有的二叉树是这样的:

image-20211120122255450

试着在二叉树中连续插入5,4,3,2

image-20211120123013367

发现二叉树变成这样一颗”歪脖子树“了。查询的节点时间复杂度退化成了O(N)。这个问题我们怎么解决呢?这个就要涉及到二叉树的平衡了。关于二叉树的平衡,下一章我们再详细的介绍。

四、总结

二叉树是数据结构的一种,本章介绍了满二叉树和完全二叉树。以及二叉查询树的查询操作和新增操作。使用二叉树来存储数据,既可以保证顺序,又可以提高查询效率。不过最后我们发现如果二叉树长歪了,查询效率就会变低,要解决这个问题,我们需要让二叉树自己平衡。关于二叉树的自平衡下章介绍。

欢迎关注公众号OpenCoder,来和我做朋友吧~❤😘😁🐱‍🐉👀

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

归并排序:解决小和、逆序对问题 一、小和问题 二、逆序对问题

发表于 2021-11-28

大家好,我是周一。

在上一篇归并排序中,我们讲了归并排序的基本概念、merge(合并)过程等,今天趁热打铁,我们来说说使用归并排序的一些常见面试题。

一、小和问题

1、题目描述:

在一个数组中,每一个数左边比当前数小的数累加起来,叫做这个数组的小和。求一个给定数组的小和。

2、例子:

数组为:[1,3,4,2,5]

1左边比1小的数:没有

3左边比3小的数:1

4左边比4小的数:1,3

2左边比2小的数:1

5左边比5小的数:1,3,4,2

所以小和为1+(1+3)+1+(1+3+4+2)=16

3、思路:

找每一个数右边比当前数大的个数,(个数 * 当前数) 的累加和就是结果。

这咋和归并排序联系上的呢,仔细想想,在左组和右组merge的时候,会比较数的大小,这时就可以在右组找到比左组当前数大的个数。

4、详细的参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
java复制代码/**
 * 小和问题:在一个数组中,每一个数左边比当前数小的数累加起来,叫做这个数组的小和。要求时间复杂度O(N*logN) 
 *
 * @author Java和算法学习:周一
 */
public class SmallSum {

    public static int smallSum(int[] arr) {
        if (arr == null || arr.length < 2) {
            return 0;
        }
        return process(arr, 0, arr.length - 1);
    }

    private static int process(int[] arr, int l, int r) {
        if (l == r) {
            return 0;
        }
        int mid = l + ((r - l) >> 1);
        return process(arr, l, mid) + process(arr, mid + 1, r) + merge(arr, l, mid, r);
    }

    private static int merge(int[] arr, int l, int mid, int r) {
        int[] help = new int[r - l + 1];
        int i = 0;

        int pL = l;
        int pR = mid + 1;
        int res = 0;
        while (pL <= mid && pR <= r) {
            // 当左组的数小于右组的数时, 当前右组的个数*当前数 的累加和 即是小和的结果
            // 仔细和归并排序比较,发现就多了此处的代码。唯一的区别是,
            // 等于的时候拷贝右组的,因为要在右组中找出比左组大的个数,肯定不能先拷贝左组的,不然咋找出个数
            res += arr[pL] < arr[pR] ? (r - pR + 1) * arr[pL] : 0;
            help[i++] = arr[pL] < arr[pR] ? arr[pL++] : arr[pR++];
        }
        while (pL <= mid) {
            help[i++] = arr[pL++];
        }
        while (pR <= r) {
            help[i++] = arr[pR++];
        }

        for (int j = 0; j < help.length; j++) {
            arr[l + j] = help[j];
        }
        return res;
    }

    /**
     * 对数器方法
     */
    public static int comparator(int[] arr) {
        int res = 0;
        for (int i = 1; i < arr.length; i++) {
            for (int j = 0; j < i; j++) {
                res += arr[j] < arr[i] ? arr[j] : 0;
            }
        }
        return res;
    }

    public static void main(String[] args) {
        int maxSize = 100;
        int maxValue = 100;
        int testTimes = 100000;

        boolean isSuccess = true;
        for (int i = 0; i < testTimes; i++) {
            int[] arr1 = generateArray(maxSize, maxValue);
            int[] arr2 = copyArray(arr1);
            if (smallSum(arr1) != comparator(arr2)) {
                printArray(arr1);
                printArray(arr2);
                isSuccess = false;
                break;
            }
        }
        System.out.println(isSuccess ? "Nice" : "Error");
    }

    //------------------------------------------ TEST METHODS ----------------------------------------------

    public static int[] generateArray(int maxSize, int maxValue) {
        int[] arr = new int[(int) ((maxSize + 1) * Math.random())];
        for (int i = 0; i < arr.length; i++) {
            arr[i] = (int) ((maxValue + 1) * Math.random()) - (int) ((maxValue + 1) * Math.random());
        }
        return arr;
    }

    public static int[] copyArray(int[] arr) {
        if (arr == null) {
            return null;
        }
        int[] res = new int[arr.length];
        for (int i = 0; i < arr.length; i++) {
            res[i] = arr[i];
        }
        return res;
    }

    public static void printArray(int[] arr) {
        if (arr == null) {
            return;
        }
        for (int value : arr) {
            System.out.print(value + " ");
        }
        System.out.println();
    }

}

二、逆序对问题

1、题目描述:

设有一个数组 [a1, a2, a3,… an],对于数组中任意两个元素ai,aj,若i<j,ai>aj,则说明ai和aj是一对逆序对。求一个给定数组的逆序对个数。

2、例子:

3 5 2 1 0 4 9

所有逆序对是:(3,2),(3,1),(3,0),(5,2),(5,1),(5,0),(5,4),(2,1),(2,0),(1,0)。逆序对个数为10。

3、思路:

合并的时候,从右往左合并,(此时右组位置 - mid位置) 的累加和 即是逆序对个数。

这又咋和归并排序联系上的呢,仔细想想,在左组和右组merge的时候,会比较数的大小,但是我要找到的是右边更小的,所以可以采用从右往左合并的方式;同时在处理相等的时候,需要先拷贝右组的,这样才能准确找出右组小的个数。

4、详细的参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
java复制代码/**
 * 逆序对问题:设有一个数组 [a1, a2, a3,... an],对于数组中任意两个元素ai,aj,若i<j 且 ai>aj,则说明ai和aj是一对逆序对。
 * 求一个给定数组的逆序对个数。
 *
 * @author Java和算法学习:周一
 */
public class ReversePair {

    public static int reversePairNum(int[] arr) {
        if (arr == null || arr.length < 2) {
            return 0;
        }
        return process(arr, 0, arr.length - 1);
    }

    private static int process(int[] arr, int l, int r) {
        if (l == r) {
            return 0;
        }
        int mid = l + ((r - l) >> 1);
        return process(arr, l, mid) + process(arr, mid + 1, r) + merge(arr, l, mid, r);
    }

    private static int merge(int[] arr, int l, int mid, int r) {
        // 辅助数组
        int[] help = new int[r - l + 1];
        // 辅助下标,由于从右往左合并,所以下标为数组最大值
        int i = help.length - 1;

        // 同理,左组第一个数位置为mid
        int pL = mid;
        // 右组第一个数为最后一个
        int pR = r;
        // 逆序对个数
        int num = 0;
        while (pL >= l && pR >= (mid + 1)) {
            // 找到右组第一个比左组小的数,则当前满足要求的逆序对个数为 (pR - (mid + 1) + 1) 即是 (pR - mid)
            num += arr[pL] > arr[pR] ? (pR - mid) : 0;
            // 从右往左拷贝,相等的拷贝右组的
            help[i--] = arr[pL] > arr[pR] ? arr[pL--] : arr[pR--];
        }
        // 左组和右组有且仅有一个未拷贝完,所以以下两个循环只会执行其中一个
        while (pL >= l) {
            help[i--] = arr[pL--];
        }
        while (pR > mid) {
            help[i--] = arr[pR--];
        }

        // 拷贝回原数组
        for (int j = 0; j < help.length; j++) {
            arr[l + j] = help[j];
        }
        return num;
    }

    /**
     * 对数器用于测试
     */
    public static int comparator(int[] arr) {
        int num = 0;
        for (int i = 0; i < arr.length; i++) {
            for (int j = i + 1; j < arr.length; j++) {
                if (arr[j] < arr[i]) {
                    num++;
                }
            }
        }
        return num;
    }

    public static void main(String[] args) {
        int testTime = 1000000;
        int maxSize = 100;
        int maxValue = 100;

        boolean isSuccess = true;
        for (int i = 0; i < testTime; i++) {
            int[] arr1 = generateRandomArray(maxSize, maxValue);
            int[] arr2 = copyArray(arr1);
            if (reversePairNum(arr1) != comparator(arr2)) {
                printArray(arr1);
                printArray(arr2);
                isSuccess = false;
                break;
            }
        }
        System.out.println(isSuccess ? "Nice" : "Error");
    }

    //--------------------------------------- 辅助测试的方法 ---------------------------------------------

    public static int[] generateRandomArray(int maxSize, int maxValue) {
        int[] arr = new int[(int) ((maxSize + 1) * Math.random())];
        for (int i = 0; i < arr.length; i++) {
            arr[i] = (int) ((maxValue + 1) * Math.random()) - (int) ((maxValue + 1) * Math.random());
        }
        return arr;
    }

    public static int[] copyArray(int[] arr) {
        if (arr == null) {
            return null;
        }

        int[] res = new int[arr.length];
        for (int i = 0; i < arr.length; i++) {
            res[i] = arr[i];
        }
        return res;
    }

    public static void printArray(int[] arr) {
        if (arr == null) {
            return;
        }

        for (int value : arr) {
            System.out.print(value + " ");
        }
        System.out.println();
    }

}

OK,今天就暂时先说利用归并排序解决小和和逆序对问题的题目。

有时候,各种排序算法掌握它本身并不难,难的是你能够充分理解它的过程和精髓,更难的是在真正遇到实际题目的时候,能够想到用这种排序算法来解决它。所以,算法无捷径,唯有多练习,在有足够多的量,积累量变,然后才能迎来质变,那时才能信手拈来,加油。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

flink - join操作&迟到数据处理

发表于 2021-11-28

flink针对迟到数据的策略

  • 丢弃
  • 重新触发计算
    • 1、已经计算窗口是否有销毁?
    • 2、计算逻辑,来一条迟到数据计算一次?
    • 3、计算结果重复往下游发送问题如何解决?
  • 旁路输出

其中,重新计算最为复杂,涉及到窗口销毁以及结果重新发射至下游问题。重新触发计算核心的方法是allowedLateness(), 该方法会设置一个时间参数,代表迟到数据允许迟到的时间,避免窗口持续存在,增加资源的损耗。

window = 10

EVENT timestamp maxTimestamp Watermark(max - 6)
(1, 2) 2 2 -4
(1, 6) 6 6 0
(1, 8) 8 8 2
(1, 3) 3 8 2
(1, 10) 10 10 4
(1, 15) 15 15 9 Case 1: 9 >= 9 触发计算
(1, 7) 7 15 9 Case 2: 迟到数据
(1, 9) 9 15 9 case 3: 迟到数据
(1, 21) 21 21 15
(1, 5) 5 21 15 Case4: 测试延迟时间是以什么标准判断,测试事件时间
(1, 25) 25 25 19
(1, 3) 3 25 19 Case5: 测试水位线

Case 1:

image-20211121192639472

1、watermark >= window。getEnd - 1时触发计算

Case 2:

获取一条迟到数据后,窗口重新计算

image-20211121193253081

case 3:

image-20211121193414934

Case 4:

不是根据事件时间,因为第一个窗口[0, 10),目前最大事件时间是21,差值大于10,按时仍然可以接受迟到数据

image-20211121193834587

Case5:

当前水位线为19,超过第一个窗口结束时间9,差值为10,(1, 3)并未触发重新计算,表示延迟时间基于水位线判断

For example, with an event-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowed lateness of 1 min, Flink will create a new window for the interval between 12:00 and 12:05 when the first element with a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the 12:06 timestamp.

image-20211121194217171

综上,可以得出结论:

1、窗口计算操作触发标准是watermark >= windowEnd - 1;

2、窗口结束时间戳是第一个不属于该窗口的元素,flink窗口都是左闭右开的

3、延迟时间基于水位线进行判断

4、在延迟时间内窗口不会被销毁

join

  • 是否可以a窗口设置时间 + b窗口设置时间
  • 数据撤回问题 - 针对迟到数据处理逻辑
  • 是否可以多窗口join
+ 不能多窗口合并

Join

flink join操作用于多流合并且并不限制流的数据类型,与window配合使用,因为不管是无界流还是有界流,在数据量比较大的情况下,如果不采用window加以限制,使用join操作,时间和空间上的消耗都是巨大的。flink join操作类似于sql中的inner join,取多个数据流的交集。

1、基本语法
1
2
3
4
5
6
java复制代码source1.join(source2)
.where(Keyselector)
.equalTo(keySelector)
.window()
.process()
.addsink();
2、适用场景
  • 支持Tumbling、Sliding、session窗口
  • 支持processing、event时间语义
  • DataStream以及其子类支持join()操作, WindowedStream无法进行join()操作,表示窗口内无法再进行流join操作
3、事件时间场景下join
  • 合并操作触发时机
  • 如何处理迟到数据,迟到数据处理结束时机
3.1、理论支撑

1、构建join操作图

1
2
3
4
5
6
7
arduino复制代码stream.join(stream2)
.where(tuple -> tuple.f0)
.equalTo(tuple -> tuple.f0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
.allowedLateness(Time.milliseconds(30))
.apply(new JoinWindowResult())
.addSink(new CommonPrintSink<>("流join"));

2、转换成coGroup操作,实现apply方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public <T> DataStream<T> apply(
JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
// clean the closure
function = input1.getExecutionEnvironment().clean(function);

// 将join操作转化为coGroup操作
coGroupedWindowedStream =
input1.coGroup(input2)
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness);

return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType);
}


@Override
public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out)
throws Exception {
for (T1 val1 : first) {
for (T2 val2 : second) {
// 调用实现的join方法
out.collect(wrappedFunction.join(val1, val2));
}
}
}

3、将coGroup转化成union操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

// we explicitly create the keyed stream to manually pass the key type information in
windowedStream =
new KeyedStream<TaggedUnion<T1, T2>, KEY>(
unionStream, unionKeySelector, keyType)
.window(windowAssigner);

if (trigger != null) {
windowedStream.trigger(trigger);
}
if (evictor != null) {
windowedStream.evictor(evictor);
}
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}

return windowedStream.apply(
new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);


==================================================================
public static class TaggedUnion<T1, T2> {
private final T1 one;
private final T2 two;


===============================================================

private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
extends WrappingFunction<CoGroupFunction<T1, T2, T>>
implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {

private static final long serialVersionUID = 1L;

public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
super(userFunction);
}

@Override
public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)
throws Exception {

List<T1> oneValues = new ArrayList<>();
List<T2> twoValues = new ArrayList<>();

for (TaggedUnion<T1, T2> val : values) {
if (val.isOne()) {
oneValues.add(val.getOne());
} else {
twoValues.add(val.getTwo());
}
}
wrappedFunction.coGroup(oneValues, twoValues, out);
}
}

由源码可知,join操作本质上是一次union操作。首先将A,B流数据统一成TaggedUnion类型,然后执行union操作,得到DataStream流,后面就和普通的单流窗口操作一致了,包括对迟到数据的处理。唯一的区别是union后的流的watermark来自于上游两个数据源。

statusWaterValue.inputWatermark()

窗口大小10ms

stream 0 stream 1 channelStatus 0 (保证递增) channelStatus 1 newMinWatermark(0,1流最小值) lastOutputWatermark
(1,4) -2 -1000 -1000 -1000
(1,6) -2 -5 -5 -5
(1,7) 1 -5 -5 -5
(1,15) 1 4 1 1
(1,15) 9 4 4 4
(1,29) 9 18 9 9 9>=9(窗口最大时间戳)触发操作
3.2、基于事件时间的两流join

窗口大小 10ms, 默认watermark -100

A Event A timestamp A maxTimestamp A watermark(max - 6) watermark B EVENT B timestamp B maxTimestamp B watermark(max - 11) watermark
(1,4) 4 4 -2 (1,3) 3 3 -8
(1,7) 7 7 2 (1,6) 6 6 -5
(1,6) 6 7 2 (1,12) 12 12 1
(1,15) 15 15 9 (9, 1) -> 1 < 9未触发join操作 (1,15) 15 15 4
(1,30) 30 30 19 (9, 19) -> 9 >= 9触发join操作
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码coGroupedWindowedStream =
input1.coGroup(input2)
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness);

return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType);


void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
3.3、迟到数据处理
1
2
3
4
5
6
7
java复制代码stream.join(stream2)
.where(tuple -> tuple.f0)
.equalTo(tuple -> tuple.f0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
.allowedLateness(Time.milliseconds(30))
.apply(new JoinWindowResult())
.addSink(new CommonPrintSink<>("流join"));

窗口A: 10ms. 窗口B:10ms 延迟时间 30ms

A Event A timestamp A maxTimestamp A watermark(max - 6) B EVENT B timestamp B maxTimestamp B watermark(max - 11)
(1,4) 4 4 -2 (1,3) 3 3 -8
(1,6) 6 7 2 (1,12) 12 12 1
(1,15) 15 15 9 触发窗口操作(9, 1) -> 1 (1,22) 22 22 11 (9,11) -> 9 触发窗口操作
(1,2) 3 15 9 迟到数据1 (1,7) 7 22 11 迟到数据2
(1,45) 45 45 39 (39,11) -> 11 (1,1) 1 22 11 迟到数据3
(1,50) 50 50 39 (39,39)->39 第一个窗口迟到数据处理结束
(1,8) 8 45 39 未触发迟到数据处理

窗口操作

image-20211125121050781

迟到数据1

image-20211125121019731

迟到数据2

image-20211125121136081

迟到数据3

![image-20211125121538870](/Users/lihui/Library/Application Support/typora-user-images/image-20211125121538870.png)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Golang - 关于 proto 文件的一点小思考

发表于 2021-11-28

前言

ProtoBuf 是什么?

ProtoBuf 是一套接口描述语言(IDL),通俗的讲是一种数据表达方式,也可以称为数据交换格式。

我们常用的数据格式有 JSON 和 XML,为什么使用 ProtoBuf ?是因为它的传输快,为什么传输快?大家可以找下资料。使用 .proto 文件进行描述要序列化的数据结构,然后将写好 .proto 文件使用 protoc 就可以很容易编译成众多计算机语言的接口代码。

gRPC 是什么?

gRPC 是开源的 RPC 框架,已支持主流的计算机语言,可以通过 ProtoBuf 进行定义接口,可以基于 ProtoBuf 进行数据传输。

两者虽然是一家,但是分别解决不同的问题,可以配合使用,也可以分开。

看一下的 gRPC helloworld 的 proto 文件是如何定义的?

helloworld.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码syntax = "proto3";

package helloworld;

option go_package = "./;helloworld";

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

文件中定义了一个 service Greeter 和 rpc SayHello 方法。

入参:string name

出参:string message

这些过于简单,还能不能描述其他信息?

小思考

  1. 定义的 rpc 方法能否也同时支持 HTTP 调用?例如:SayHello 方法,既支持 gRPC 调用,也支持 HTTP 调用,同时支持 protoc 生成代码时,同时也生成 Swagger 接口文档。
  2. 定义的入参能否支持参数验证?例如:name 长度不能大于 20 个字符。
  3. 定义的 service Greeter 服务能否支持拦截器?例如:该服务下的所有方法需要进行登录令牌验证。
  4. 定义的 rpc SayHello 方法能够支持拦截器?例如:当前方法支持开启和关闭是否记录日志。

小结

以上问题还未完全解决,学习 gRPC 感觉有些吃力…

大家有没有可以推荐的学习资源?目前在看 grpc-gateway 。

推荐阅读

  • Go - 使用 sync.WaitGroup 来实现并发操作
  • Go - 使用 sync.Map 解决 map 并发安全问题
  • Go - 基于逃逸分析来提升程序性能
  • Go - 使用 sync.Pool 来减少 GC 压力
  • Go - 使用 options 设计模式

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Linux下信号处理(发送、捕获)

发表于 2021-11-28

这是我参与11月更文挑战的第28天,活动详情查看:2021最后一次更文挑战」

一、Linux下信号介绍

Linux下进行应用编程时,信号的处理必不可少。信号可以用于多进程间通信,查看当前系统支持的所有信号: kill -l

1
2
3
4
5
6
7
8
9
10
11
12
13
14
cpp复制代码[wbyq@wbyq linux_c]$ kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP
6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
16) SIGSTKFLT 17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP
21) SIGTTIN 22) SIGTTOU 23) SIGURG 24) SIGXCPU 25) SIGXFSZ
26) SIGVTALRM 27) SIGPROF 28) SIGWINCH 29) SIGIO 30) SIGPWR
31) SIGSYS 34) SIGRTMIN 35) SIGRTMIN+1 36) SIGRTMIN+2 37) SIGRTMIN+3
38) SIGRTMIN+4 39) SIGRTMIN+5 40) SIGRTMIN+6 41) SIGRTMIN+7 42) SIGRTMIN+8
43) SIGRTMIN+9 44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13
48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12
53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9 56) SIGRTMAX-8 57) SIGRTMAX-7
58) SIGRTMAX-6 59) SIGRTMAX-5 60) SIGRTMAX-4 61) SIGRTMAX-3 62) SIGRTMAX-2
63) SIGRTMAX-1 64) SIGRTMAX

几个比较常用的信号:

  1. SIGINT 当用户按下了<Ctrl+C>时,用户终端向当前正在运行的进程发出此信号,默认动作为是终止当前进程。
  2. SIGQUIT 快捷键是<Ctrl+>,和SIGINT 一样默认动作结束当前进程.
  3. SIGSEGV 访问非法内存产生的信号,也就是经常遇到的段错误
  4. SIGALRM 定时器超时信号,比如闹钟的时间到达就会产生该信号.
  5. SIGIO 数据可读写信号,一般与驱动程序交互时,设备有数据可读,就会产生该信号,通知进程去读取数据。

Linux下应用层的信号与单片机裸机程序里中断类似,都可以设置处理函数.中断服务函数。
如果进程里不想使用信号的默认处理方式,可以自己捕获信号,然后再做相关处理。

比如: 当用户按下了<Ctrl+C>时,不想立即终止进程,可能还需要做一些善后工作,那么就可以自己捕获SIGINT信号,处理了内存释放、文件关闭、等等一些退出之前的清理工作之后,再退出。

信号的捕获函数如下:

1
2
3
4
5
6
7
8
cpp复制代码#include <signal.h>
typedef void (*sighandler_t)(int); //函数指针类型

sighandler_t signal(int signum, sighandler_t handler);
函数功能: 注册需要捕获的信号。
函数参数:
int signum 要捕获的信号。 Kill -l
sighandler_t handler :捕获到信号后,执行的函数。

二、信号捕获、发送 案例

2.1 捕获Ctrl+C的信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
cpp复制代码#include <stdio.h>
#include <time.h>
#include <unistd.h>
#include <sys/time.h>
#include <signal.h>

/*
信号处理函数
*/
void sighandler_func(int sig)
{
printf("捕获的信号:%d\n",sig);
}

int main(int argc,char **argv)
{
//注册将要捕获的信号
signal(SIGINT,sighandler_func);
while(1)
{

}
return 0;
}

2.2 捕获段错误信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
cpp复制代码#include <stdio.h>
#include <time.h>
#include <unistd.h>
#include <sys/time.h>
#include <signal.h>
#include <stdlib.h>
/*
信号处理函数
*/
void sighandler_func(int sig)
{
printf("捕获的信号:%d\n",sig);
exit(0); //结束进程
}

int main(int argc,char **argv)
{
//注册将要捕获的信号
signal(SIGSEGV,sighandler_func);

int *p;
*p=12345; //给非法内存地址赋值

while(1)
{

}
return 0;
}

2.3 通过kill命令给指定的进程发送信号

语法:

1
2
cpp复制代码kill -s  <信号名称>  <进程的PID号>
kill -<信号名称> <进程的PID号>

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
cpp复制代码[wbyq@wbyq linux_c]$ kill -s SIGINT 18365
[wbyq@wbyq linux_c]$ ps
PID TTY TIME CMD
7481 pts/0 00:00:02 bash
18370 pts/0 00:02:49 a.out
18371 pts/0 00:03:28 a.out
18372 pts/0 00:02:46 a.out
18373 pts/0 00:02:36 a.out
18400 pts/0 00:00:00 ps
[1] 中断 ./a.out
[wbyq@wbyq linux_c]$ kill -s 2 18370
[wbyq@wbyq linux_c]$ ps
PID TTY TIME CMD
7481 pts/0 00:00:02 bash
18371 pts/0 00:03:39 a.out
18372 pts/0 00:02:56 a.out
18373 pts/0 00:02:47 a.out
18401 pts/0 00:00:00 ps
[2] 中断 ./a.out
[wbyq@wbyq linux_c]$

[wbyq@wbyq linux_c]$ kill -2 18371

[wbyq@wbyq linux_c]$ kill -9 18372 //强制杀死指定的信号
[wbyq@wbyq linux_c]$ ps
PID TTY TIME CMD
7481 pts/0 00:00:02 bash
18373 pts/0 00:05:31 a.out
18407 pts/0 00:00:00 ps
[4]- 已杀死 ./a.out

查看当前进程的PID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
cpp复制代码查看后台运行的所有进程:
[wbyq@wbyq linux_c]$ ps //查看当前终端后台进程
PID TTY TIME CMD
7481 pts/0 00:00:02 bash
18365 pts/0 00:00:54 a.out
18370 pts/0 00:00:04 a.out
18371 pts/0 00:00:04 a.out
18372 pts/0 00:00:05 a.out
18373 pts/0 00:00:03 a.out
18374 pts/0 00:00:00 ps

[wbyq@wbyq linux_c]$ ps -aux //查看当前系统所有的进程详细信息
wbyq 18365 65.1 0.0 1844 280 pts/0 R 09:19 1:33 ./a.out
wbyq 18370 35.9 0.0 1844 280 pts/0 R 09:20 0:34 ./a.out
wbyq 18371 34.9 0.0 1844 276 pts/0 R 09:20 0:32 ./a.out
wbyq 18372 44.7 0.0 1844 276 pts/0 R 09:20 0:41 ./a.out
wbyq 18373 34.4 0.0 1844 280 pts/0 R 09:20 0:32 ./a.out
wbyq 18375 5.0 0.0 6532 1048 pts/0 R+ 09:21 0:00 ps -aux

[wbyq@wbyq linux_c]$ top //动态查看当前系统的进程详细信息
程序后台运行的方式:
[wbyq@wbyq linux_c]$ ./a.out &

2.4 通过代码给指定的进程发送信号

函数的用法与命令一样。

1
2
3
cpp复制代码#include <sys/types.h>
#include <signal.h>
int kill(pid_t pid, int sig);

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 锁详解

发表于 2021-11-28

这是我参与11月更文挑战的第23天,活动详情查看:2021最后一次更文挑战。

  1. 公平锁 vs 非公平锁

公平锁:是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。类似排队打饭,先来后到。

非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。

比较

公平锁,就是很公平,在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照 FIFO 的规则从队列中取到自己。

非公平锁,比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就再采用类似公平锁的方式。

公平锁的优点是等待锁的线程不会饿死。

非公平锁的优点在于吞吐量比公平锁大。但在高并发的情况下,有可能会造成优先级反转或饥饿现象。

内窥

并发包中 ReentrantLock 的创建可以指定构造函数的 boolean 类型来得到公平锁或非公平锁,默认为非公平锁。

查看 ReentrantLock,可以看到有一个继承自 AbstractQueuedSynchronizer 的内部类 Sync,添加锁和释放锁的大部分操作实际上都是在 Sync 中实现的。它有公平锁 FairSync 和非公平锁 NonfairSync 两个子类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;

public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

abstract void lock();
//......
}

static final class NonfairSync extends Sync {
//......
}

static final class FairSync extends Sync {
//......
}
}

两个构造方法对比,可以看出公平锁和非公平锁的区别

  • 非公平锁在调用 lock() 后,首先就会通过 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了,否则按公平锁的方式去排队,进入到阻塞队列等待唤醒
  • 公平锁在获取同步状态(获取锁)时 tryAcquire() 多了一个限制条件:!hasQueuedPredecessors() ,用来判断当前线程是否位于同步队列中的第一个

Synchronized关键字,也是一种非公平锁。

  1. 乐观锁 VS 悲观锁

乐观锁与悲观锁是一种广义上的概念,体现了看待线程同步的不同角度。在 Java 和数据库中都有此概念对应的实际应用。

  • 悲观锁是一种悲观思想,它总认为自己在使用数据的时候一定有别的线程来修改,所以悲观锁在持有数据的时候总会把资源或数据锁住,这样其他线程想要请求这个资源的时候就会阻塞,直到等到悲观锁把资源释放为止。传统的关系型数据库里边就用到了很多这种锁机制,**比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。**悲观锁的实现往往依靠数据库本身的锁功能实现。

Java 中,synchronized 关键字和 Lock 的实现类都是悲观锁。

  • 而乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其他线程更新,则根据不同的实现方式执行不同的操作(例如报错或者自动重试)

乐观锁的实现方案一般来说有两种: 版本号机制 和 CAS实现 。

Java 中 java.util.concurrent.atomic 包下面的原子变量类的递增操作就是通过 CAS 实现了乐观锁。

比较

悲观锁:比较适合写入操作比较频繁的场景,如果出现大量的读取操作,每次读取的时候都会进行加锁,这样会增加大量的锁的开销,降低了系统的吞吐量。

乐观锁:比较适合读取操作比较频繁的场景,如果出现大量的写入操作,数据发生冲突的可能性就会增大,为了保证数据的一致性,应用层需要不断的重新获取数据,这样会增加大量的查询操作,降低了系统的吞吐量。

悲观锁比较适合强一致性的场景,但效率比较低,特别是读的并发低。乐观锁则适用于读多写少,并发冲突少的场景。

乐观锁常见问题:

  • ABA 问题
  • 循环时间长开销大
  • 只能保证一个共享变量的原子操作
  1. 可重入锁(递归锁)

可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者class),不会因为之前已经获取过还没释放而阻塞。

也就是说,线程可以进入任何一个它已经拥有的锁同步着的代码块。

可重入锁的最大作用是可一定程度避免死锁,ReentrackLock、Synchronized 就是典型的可重入锁。

1
2
3
4
5
6
7
8
9
10
java复制代码public class Wget {
public synchronized void doSomething() {
System.out.println("方法1执行...");
doOthers();
}

public synchronized void doOthers() {
System.out.println("方法2执行...");
}
}

在上面的代码中,类中的两个方法都是被内置锁 synchronized 修饰的,doSomething() 方法中调用 doOthers() 方法。因为内置锁是可重入的,所以同一个线程在调用 doOthers() 时可以直接获得当前对象的锁,进入doOthers() 进行操作。

如果是一个不可重入锁,那么当前线程在调用 doOthers() 之前需要将执行 doSomething() 时获取当前对象的锁释放掉,实际上该对象锁已被当前线程所持有,且无法释放。所以此时会出现死锁。

自旋锁

自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗 CPU

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class SpinLockDemo {

AtomicReference<Thread> lock = new AtomicReference<>();

public void myLock(){
Thread thread = Thread.currentThread();
//如果不为空,自旋
while (!lock.compareAndSet(null,thread)){

}
}

public void myUnlock(){
Thread thread = Thread.currentThread();
//解锁后,将锁置为 null
lock.compareAndSet(thread,null);
}
}

优缺点

缺点:

  1. 如果某个线程持有锁的时间过长,就会导致其它等待获取锁的线程进入循环等待,消耗CPU。使用不当会造成CPU 使用率极高。
  2. 上面 Java 实现的自旋锁不是公平的,即无法满足等待时间最长的线程优先获取锁。不公平的锁就会存在“线程饥饿”问题。

优点:

  1. 自旋锁不会使线程状态发生切换,一直处于用户态,即线程一直都是 active 的;不会使线程进入阻塞状态,减少了不必要的上下文切换,执行速度快
  2. 非自旋锁在获取不到锁的时候会进入阻塞状态,从而进入内核态,当获取到锁的时候需要从内核态恢复,需要线程上下文切换。 (线程被阻塞后便进入内核(Linux)调度状态,这个会导致系统在用户态与内核态之间来回切换,严重影响锁的性能)

可重入的自旋锁和不可重入的自旋锁

上边写的自旋锁,仔细分析一下可以看出,它是不支持重入的,即当一个线程第一次已经获取到了该锁,在锁释放之前又一次重新获取该锁,第二次就不能成功获取到。由于不满足 CAS,所以第二次获取会进入 while 循环等待,而如果是可重入锁,第二次也是应该能够成功获取到的。

而且,即使第二次能够成功获取,那么当第一次释放锁的时候,第二次获取到的锁也会被释放,而这是不合理的。

为了实现可重入锁,我们需要引入一个计数器,用来记录获取锁的线程数。

自旋锁与互斥锁

  • 自旋锁与互斥锁都是为了实现保护资源共享的机制。
  • 无论是自旋锁还是互斥锁,在任意时刻,都最多只能有一个保持者。
  • 获取互斥锁的线程,如果锁已经被占用,则该线程将进入睡眠状态;获取自旋锁的线程则不会睡眠,而是一直循环等待锁释放。

总结:

  • 自旋锁:线程获取锁的时候,如果锁被其他线程持有,则当前线程将循环等待,直到获取到锁。
  • 自旋锁等待期间,线程的状态不会改变,线程一直是用户态并且是活动的(active)。
  • 自旋锁如果持有锁的时间太长,则会导致其它等待获取锁的线程耗尽CPU。
  • 自旋锁本身无法保证公平性,同时也无法保证可重入性。
  • 基于自旋锁,可以实现具备公平性和可重入性质的锁。

  1. 独占锁(互斥锁/写锁)、共享锁(读锁)

独占锁:指该锁一次只能被一个线程所持有,对 ReentrantLock 和 Synchronized 而言都是独占锁

共享锁:指该锁可被多个线程所持有

对 ReentrantReadWriteLock 其读锁是共享锁,其写锁是独占锁。

读锁的共享锁可保证并发读是非常高效的,读写、写读、写写的过程是互斥的。

无锁 VS 偏向锁 VS 轻量级锁 VS 重量级锁

这四种锁是指锁的状态,专门针对 synchronized 的。在介绍这四种锁状态之前还需要介绍一些额外的知识。

首先为什么 Synchronized 能实现线程同步?

在回答这个问题之前我们需要了解两个重要的概念:“Java对象头”、“Monitor”。

无锁

无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。

无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。上面我们介绍的CAS原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。

偏向锁

偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。

在大多数情况下,锁总是由同一线程多次获得,不存在多线程竞争,所以出现了偏向锁。其目标就是在只有一个线程执行同步代码块时能够提高性能。

当一个线程访问同步代码块并获取锁时,会在Mark Word里存储锁偏向的线程ID。在线程进入和退出同步块时不再通过CAS操作来加锁和解锁,而是检测Mark Word里是否存储着指向当前线程的偏向锁。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次CAS原子指令即可。

偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。

偏向锁在JDK 6及以后的JVM里是默认启用的。可以通过JVM参数关闭偏向锁:-XX:-UseBiasedLocking=false,关闭之后程序默认会进入轻量级锁状态。

轻量级锁

是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。

在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,然后拷贝对象头中的Mark Word复制到锁记录中。

拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock Record里的owner指针指向对象的Mark Word。

如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。

如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。

若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。

重量级锁

升级为重量级锁时,锁标志的状态值变为“10”,此时Mark Word中存储的是指向重量级锁的指针,此时等待锁的线程都会进入阻塞状态。

综上,偏向锁通过对比Mark Word解决加锁问题,避免执行CAS操作。而轻量级锁是通过用CAS操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。重量级锁是将除了拥有锁的线程以外的线程都阻塞。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

shiro 学习笔记

发表于 2021-11-28
  1. 权限管理

1.1 什么是权限管理?

权限管理实现对用户访问系统的控制,按照安全规则或者安全策略,可以控制用户只能访问自己被授权的资源

权限管理包括用户身份认证和授权两部分,简称认证授权

1.2 什么是身份认证?

身份认证就是判断一个用户是否为合法用户的处理过程,最常用的方式就是通过核对用户输入和用户名和口令是否与系统中存储的一致,来判断用户身份是否正确

1.3 什么是授权?

授权,即访问控制,控制谁能访问哪些资源,主体进行身份认证后需要分配权限方可访问系统资源,对于某些资源没有权限是无法访问的

  1. 什么是 shiro?

shiro 是一个功能强大且易于使用的 Java 安全框架,能够完成身份认证、授权、加密和会话管理等功能

2.1 shiro 的核心架构

  • Subject

主体,外部应用与 subject 进行交互,subject 记录了当前操作用户,将用户的概念理解为当前操作的主体,可能是一个通过浏览器请求的用户,也可能是一个运行的程序。Subject 在 shiro 中是一个接口,接口中定义了很多认证授相关的方法,外部程序通过 subject 进行认证授,而 subject 是通过 SecurityManager 安全管理器进行认证授权

  • SecurityManager

安全管理器,对全部的 subject 进行安全管理,是 shiro 的核心。通过 SecurityManager 可以完成 subject 的认证、授权等,实质上 SecurityManager 是通过 Authenticator 进行认证,通过 Authorizer 进行授权,通过 SessionManager 进行会话管理。SecurityManager 是一个接口,继承了 Authenticator,Authorizer,SessionManager 这三个接口

  • Authenticator

认证器,对用户身份进行认证,Authenticator 是一个接口,shiro 提供 ModularRealmAuthenticator 实现类,通过 ModularRealmAuthenticator 基本上可以满足大多数需求,也可以自定义认证器

  • Authorizer

授权器,用户通过认证器认证通过,在访问功能时需要通过授权器判断用户是否有此功能的操作权限

  • Realm

领域,相当于 datasource 数据源,securityManager 进行安全认证需要通过 Realm 获取用户权限数据,比如:如果用户身份数据在数据库,那么 realm 就需要从数据库获取用户身份信息

  • SessionManager

会话管理,shiro 框架定义了一套会话管理,它不依赖 web 容器的 session,所以 shiro 可以使用在非 web 应用上,也可以将分布式应用的会话集中在一点管理,此特性可使它实现单点登录

  • SessionDAO

会话 dao,是对 session 会话操作的一套接口,比如要将 session 存储到数据库,可以通过 jdbc 将会话存储到数据库

  • CacheManager

CacheManager 即缓存管理,将用户权限数据存储在缓存,这样可以提高性能

  • Cryptography

密码管理,shiro 提供了一套加解密的组件,方便开发,比如提供常用的散列、加解密等功能

  1. shiro 中的认证

3.1 认证

身份认证,就是判断一个用户是否为合法用户的处理过程。最常用的身份认证方式是系统通过核对用户输入的用户名和口令,看其是否与系统中存储的该用户的用户名和口令一致,来判断用户身份是否正确

3.2 shiro 认证中的关键对象

  • Subject:主体

访问系统的用户,主体可以是用户、程序等,进行认证的都称为主体

  • Principal:身份信息

是主体(subject)进行身份认证的标识,标识必须具有唯一性, 如用户名、手机号、邮箱地址等,一个主体可以有多个身份,但是必须有一个主身份(Primary Principal)

  • credential:凭证信息

是只有主体自己知道的安全信息,如密码、证书等

3.3 认证流程

3.4 认证开发

3.4.1 引入依赖

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-core</artifactId>
<version>1.5.3</version>
</dependency>

3.4.2 创建配置文件

该配置文件用来书写系统中相关的权限数据,主要用于学习使用

1
2
3
ini复制代码[users]
xiaochen=123
zhangsan=456

3.4.3 开发认证代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class TestAuthenticator {

public static void main(String[] args) {
// 1.创建安全管理器对象
DefaultSecurityManager securityManager = new DefaultSecurityManager();
// 2.给安全管理器设置 realm
securityManager.setRealm(new IniRealm("classpath:shiro.ini"));
// 3.给 SecurityUtils 全局安全工具类设置安全管理器
SecurityUtils.setSecurityManager(securityManager);
// 4.获取认证主体
Subject subject = SecurityUtils.getSubject();
// 5.创建令牌
UsernamePasswordToken token = new UsernamePasswordToken("xiaochen", "123");
// 6.认证
try {
System.out.println("认证状态:" + subject.isAuthenticated());
subject.login(token);
System.out.println("认证状态:" + subject.isAuthenticated());
} catch (Exception e) {
e.printStackTrace();
}
}
}

3.5 自定义 Realm

自定义 Realm 的实现,即是将认证/授权数据来源转为数据库的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class TestCustomerRealmAuthenticator {

public static void main(String[] args) {
// 1.创建安全管理器对象
DefaultSecurityManager securityManager = new DefaultSecurityManager();
// 2.设置自定义realm
securityManager.setRealm(new CustomerRealm());
// 3.给 SecurityUtils 全局安全工具类设置安全管理器
SecurityUtils.setSecurityManager(securityManager);
// 4.获取认证主体
Subject subject = SecurityUtils.getSubject();
// 5.创建令牌
UsernamePasswordToken token = new UsernamePasswordToken("xiaochen", "123");
// 6.认证
try {
System.out.println("认证状态:" + subject.isAuthenticated());
subject.login(token);
System.out.println("认证状态:" + subject.isAuthenticated());
} catch (Exception e) {
e.printStackTrace();
}
}
}

自定义 Realm 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public class CustomerRealm extends AuthorizingRealm {

// 授权
@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principalCollection) {
return null;
}

// 认证
@Override
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authenticationToken) throws AuthenticationException {
// 1. 在token中获取用户名
String principal = (String) authenticationToken.getPrincipal();
// 2. 查询数据库,此处模拟数据库数据
if ("xiaochen".equals(principal)) {
// 参数1:正确的用户名
// 参数2:正确的密码
// 参数3:提供当前realm的名字
SimpleAuthenticationInfo simpleAuthenticationInfo
= new SimpleAuthenticationInfo("xianchen", "123", this.getName());
return simpleAuthenticationInfo;
}
return null;
}
}

3.6 明文加密

实际使用时,我们不可能把用户密码以明文形式显示,需要做加密处理

通常的加密方式是使用 md5 + salt + hash 散列的形式,校验过程:保存盐和散列后的值,在 shiro 完成密码校验

下面是使用 shiro 提供的 api 完成加密代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class TestShiroMD5 {

public static void main(String[] args) {
// 1. 使用md5加密
// 参数1是明文密码
Md5Hash md5Hash1 = new Md5Hash("123");
// 打印加密后的密文
// 结果:202cb962ac59075b964b07152d234b70
System.out.println(md5Hash1.toHex());
// 2. 使用md5+salt加密
Md5Hash md5Hash2 = new Md5Hash("123", "X0*7ps");
// 8a83592a02263bfe6752b2b5b03a4799
System.out.println(md5Hash2.toHex());
// 3. 使用md5+salt+hash散列加密
Md5Hash md5Hash3 = new Md5Hash("123", "X0*7ps", 1024);
// e4f9bf3e0c58f045e62c23c533fcf633
System.out.println(md5Hash3.toHex());
}
}

自定义 CustomerMd5Realm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public class CustomerMd5Realm extends AuthorizingRealm {

// 授权
@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principalCollection) {
return null;
}

// 认证
@Override
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authenticationToken) throws AuthenticationException {
// 1. 在token中获取用户名
String principal = (String) authenticationToken.getPrincipal();
// 2. 查询数据库,此处模拟数据库数据
if ("xiaochen".equals(principal)) {
// 参数1:正确的用户名
// 参数2:正确的密码
// 参数3:提供当前realm的名字
// md5
// return new SimpleAuthenticationInfo(principal, "202cb962ac59075b964b07152d234b70", this.getName());
// md5+salt
/*return new SimpleAuthenticationInfo(principal, "8a83592a02263bfe6752b2b5b03a4799", ByteSource.Util.bytes("X0*7ps"), this.getName());*/
}
return null;
}
}

校验流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码public class TestCustomerMd5RealmAuthenticator {

public static void main(String[] args) {
// 1.创建安全管理器对象
DefaultSecurityManager securityManager = new DefaultSecurityManager();
// 2.设置自定义realm
CustomerMd5Realm realm = new CustomerMd5Realm();
HashedCredentialsMatcher hashedCredentialsMatcher = new HashedCredentialsMatcher();
// 使用MD5加密
hashedCredentialsMatcher.setHashAlgorithmName("md5");
// 散列次数
hashedCredentialsMatcher.setHashIterations(1024);
realm.setCredentialsMatcher(hashedCredentialsMatcher);
securityManager.setRealm(realm);
// 3.给 SecurityUtils 全局安全工具类设置安全管理器
SecurityUtils.setSecurityManager(securityManager);
// 4.获取认证主体
Subject subject = SecurityUtils.getSubject();
// 5.创建令牌
UsernamePasswordToken token = new UsernamePasswordToken("xiaochen", "123");
// 6.认证
try {
System.out.println("认证状态:" + subject.isAuthenticated());
subject.login(token);
System.out.println("认证状态:" + subject.isAuthenticated());
} catch (Exception e) {
e.printStackTrace();
}
}
}
  1. shiro 中的授权

4.1 授权

授权,即访问控制,控制谁能访问哪些资源。主体进行身份认证后需要分配权限方可访问系统的资源,对于某些资源没有权限是无法访问的

4.2 关键对象

授权可简单理解为 who 对 what(which) 进行 How 操作:

  • Who,即主体(Subject),主体需要访问系统中的资源
  • What,即资源(Resource),如系统菜单、页面、按钮、类方法、系统商品信息等。资源包括资源类型和资源实例,比如商品信息为资源类型,类型为 t01 的商品为资源实例,编号为 001 的商品信息也属于资源实例
  • How,权限/许可(Permission),规定了主体对资源的操作许可,权限离开资源没有意义,如用户查询权限、用户添加权限、某个类方法的调用权限、编号为 001 用户的修改权限等,通过权限可知道主体对哪些资源都有哪些操作许可

4.3 授权方式

基于角色的访问控制,以角色为中心进行访问控制

1
2
3
java复制代码if(subject.hasRole("admin")){
//操作什么资源
}

基于资源的访问控制,以资源为中心进行访问控制

1
2
3
4
5
6
java复制代码if(subject.isPermission("user:update:01")){ //资源实例
//对01用户进行修改
}
if(subject.isPermission("user:update:*")){ //资源类型
//对01用户进行修改
}

4.4 权限字符串

权限字符串的规则是:资源标识符:操作:资源实例标识符,意思是对哪个资源的哪个实例具有什么操作,: 是资源/操作/实例的分割符,权限字符串也可以使用 * 通配符

例子:

  • 用户创建权限:user:create,或 user:create:*
  • 用户修改实例 001 的权限:user:update:001
  • 用户实例 001 的所有权限:user:*:001

4.5 授权编程实现

在之前 md5 加密的基础上,实现授权操作

自定义 CustomerMd5Realm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public class CustomerMd5Realm extends AuthorizingRealm {

// 授权
@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principalCollection) {
// 获取身份信息
String primaryPrincipal = (String) principalCollection.getPrimaryPrincipal();
// 根据身份信息(用户名)从数据库获取当前用户的角色以及权限信息
SimpleAuthorizationInfo simpleAuthorizationInfo = new SimpleAuthorizationInfo();
// 将数据库中查询的角色信息赋值给权限对象
simpleAuthorizationInfo.addRole("admin");
simpleAuthorizationInfo.addRole("user");
// 将数据库中查询的权限信息赋值给权限对象
simpleAuthorizationInfo.addStringPermission("user:*:01");
simpleAuthorizationInfo.addStringPermission("product:create:02");
return simpleAuthorizationInfo;
}

...
}

授权逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class TestCustomerMd5RealmAuthenticator {

public static void main(String[] args) {

...

// 7. 认证用户进行授权
if (subject.isAuthenticated()) {
// 7.1 基于角色权限控制
boolean hasRole = subject.hasRole("admin");
System.out.println("角色校验:" + hasRole);
// 7.2 基于多角色权限控制
boolean hasAllRoles = subject.hasAllRoles(Arrays.asList("admin", "user"));
System.out.println("多角色校验:" + hasAllRoles);
// 7.3 是否具有其中一个角色
boolean[] booleans = subject.hasRoles(Arrays.asList("admin", "user", "super"));
for (boolean aBoolean : booleans) {
System.out.println(aBoolean);
}
// 7.4 基于权限字符串的访问控制
boolean permitted = subject.isPermitted("user:*:01");
System.out.println("资源权限校验:" + permitted);
// 7.5 分布具有哪些资源权限
boolean[] permitted1 = subject.isPermitted("user:*:01", "order:*:10");
for (boolean b : permitted1) {
System.out.println(b);
}
// 7.6 同时具有哪些资源权限
boolean permittedAll = subject.isPermittedAll("user:*:01", "product:*");
System.out.println("多资源权限校验:" + permittedAll);
}
}
}
  1. shiro 整合 SpringBoot

shiro 整合 SpringBoot 的思路如图所示:

引入 shiro 整合 SpringBoot 依赖

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-spring-boot-starter</artifactId>
<version>1.5.3</version>
</dependency>

5.1 认证

配置 shiro

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码@Configuration
public class ShiroConfig {

// 1.创建shiroFilter,负责拦截所有请求
@Bean
public ShiroFilterFactoryBean getShiroFactoryBean(DefaultWebSecurityManager defaultWebSecurityManager) {
ShiroFilterFactoryBean shiroFilterFactoryBean = new ShiroFilterFactoryBean();
// 给filter设置安全管理器
shiroFilterFactoryBean.setSecurityManager(defaultWebSecurityManager);
// 配置系统受限资源
HashMap<String, String> map = new HashMap<>();
map.put("/user/login", "anon"); // anon设置为公共资源
map.put("/user/register", "anon");
map.put("/register.jsp", "anon");
map.put("/**", "authc"); // authc表示请求这个资源需要认证和授权
// 默认认证界面路径
shiroFilterFactoryBean.setLoginUrl("/login.jsp");

shiroFilterFactoryBean.setFilterChainDefinitionMap(map);
return shiroFilterFactoryBean;
}

// 2.创建安全管理器
@Bean
public DefaultWebSecurityManager getDefaultWebSecurityManager(@Qualifier("realm") Realm realm) {
DefaultWebSecurityManager defaultWebSecurityManager = new DefaultWebSecurityManager();
// 给安全管理器设置Realm
defaultWebSecurityManager.setRealm(realm);
return defaultWebSecurityManager;
}

// 3.创建自定义realm
@Bean(name = "realm")
public Realm getRealm() {
CustomerRealm customerRealm = new CustomerRealm();
// 修改凭证校验匹配器
HashedCredentialsMatcher credentialsMatcher = new HashedCredentialsMatcher();
// 设置加密算法为md5
credentialsMatcher.setHashAlgorithmName("MD5");
// 设置散列次数
credentialsMatcher.setHashIterations(1024);
customerRealm.setCredentialsMatcher(credentialsMatcher);
return customerRealm;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class CustomerRealm extends AuthorizingRealm {

@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principalCollection) {
return null;
}

@Override
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authenticationToken) throws AuthenticationException {
String principal = (String) authenticationToken.getPrincipal();
UserSev userSev = (UserSev) ApplicationContextUtils.getBean("userSev");
User user = userSev.findByUsername(principal);
if (user != null) {
return new SimpleAuthenticationInfo(user.getUsername(), user.getPassword(),
ByteSource.Util.bytes(user.getSalt()), this.getName());
}
return null;
}
}

shiro 提供了多个默认的过滤器,我们可以用这些过滤器来控制指定 url 的权限

配置缩写 对应的过滤器 功能
anon AnonymousFilter 指定url可以匿名访问
authc FormAuthenticationFilter 指定url需要form表单登录,默认会从请求中获取username、password,rememberMe等参数并尝试登录,如果登录不了就会跳转到loginUrl配置的路径。我们也可以用这个过滤器做默认的登录逻辑,但是一般都是我们自己在控制器写登录逻辑的,自己写的话出错返回的信息都可以定制嘛。
authcBasic BasicHttpAuthenticationFilter 指定url需要basic登录
logout LogoutFilter 登出过滤器,配置指定url就可以实现退出功能,非常方便
noSessionCreation NoSessionCreationFilter 禁止创建会话
perms PermissionsAuthorizationFilter 需要指定权限才能访问
port PortFilter 需要指定端口才能访问
rest HttpMethodPermissionFilter 将http请求方法转化成相应的动词来构造一个权限字符串,这个感觉意义不大,有兴趣自己看源码的注释
roles RolesAuthorizationFilter 需要指定角色才能访问
ssl SslFilter 需要https请求才能访问
user UserFilter 需要已登录或“记住我”的用户才能访问

模拟认证、注册和退出过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码@Controller
@RequestMapping("user")
public class UserCon {

@Autowired
private UserSev userSev;

@RequestMapping("logout")
public String logout(String username, String password) {
// 获取主体对象
Subject subject = SecurityUtils.getSubject();
subject.logout();
return "redirect:/login.jsp";
}

@RequestMapping("login")
public String login(String username, String password) {
// 获取主体对象
Subject subject = SecurityUtils.getSubject();
try {
subject.login(new UsernamePasswordToken(username, password));
return "redirect:/index.jsp";
} catch (UnknownAccountException e) {
System.out.println("用户名错误");
} catch (IncorrectCredentialsException e) {
System.out.println("密码错误");
}
return "redirect:/index.jsp";
}

@RequestMapping("register")
public String register(User user) {
try {
userSev.register(user);
} catch (Exception e) {
return "redirect:/register.jsp";
}
return "redirect:/login.jsp";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Service
public class UserSev {

@Autowired
private UserDao userDao;

public void register(User user) {
// 处理业务调用dao
// 明文密码进行 md5 + salt + hash散列
String salt = SaltUtils.getSalt();
user.setSalt(salt);
Md5Hash md5Hash = new Md5Hash(user.getPassword(), salt, 1024);
user.setPassword(md5Hash.toHex());
userDao.save(user);
}

public User findByUsername(String username) {
return userDao.findByUserName(username);
}
}

5.2 授权

第一种方式,通过页面标签授权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
jsp复制代码<%@taglib prefix="shiro" uri="http://shiro.apache.org/tags" %>
<shiro:hasAnyRoles name="user,admin">
<li><a href="">用户管理</a>
<ul>
<shiro:hasPermission name="user:add:*">
<li><a href="">添加</a></li>
</shiro:hasPermission>
<shiro:hasPermission name="user:delete:*">
<li><a href="">删除</a></li>
</shiro:hasPermission>
<shiro:hasPermission name="user:update:*">
<li><a href="">修改</a></li>
</shiro:hasPermission>
<shiro:hasPermission name="user:find:*">
<li><a href="">查询</a></li>
</shiro:hasPermission>
</ul>
</li>
</shiro:hasAnyRoles>
<shiro:hasRole name="admin">
<li><a href="">商品管理</a></li>
<li><a href="">订单管理</a></li>
<li><a href="">物流管理</a></li>
</shiro:hasRole>

第二种方式,代码方式授权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@RequestMapping("save")
public String save(){
System.out.println("进入方法");
//获取主体对象
Subject subject = SecurityUtils.getSubject();
//代码方式
if (subject.hasRole("admin")) {
System.out.println("保存订单!");
}else{
System.out.println("无权访问!");
}
//基于权限字符串
//....
return "redirect:/index.jsp";
}

第二种方式,注解方式授权

1
2
3
4
5
6
7
java复制代码@RequiresRoles(value={"admin","user"})//用来判断角色  同时具有 admin user
@RequiresPermissions("user:update:01") //用来判断权限字符串
@RequestMapping("save")
public String save(){
System.out.println("进入方法");
return "redirect:/index.jsp";
}

这里只是做个演示,实际开发中,我们需要对授权数据持久化。需要三张表:用户表、角色表和权限表,用户表和角色表之间,角色表和权限表之间都是多对多的关系,需要建立一张关系表

修改自定义 Realm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class CustomerRealm extends AuthorizingRealm {
@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
//获取身份信息
String primaryPrincipal = (String) principals.getPrimaryPrincipal();
System.out.println("调用授权验证: "+primaryPrincipal);
//根据主身份信息获取角色 和 权限信息
UserService userService = (UserService) ApplicationContextUtils.getBean("userService");
User user = userService.findRolesByUserName(primaryPrincipal);
//授权角色信息
if(!CollectionUtils.isEmpty(user.getRoles())){
SimpleAuthorizationInfo simpleAuthorizationInfo = new SimpleAuthorizationInfo();
user.getRoles().forEach(role->{
simpleAuthorizationInfo.addRole(role.getName());
//权限信息
List<Perms> perms = userService.findPermsByRoleId(role.getId());
if(!CollectionUtils.isEmpty(perms)){
perms.forEach(perm->{
simpleAuthorizationInfo.addStringPermission(perm.getName());
});
}
});
return simpleAuthorizationInfo;
}
return null;
}
}

5.3 shiro 缓存

使用缓存可以减轻 DB 的访问压力,从而提高系统的查询效率

5.3.1 整合 Ehcache

引入 ehcache 依赖

1
2
3
4
5
6
xml复制代码<!--引入shiro和ehcache-->
<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-ehcache</artifactId>
<version>1.5.3</version>
</dependency>

开启缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Bean
public Realm getRealm(){
CustomerRealm customerRealm = new CustomerRealm();
//修改凭证校验匹配器
HashedCredentialsMatcher credentialsMatcher = new HashedCredentialsMatcher();
//设置加密算法为md5
credentialsMatcher.setHashAlgorithmName("MD5");
//设置散列次数
credentialsMatcher.setHashIterations(1024);
customerRealm.setCredentialsMatcher(credentialsMatcher);

//开启缓存管理
customerRealm.setCacheManager(new EhCacheManager());
customerRealm.setCachingEnabled(true);//开启全局缓存
customerRealm.setAuthenticationCachingEnabled(true);//认证认证缓存
customerRealm.setAuthenticationCacheName("authenticationCache");
customerRealm.setAuthorizationCachingEnabled(true);//开启授权缓存
customerRealm.setAuthorizationCacheName("authorizationCache");
return customerRealm;
}

5.3.2 整合 Redis

引入 redis 依赖

1
2
3
4
5
xml复制代码<!--redis整合springboot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置 redis 连接,启动 redis 服务

1
2
3
properties复制代码spring.redis.port=6379
spring.redis.host=localhost
spring.redis.database=0

ehcache 提供了 EhCacheManager,而 EhCacheManager 实现了 CacheManager 接口,因此我们可以自定义一个 RedisCacheManager

1
2
3
4
5
6
7
java复制代码public class RedisCacheManager implements CacheManager {
@Override
public <K, V> Cache<K, V> getCache(String cacheName) throws CacheException {
System.out.println("缓存名称: "+cacheName);
return new RedisCache<K,V>(cacheName);
}
}

再自定义 Cache 接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
java复制代码public class RedisCache<K,V> implements Cache<K,V> {

private String cacheName;

public RedisCache() {
}

public RedisCache(String cacheName) {
this.cacheName = cacheName;
}

@Override
public V get(K k) throws CacheException {
System.out.println("获取缓存:"+ k);
return (V) getRedisTemplate().opsForHash().get(this.cacheName,k.toString());
}

@Override
public V put(K k, V v) throws CacheException {
System.out.println("设置缓存key: "+k+" value:"+v);
getRedisTemplate().opsForHash().put(this.cacheName,k.toString(),v);
return null;
}

@Override
public V remove(K k) throws CacheException {
return (V) getRedisTemplate().opsForHash().delete(this.cacheName,k.toString());
}

@Override
public v remove(k k) throws CacheException {
return (v) getRedisTemplate().opsForHash().delete(this.cacheName,k.toString());
}

@Override
public void clear() throws CacheException {
getRedisTemplate().delete(this.cacheName);
}

@Override
public int size() {
return getRedisTemplate().opsForHash().size(this.cacheName).intValue();
}

@Override
public Set<k> keys() {
return getRedisTemplate().opsForHash().keys(this.cacheName);
}

@Override
public Collection<v> values() {
return getRedisTemplate().opsForHash().values(this.cacheName);
}

private RedisTemplate getRedisTemplate(){
RedisTemplate redisTemplate = (RedisTemplate) ApplicationContextUtils.getBean("redisTemplate");
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
return redisTemplate;
}


// 封装获取 redisTemplate
private RedisTemplate getRedisTemplate(){
RedisTemplate redisTemplate = (RedisTemplate) ApplicationContextUtils.getBean("redisTemplate");
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
return redisTemplate;
}
}

我们还需要对 salt 作序列化实现,由于 shiro 中提供的 SimpleByteSource 实现没有实现序列化,所以我们需要自定义 salt 序列化实现

1
2
3
4
5
6
java复制代码 // 自定义 salt 实现,实现序列化接口
public class MyByteSource extends SimpleByteSource implements Serializable {
public MyByteSource(String string) {
super(string);
}
}

在 realm 中使用自定义 salt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
java复制代码public class MyByteSource implements ByteSource, Serializable {

private byte[] bytes;
private String cachedHex;
private String cachedBase64;

//加入无参数构造方法实现序列化和反序列化
public MyByteSource(){

}

public MyByteSource(byte[] bytes) {
this.bytes = bytes;
}

public MyByteSource(char[] chars) {
this.bytes = CodecSupport.toBytes(chars);
}

public MyByteSource(String string) {
this.bytes = CodecSupport.toBytes(string);
}

public MyByteSource(ByteSource source) {
this.bytes = source.getBytes();
}

public MyByteSource(File file) {
this.bytes = (new MyByteSource.BytesHelper()).getBytes(file);
}

public MyByteSource(InputStream stream) {
this.bytes = (new MyByteSource.BytesHelper()).getBytes(stream);
}

public static boolean isCompatible(Object o) {
return o instanceof byte[] || o instanceof char[] || o instanceof String || o instanceof ByteSource || o instanceof File || o instanceof InputStream;
}

public byte[] getBytes() {
return this.bytes;
}

public boolean isEmpty() {
return this.bytes == null || this.bytes.length == 0;
}

public String toHex() {
if (this.cachedHex == null) {
this.cachedHex = Hex.encodeToString(this.getBytes());
}

return this.cachedHex;
}

public String toBase64() {
if (this.cachedBase64 == null) {
this.cachedBase64 = Base64.encodeToString(this.getBytes());
}

return this.cachedBase64;
}

public String toString() {
return this.toBase64();
}

public int hashCode() {
return this.bytes != null && this.bytes.length != 0 ? Arrays.hashCode(this.bytes) : 0;
}

public boolean equals(Object o) {
if (o == this) {
return true;
} else if (o instanceof ByteSource) {
ByteSource bs = (ByteSource)o;
return Arrays.equals(this.getBytes(), bs.getBytes());
} else {
return false;
}
}

private static final class BytesHelper extends CodecSupport {
private BytesHelper() {
}

public byte[] getBytes(File file) {
return this.toBytes(file);
}

public byte[] getBytes(InputStream stream) {
return this.toBytes(stream);
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【新手慎入】秘境探索之一个NET 对象从内存分配到内存回收

发表于 2021-11-28

这是我参与11月更文挑战的第7天,活动详情查看:2021最后一次更文挑战」。

  • 📢欢迎点赞 :👍 收藏 ⭐留言 📝 如有错误敬请指正,赐人玫瑰,手留余香!
  • 📢本文作者:由webmote 原创,首发于 【掘金】
  • 📢作者格言: 生活在于折腾,当你不折腾生活时,生活就开始折腾你,让我们一起加油!💪💪💪

前方高能预警,新手慎入!不听劝阻者,轻则郁闷堆积,重则生死看淡,对编程失去了念想,对生活失去了幻想!好了,心理强大到NB的可以忽略前方若干警示。为了探索.NET对象的内存分配和回收销毁,您可能需要准备一些调试的基本知识,比如上篇的<利用SOS扩展库进入高阶.NET6程序的调试>.以下例子来自.net 6技术支持。

  1. 我们的第一个对象

我们的第一个对象,不是你初中暗恋的古灵精怪的小女孩,更不是你高中的神秘御姐范的初恋女友,她是地地道道的Object。

不信,我Show给你看。

1
2
3
4
5
6
7
csharp复制代码public static int Main()
{
MaoniType o = new MaoniType(128, 256);
Console.ReadLine();
// 其它乱七八糟的代码
return 0;
}

掀开她神秘的盖头,她也只不是千千万万普通对象中的一员,非要说她有什么不同的话,那可能就是你想驯服她,并且你花费了你的宝贵时间,在她身上。

1
2
3
4
5
6
7
8
9
10
11
csharp复制代码public class MaoniType
{
public MaoniType(int a, int b)
{
A = a;
B = b;
}

public int A { get; set; }
public int B { get; set; }
}
  1. 正确的打开她

美丽总是隐藏在朦胧之中,隔纱看美人,越看越迷人。

不过我们需要的不是肤浅的撩骚,让我们利用高级窥探工具,更加深入到灵魂的探索她。

当然,最最简单的探索工具,就是Windbg + SoS 扩展了。

至于工具的使用,不是重点,在这里就略过了,如果你还不会的话,那么就移步<利用SOS扩展库进入高阶.NET6程序的调试>瞧瞧,那里已经给你备好了下酒好菜。

闲话少叙,让我们直接打开工具,键入神秘指令,来个一指入魂吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
shell复制代码0:007> .load C:\Users\webmote.dotnet\sos\sos.dll
0:007> !dumpheap -stat
Statistics:
MT Count TotalSize Class Name
00007ffc77c37598 1 24 System.IO.SyncTextReader
00007ffc77c33478 1 24 System.Threading.Tasks.Task+<>c
00007ffc77c1ca70 1 24 System.IO.Stream+NullStream
00007ffc77c13798 1 24 ConsoleApp6.MaoniType
...
[omitted]

00007ffc77bd7f48 28 1160 System.SByte[]
00007ffc77bd8410 4 3596 System.Int32[]
00007ffc77c1d3c8 3 4178 System.Byte[]
00007ffc77b2b578 8 18216 System.Object[]
00007ffc77c33898 3 33356 System.Char[]
00007ffc77bdd698 82 35610 System.String
Total 208 objects

没错,找到 ConsoleApp6.MaoniType 这个类名,这就是你心心念的 对象 No 1.

  1. 深入内存

既然已经被你定位到了,那么就让我们继续深入吧, 现在只需要点她的牌牌就可以了。

1
2
3
4
5
6
7
8
shell复制代码0:007> !DumpHeap /d -mt 00007ffc77c13798
Address MT Size
000002470000c0c8 00007ffc77c13798 24

Statistics:
MT Count TotalSize Class Name
00007ffc77c13798 1 24 ConsoleApp6.MaoniType
Total 1 objects

现在,有了她第一手的资讯:

1
2
3
4
5
shell复制代码姓名: Maoni/莫妮
尺寸: 24
起点:c0c8 [000002470000c0c8]
个数:1个
表索引:[00007ffc77c13798]
  1. 继续深入——内存布局调查

让我们来看看GC地址空间的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
shell复制代码0:007> !eeheap -gc
Number of GC Heaps: 1
generation 0 starts at 0x0000024700001030
generation 1 starts at 0x0000024700001018
generation 2 starts at 0x0000024700001000
ephemeral segment allocation context: none
segment begin allocated committed allocated size committed size
0000024700000000 0000024700001000 00000247000173C8 0000024700022000 0x163c8(91080) 0x21000(135168)
Large object heap starts at 0x0000024710001000
segment begin allocated committed allocated size committed size
0000024710000000 0000024710001000 0000024710001018 0000024710002000 0x18(24) 0x1000(4096)
Pinned object heap starts at 0x0000024718001000
0000024718000000 0000024718001000 0000024718005420 0000024718012000 0x4420(17440) 0x11000(69632)
Total Allocated Size: Size: 0x1a800 (108544) bytes.
Total Committed Size: Size: 0x22000 (139264) bytes.
------------------------------
GC Allocated Heap Size: Size: 0x1a800 (108544) bytes.
GC Committed Heap Size: Size: 0x22000 (139264) bytes.

你应该没忘记我们对象的地址吧?

莫妮的地址是 000002470000c0c8,而新生段的分配信息我们也可以清晰的看到。

当然有关段,估计仍然需要一大章节才能说明白吧,这里仅仅简单介绍下。

它是GC从操作系统采集内存的一个单位,实际内存申请和分配以及释放以segment(段)为单位;

例如: workstation GC模式segment大小为16M,server GC模式segment大小为64M。

Gen 0和Gen 1 heap总是位于同一个段中,叫做ephemeral segment(新生段),
Gen 2 heap由0个或多个segments组成,LOH由1个或多个segments组成

.NET程序启动时CLR为heap创建2个segment,一个作为ephemeral segment,另一个用于LOH。

Full GC后完全空闲的segments将被释放掉,内存返回给操作系统

再次深入前,让我们来点小甜点,放松一下,看看四周的风景。

4.1 我们怎么用DRAM

不管怎么分配,我们都需要涉及到物理内存。

当然,我们并不支持使用物理内存!

我们使用虚拟内存(VM),这块有操作系统的哦VMM(虚拟内存管理器)提供。

操作系统引入了虚拟内存概念,使得我们能够:

  • 每个进程都认为它有自己的内存空间,就好象国家的廉租房制度一样,让每个人都体验到家的温馨。
  • 你可以请求更多的内存,甚至超过了物理内存大小,而管理器只会占用真正使用的物理内存;
  • 重要的是,不需要VM分配为连续的了,实现了即抛即用。

VM的实现也很有意思,由操作系统提供页的支持:

  • 由MMU(内存管理单元)实现
  • 内存被分割为页(一般是4K)
  • 虚拟内存到物理内存由页映射使用页表进行管理
  • 无法映射到物理内存,会导致页失败错误
  • 操作系统控制页映射转换

有很多技术实现更快的转换,比如页表缓存、TLB(Translation Lookaside Buffer)技术等。

image.png

4.2 物理页是怎么组织的?

image.png

  1. 当计算机启动后,Windows操作系统把来自DRAM的物理页整理为一个列表;
  2. 当有进程需要物理页分配时,它转变为WS(Working Set)的一部分
  3. 当一个物理页从WS移除后,它通过软件页故障或硬页故障返回到列表
  4. 硬页故障是非常耗时的,因此我们需要避免它
  5. 为了避免硬页故障,我们不能增加大于物理内存的堆栈(可以观察物理内存负载信息)

4.3 GC怎么从VM采集内存

  • 保留内存

由于需要分页的原因,因此我们可以请求稍后可能使用的范围地址,它被称作保留内存(VirtualAlloc 使用 MEM_RESERVE)。当然保留内存不能保存任何数据。

  • 提交内存

当我们需要在页存储上存储数据时,我们告诉操作系统,这叫提交内存。(VirtualAlloc使用MEM_COMMIT),提交操作成功后,保证你不会得到OOM异常。

保留内存操作是非常快的,当然你仍然需要增加一次用户态<–> 内核态的操作;提交内存也是非常快的…. 当然,知道你真正的保存数据。 而恰恰这个时候,有可能引起分配页故障,导致OOM。

  • 保存数据
    一切都oK了,我们呢就可以轻松保存数据了。
  1. 再次深入内存布局调查

让我们回到从前,一如第一次初见。

5.1 初见

image.png

假设上图就是我们的段(segment)内存的保留内存(Reserve memory)区域,那么你想到了什么?

是的,首先她是一个空荡荡的巨大空间!

当然,这里面也没有任何东西。

5.2 相识

现在,我们想要在段内存中保存一些东东,该怎么办呢?

image.png

是的,我们得混个脸熟!

好了,首先我们需要保存段的头信息,那让我们先提交个申请(通常是64K)。

有了第一次后,我们对这个操作流程应该熟门熟路了,所以,谁也抵挡不住我们前进的脚步。

image.png

再次提交存储对象的空间请求(通常是64K),当然,GC通常不会仅仅为一个对象申请内存.

5.3 行动

它通常先申请一个分配上下文,当然这个时候并没有对象被构造。

image.png

然后动用物理内存页,保存数据,查看存储信息如下:

1
2
3
shell复制代码0:007> dq 000002470000c0c8-8 l3
00000247`0000c0c0 00000000`00000000 00007ffc`77c13798
00000247`0000c0d0 00000100`00000080

其内部大致的流程如下(精简版):

image.png

注意:缓存是非常快的,以下是来自Intel的数据。

  • L1 缓存: 4 cpu周期
  • L2 缓存: 12 cpu周期
  • L3 缓存: 44 cpu周期

DRAM的读取大约 60ns ~ 100ns之间。

5.4 小结下

经过前面不断的深入探索,对象的内存分布已经在你面前完全展开。那么,让我们再总结下。

image.png

GC的分配如下:

image.png

  1. 清扫战场

经过上面让人目眩神秘的命令和图片,你学废了吗?

最后,让我们打扫下战场,看看GC这位小宝贝。

image.png

6.1 GC怎么决定收集

如下代码,让我们看看它能有多智能?

1
2
3
4
5
6
7
8
9
csharp复制代码public static int Main()
{
MaoniType o = new MaoniType(128, 256);
GCHandle h = GCHandle.Alloc(o, GCHandleType.Weak);
GC.Collect();
Console.WriteLine("Collect called, h.Target is {0}",
(h.Target == null) ? "collected" : "not collected");
return 0;
}

发生了什么? 输出是:

Output - Collect called, h.Target is not collected

是的,你没有看错,GC.Collect()收集整个堆栈,这意味着GC不能决定对象的生命周期。
如果一个对象还活着,那么GC会被告知,在这个例子中,JIT(User Roots)告诉GC,对象还活着。因此GC无法回收对象。

6.2 开始收集

好了,让我们来个真正的回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
csharp复制代码[MethodImpl(MethodImplOptions.NoInlining)]
public static void TestLifeTime()
{
MaoniType o = new MaoniType(128, 256);
h = GCHandle.Alloc(o, GCHandleType.Weak);
}

public static int Main()
{
TestLifeTime();
GC.Collect();
Console.WriteLine("Collect called, h.Target is {0}",
(h.Target == null) ? "collected" : "not collected");

return 0;
}

输出结果:

Output: Collect called, h.Target is collected

再次观察GC:

image.png

是的,GC摧毁了对象,内存回收了。

image.png

  1. 小结

经过本次的多次深入刨析,你对你的对象是不是更加了解了?

👓都看到这了,还在乎点个赞吗?

👓都点赞了,还在乎一个收藏吗?

👓都收藏了,还在乎一个评论吗?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

字节面试官让我讲讲最小生成树,我忍不住笑了 一、引言 二、K

发表于 2021-11-28

你好,我是小黄,一名独角兽企业的Java开发工程师。
感谢茫茫人海中我们能够相遇,
俗话说:当你的才华和能力,不足以支撑你的梦想的时候,请静下心来学习,
希望优秀的你可以和我一起学习,一起努力,实现属于自己的梦想。

一、引言

鸽了一周的我又回来了……..

小黄去过生日了~

带你们看下生日蛋糕

在这里插入图片描述

大家有没有在生活中遇到这种事情

你们县城需要在几个小区之间进行修路,由于政府资金紧张,不可能所有的小区之间都进行修路,而是利用最少的资金修一条可以连接所有小区的路

如同下图所示:
在这里插入图片描述
当然,上述只是一个抽象化的例子,而我们实际生活中,每个小区间的距离也是不一样的,我们怎么使用最小的资金去连接所有的小区呢?

这就牵扯到我们今天的老大哥们:Kruskal 算法和 Prim 算法

这两种算法分别从边和点产生最小生成树,保证了资金的最小性

本篇文章,我们一起走近 Kruskal 算法,探究一下该算法是怎么通过边来确定最小生成树的

当然,有人可能有疑惑,为什么不一次性把 Prim 算法也讲了,结尾我会告诉答案的

二、Kruskal 算法是什么

克鲁斯卡尔(Kruskal)算法是求连通网的最小生成树的一种方法。与普里姆(Prim)算法不同,它的时间复杂度为 O(eloge)(e为网中的边数),所以,适合于求边稀疏的最小生成树 。

因为我们的 Kruskal 算法是以边为单位,所以求一些边稀疏的最小生成树,时间复杂度比较小

我们以下面的小区为例,通过 Kruskal 算法会给我们一条连接所有小区的最短路径
在这里插入图片描述

三、Kruskal 算法本质

对于 Kruskal 算法来说,整体使用了 贪心 + 并查集 的思路

有不熟悉并查集的童鞋可以看一下这篇:三分钟带你学会并查集【含状态压缩】
在这里插入图片描述

简单来说,我们需要将所有的边放入一个堆中,按照边的大小进行排序,如下所示:1、2、3、6、7、10、12

  • 我们把第一个边 1 取出,将 C小区 和 D小区 合并,目前集合:{C、D}
    在这里插入图片描述
  • 我们把第二个边 2 取出,将 A小区 和 E小区 合并,目前集合:{C、D},{A、E}
    在这里插入图片描述
  • 我们把第三个边 3 取出,将 A小区 和 B小区 合并,目前集合:{C、D},{A、B、E}
    在这里插入图片描述
  • 将第四个边 6 取出,将 A小区 和 D小区 合并,目前集合:{A、B、E、C、D}
    在这里插入图片描述
  • 将第五个边 7 取出,将 B小区 和 E小区 合并,由于 {A、B、E、C、D} 在一个集合,不进行合并,跳过该边
  • 将第六个边 10 取出,将 B小区 和 C小区 合并,由于 {A、B、E、C、D} 在一个集合,不进行合并,跳过该边
  • 依次类推…….

最终我们会得到一个路径,这也就是我们的最小生成树

由图得知,我们最小的资金需要:12

四、Kruskal 算法实现

对于 Kruskal 算法,我们需要实现两部分

  • 并查集
  • 贪心

1、并查集

这里简单的放下并查集的两个关键步骤,其余源码可以关注 爱敲代码的小黄 公众号,回复:算法源码 即可获得算法源码

合并:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码	// 合并
public void union(Node node1, Node node2) {
// 找到两个节点的父节点
Node node1Parent = getParentNode(node1);
Node node2Parent = getParentNode(node2);
// 看看是不是一个父亲
if (node1Parent != node2Parent) {
// node1、node2父亲的节点数量
int size1 = size.get(node1Parent);
int size2 = size.get(node2Parent);
// 谁的节点多,少的就挂在多的下面,进行合并
if (size1 >= size2) {
parent.put(node1Parent, node2Parent);
size.put(node1Parent, size1 + size2);
size.remove(node2Parent);
} else {
parent.put(node2Parent, node1Parent);
size.put(node2Parent, size1 + size2);
size.remove(node2Parent);
}
}
}

查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码	public boolean isSame(Node node1, Node node2) {
return getParentNode(node1) == getParentNode(node2);
}

public Node getParentNode(Node node) {
// 为了路径压缩
Stack<Node> stack = new Stack<>();
while (parent.get(node) != node) {
stack.add(node);
node = parent.get(node);
}
while (!stack.isEmpty()) {
parent.put(stack.pop(), node);
}
return node;
}

2、Kruskal 算法

并查集的初始化:

1
2
3
4
5
6
7
8
java复制代码	// 赋予初始值
public void makeSets(Collection<Node> list) {
for (Node node : list) {
// 初始时,每个节点的父节点均是自己,集合的数量为1
parent.put(node, node);
size.put(node, 1);
}
}

比较器(按照边的权重排序):

1
2
3
4
5
6
7
java复制代码 	public static class EdgeComparator implements Comparator<Edge> {

@Override
public int compare(Edge o1, Edge o2) {
return o1.weight - o2.weight;
}
}

Kruskal 算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码	public static Set<Edge> kruskalMST(Graph graph) {
Union union = new Union();
// 初始化并查集
union.makeSets(graph.nodes.values());
// 建堆,按照边的权重进行排序
PriorityQueue<Edge> priorityQueue = new PriorityQueue<>(new EdgeComparator());
// 放入边
for (Edge edge : graph.edges) {
priorityQueue.add(edge);
}
Set<Edge> edges = new HashSet<>();
// 从最小的开始
while (!priorityQueue.isEmpty()) {
Edge edge = priorityQueue.poll();
// 看一下是否是一个集合的
if (!union.isSame(edge.from, edge.to)) {
// 可以选取这条边,合并这两个点
edges.add(edge);
union.union(edge.from, edge.to);
}
}
return edges;
}

以上图的描述均使用图的形象化描述:图的形象化描述

五、总结

通过以上的描述,我们可以解决我们开头说的那个问题:你们县城需要在几个小区之间进行修路,由于政府资金紧张,不可能所有的小区之间都进行修路,而是利用最少的资金修一条可以连接所有小区的路

同时,对于 Kruskal 的代码也需要多写几遍,博主写这篇博客的时候,又忘记了怎么写(逃

  • 1584. 连接所有点的最小费用【模板题目】

在这里插入图片描述

对源代码有兴趣的小伙伴,可以关注 爱敲代码的小黄 公众号,回复:算法源码 即可获得算法源码

回答一下一开始的问题:有人可能有疑惑,为什么不一次性把 **Prim 算法也讲了**

答:下期讲 Prim,可以水一期(逃

本期的内容就到这里,下期会讲述最小生成树 Prim 算法

我是一名独角兽企业的Java开发工程师,希望可以点个关注呀,有问题可以留言或者私信加我微信:hls1793929520,我们下期再见!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

LongAdder源码分析 基本介绍 源码分析

发表于 2021-11-28

基本介绍

LongAdder跟AtomicLong都是用于计数器统计的,AtomicLong底层通过CAS操作进行计数,但是在高并发条件下性能比较低。

阿里的开发手册上明确说明:

image.png

LongAdder的继承结构如下:

image.png

1
2
3
java复制代码//LongAdder是Striped64的子类
public class LongAdder extends Striped64 implements Serializable {
}

Striped64类中重要的属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
java复制代码abstract class Striped64 extends Number {
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS.
*
* JVM intrinsics note: It would be possible to use a release-only
* form of CAS here, if it were provided.
*/
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

/** Number of CPUS, to place bound on table size */
//表示当前计算机CPU数量,什么用? 控制cells数组长度的一个关键条件
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
* Table of cells. When non-null, size is a power of 2.
* cells数组
*/
transient volatile Cell[] cells;

/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
* 没有发生过竞争时,数据会累加到 base上 | 当cells扩容时,需要将数据写到base中
*/
transient volatile long base;

/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
* 初始化cells或者扩容cells都需要获取锁,0 表示无锁状态,1 表示其他线程已经持有锁了
*/
transient volatile int cellsBusy;

/**
* Package-private default constructor
*/
Striped64() {
}

/**
* CASes the base field.
* 通过修改base中的值
*/
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

/**
* CASes the cellsBusy field from 0 to 1 to acquire lock.
* 通过CAS方式获取锁,即将CELLSBUSY改成1,表示获取到了锁
*/
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

/**
* Returns the probe value for the current thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
*
* 获取当前线程的Hash值
*/
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

/**
* Pseudo-randomly advances and records the given probe value for the
* given thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
*
* 重置当前线程的Hash值
*/
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
}

image.png

Cell 是 java.util.concurrent.atomic 下 Striped64 的一个内部类,
LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

image.png

即内部有一个base变量,一个Cell[]数组。

  • base变量:非竞态条件下,直接累加到该变量上
  • Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中

当计算总值调用sum()方法,sum源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码/**
* 将base的值加上cells数组中所有槽位中的值得到总值
*/
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

image.png

LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。

image.png

源码分析

入口:longAdder.increment()

1
2
3
java复制代码public void increment() {
add(1L);
}

接着查看add方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码public void add(long x) {
//as 表示cells引用
//b 表示获取的base值
//v 表示 期望值
//m 表示 cells 数组的长度
//a 表示当前线程命中的cell单元格
Cell[] as; long b, v; int m; Cell a;

//条件一:true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
// false->表示cells未初始化,当前所有线程应该将数据写到base中

//条件二:false->表示当前线程cas替换数据成功,
// true->表示发生竞争了,可能需要重试 或者 扩容
if ((as = cells) != null || !casBase(b = base, b + x)) {
//什么时候会进来?
//1.true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
//2.true->表示发生竞争了,可能需要重试 或者 扩容



//true -> 未竞争 false->发生竞争
boolean uncontended = true;

//条件一:true->说明 cells 未初始化,也就是多线程写base发生竞争了
// false->说明 cells 已经初始化了,当前线程应该是 找自己的cell 写值

//条件二:getProbe() 获取当前线程的hash值 m表示cells长度-1 cells长度 一定是2的次方数 15= b1111
// true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
// false-> 说明当前线程对应的cell 不为空,说明 下一步想要将x值 添加到cell中。

//条件三:true->表示cas失败,意味着当前线程对应的cell 有竞争
// false->表示cas成功
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//都有哪些情况会调用?
//1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
//2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
//3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]
longAccumulate(x, null, uncontended);
}
}

条件递增,逐步解析,如下:

  • 1.最初无竞争时只更新base;
  • 2.如果更新base失败后,首次新建一个Cell[]数组
  • 3.当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[]扩容

longAccumulate入参说明如下:

image.png

只有三种情况会调用longAccumulate方法

  • 1 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
  • 2 当前线程对应下标的cell为空,需要创建 longAccumulate 支持
  • 3 cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]

longAccumulate方法总纲如下:

image.png

上述代码首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:

  • CASE1:Cell[]数组已经初始化
  • CASE2:Cell[]数组未初始化(首次新建)
  • CASE3:Cell[]数组正在初始化中

一开始刚刚要初始化Cell[]数组(首次新建),即未初始化过Cell[]数组,尝试占有锁并首次初始化cells数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码//CASE2:前置条件cells还未初始化 as 为null
//条件一:true 表示当前未加锁
//条件二:cells == as?因为其它线程可能会在你给as赋值之后修改了 cells
//条件三:true 表示获取锁成功 会把cellsBusy = 1,false 表示其它线程正在持有这把锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
//cells == as? 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}

image.png

image.png

如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素,value是x值,默认为1。h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思

兜底的else模块,即多个线程尝试CAS修改失败的线程会走到这个分支,如下:

image.png
该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。

当cell已经初始化了时,流程代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
java复制代码//CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
if ((as = cells) != null && (n = as.length) > 0) {
//2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
//3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]

//CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell
if ((a = as[(n - 1) & h]) == null) {

//true->表示当前锁 未被占用 false->表示锁被占用
if (cellsBusy == 0) { // Try to attach new Cell

//拿当前的x创建Cell
Cell r = new Cell(x); // Optimistically create

//条件一:true->表示当前锁 未被占用 false->表示锁被占用
//条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败..
if (cellsBusy == 0 && casCellsBusy()) {
//是否创建成功 标记
boolean created = false;
try { // Recheck under lock
//rs 表示当前cells 引用
//m 表示cells长度
//j 表示当前线程命中的下标
Cell[] rs; int m, j;

//条件一 条件二 恒成立
//rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置
//导致丢失数据
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}

//扩容意向 强制改为了false
collide = false;
}
// CASE1.2:
// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//CASE 1.3:当前线程rehash过hash值,然后新命中的cell不为空
//true -> 写成功,退出循环
//false -> 表示rehash之后命中的新的cell 也有竞争 重试1次 再重试1次
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))

break;
//CASE 1.4:
//条件一:n >= NCPU true->扩容意向 改为false,表示不扩容了 false-> 说明cells数组还可以扩容
//条件二:cells != as true->其它线程已经扩容过了,当前线程rehash之后重试即可
else if (n >= NCPU || cells != as)
//扩容意向 改为false,表示不扩容了
collide = false; // At max size or stale
//CASE 1.5:
//!collide = true 设置扩容意向 为true 但是不一定真的发生扩容
else if (!collide)
collide = true;
//CASE 1.6:真正扩容的逻辑
//条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程可以去竞争这把锁
//条件二:casCellsBusy true->表示当前线程 获取锁 成功,可以执行扩容逻辑
// false->表示当前时刻有其它线程在做扩容相关的操作。
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//cells == as
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}

//重置当前线程Hash值,即rehash
h = advanceProbe(h);
}

当cell已经初始化了

  • 1.如果当前线程对应的hash槽位为null时,通过cas创建cell,并将cell赋值,将cell存入到cells数组中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码//CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell
if ((a = as[(n - 1) & h]) == null) {

//true->表示当前锁 未被占用 false->表示锁被占用
if (cellsBusy == 0) { // Try to attach new Cell

//拿当前的x创建Cell
Cell r = new Cell(x); // Optimistically create

//条件一:true->表示当前锁 未被占用 false->表示锁被占用
//条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败..
if (cellsBusy == 0 && casCellsBusy()) {
//是否创建成功 标记
boolean created = false;
try { // Recheck under lock
//rs 表示当前cells 引用
//m 表示cells长度
//j 表示当前线程命中的下标
Cell[] rs; int m, j;

//条件一 条件二 恒成立
//rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置
//导致丢失数据
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}

//扩容意向 强制改为了false
collide = false;
}
  • 2.如果当前线程对应的cells数组中的槽位不为空null,并且已经尝试过cas操作修改值失败,即存在竞争。将wasUncontended改为true,接着调用最下面的h = advanceProbe(h);重置当前线程Hash值,
1
2
3
4
5
6
java复制代码// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash

//重置当前线程Hash值,即rehash
h = advanceProbe(h);
  • 3.重置当前线程Hash值后,接着再次判断对应的cells数组中的槽位是否为空,如果为空,则将值存入到对应的槽位,如果不为空,则通过cas操作尝试能不能修改槽位的值。如果修改成功,则执行结束
1
2
3
java复制代码else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
  • 4.如果步骤3修改失败的话,就会将扩容意向collide的值置为true
1
2
java复制代码else if (!collide)
collide = true;
  • 5.接着下次还是修改槽位的值不成功的话,最后会执行扩容操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码else if (cellsBusy == 0 && casCellsBusy()) {
try {
//cells == as
if (cells == as) { // Expand table unless stale
//扩容为2倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
//将之前cells数组中的值复制到扩容之后的数组中
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
java复制代码//都有哪些情况会调用?
//1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
//2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
//3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]

// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
//h 表示线程hash值
int h;
//条件成立:说明当前线程 还未分配hash值
if ((h = getProbe()) == 0) {
//给当前线程分配hash值
ThreadLocalRandom.current(); // force initialization
//取出当前线程的hash值 赋值给h
h = getProbe();
//为什么? 因为默认情况下 当前线程 肯定是写入到了 cells[0] 位置。 不把它当做一次真正的竞争
wasUncontended = true;
}

//表示扩容意向 false 一定不会扩容,true 可能会扩容。
boolean collide = false; // True if last slot nonempty

//自旋
for (;;) {
//as 表示cells引用
//a 表示当前线程命中的cell
//n 表示cells数组长度
//v 表示 期望值
Cell[] as; Cell a; int n; long v;

//CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
if ((as = cells) != null && (n = as.length) > 0) {
//2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
//3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]

//CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell
if ((a = as[(n - 1) & h]) == null) {

//true->表示当前锁 未被占用 false->表示锁被占用
if (cellsBusy == 0) { // Try to attach new Cell

//拿当前的x创建Cell
Cell r = new Cell(x); // Optimistically create

//条件一:true->表示当前锁 未被占用 false->表示锁被占用
//条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败..
if (cellsBusy == 0 && casCellsBusy()) {
//是否创建成功 标记
boolean created = false;
try { // Recheck under lock
//rs 表示当前cells 引用
//m 表示cells长度
//j 表示当前线程命中的下标
Cell[] rs; int m, j;

//条件一 条件二 恒成立
//rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置
//导致丢失数据
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}

//扩容意向 强制改为了false
collide = false;
}
// CASE1.2:
// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//CASE 1.3:当前线程rehash过hash值,然后新命中的cell不为空
//true -> 写成功,退出循环
//false -> 表示rehash之后命中的新的cell 也有竞争 重试1次 再重试1次
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))

break;
//CASE 1.4:
//条件一:n >= NCPU true->扩容意向 改为false,表示不扩容了 false-> 说明cells数组还可以扩容
//条件二:cells != as true->其它线程已经扩容过了,当前线程rehash之后重试即可
else if (n >= NCPU || cells != as)
//扩容意向 改为false,表示不扩容了
collide = false; // At max size or stale
//CASE 1.5:
//!collide = true 设置扩容意向 为true 但是不一定真的发生扩容
else if (!collide)
collide = true;
//CASE 1.6:真正扩容的逻辑
//条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程可以去竞争这把锁
//条件二:casCellsBusy true->表示当前线程 获取锁 成功,可以执行扩容逻辑
// false->表示当前时刻有其它线程在做扩容相关的操作。
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//cells == as
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}

//重置当前线程Hash值,即rehash
h = advanceProbe(h);
}
//CASE2:前置条件cells还未初始化 as 为null
//条件一:true 表示当前未加锁
//条件二:cells == as?因为其它线程可能会在你给as赋值之后修改了 cells
//条件三:true 表示获取锁成功 会把cellsBusy = 1,false 表示其它线程正在持有这把锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
//cells == as? 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//CASE3:
//1.当前cellsBusy加锁状态,表示其它线程正在初始化cells,所以当前线程将值累加到base
//2.cells被其它线程初始化后,当前线程需要将数据累加到base
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

总体步骤如下:

image.png

小总结

AtomicLong 原理:CAS+自旋

场景:

  • 低并发下的全局计算
  • AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。

缺陷: 高并发后性能急剧下降,AtomicLong的自旋会成为瓶颈,N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。

LongAdder 原理:

  • CAS+Base+Cell数组分散
  • 空间换时间并分散了热点数据

场景:高并发下的全局计算

缺陷:sum求和后还有计算线程修改结果的话,最后结果不够准确

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…140141142…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%