LongAdder源码分析 基本介绍 源码分析

基本介绍

LongAdderAtomicLong都是用于计数器统计的,AtomicLong底层通过CAS操作进行计数,但是在高并发条件下性能比较低。

阿里的开发手册上明确说明:

image.png

LongAdder的继承结构如下:

image.png

1
2
3
java复制代码//LongAdder是Striped64的子类
public class LongAdder extends Striped64 implements Serializable {
}

Striped64类中重要的属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
java复制代码abstract class Striped64 extends Number {
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS.
*
* JVM intrinsics note: It would be possible to use a release-only
* form of CAS here, if it were provided.
*/
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

/** Number of CPUS, to place bound on table size */
//表示当前计算机CPU数量,什么用? 控制cells数组长度的一个关键条件
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
* Table of cells. When non-null, size is a power of 2.
* cells数组
*/
transient volatile Cell[] cells;

/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
* 没有发生过竞争时,数据会累加到 base上 | 当cells扩容时,需要将数据写到base中
*/
transient volatile long base;

/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
* 初始化cells或者扩容cells都需要获取锁,0 表示无锁状态,1 表示其他线程已经持有锁了
*/
transient volatile int cellsBusy;

/**
* Package-private default constructor
*/
Striped64() {
}

/**
* CASes the base field.
* 通过修改base中的值
*/
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

/**
* CASes the cellsBusy field from 0 to 1 to acquire lock.
* 通过CAS方式获取锁,即将CELLSBUSY改成1,表示获取到了锁
*/
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

/**
* Returns the probe value for the current thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
*
* 获取当前线程的Hash值
*/
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

/**
* Pseudo-randomly advances and records the given probe value for the
* given thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
*
* 重置当前线程的Hash值
*/
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
}

image.png

Cell 是 java.util.concurrent.atomic 下 Striped64 的一个内部类,
LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回

sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

image.png

即内部有一个base变量,一个Cell[]数组。

  • base变量:非竞态条件下,直接累加到该变量上
  • Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中

当计算总值调用sum()方法,sum源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码/**
* 将base的值加上cells数组中所有槽位中的值得到总值
*/
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

image.png

LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。

image.png

源码分析

入口:longAdder.increment()

1
2
3
java复制代码public void increment() {
add(1L);
}

接着查看add方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码public void add(long x) {
//as 表示cells引用
//b 表示获取的base值
//v 表示 期望值
//m 表示 cells 数组的长度
//a 表示当前线程命中的cell单元格
Cell[] as; long b, v; int m; Cell a;

//条件一:true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
// false->表示cells未初始化,当前所有线程应该将数据写到base中

//条件二:false->表示当前线程cas替换数据成功,
// true->表示发生竞争了,可能需要重试 或者 扩容
if ((as = cells) != null || !casBase(b = base, b + x)) {
//什么时候会进来?
//1.true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
//2.true->表示发生竞争了,可能需要重试 或者 扩容



//true -> 未竞争 false->发生竞争
boolean uncontended = true;

//条件一:true->说明 cells 未初始化,也就是多线程写base发生竞争了
// false->说明 cells 已经初始化了,当前线程应该是 找自己的cell 写值

//条件二:getProbe() 获取当前线程的hash值 m表示cells长度-1 cells长度 一定是2的次方数 15= b1111
// true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
// false-> 说明当前线程对应的cell 不为空,说明 下一步想要将x值 添加到cell中。

//条件三:true->表示cas失败,意味着当前线程对应的cell 有竞争
// false->表示cas成功
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//都有哪些情况会调用?
//1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
//2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
//3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]
longAccumulate(x, null, uncontended);
}
}

条件递增,逐步解析,如下:

  • 1.最初无竞争时只更新base;
  • 2.如果更新base失败后,首次新建一个Cell[]数组
  • 3.当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[]扩容

longAccumulate入参说明如下:

image.png

只有三种情况会调用longAccumulate方法

  • 1 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
  • 2 当前线程对应下标的cell为空,需要创建 longAccumulate 支持
  • 3 cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]

longAccumulate方法总纲如下:

image.png

上述代码首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:

  • CASE1:Cell[]数组已经初始化
  • CASE2:Cell[]数组未初始化(首次新建)
  • CASE3:Cell[]数组正在初始化中

