因为上篇文章HashMap已经讲解的很详细了,因此此篇文章会简单介绍思路,再学习并发HashMap就简单很多了,上一篇文章中我们最终知道HashMap
是线程不安全的,因此在老版本JDK中提供了HashTable
来实现多线程级别的,改变之处重要有以下几点。
HashTable
的put
,get
,remove
等方法是通过synchronized
来修饰保证其线程安全性的。HashTable
是 不允许key跟value为null的。- 问题是
synchronized
是个关键字级别的==重量锁==,在get数据的时候任何写入操作都不允许。相对来说性能不好。因此目前主要用的ConcurrentHashMap
来保证线程安全性。
ConcurrentHashMap
主要分为JDK<=7跟JDK>=8的两个版本,ConcurrentHashMap
的空间利用率更低一般只有10%~20%,接下来分别介绍。
JDK7
先宏观说下JDK7中的大致组成,ConcurrentHashMap由Segment
数组结构和HashEntry
数组组成。Segment是一种可重入锁,是一种数组和链表的结构,一个Segment中包含一个HashEntry数组,每个HashEntry又是一个链表结构。正是通过Segment==分段锁==,ConcurrentHashMap实现了高效率的并发。缺点是并发程度是有segment数组来决定的,并发度一旦初始化无法扩容。
先绘制个ConcurrentHashMap
的形象直观图。
要想理解currentHashMap
,可以简单的理解为将数据分表分库。 ConcurrentHashMap
是由 Segment
数组 结构和HashEntry
数组 结构组成。
- Segment 是一种可重入锁
ReentrantLock
的子类 ,在ConcurrentHashMap
里扮演锁的角色,HashEntry
则用于存储键值对数据。ConcurrentHashMap
里包含一个Segment
数组来实现锁分离,Segment
的结构和HashMap
类似,一个Segment
里包含一个HashEntry
数组,每个HashEntry
是一个链表结构的元素, 每个Segment
守护者一个HashEntry
数组里的元素,当对HashEntry
数组的数据进行修改时,必须首先获得它对应的Segment
锁。
- 我们先看下segment类:
1 | 复制代码static final class Segment<K,V> extends ReentrantLock implements Serializable { |
可以理解为我们的每个segment
都是实现了Lock
功能的HashMap
。如果我们同时有多个segment
形成了segment
数组那我们就可以实现并发咯。
- 我们看下
currentHashMap
的构造函数,先总结几点。
- segment的数组大小最终一定是2的次幂
- 每一个segment里面包含的table(HashEntry数组)初始化大小也一定是2的次幂
- 这里设置了若干个用于位计算的参数。
- initialCapacity:初始容量大小 ,默认16。
- loadFactor: 扩容因子,默认0.75,当一个Segment存储的元素数量大于initialCapacity* loadFactor时,该Segment会进行一次扩容。
- concurrencyLevel:并发度,默认16。并发度可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。
构造函数详解:
1 | 复制代码 //initialCapacity 是我们保存所以KV数据的初始值 |
- hash
不管是我们的get操作还是put操作要需要通过hash来对数据进行定位。
1 | 复制代码 // 整体思想就是通过多次不同方式的位运算来努力将数据均匀的分不到目标table中,都是些扰动函数 |
- get
相对来说比较简单,无非就是通过hash
找到对应的segment
,继续通过hash
找到对应的table
,然后就是遍历这个链表看是否可以找到,并且要注意get
的时候是==没有加锁==的。
1 | 复制代码 public V get(Object key) { |
- put
相同的思路,先找到hash
值对应的segment
位置,然后看该segment
位置是否初始化了(因为segment是==懒加载==模式)。选择性初始化,最终执行put操作。
1 | 复制代码 @SuppressWarnings("unchecked") |
其中put
操作基本思路跟HashMap
几乎一样,只是在开始跟结束进行了加锁的操作tryLock and unlock
,然后JDK7中都是先扩容再添加数据的,并且获得不到锁也会进行==自旋==的tryLock或者lock阻塞排队进行等待(同时获得锁前提前new出新数据)。
1 | 复制代码final V put(K key, int hash, V value, boolean onlyIfAbsent) { |
如果加锁失败了调用scanAndLockForPut
,完成查找或新建节点的工作。当获取到锁后直接将该节点加入链表即可,提升了put操作的性能,这里涉及到==自旋==。大致过程:
- 在我获取不到锁的时候我进行tryLock,准备好new的数据,同时还有一定的次数限制,还要考虑别的已经获得线程的节点修改该头节点。
1 | 复制代码private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { |
- Size
这个size方法比较有趣,他是先无锁的统计下所有的数据量看下前后两次是否数据一样,如果一样则返回数据,如果不一样则要把全部的segment进行加锁,统计,解锁。并且size方法只是返回一个统计性的数字,因此size谨慎使用哦。
1 | 复制代码public int size() { |
- rehash
segment
数组初始化后就不可变了,也就是说并发性不可变,不过segment
里的table
可以扩容为2倍,该方法没有考虑并发,因为执行该方法之前已经获取了锁。其中JDK7中的rehash
思路跟JDK8 中扩容后处理链表的思路一样,个人不过感觉没有8写的精髓好看。
1 | 复制代码// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。 |
- CAS操作
在JDK7里在ConcurrentHashMap
中通过原子操作sun.misc.Unsafe
查找元素、替换元素和设置元素。通过这样的硬件级别获得数据可以保证及时是多线程我也每次获得的数据是最新的。这些原子操作起着非常关键的作用,你可以在所有ConcurrentHashMap
的基本功能中看到,随机距离如下:
1 | 复制代码 final void setNext(HashEntry<K,V> n) { |
常见问题
- ConcurrentHashMap实现原理是怎么样的或者ConcurrentHashMap如何在保证高并发下线程安全的同时实现了性能提升?
ConcurrentHashMap
允许多个修改操作并发进行,其关键在于使用了==锁分离==技术。它使用了多个锁来控制对hash表的不同部分进行的修改。内部使用段(Segment
)来表示这些不同的部分,每个段其实就是一个小的HashTable
,只要多个修改操作发生在不同的段上,它们就可以并发进行。
- 在高并发下的情况下如何保证取得的元素是最新的?
用于存储键值对数据的
HashEntry
,在设计上它的成员变量value跟next
都是volatile
类型的,这样就保证别的线程对value值的修改,get方法可以马上看到。
- ConcurrentHashMap的弱一致性体现在迭代器,clear和get方法,原因在于没有加锁。
- 比如迭代器在遍历数据的时候是一个Segment一个Segment去遍历的,如果在遍历完一个Segment时正好有一个线程在刚遍历完的Segment上插入数据,就会体现出不一致性。clear也是一样。
- get方法和containsKey方法都是遍历对应索引位上所有节点,都是不加锁来判断的,如果是修改性质的因为可见性的存在可以直接获得最新值,不过如果是新添加值则无法保持一致性。
JDK8
JDK8相比与JDK7主要区别如下:
- 取消了segment数组,直接用table保存数据,锁的粒度更小,减少并发冲突的概率。采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率,并发控制使用Synchronized和CAS来操作。
- 存储数据时采用了数组+ 链表+红黑树的形式。
- CurrentHashMap重要参数:
private static final int MAXIMUM_CAPACITY = 1 << 30; // 数组的最大值
private static final int DEFAULT_CAPACITY = 16; // 默认数组长度
static final int TREEIFY_THRESHOLD = 8; // 链表转红黑树的一个条件
static final int UNTREEIFY_THRESHOLD = 6; // 红黑树转链表的一个条件
static final int MIN_TREEIFY_CAPACITY = 64; // 链表转红黑树的另一个条件
static final int MOVED = -1; // 表示正在扩容转移
static final int TREEBIN = -2; // 表示已经转换成树
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // 获得hash值的辅助参数
transient volatile Node<K,V>[] table;// 默认没初始化的数组,用来保存元素
private transient volatile Node<K,V>[] nextTable; // 转移的时候用的数组
static final int NCPU = Runtime.getRuntime().availableProcessors();// 获取可用的CPU个数
private transient volatile Node<K,V>[] nextTable; // 连接表,用于哈希表扩容,扩容完成后会被重置为 null
private transient volatile long baseCount;保存着整个哈希表中存储的所有的结点的个数总和,有点类似于 HashMap 的 size 属性。
private transient volatile intsizeCtl
;
负数:表示进行初始化或者扩容,-1:表示正在初始化,-N:表示有 N-1 个线程正在进行扩容
正数:0 表示还没有被初始化,> 0的数:初始化或者是下一次进行扩容的阈值,有点类似HashMap中的threshold
,不过功能更强大。
- 若干重要类
- 构成每个元素的基本类
Node
1 | 复制代码 static class Node<K,V> implements Map.Entry<K,V> { |
- TreeNode继承于Node,用来存储红黑树节点
1 | 复制代码 static final class TreeNode<K,V> extends Node<K,V> { |
- ForwardingNode
在 Node 的子类ForwardingNode
的构造方法中,可以看到此变量的hash = -1 ,类中还存储nextTable
的引用。该初始化方法只在transfer
方法被调用,如果一个类被设置成此种情况并且hash = -1 则说明该节点不需要resize了。
1 | 复制代码static final class ForwardingNode<K,V> extends Node<K,V> { |
- TreeBin
TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制.
1 | 复制代码static final class TreeBin<K,V> extends Node<K,V> { |
构造函数
整体的构造情况基本跟HashMap类似,并且为了跟原来的JDK7中的兼容性还可以传入并发度。不过JDK8中并发度已经有table的具体长度来控制了。
- ConcurrentHashMap(): 创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射
- ConcurrentHashMap(int):创建一个带有指定初始容量
tableSizeFor
、默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射- ConcurrentHashMap(Map<? extends K, ? extends V> m):构造一个与给定映射具有相同映射关系的新映射
- ConcurrentHashMap(int initialCapacity, float loadFactor):创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (1) 的新的空映射
- ConcurrentHashMap(int, float, int):创建一个带有指定初始容量、加载因子和并发级别的新的空映射。
put
假设table已经初始化完成,put操作采用 CAS + synchronized 实现并发插入或更新操作,具体实现如下。
- 做一些边界处理,然后获得hash值。
- 没初始化就初始化,初始化后看下对应的桶是否为空,为空就原子性的尝试插入。
- 如果当前节点正在扩容还要去帮忙扩容,骚操作。
- 用
syn
来加锁当前节点,然后操作几乎跟就跟hashmap一样了。
1 | 复制代码 |
涉及到重要函数initTable
、tabAt
、casTabAt
、helpTransfer
、putTreeVal
、treeifyBin
、addCount
函数。
initTable
只允许一个线程对表进行初始化,如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度Thread.yield
。这样,保证了表同时只会被一个线程初始化,对于table的大小,会根据sizeCtl
的值进行设置,如果没有设置szieCtl的值,那么默认生成的table大小为16,否则,会根据sizeCtl
的大小设置table大小。
1 | 复制代码// 容器初始化 操作 |
unsafe
在ConcurrentHashMap
中使用了unSafe
方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是==硬件==的安全机制。
1 | 复制代码 // 用来返回节点数组的指定位置的节点的原子操作 |
可以看到获得table[i]数据是通过Unsafe
对象通过反射获取的,取数据直接table[index]不可以么,为什么要这么复杂?在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile
修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
helpTransfer
1 | 复制代码// 可能有多个线程在同时帮忙运行helpTransfer |
- Integer.numberOfLeadingZeros(n)
该方法的作用是返回无符号整型i的最高非零位前面的0的个数,包括符号位在内;
如果i为负数,这个方法将会返回0,符号位为1.
比如说,10的二进制表示为 0000 0000 0000 0000 0000 0000 0000 1010
java的整型长度为32位。那么这个方法返回的就是28
- resizeStamp
主要用来获得标识符,可以简单理解是对当前系统容量大小的一种监控。
1 | 复制代码static final int resizeStamp(int n) { |
addCount
主要就2件事:一是更新 baseCount,二是判断是否需要扩容。
1 | 复制代码private final void addCount(long x, int check) { |
- baseCount添加
ConcurrentHashMap
提供了baseCount
、counterCells
两个辅助变量和一个CounterCell
辅助内部类。sumCount() 就是迭代counterCells
来统计 sum 的过程。 put 操作时,肯定会影响size()
,在put()
方法最后会调用addCount()
方法。整体的思维方法跟LongAdder类似,用的思维就是借鉴的ConcurrentHashMap
。每一个Cell
都用Contended修饰来避免伪共享。
- JDK1.7 和 JDK1.8 对 size 的计算是不一样的。 1.7 中是先不加锁计算三次,如果三次结果不一样在加锁。
- JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。
- JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值。
- 关于扩容
在addCount
第一次扩容时候会有骚操作sc=rs << RESIZE_STAMP_SHIFT) + 2)
其中rs = resizeStamp(n)
。这里需要核心说一点,
如果不是第一次扩容则直接将低16位的数字 +1 即可。
putTreeVal
这个操作几乎跟HashMap
的操作完全一样,核心思想就是一定要决定向左还是向右然后最终尝试放置新数据,然后balance。不同点就是有锁的考虑。
treeifyBin
这里的基本思路跟HashMap
几乎一样,不同点就是先变成TreeNode,然后是单向链表串联。
1 | 复制代码private final void treeifyBin(Node<K,V>[] tab, int index) { |
TreeBin
主要功能就是链表变化为红黑树,这个红黑树用TreeBin
来包装。并且要注意 转成红黑树以后以前链表的结构信息还是有的,最终信息如下:
- TreeBin.first = 链表中第一个节点。
- TreeBin.root = 红黑树中的root节点。
1 | 复制代码TreeBin(TreeNode<K,V> b) { |
tryPresize
当数组长度小于64的时候,扩张数组长度一倍,调用此函数。扩容后容量大小的核对,可能涉及到初始化容器大小。并且扩容的时候又跟2的次幂联系上了!,其中初始化时候传入map会调用putAll方法直接put一个map的话,在putAll方法中没有调用initTable方法去初始化table,而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断。
PS: 默认第一个线程设置 sc = rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1,这个时候说明扩容完毕了。
1 | 复制代码 /** |
transfer
这里代码量比较大主要分文三部分,并且感觉思路很精髓,尤其是其他线程帮着去扩容的骚操作。
- 主要是 单个线程能处理的最少桶结点个数的计算和一些属性的初始化操作。
- 每个线程进来会先领取自己的任务区间
[bound,i]
,然后开始 –i 来遍历自己的任务区间,对每个桶进行处理。如果遇到桶的头结点是空的,那么使用ForwardingNode
标识旧table中该桶已经被处理完成了。如果遇到已经处理完成的桶,直接跳过进行下一个桶的处理。如果是正常的桶,对桶首节点加锁,正常的迁移即可(跟HashMap第三部分一样思路),迁移结束后依然会将原表的该位置标识位已经处理。
该函数中的finish= true
则说明整张表的迁移操作已经全部完成了,我们只需要重置 table
的引用并将 nextTable
赋为空即可。否则,CAS
式的将 sizeCtl
减一,表示当前线程已经完成了任务,退出扩容操作。如果退出成功,那么需要进一步判断当前线程是否就是最后一个在执行扩容的。
1 | 复制代码f ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) |
第一次扩容时在addCount
中有写到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
表示当前只有一个线程正在工作,相对应的,如果 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
,说明当前线程就是==最后一个==还在扩容的线程,那么会将 finishing 标识为 true,并在下一次循环中退出扩容方法。
- 几乎跟
HashMap
大致思路类似的遍历链表/红黑树然后扩容操作。
1 | 复制代码private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { |
get
这个就很简单了,获得hash值,然后判断存在与否,遍历链表即可,注意get没有任何锁操作!
1 | 复制代码 public V get(Object key) { |
clear
关于清空也相对简单 ,无非就是遍历桶数组,然后通过CAS来置空。
1 | 复制代码public void clear() { |
end
ConcurrentHashMap是如果来做到并发安全,又是如何做到高效的并发的呢?
- 首先是读操作,读源码发现get方法中根本没有使用同步机制,也没有使用
unsafe
方法,所以读操作是支持并发操作的。 - 写操作
- . 数据扩容函数是
transfer
,该方法的只有addCount
,helpTransfer
和tryPresize
这三个方法来调用。
- addCount是在当对数组进行操作,使得数组中存储的元素个数发生了变化的时候会调用的方法。
helpTransfer
是在当一个线程要对table中元素进行操作的时候,如果检测到节点的·hash·= MOVED 的时候,就会调用helpTransfer
方法,在helpTransfer
中再调用transfer
方法来帮助完成数组的扩容
tryPresize
是在treeIfybin
和putAll
方法中调用,treeIfybin
主要是在put
添加元素完之后,判断该数组节点相关元素是不是已经超过8个的时候,如果超过则会调用这个方法来扩容数组或者把链表转为树。注意putAll
在初始化传入一个大map的时候会调用。·
总结扩容情况发生:
- 在往map中添加元素的时候,在某一个节点的数目已经超过了8个,同时数组的长度又小于64的时候,才会触发数组的扩容。
- 当数组中元素达到了sizeCtl的数量的时候,则会调用transfer方法来进行扩容
- 扩容时候是否可以进行读写。
对于读操作,因为是没有加锁的所以可以的.
对于写操作,JDK8中已经将锁的范围细腻到table[i]
l了,当在进行数组扩容的时候,如果当前节点还没有被处理(也就是说还没有设置为==fwd==节点),那就可以进行设置操作。如果该节点已经被处理了,则当前线程也会==加入==到扩容的操作中去。
- 多个线程又是如何同步处理的
在ConcurrentHashMap
中,同步处理主要是通过Synchronized
和unsafe
的硬件级别原子性 这两种方式来完成的。
- 在取得sizeCtl跟某个位置的Node的时候,使用的都是
unsafe
的方法,来达到并发安全的目的- 当需要在某个位置设置节点的时候,则会通过
Synchronized
的同步机制来锁定该位置的节点。- 在数组扩容的时候,则通过处理的
步长
和fwd
节点来达到并发安全的目的,通过设置hash值为MOVED=-1。- 当把某个位置的节点复制到扩张后的table的时候,也通过
Synchronized
的同步机制来保证线程安全
套路
- 谈谈你理解的 HashMap,讲讲其中的 get put 过程。
- 1.8 做了什么优化?
- 是线程安全的嘛?
- 不安全会导致哪些问题?
- 如何解决?有没有线程安全的并发容器?
- ConcurrentHashMap 是如何实现的? 1.7、1.8 实现有何不同,为什么这么做。
- 1.8中ConcurrentHashMap的sizeCtl作用,大致说下协助扩容跟标志位。
- HashMap 为什么不用跳表替换红黑树呢?
参考
CurrentHashMap之transfer
CurrentHashMap详细
LongAdder原理解析
本文使用 mdnice 排版
本文转载自: 掘金