ConcurrentHashMap的扩容方法transfer

主要细节问题:

  1. 什么时候触发扩容?扩容阈值是多少?
  2. 扩容时的线程安全怎么做的?
  3. 其他线程怎么感知到扩容状态,从而一起进行扩容?
  4. 多个线程一起扩容时,怎么拆分任务,是不是任务粒度越小越好?
  5. ConcurrentHashMap.get(key)方法是没有加锁的,怎么保证在这个扩容过程中,其他线程的get(key)方法能获取到正确的值,不出现线程安全问题?

魔鬼在细节里,一起看下源码,然后回答下上面的细节问题,先看下触发扩容的代码,在往map中put新数据后会调用这个addCount(long x, int check)方法,计算当前map的容量,当容量达到扩容阈值时会触发扩容逻辑。

触发扩容源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码private final void addCount(long x, int check) {
// 借用Longadder的设计思路来统计map的当前容量,减少锁竞争,详细见下面的分析
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// s是当前的容量,sizeCtl是扩容阈值,当当前容量大于扩容阈值时,触发扩容,在扩容逻辑被触发前sizeCtl是正数,表示的下次触发扩容的阈值,
// 而在进入扩容逻辑后,sizeCtl会变成负数,并且sizeCtl是32位的int类型,高16位是扩容的邮戳,低16位是同时进行扩容时的线程数,在某个线程进入扩容时会修改sizeCtl值,在下面的代码里能看到这个修改逻辑
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 生成当前扩容流程邮戳,n是当前数组的长度,当n相同时,邮戳肯定也是一样的,计算逻辑详见下面分析1
int rs = resizeStamp(n);
// sc小于0 标识已经在扩容中,因为在触发扩容时会通过CAS修改sc这个值
// sc值是int类型,32位,低16位记录了当前进行扩容的线程数,高16位就是邮戳,这个逻辑见下面分析2
if (sc < 0) {
// (sc >>> RESIZE_STAMP_SHIFT) != rs 说明不是同一个扩容流程,放弃扩容
// sc == rs + 1 || sc == rs + MAX_RESIZERS 这个条件是个bug,应该写成(sc >>> RESIZE_STAMP_SHIFT) == rs + 1 || (sc >>> RESIZE_STAMP_SHIFT) == rs + MAX_RESIZERS
// 在jdk12中已被修复,oracle官网修复链接是 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427
// (nt = nextTable) == null 有线程去扩容时,必然会生成nextTable,所以这里不需要处理
// transferIndex <= 0 transferIndex是标记当前还未迁移的桶的下标,如果小于等于0,则表示已经迁移完,不需要做处理
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 在进入扩容流程后会将sizeCtl值+1,sizeCtl的低16位表示当前并发扩容的线程数
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// sc大于0 则表示没有进入扩容逻辑,则通过CAS将sizeCtl值修改成 (rs << RESIZE_STAMP_SHIFT) + 2
// 这里的rs就是上面生成的扩容邮戳,这里会将rs向左位移16位,这样低16位用来记录并发扩容的线程数,高16位用来表示扩容邮戳,至于为什么要+2,我也没有理解...
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
  • 容量计算逻辑:
    在计算map容量的逻辑里借用了LongAdder的思想,如果ConcurrentHashMap里用一个size来记录当前容量的话,那么在并发put时,所有的线程都回去竞争修改这个变量,竞争会非常激烈,性能低下,那么LongAdder的思路是降低锁的粒度,我维护一个Long的数组,多个线程并发修改时,选取数组中没有被占用的Long进行加减,最后计算结果时我将数组内的数字加起来近就行了,这样就提升了数倍的吞吐,减少锁竞争的改了,所以这里也是一样维护了一个CounterCell数组,CounterCell类里就是一个long属性,当调用size()方法获取当前容量时,只需要将这个数组里的所有值加起来就行了,CounterCell代码如下:
1
2
3
4
java复制代码static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
  • 扩容邮戳计算逻辑:
1
2
3
4
5
6
java复制代码int rs = resizeStamp(n); 


static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

Integer.numberOfLeadingZeros(n)是计算n的二进制下高位连续0的个数,比如n转成二进制后是0000 0000 0000 1000 1111 0101 0110 1101,那么得到的结果就是12,因为高位是12个连续的0,而(1 << (RESIZE_STAMP_BITS - 1))RESIZE_STAMP_BITS的值是16,所以最后的结果转成二进制是1000 0000 0001 1100

这里有三个关键点:

  1. sizeCtl在没有触发扩容时,是用来表示扩容阈值的,这时候sizeCtl是个正数,当map内数据数量达到这个阈值时,会触发扩容逻辑
  2. 当某个线程触发扩容时,会通过CAS修改sizeCtl值,修改的逻辑是将上面生成的扩容邮戳向左位移16位,然后+2,这时候由于符号位是1(因为邮戳的算法决定了把邮戳向左位移16位后,符号位是1),所以sizeCtl一定是个负数,也正是由于是cas操作,所以只会有一个线程cas成功并开启扩容流程,不会有多个扩容流程被开启。
  3. 当sizeCtl为负数时,说明在扩容中,这时候其他线程可以一起扩容,需要先通过cas将sizeCtl+1,这样可以通过sizeCtl的低16位来判断有多少并发线程在一起做扩容,从而判断哪个线程最后完成扩容,然后做收尾工作,这个收尾工作包括将当前对象的table指向新表,将sizeCtl重新设置成表示扩容阈值的正数等

下面看下扩容源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
java复制代码// tab是旧表,nextTab是一个两倍容量的空的新表
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// stide是步长的意思,会将旧表按照这个步长进行分段,在并发扩容时,每个线程只负责自己段内数据的转移
int n = tab.length, stride;
// NCPU是当前系统的内核数,如果内核数只有一个,那么就不需要进行并发扩容,因为扩容是个纯计算密集型的逻辑,只有一个核心的时候反而得不偿失,因为无法真正的并发,反而会额外付出线程上下文切换的开销
// 这里步长最小是16,也就是说每个线程最少要负责16个桶的数据迁移,这个值设置的太小会导致并发线程数增多,从而导致线程间的竞争变大,这个竞争是只下面的一些CAS逻辑,比如对transferIndex、sizeCtl变量的cas操作
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 如果新表没有初始化,则新建一个双倍容量的新表
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 设置【开始数据转移】的桶的下标,从尾部的桶开始往头部逐个处理,将transferIndex设置为老表的length(比最后一个桶的下标大1,所以后面的代码会-1)
transferIndex = n;
}
int nextn = nextTab.length;
// 生成用于表示【扩容中】状态的节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 线程每次根据步长从数组上截取一段桶,如果线程处理完自己截取的一段内的桶后,还有未处理的数据,则需要重新从数组上截取一段来处理
// true则标识当前线程需要继续在老表的数组上截取新的一段桶来处理数据(可能没有线程来帮忙,就只能自己一个人干完了)
boolean advance = true;
// 标记是否已结束扩容,做收尾工作
boolean finishing = false; // to ensure sweep before committing nextTab
// i是当前线程需要转移的桶的下标,bound是当前线程负责的桶的最小下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 这个while的逻辑就是为了检查当前线程负责的段内的桶是否都处理完毕,如果处理完毕,则看下老表的数据里是否还有未处理的桶,如果有未处理的桶,则再次截取一段来处理
while (advance) {
int nextIndex, nextBound;
// 如果下一个桶的下标(--i是下一个需要操作的桶的下标)还在自己负责的段内,就不需要截取新段了,就继续处理下一个桶的数据
// 如果已经结束,则不需要继续截取新的段
if (--i >= bound || finishing)
advance = false;
// transferIndex用来表示这个下标及其后面的桶都已经被其他线程处理了,新的线程需要从transferIndex往前截取自己需要负责的桶,如果transferIndex小于等于0说明桶都已经转移完毕,不需要再处理了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 以nextIndex(在上面已经赋值为transferIndex)为起始位置,往数组头部方向截取相应步长的段来转移数据,通过cas将transferIndex设置到新的下标
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// cas成功后,设置当前线程负责的下标边界(比如负责下标32到48的桶,那么这个bound就是32)
bound = nextBound;
// cas成功后,设置当前线程开始处理的桶的下标(比如负责下标32到48的桶,那么这个i就是48)
// transferIndex默认是从tab.length开始取值,所以要减1来表示正确的下标
i = nextIndex - 1;
// cas成功则表示当前线程已经成功截取了自己需要负责的一段数据了,不需要再往前截取了
advance = false;
}
}
// i是需要转移的桶的下标,n是老表的容量
// i<0说明旧表中的桶都已经转移完毕
// i>=n|| i + n >= nextn 不是很明白这个判断条件,正常情况下,i作为开始转移的桶的下标肯定会小于老表的容量的,因为转移的是老表内的桶
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 判断是否已经完成扩容,已完成扩容则做收尾逻辑
if (finishing) {
// 完成扩容后,将引用设置为null
nextTable = null;
// 将table引用指向新表,这里的table是个volatile变量,所以这个赋值操作对其他线程是可见的
table = nextTab;
// 设置新的扩容阈值,将阈值设置为新容量的3/4
// 这里的n是老表的容量,因为是双倍扩容,所以新表容量是2n,下面计算的结果是2n-0.5n = 1.5n,也就是新表容量的3/4
sizeCtl = (n << 1) - (n >>> 1);
// 返回结果,扩容结束
return;
}
// 在扩容开始时,会将sizeCtl设置成一个负数,每次有新的线程并发扩容时,会将sizeCtl+1,而当有线程处理完扩容逻辑后,再减1,以此来判断是否是最后一个线程
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// cas成功,则判断当前线程是不是最后一个完成扩容的线程,由最后一个完成扩容逻辑的线程将finishing和advance设为true,重新循环到上面的if(finishing)里的收尾逻辑
// 这里减2是因为在执行扩容的入口处,第一个触发扩容的线程会负责将sc加2,至于为什么第一个扩容的线程要加2,而不是加1,这个没理解
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果是空桶,则在老表对应的下标出放一个ForwardingNode,在有别的线程往这个空桶写数据时会感知到扩容过程,一起来扩容
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// ForwardingNode节点的hash值就是MOVED(在ForwardingNode的构造方法里会设置hash值为MOVED),说明已经有线程处理了这个桶内的数据
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 在桶上加锁,防止对同一个桶数据并发操作
synchronized (f) {
// 有点双重校验锁的味道,防止获得锁后,该桶内数据被别的线程插入了新的数据,因为这个f是在未加锁之前获取的node对象,在这期间,可能这个下标处插入了新数据
// 比如有别的线程调用了put方法往这个桶内链表插入新节点,这时候这个桶的node就变成了新插入的数据node(put操作会生成新的node,并将新node的next引用指向原node)
// 如果不做这层校验,会导致新加入到桶内的数据没有被处理,导致数据丢失
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh>=0表示是正常的链表
if (fh >= 0) {
// 这里需要注意的是在put操作里是通过hash&(n-1)来选取下标位置,表容量n都是2的幂,所以这里hash&n的结果只有两个要么是n要么是0
// 值为0时的节点在新表的i下标出,而值为n的节点需要迁移到新表的i+n下标下,因为是双倍扩容
// 所以老表下标为i的桶内的数据在迁移rehash时,一半仍然在下标为i的桶内,另一半在i+n的桶内,不会出现第三种情况
int runBit = fh & n;
Node<K,V> lastRun = f;
// 这个循环的目的有两个
// 1、遍历出整个链表尾部不需要改变next指针的最长链,这样可以将这个链整个搬到新桶内,不用再逐个遍历了
// 2、由于是将老的完整节点链条搬到新桶内,所以也就不需要创建新的node节点,减少迁移过程中的gc压力
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 逐个遍历节点,0表示仍然放到下标为i的桶内的链表
// 这里每次都是生成新的node对象而不修改原node对象的next指针,这也是get()方法不用加锁的关键所在
// 但是会带来gc压力,所以才有上面的那次遍历,希望减少对象的创建
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
// 否则就是放到下标为i+n的桶内的链表
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 设置新表的下标为i的桶内数据
setTabAt(nextTab, i, ln);
// 设置新表的下标为i+n的桶内数据
setTabAt(nextTab, i + n, hn);
// 将老表下标为i的桶内放上ForwardingNode对象,用来标识当前处于扩容过程
setTabAt(tab, i, fwd);
// 处理完后,将这个字段设置为true,以便走到上面的while(advance)里检查当前线程负责的数据是否处理完成,并且查看是否需要截取新段
advance = true;
}
// 红黑树结构的迁移,逻辑与链表差不多,也是将整棵树拆成两颗,一棵树放到下标为i的桶内,一棵放到下标i+n的桶内
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

看到这里可以回答上面的问题:

  1. 什么时候触发扩容?扩容阈值是多少?

在每次往map中put数据时会重新计算map中的size,如果达到扩容阈值就会触发扩容逻辑,扩容因子是0.75,即:当容量达到总容量的3/4时,触发扩容
2. 扩容时的线程安全怎么做的?
这个要细化下线程安全的场景,分为下面几种:

* 扩容线程与扩容线程间的并发场景
扩容线程和扩容线程间在进行任务分配时,是从数组尾部往头部以桶为单位截取,并且用来标记已分配区域的指针`transferIndex`是`volatile`修饰的,所以线程间是可见的,通过cas来修改`transferIndex`值,保证线程间没有重复的桶
![在这里插入图片描述](https://gitee.com/songjianzaina/juejin_p16/raw/master/img/8c2b1842d408c8e05732f9bd96044e1aed7738a224ebb74e4b794bea811185b6)
* 扩容线程与写线程的并发场景
这里要分两种情况:
一、在触发扩容流程时,需要通过CAS将sizeCtl从正数改成负数,并且+2,这样只会有一个线程cas成功,避免其他的写线程也触发扩容流程。
二、怎么避免其他写线程往处于扩容中、扩容完毕的桶里写数据,因为扩容线程是遍历桶内的链表或者B树来rehash,如果往已经遍历的链表或者B树中插入新数据,扩容线程是无法感知到的,会导致新表中没有这些数据,这个要结合`put(k,v)`方法来说,对于空桶来说,不管是put操作还是扩容操作,都是通过cas操作来往空桶中添加数据,所以在出现并发往空桶写时,只会有一个线程成功,而不管是put的线程失败还是扩容的线程失败时,都会重新获取里面的值,再重新触发对应的put或者扩容逻辑,避免并发问题,`put(k,v)`代码截图如下:
![在这里插入图片描述](https://gitee.com/songjianzaina/juejin_p16/raw/master/img/81f8aecf596a898c4d192402ab3e3b8b483299612bc5e2e32cf31e8497cb4960)
对于有数据的桶,put操作和扩容操作都是通过`synchronized`在桶上加锁来避免并发写问题。
* 扩容线程与读线程之间的并发场景
这个在第5个问题里解答
  1. 其他线程怎么感知到扩容状态,从而一起进行扩容?
    在对某个桶进行扩容时候,在完成扩容后会生成一个ForwardingNode放在老表的对应下标的位置下,当有其他线程修改这个桶内数据时,如果发现这个类型的节点,就会一起进行扩容,put(k,v)代码截图如下:
    在这里插入图片描述
  2. 多个线程一起扩容时,怎么拆分任务,是不是任务粒度越小越好?
    扩容任务是一个纯计算逻辑的任务,所以会根据机器内核数来决定,如果只有一个核,则由一个线程处理完就行了,这时候引入多线程扩容反而会引入上线文切换开销,同时源码里设置的每个线程负责的桶的数量最少是16,因为粒度太小的话,会导致线程的cas操作竞争太多,比如对transferIndex、sizeCtl变量的cas操作
  3. ConcurrentHashMap.get(key)方法是没有加锁的,怎么保证在这个扩容过程中,其他线程的get(key)方法能获取到正确的值,不出现线程安全问题?
    线程安全场景是,get(key)在获取下标后,数组可能已经扩容,数据被rehash,这时候通过老的下标可能会取不到值,这里是用读写分离的思路解决,这个读写分离在迁移桶内数据过程中、迁移桶内数据完毕、整个扩容完成 三个阶段都能体现出来:
* 在转移桶内数据时,不移动桶内数据并且不修改桶内数据的next指针,而是new一个新的node对象放到新表中,这样不会导致读取数据的线程在遍历链表时候因为next引用被更改而查询不到数据;
* 在桶内数据迁移完后,在原table的桶内放一个`ForwardingNode`节点,通过这个节点的`find(k)`方法能获取到对应的数据;
* 在整个库容完成后,将新表引用赋值给`volatile`的变量table,这样更新引用的动作对其他线程可见;从而保证在这三个过程中都能读取到正确的值。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%