目录
- 正文
- longAccumulate方法
- 线程hash值
- 初始化Cell数组
- 对base进行累加
- Cell数组初始化之后
正文
上篇文章 Java并发编程之LongAdder源码(一)中最后写到了有三种情况会执行longAccumulate
方法,下面就根据这三种情况来进行分析
- 当
Cell
数组为null
时,传入的三个参数为1,null,true
- 随机找到
Cell
数组某个索引位置的值为null
时,传入的三个参数为1,null,true
- 对
Cell
数组某个索引位置的值进行累加失败时,传入的三个参数为1,null,false
longAccumulate方法
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
代码较长,我们分段来分析,首先介绍一下各部分的内容
- 第一部分:
for
循环之前的代码,主要是获取线程的hash值,如果是0的话就强制初始化 - 第二部分:
for
循环中第一个if
语句,在Cell
数组中进行累加、扩容 - 第三部分:
for
循环中第一个else if
语句,这部分的作用是创建Cell
数组并初始化 - 第四部分:
for
循环中第二个else if
语句,当Cell
数组竞争激烈时尝试在base
上进行累加
线程hash值
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true; // true表示没有竞争
}
boolean collide = false; // True if last slot nonempty 可以理解为是否需要扩容
这部分的核心代码是getProbe
方法,这个方法的作用就是获取线程的hash
值,方便后面通过位运算定位到Cell
数组中某个位置,如果是0
的话就会进行强制初始化
初始化Cell数组
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 省略...
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// 省略...
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 获取锁
boolean init = false; // 初始化标志
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2]; // 创建Cell数组
rs[h & 1] = new Cell(x); // 索引1位置创建Cell元素,值为x=1
cells = rs; // cells指向新数组
init = true; // 初始化完成
}
} finally {
cellsBusy = 0; // 释放锁
}
if (init)
break; // 跳出循环
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
第一种情况下Cell
数组为null
,所以会进入第一个else if
语句,并且没有其他线程进行操作,所以cellsBusy==0
,cells==as
也是true
,casCellsBusy()
尝试对cellsBusy
进行cas
操作改成1
也是true
。
首先创建了一个有两个元素的Cell
数组,然后通过线程h & 1
的位运算在索引1
的位置设置一个value
为1
的Cell
,然后重新赋值给cells
,标记初始化成功,修改cellsBusy
为0
表示释放锁,最后跳出循环,初始化操作就完成了。
对base进行累加
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 省略...
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// 省略...
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 省略...
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
第二个else if
语句的意思是当Cell
数组中所有位置竞争都很激烈时,就尝试在base
上进行累加,可以理解为最后的保障
Cell数组初始化之后
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 省略...
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // as初始化之后满足条件
if ((a = as[(n - 1) & h]) == null) { // as中某个位置的值为null
if (cellsBusy == 0) { // Try to attach new Cell 是否加锁
Cell r = new Cell(x); // Optimistically create 创建新Cell
if (cellsBusy == 0 && casCellsBusy()) { // 双重检查是否有锁,并尝试加锁
boolean created = false; //
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) { // 重新检查该位置是否为null
rs[j] = r; // 该位置添加Cell元素
created = true; // 新Cell创建成功
}
} finally {
cellsBusy = 0; // 释放锁
}
if (created)
break; // 创建成功,跳出循环
continue; // Slot is now non-empty
}
}
collide = false; // 扩容标志
}
else if (!wasUncontended) // 上面定位到的索引位置的值不为null
wasUncontended = true; // 重新计算hash,重新定位其他索引位置重试
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) // 尝试在该索引位置进行累加
break;
else if (n >= NCPU || cells != as) // 如果数组长度大于等于CPU核心数,就不能在扩容
collide = false; // At max size or stale
else if (!collide) // 数组长度没有达到最大值,修改扩容标志可以扩容
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1]; // 创建一个原来长度2倍的数组
for (int i = 0; i < n; ++i)
rs[i] = as[i]; // 把原来的元素拷贝到新数组中
cells = rs; // cells指向新数组
}
} finally {
cellsBusy = 0; // 释放锁
}
collide = false; // 已经扩容完成,修改标志不用再扩容
continue; // Retry with expanded table
}
h = advanceProbe(h); // 重新获取hash值
}
// 省略...
}
根据代码中的注释分析一遍整体逻辑
- 首先如果找到数组某个位置上的值为
null
,说明可以在这个位置进行操作,就创建一个新的Cell
并初始化值为1
放到这个位置,如果失败了就重新计算hash
值再重试 - 定位到的位置已经有值了,说明线程之间产生了竞争,如果
wasUncontended
是false
就修改为true
,并重新计算hash
重试 - 定位的位置有值并且
wasUncontended
已经是true
,就尝试在该位置进行累加 - 当累加失败时,判断数组容量是否已经达到最大,如果是就不能进行扩容,只能
rehash
并重试 - 如果前面条件都不满足,并且扩容标志
collide
标记为false
的话就修改为true
,表示可以进行扩容,然后rehash
重试 - 首先尝试加锁,成功了就进行扩容操作,每次扩容长度是之前的
2
倍,然后把原来数组内容拷贝到新数组,扩容就完成了。