一开始刚刚要初始化Cell[]数组(首次新建),即未初始化过Cell[]数组,尝试占有锁并首次初始化cells数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码//CASE2:前置条件cells还未初始化 as 为null
//条件一:true 表示当前未加锁
//条件二:cells == as?因为其它线程可能会在你给as赋值之后修改了 cells
//条件三:true 表示获取锁成功 会把cellsBusy = 1,false 表示其它线程正在持有这把锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
//cells == as? 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}

image.png

image.png

如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素,value是x值,默认为1。h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思

兜底的else模块,即多个线程尝试CAS修改失败的线程会走到这个分支,如下:

image.png
该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。

当cell已经初始化了时,流程代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
java复制代码//CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
if ((as = cells) != null && (n = as.length) > 0) {
//2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
//3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]

//CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell
if ((a = as[(n - 1) & h]) == null) {

//true->表示当前锁 未被占用 false->表示锁被占用
if (cellsBusy == 0) { // Try to attach new Cell

//拿当前的x创建Cell
Cell r = new Cell(x); // Optimistically create

//条件一:true->表示当前锁 未被占用 false->表示锁被占用
//条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败..
if (cellsBusy == 0 && casCellsBusy()) {
//是否创建成功 标记
boolean created = false;
try { // Recheck under lock
//rs 表示当前cells 引用
//m 表示cells长度
//j 表示当前线程命中的下标
Cell[] rs; int m, j;

//条件一 条件二 恒成立
//rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置
//导致丢失数据
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}

//扩容意向 强制改为了false
collide = false;
}
// CASE1.2:
// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//CASE 1.3:当前线程rehash过hash值,然后新命中的cell不为空
//true -> 写成功,退出循环
//false -> 表示rehash之后命中的新的cell 也有竞争 重试1次 再重试1次
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))

break;
//CASE 1.4:
//条件一:n >= NCPU true->扩容意向 改为false,表示不扩容了 false-> 说明cells数组还可以扩容
//条件二:cells != as true->其它线程已经扩容过了,当前线程rehash之后重试即可
else if (n >= NCPU || cells != as)
//扩容意向 改为false,表示不扩容了
collide = false; // At max size or stale
//CASE 1.5:
//!collide = true 设置扩容意向 为true 但是不一定真的发生扩容
else if (!collide)
collide = true;
//CASE 1.6:真正扩容的逻辑
//条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程可以去竞争这把锁
//条件二:casCellsBusy true->表示当前线程 获取锁 成功,可以执行扩容逻辑
// false->表示当前时刻有其它线程在做扩容相关的操作。
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//cells == as
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}

//重置当前线程Hash值,即rehash
h = advanceProbe(h);
}

当cell已经初始化了

  • 1.如果当前线程对应的hash槽位为null时,通过cas创建cell,并将cell赋值,将cell存入到cells数组中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码//CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell
