目录
- 概览
- 使用方法
- 1. add | remove | element
- 2. offer | poll | peek
- 3. put | take
- 4. offer | poll (timeout)
- 源码解析
- 说明
- 队列容器
- 关键成员变量
- 初始化
- put方法
- 总结
概览
1. 基于链表的可选有界阻塞队列。根据FIFO的出入队顺序,从队列头部检索和获取元素,在队列尾部插入新元素。
2. 当作为有界阻塞队列,在队列空间不足时,put方法将会一直阻塞直到有多余空间才会执行插入元素操作,take方法则相反,只到队列内元素不为空时,才将队列元素逐个取出。
3. 队列容量不指定时,默认为Integer.MAX_VALUE,此时可以看作无界队列。
4. 使用非公平锁进行并发控制。所有方法都是线程安全的。
使用方法
下面的文章给出了阻塞队列的四种基本用法:
了解BlockingQueue 大体框架和实现思路
LinkedBlockingQueue实现了BlockingQueue类。
在BlockingQueue中,方法被分为如下四类:
- Throws exception:操作未实现时(正常流程下的执行)抛出异常
- Special value:根据操作的实际情况,返回特定值,例如null、false(这些失败可能是线程中断、队列为空引起的)
- Blocks:阻塞当前线程,直到当前线程可以成功执行
- Times out:尝试指定时间后,放弃执行
| Throws exception | Special value | Blocks | Times out |
新增 | add(E e) | offer(E e) | put(E e) | offer(E e, long timeout, TimeUnit unit) |
删除 | remove() | poll() | take() | poll(long timeout, TimeUnit unit) |
查询 | element() | peek() |
|
|
1. add | remove | element
这三个方法在BlockingQueue的定义中,都会在操作未实现时,抛出异常。
- add(E e):在队尾添加元素e,add内部调用offer方法实现。因此,元素e为空时,抛出NullPointerException异常;插入失败时,抛出IllegalStateException异常。
- remove:删除队首元素,内部调用poll方法。队首无数据时,抛出NoSuchElementException异常。
- element:检索队首元素。队首无数据时,抛出NoSuchElementException异常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
blockingQue.add(1);
blockingQue.remove(1);
blockingQue.remove(); // NoSuchElementException
blockingQue.element(); // NoSuchElementException
2. offer | poll | peek
根据操作的实际情况,返回特定值,例如null、false(这些失败可能是线程中断、队列为空引起的)
- offer(E e):在队尾添加元素e,元素e为空时,抛出NullPointerException异常;插入失败时返回false。
- poll:删除队首元素。删除失败时返回false。
- peek:检索队首元素。队首无数据时,返回null。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
blockingQue.offer(1);
blockingQue.poll();
Integer peek = blockingQue.peek(); // 返回null
3. put | take
阻塞当前线程,直到当前线程可以成功执行。
- put(E e):在队尾添加元素e,元素e为空时,抛出NullPointerException异常。当队列满时,阻塞put线程,等待队列被消费后,队列容量不满时,该阻塞线程继续尝试在队尾插入元素。该方法在阻塞时可以被中断,并抛出InterruptedException异常。
- take:删除并获取队首元素。队首元素不为空时返回。队首元素为空,阻塞take线程,等待队列不为空时,再次尝试消费队首元素。该方法在阻塞时可以被中断,并抛出InterruptedException异常。
注意:阻塞时,不会解除锁占用。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
try {
blockingQue.put(1);
blockingQue.take();
} catch (InterruptedException e) {
// 线程被中断
e.printStackTrace();
}
4. offer | poll (timeout)
尝试指定时间后,放弃执行
- offer(E e, long timeout, TimeUnit unit):在队尾添加元素e,元素e为空时,抛出NullPointerException异常;当队列容量满时,线程休眠一定时间后再次查看队列容量,当该休眠时间大于等于timeout后,此时队列还满则返回false。不满时,尝试入队。需要注意的是,由于伪唤醒机制的存在,线程可能在timeout这个时间段内的任意一点被唤醒,如果队列容易不满,则会直接执行入队操作。阻塞时,当前线程被中断抛出InterruptedException异常。
- poll(long timeout, TimeUnit unit):删除队首元素。poll与offer对应的,当队列为空的时候,线程休眠一定时间。休眠时,当前线程被中断抛出InterruptedException异常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
try {
blockingQue.offer(1, 100, TimeUnit.MILLISECONDS);
blockingQue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
当然除了上述的阻塞队列的基本操作外,LinkedBlockingQueue还具有集合Collection的性质。因此集合中的通用方法也可以使用。
源码解析
说明
本次源码分析主要按照下面几个步骤进行:
1. 保存队列数据的容器以及出入队方法
2. 主要成员变量以及作用
3. 主要方法分析
队列容器
结构图
仅有数据item和后继next的单向节点,结构简单。
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
next三种情况
A 普通节点的真实后继
B 真正的队首节点,item=null(队首节点恒为head.next)
C 队尾节点,next=null
- item: null -> first -> …… -> last
- next: first -> second -> ……-> null
入队操作
// 入队
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node; // last.next = node; last = node;
}
步骤1
步骤2
出队操作
// 出队
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
// 形成引用链闭环,JVM根据可达性分析时,GC root的引用链与该对象之间不可达,进行GC
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
步骤1
步骤2
关键成员变量
队列入队和出队锁分离,都使用了非公平锁。
这里的count属性需要注意下,这里使用了原子类保证操作的原子性。后面的入队和出队,将会频繁使用它。
/** 容量, 初始化不设置时默认为Integer.MAX_VALUE*/
private final int capacity;
/** 当前队列内的元素数量 */
private final AtomicInteger count = new AtomicInteger();
/**
* 队首
* 不变量: head.item == null
*/
transient Node<E> head;
/**
* 队尾
* 不变量: last.next == null
*/
private transient Node<E> last;
/** 出队操作公用锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 用于出队操作的阻塞和唤醒 出队的话,只需要考虑队列是否为空 */
private final Condition notEmpty = takeLock.newCondition();
/** 入队操作公用锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 用于入队操作的阻塞和唤醒 入队只需要考虑队列空间是否足够*/
private final Condition notFull = putLock.newCondition();
初始化
三个构造函数
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 自定义容量
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 初始化时,批量添加集合中的元素
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
put方法
put方法几个关注点
- 释放锁的时机
- 执行入队操作
- 唤醒生产者的时机
- 唤醒消费者的时机
这几点是整个阻塞操作的核心,可以在下面的分析中仔细观察。
注:由于阻塞队列就是基于生产者-消费者模型的,因此,下文中都把调用put方法的线程称为生产者,调用take方法的线程称为消费者。
总体分析
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1; // -1代表操作异常
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 如果线程没有被标记为中断,则获取锁
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 这里是线程在执行put操作时唯一一个执行过程中释放锁的地方
notFull.await(); // 容量已满,等待被消费后唤醒
}
// 添加元素,更新容量
enqueue(node);
c = count.getAndIncrement();
// 队列容量有余时,在这里再次唤醒一个其他的生产者线程(或者说消费者消费速度大于生产)
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// 唤醒一个消费者
if (c == 0)
signalNotEmpty();
}
count属性并发问题
这里需要重点关注count,由于有两把锁,count可以同时被putLock、takeLock操作,那么这里是否会产生并发问题。
分析如下:
A. 只有putLock或takeLock一把锁操作:就是单线程操作,没影响,不产生并发问题。
其他所有put操作都处于await的状态或者竞争锁状态,其他线程也因为获取不到锁而无法执行,只有等该节点添加完成释放锁,其他线程才有机会继续执行。
while (count.get() == capacity) {
notFull.await(); // 容量已满,等待被消费后唤醒
}
B. putLock和takeLock同时操作:我们假设两个线程一个获取到putLock,一个获取到了takeLock(同时最多也只有两个线程操作count)。
// put
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
// take
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
由于count是原子类那么count的所有读写操作必然是一个串联的操作,而非并行操作,因此也不存在并发问题,如下图(顺序可能不同):
唤醒消费者
代码的最后一段,会有唤醒一个消费者的操作。
// 唤醒一个等待中的消费者
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try { notEmpty.signal();
} finally { takeLock.unlock();
}
}
刚开始看到的时候很疑惑,为什么是c == 0才唤醒。如果生产者入队成功,那么c应该为如下值:
c = count.getAndIncrement();
后面看了一下count.getAndIncrement()方法定义才发现自己记混了,count.getAndIncrement()是一个原子操作,且返回值的是操作前的值。
ok,现在没问题了。
count >= 0,也就是说,只有在生产者入队前队列为空,入队成功之后才会唤醒一个消费者消费。
take方法
take方法与put方法大致相似,只是与put做相反操作。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try { // 队列元素为空,停止消费,让出锁并等待被唤醒
while (count.get() == 0) {
notEmpty.await();
}
// 移除队首元素,并更新容量
x = dequeue();
c = count.getAndDecrement();
// 生产速度大于消费速度,唤醒一个其他消费者进行消费
if (c > 1)
notEmpty.signal();
} finally { takeLock.unlock();
}
// 消费之前队列已满,消费后唤醒一个生产者
if (c == capacity) signalNotFull();
return x;
}
// 唤醒生产者
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try { notFull.signal();
} finally { putLock.unlock();
}
}
总结
总体上看LinkedBlockingQueue类不难,整个生产-消费的流程实现也比较简单。源码已经把该介绍的东西都讲得很明白了,我这属于依葫芦画瓢顺着源码注释写出来的。这么一写,自己这个类的印象就很深刻了。