上篇文章谈到BlockingQueue的使用场景,并重点分析了ArrayBlockingQueue的实现原理,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列。
但是BlockingQueue的实现类中,有一种阻塞队列比较特殊,就是 Synchronous Queue(同步移交队列),队列长度为0。
作用就是一个 线程 往队列放数据的时候,必须等待另一个线程从队列中取走数据。同样,从队列中取数据的时候,必须等待另一个线程往队列中放数据。
这样特殊的队列,有什么应用场景呢?
1. SynchronousQueue用法
先看一个SynchronousQueue的简单用例:
/** | |
* @author 一灯架构 | |
* @apiNote SynchronousQueue示例 | |
**/public class SynchronousQueueDemo { | |
public static void main(String[] args) throws Interrupted Exception { | |
//. 创建SynchronousQueue队列 | |
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(); | |
//. 启动一个线程,往队列中放3个元素 | |
new Thread(() -> { | |
try { | |
System.out.println(Thread.currentThread().getName() + " 入队列"); | |
synchronousQueue.put(); | |
Thread.sleep(); | |
System.out.println(Thread.currentThread().getName() + " 入队列"); | |
synchronousQueue.put(); | |
Thread.sleep(); | |
System.out.println(Thread.currentThread().getName() + " 入队列"); | |
synchronousQueue.put(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
//. 等待1000毫秒 | |
Thread.sleep(L); | |
//. 再启动一个线程,从队列中取出3个元素 | |
new Thread(() -> { | |
try { | |
System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); | |
Thread.sleep(); | |
System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); | |
Thread.sleep(); | |
System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
} | |
} |
输出结果:
Thread- 入队列 1 | |
Thread- 出队列 1 | |
Thread- 入队列 2 | |
Thread- 出队列 2 | |
Thread- 入队列 3 | |
Thread- 出队列 3 |
从输出结果中可以看到,第一个线程Thread-0往队列放入一个元素1后,就被阻塞了。直到第二个线程Thread-1从队列中取走元素1后,Thread-0才能继续放入第二个元素2。
由于SynchronousQueue是BlockingQueue的实现类,所以也实现类BlockingQueue中几组抽象方法:
为了满足不同的使用场景,BlockingQueue设计了很多的放数据和取数据的方法。
操作 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞一段时间 |
放数据 | add | offer | put | offer(e, time, unit) |
取数据 | remove | poll | take | poll(time, unit) |
查看数据(不删除) | element() | peek() | 不支持 | 不支持 |
这几组方法的不同之处就是:
- 当队列满了,再往队列中放数据,add方法抛异常,offer方法返回false,put方法会一直阻塞(直到有其他线程从队列中取走数据),offer(e, time, unit)方法阻塞指定时间然后返回false。
- 当队列是空,再从队列中取数据,remove方法抛异常,poll方法返回null,take方法会一直阻塞(直到有其他线程往队列中放数据),poll(time, unit)方法阻塞指定时间然后返回null。
- 当队列是空,再去队列中查看数据(并不删除数据),element方法抛异常,peek方法返回null。
工作中使用最多的就是offer、poll阻塞指定时间的方法。
2. SynchronousQueue应用场景
SynchronousQueue的特点:
队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。
这种特殊的实现逻辑有什么应用场景呢?
我的理解就是, 如果你希望你的任务需要被快速处理 ,就可以使用这种队列。
Java 线程池中的 newCachedThreadPool (带缓存的线程池)底层就是使用SynchronousQueue实现的。
public static ExecutorService newCachedThreadPool() { | |
return new ThreadPoolExecutor(, Integer .MAX_VALUE, | |
L, TimeUnit.SECONDS, | |
new SynchronousQueue<Runnable>()); | |
} |
newCachedThreadPool 线程池的核心线程数是0,最大线程数是Integer的最大值,线程存活时间是60秒。
如果你使用 newCachedThreadPool 线程池,你提交的任务会被更快速的处理,因为你每次提交任务,都会有一个空闲的线程等着处理任务。如果没有空闲的线程,也会立即创建一个线程处理你的任务。
你想想,这处理效率,杠杠滴!
当然也有弊端,如果你提交了太多的任务,导致创建了大量的线程,这些线程都在竞争 CPU 时间片,等待CPU调度,处理任务速度也会变慢,所以在使用过程中也要综合考虑。
3. SynchronousQueue源码解析
3.1 SynchronousQueue类属性
public class SynchronousQueue<E> extends abstract Queue<E> implements BlockingQueue<E> { | |
// 转换器,取数据和放数据的核心逻辑都在这个类里面 | |
private transient volatile Transferer<E> transferer; | |
// 默认的 构造方法 (使用非公平队列) | |
public SynchronousQueue() { | |
this(false); | |
} | |
// 有参构造方法,可以指定是否使用公平队列 | |
public SynchronousQueue(boolean fair) { | |
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); | |
} | |
// 转换器实现类 | |
abstract static class Transferer<E> { | |
abstract E transfer(E e, boolean timed, long nanos); | |
} | |
// 基于栈实现的非公平队列 | |
static final class TransferStack<E> extends Transferer<E> { | |
} | |
// 基于队列实现的公平队列 | |
static final class TransferQueue<E> extends Transferer<E> { | |
} | |
} |
可以看到SynchronousQueue默认的无参构造方法,内部使用的是基于栈实现的非公平队列,当然也可以调用有参构造方法,传参是true,使用基于队列实现的公平队列。
// 使用非公平队列(基于栈实现) | |
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(); | |
// 使用公平队列(基于队列实现) | |
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(true); |
本次就常用的栈实现来剖析SynchronousQueue的底层实现原理。
3.2 栈底层结构
栈结构,是非公平的,遵循先进后出。
使用个case测试一下:
/** | |
* @author 一灯架构 | |
* @apiNote SynchronousQueue示例 | |
**/public class SynchronousQueueDemo { | |
public static void main(String[] args) throws InterruptedException { | |
//. 创建SynchronousQueue队列 | |
SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>(); | |
//. 启动一个线程,往队列中放1个元素 | |
new Thread(() -> { | |
try { | |
System.out.println(Thread.currentThread().getName() + " 入队列"); | |
synchronousQueue.put(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
//. 等待1000毫秒 | |
Thread.sleep(L); | |
//. 启动一个线程,往队列中放1个元素 | |
new Thread(() -> { | |
try { | |
System.out.println(Thread.currentThread().getName() + " 入队列"); | |
synchronousQueue.put(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
//. 等待1000毫秒 | |
Thread.sleep(L); | |
//. 再启动一个线程,从队列中取出1个元素 | |
new Thread(() -> { | |
try { | |
System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
//. 等待1000毫秒 | |
Thread.sleep(L); | |
//. 再启动一个线程,从队列中取出1个元素 | |
new Thread(() -> { | |
try { | |
System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
} | |
} |
输出结果:
Thread- 入队列 0 | |
Thread- 入队列 1 | |
Thread- 出队列 1 | |
Thread- 出队列 0 |
从输出结果中可以看出,符合栈结构先进后出的顺序。
3.3 栈节点源码
栈中的数据都是由一个个的节点组成的,先看一下节点类的源码:
// 节点 | |
static final class SNode { | |
// 节点值(取数据的时候,该字段为null) | |
Object item; | |
// 存取数据的线程 | |
volatile Thread waiter; | |
// 节点模式 | |
int mode; | |
// 匹配到的节点 | |
volatile SNode match; | |
// 后继节点 | |
volatile SNode next; | |
} |
- item
- 节点值,只在存数据的时候用。取数据的时候,这个值是null。
- waiter
- 存取数据的线程,如果没有对应的接收线程,这个线程会被阻塞。
- mode
- 节点模式,共有3种类型:
- 类型值类型描述类型的作用 0REQUEST表示取数据 1DATA表示存数据 2FULFILLING表示正在等待执行(比如取数据的线程,等待其他线程放数据)
3.4 put/take流程
放数据和取数据的逻辑,在底层复用的是同一个方法,以put/take方法为例,另外两个放数据的方法,add和offer方法底层实现是一样的。
先看一下数据流转的过程,方便理解源码。
还是以上面的case为例:
- Thread0先往SynchronousQueue队列中放入元素0
- Thread1再往SynchronousQueue队列放入元素1
- Thread2从SynchronousQueue队列中取出一个元素
第一步:Thread0先往SynchronousQueue队列中放入元素0
把本次操作组装成SNode压入栈顶,item是元素0,waiter是当前线程Thread0,mode是1表示放入数据。
第二步: Thread 1再往SynchronousQueue队列放入元素1
把本次操作组装成SNode压入栈顶,item是元素1,waiter是当前线程Thread1,mode是1表示放入数据,next是SNode0。
第三步:Thread2从SynchronousQueue队列中取出一个元素
这次的操作比较复杂,也是先把本次的操作包装成SNode压入栈顶。
item是null(取数据的时候,这个字段没有值),waiter是null(当前线程Thread2正在操作,所以不用赋值了),mode是2表示正在操作(即将跟后继节点进行匹配),next是SNode1。
然后,Thread2开始把栈顶的两个节点进行匹配,匹配成功后,就把SNode2赋值给SNode1的match属性,唤醒SNode1中的Thread1线程,然后弹出SNode2节点和SNode1节点。
3.5 put/take源码实现
看完 了put/take流程,再来看源码就简单多了。
先看一下put方法源码:
// 放数据 | |
public void put(E e) throws InterruptedException { | |
// 不允许放null元素 | |
if (e == null) | |
throw new NullPointerException(); | |
// 调用转换器实现类,放元素 | |
if (transferer.transfer(e, false,) == null) { | |
// 如果放数据失败,就中断当前线程,并抛出异常 | |
Thread.interrupted(); | |
throw new InterruptedException(); | |
} | |
} |
核心逻辑都在transfer方法中,代码很长,理清逻辑后,也很容易理解。
// 取数据和放数据操作,共用一个方法 | |
E transfer(E e, boolean timed, long nanos) { | |
SNode s = null; | |
// e为空,说明是取数据,否则是放数据 | |
int mode = (e == null) ? REQUEST : DATA; | |
for (; ; ) { | |
SNode h = head; | |
//. 如果栈顶节点为空,或者栈顶节点类型跟本次操作相同(都是取数据,或者都是放数据) | |
if (h == null || h.mode == mode) { | |
//. 判断节点是否已经超时 | |
if (timed && nanos <=) { | |
//. 如果栈顶节点已经被取消,就删除栈顶节点 | |
if (h != null && h.isCancelled()) | |
casHead(h, h.next); | |
else | |
return null; | |
//. 把本次操作包装成SNode,压入栈顶 | |
} else if (casHead(h, s = snode(s, e, h, mode))) { | |
//. 挂起当前线程,等待被唤醒 | |
SNode m = awaitFulfill(s, timed, nanos); | |
//. 如果这个节点已经被取消,就删除这个节点 | |
if (m == s) { | |
clean(s); | |
return null; | |
} | |
//. 把s.next设置成head | |
if ((h = head) != null && h.next == s) | |
casHead(h, s.next); | |
return (E) ((mode == REQUEST) ? m.item : s.item); | |
} | |
//. 如果栈顶节点类型跟本次操作不同,并且不是FULFILLING类型 | |
} else if (!isFulfilling(h.mode)) { | |
//. 再次判断如果栈顶节点已经被取消,就删除栈顶节点 | |
if (h.isCancelled()) | |
casHead(h, h.next); | |
//. 把本次操作包装成SNode(类型是FULFILLING),压入栈顶 | |
else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { | |
//. 使用死循环,直到匹配到对应的节点 | |
for (; ; ) { | |
//. 遍历下个节点 | |
SNode m = s.next; | |
//. 如果节点是null,表示遍历到末尾,设置栈顶节点是null,结束。 | |
if (m == null) { | |
casHead(s, null); | |
s = null; | |
break; | |
} | |
SNode mn = m.next; | |
//. 如果栈顶的后继节点跟栈顶节点匹配成功,就删除这两个节点,结束。 | |
if (m.tryMatch(s)) { | |
casHead(s, mn); | |
return (E) ((mode == REQUEST) ? m.item : s.item); | |
} else | |
//. 如果没有匹配成功,就删除栈顶的后继节点,继续匹配 | |
s.casNext(m, mn); | |
} | |
} | |
} else { | |
//. 如果栈顶节点类型跟本次操作不同,并且是FULFILLING类型, | |
// 就再执行一遍上面第步for循环中的逻辑(很少概率出现) | |
SNode m = h.next; | |
if (m == null) | |
casHead(h, null); | |
else { | |
SNode mn = m.next; | |
if (m.tryMatch(h)) | |
casHead(h, mn); | |
else | |
h.casNext(m, mn); | |
} | |
} | |
} | |
} |
transfer方法逻辑也很简单,就是判断本次操作类型是否跟栈顶节点相同,如果相同,就把本次操作压入栈顶。否则就跟栈顶节点匹配,唤醒栈顶节点线程,弹出栈顶节点。
transfer方法中调用了awaitFulfill方法, 作用是 挂起当前线程。
// 等待被唤醒 | |
SNode awaitFulfill(SNode s, boolean timed, long nanos) { | |
//. 计算超时时间 | |
final long deadline = timed ? System.nanoTime() + nanos :L; | |
Thread w = Thread.currentThread(); | |
//. 计算自旋次数 | |
int spins = (shouldSpin(s) ? | |
(timed ? maxTimedSpins : maxUntimedSpins) :); | |
for (;;) { | |
if (w.isInterrupted()) | |
s.tryCancel(); | |
//. 如果已经匹配到其他节点,直接返回 | |
SNode m = s.match; | |
if (m != null) | |
return m; | |
if (timed) { | |
//. 超时时间递减 | |
nanos = deadline - System.nanoTime(); | |
if (nanos <=L) { | |
s.tryCancel(); | |
continue; | |
} | |
} | |
//. 自旋次数减一 | |
if (spins >) | |
spins = shouldSpin(s) ? (spins-) : 0; | |
else if (s.waiter == null) | |
s.waiter = w; | |
//. 开始挂起当前线程 | |
else if (!timed) | |
LockSupport.park(this); | |
else if (nanos > spinForTimeoutThreshold) | |
LockSupport.parkNanos(this, nanos); | |
} | |
} |
awaitFulfill方法的逻辑也很简单,就是挂起当前线程。
take方法底层使用的也是transfer方法:
// 取数据 | |
public E take() throws InterruptedException { | |
// // 调用转换器实现类,取数据 | |
E e = transferer.transfer(null, false,); | |
if (e != null) | |
return e; | |
// 没取到,就中断当前线程 | |
Thread.interrupted(); | |
throw new InterruptedException(); | |
} |
4. 总结
- SynchronousQueue是一种特殊的阻塞队列,队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。
- SynchronousQueue底层是基于栈和队列两种数据结构实现的。
- Java线程池中的 newCachedThreadPool (带缓存的线程池)底层就是使用SynchronousQueue实现的。
- 如果希望你的任务需要被快速处理,可以使用SynchronousQueue队列。