if ((a = as[(n - 1) & h]) == null) {

//true->表示当前锁 未被占用 false->表示锁被占用
if (cellsBusy == 0) { // Try to attach new Cell

//拿当前的x创建Cell
Cell r = new Cell(x); // Optimistically create

//条件一:true->表示当前锁 未被占用 false->表示锁被占用
//条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败..
if (cellsBusy == 0 && casCellsBusy()) {
//是否创建成功 标记
boolean created = false;
try { // Recheck under lock
//rs 表示当前cells 引用
//m 表示cells长度
//j 表示当前线程命中的下标
Cell[] rs; int m, j;

//条件一 条件二 恒成立
//rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置
//导致丢失数据
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}

//扩容意向 强制改为了false
collide = false;
}
  • 2.如果当前线程对应的cells数组中的槽位不为空null,并且已经尝试过cas操作修改值失败,即存在竞争。将wasUncontended改为true,接着调用最下面的h = advanceProbe(h);重置当前线程Hash值,
1
2
3
4
5
6
java复制代码// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash

//重置当前线程Hash值,即rehash
h = advanceProbe(h);
  • 3.重置当前线程Hash值后,接着再次判断对应的cells数组中的槽位是否为空,如果为空,则将值存入到对应的槽位,如果不为空,则通过cas操作尝试能不能修改槽位的值。如果修改成功,则执行结束
1
2
3
java复制代码else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
  • 4.如果步骤3修改失败的话,就会将扩容意向collide的值置为true
1
2
java复制代码else if (!collide)
collide = true;
  • 5.接着下次还是修改槽位的值不成功的话,最后会执行扩容操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码else if (cellsBusy == 0 && casCellsBusy()) {
try {
//cells == as
if (cells == as) { // Expand table unless stale
//扩容为2倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
//将之前cells数组中的值复制到扩容之后的数组中
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
java复制代码//都有哪些情况会调用?
//1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
//2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
//3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]

// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
//h 表示线程hash值
int h;
//条件成立:说明当前线程 还未分配hash值
if ((h = getProbe()) == 0) {
//给当前线程分配hash值
ThreadLocalRandom.current(); // force initialization
//取出当前线程的hash值 赋值给h
h = getProbe();
//为什么? 因为默认情况下 当前线程 肯定是写入到了 cells[0] 位置。 不把它当做一次真正的竞争
wasUncontended = true;
}

//表示扩容意向 false 一定不会扩容,true 可能会扩容。
boolean collide = false; // True if last slot nonempty

//自旋
for (;;) {
//as 表示cells引用
//a 表示当前线程命中的cell
//n 表示cells数组长度
//v 表示 期望值
Cell[] as; Cell a; int n; long v;

//CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
if ((as = cells) != null && (n = as.length) > 0) {
//2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
//3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]

//CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell
if ((a = as[(n - 1) & h]) == null) {

//true->表示当前锁 未被占用 false->表示锁被占用
if (cellsBusy == 0) { // Try to attach new Cell

//拿当前的x创建Cell
Cell r = new Cell(x); // Optimistically create

//条件一:true->表示当前锁 未被占用 false->表示锁被占用
//条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败..
if (cellsBusy == 0 && casCellsBusy()) {
//是否创建成功 标记
boolean created = false;
try { // Recheck under lock
//rs 表示当前cells 引用
//m 表示cells长度
//j 表示当前线程命中的下标
Cell[] rs; int m, j;

//条件一 条件二 恒成立
//rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置
//导致丢失数据
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}

//扩容意向 强制改为了false
collide = false;
}
// CASE1.2:
// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//CASE 1.3:当前线程rehash过hash值,然后新命中的cell不为空
//true -> 写成功,退出循环
//false -> 表示rehash之后命中的新的cell 也有竞争 重试1次 再重试1次
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))

break;
//CASE 1.4:
//条件一:n >= NCPU true->扩容意向 改为false,表示不扩容了 false-> 说明cells数组还可以扩容
//条件二:cells != as true->其它线程已经扩容过了,当前线程rehash之后重试即可
else if (n >= NCPU || cells != as)
//扩容意向 改为false,表示不扩容了
collide = false; // At max size or stale
//CASE 1.5:
//!collide = true 设置扩容意向 为true 但是不一定真的发生扩容
else if (!collide)
collide = true;
//CASE 1.6:真正扩容的逻辑
//条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程可以去竞争这把锁
//条件二:casCellsBusy true->表示当前线程 获取锁 成功,可以执行扩容逻辑
// false->表示当前时刻有其它线程在做扩容相关的操作。
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//cells == as
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}

//重置当前线程Hash值,即rehash
h = advanceProbe(h);
}
//CASE2:前置条件cells还未初始化 as 为null
//条件一:true 表示当前未加锁
//条件二:cells == as?因为其它线程可能会在你给as赋值之后修改了 cells
//条件三:true 表示获取锁成功 会把cellsBusy = 1,false 表示其它线程正在持有这把锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
//cells == as? 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//CASE3:
//1.当前cellsBusy加锁状态,表示其它线程正在初始化cells,所以当前线程将值累加到base
//2.cells被其它线程初始化后,当前线程需要将数据累加到base
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

总体步骤如下:

image.png

小总结

AtomicLong 原理:CAS+自旋

场景:

  • 低并发下的全局计算
  • AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。

缺陷: 高并发后性能急剧下降,AtomicLong的自旋会成为瓶颈,N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。

LongAdder 原理:

  • CAS+Base+Cell数组分散
  • 空间换时间并分散了热点数据

场景:高并发下的全局计算

缺陷:sum求和后还有计算线程修改结果的话,最后结果不够准确

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%