基本介绍
LongAdder
跟AtomicLong
都是用于计数器统计的,AtomicLong
底层通过CAS
操作进行计数,但是在高并发条件下性能比较低。
阿里的开发手册上明确说明:
LongAdder
的继承结构如下:
1 | java复制代码//LongAdder是Striped64的子类 |
Striped64
类中重要的属性如下:
1 | java复制代码abstract class Striped64 extends Number { |
Cell 是 java.util.concurrent.atomic 下 Striped64 的一个内部类,
LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。
即内部有一个base变量,一个Cell[]数组。
- base变量:非竞态条件下,直接累加到该变量上
- Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中
当计算总值调用sum()方法,sum源码如下:
1 | java复制代码/** |
LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。
源码分析
入口:longAdder.increment()
1 | java复制代码public void increment() { |
接着查看add方法如下:
1 | java复制代码public void add(long x) { |
条件递增,逐步解析,如下:
- 1.最初无竞争时只更新base;
- 2.如果更新base失败后,首次新建一个Cell[]数组
- 3.当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[]扩容
longAccumulate
入参说明如下:
只有三种情况会调用longAccumulate
方法
- 1 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
- 2 当前线程对应下标的cell为空,需要创建 longAccumulate 支持
- 3 cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]
longAccumulate
方法总纲如下:
上述代码首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:
- CASE1:Cell[]数组已经初始化
- CASE2:Cell[]数组未初始化(首次新建)
- CASE3:Cell[]数组正在初始化中
一开始刚刚要初始化Cell[]数组(首次新建),即未初始化过Cell[]数组,尝试占有锁并首次初始化cells数组。
1 | java复制代码//CASE2:前置条件cells还未初始化 as 为null |
如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素,value是x值,默认为1。h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思
兜底的else模块,即多个线程尝试CAS修改失败的线程会走到这个分支,如下:
该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。
当cell已经初始化了时,流程代码如下:
1 | java复制代码//CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中 |
当cell已经初始化了
- 1.如果当前线程对应的hash槽位为null时,通过cas创建cell,并将cell赋值,将cell存入到cells数组中
1 | java复制代码//CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell |
- 2.如果当前线程对应的cells数组中的槽位不为空null,并且已经尝试过cas操作修改值失败,即存在竞争。将
wasUncontended
改为true,接着调用最下面的h = advanceProbe(h);
重置当前线程Hash值,
1 | java复制代码// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false |
- 3.重置当前线程Hash值后,接着再次判断对应的cells数组中的槽位是否为空,如果为空,则将值存入到对应的槽位,如果不为空,则通过cas操作尝试能不能修改槽位的值。如果修改成功,则执行结束
1 | java复制代码else if (a.cas(v = a.value, ((fn == null) ? v + x : |
- 4.如果步骤3修改失败的话,就会将扩容意向
collide
的值置为true
1 | java复制代码else if (!collide) |
- 5.接着下次还是修改槽位的值不成功的话,最后会执行扩容操作。
1 | java复制代码else if (cellsBusy == 0 && casCellsBusy()) { |
完整代码如下:
1 | java复制代码//都有哪些情况会调用? |
总体步骤如下:
小总结
AtomicLong 原理:CAS+自旋
场景:
- 低并发下的全局计算
- AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。
缺陷: 高并发后性能急剧下降,AtomicLong的自旋会成为瓶颈,N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。
LongAdder 原理:
- CAS+Base+Cell数组分散
- 空间换时间并分散了热点数据
场景:高并发下的全局计算
缺陷:sum求和后还有计算线程修改结果的话,最后结果不够准确
本文转载自: 掘金