Juc并发编程11——深入源码:常用并发容器、阻塞队列使用与原理

Java
321
0
0
2022-12-06
标签   Java并发

前言

本文将介绍常用的并发容器,比较传统容器与并发容器的区别,介绍并发容器的基本原理。是面试常考、工作常用的热门知识点。

深入源码:常用并发容器、阻塞队列使用与原理

  • 前言
  • 1.传统容器安全吗?
  • 2.常用并发容器介绍
  • 2.1 CopyOnWriteArrayList
  • 2.2 ConcurrentHashMap
  • 3.阻塞队列
  • 3.1 阻塞队列的介绍
  • 3.2 ArrayBlockingQueue源码分析
  • 3.3 SynchronousQueue源码分析
  • 3.4 PriorityBlockingQueue介绍
  • 3.5 DelayQueue介绍与源码分析

1.传统容器安全吗?

运行如下代码。

public static void main(String[] args) throws InterruptedException {
    ArrayList<String> arr = new ArrayList<>();
    Runnable r = () -> {
        for (int i = 0; i < 100; i++) {
            arr.add("hello");
        }
    };
    for (int i = 0; i < 100; i++) {
        new Thread(r).start();
    }
    TimeUnit.SECONDS.sleep(1);
    System.out.println(arr.size());

}

其结果如下。

img

按照堆栈信息查看ArrayList第663行。

public boolean add(E e) {
    ensureCapacityInternal(size + 1);  // 确保容量
    elementData[size++] = e; // 这一行报错 
    return true;
}

我们看源码发现,ArrayList是有确保容量的操作的,为何会在多线程环境下报错呢?

试想ArrayList刚好占满的情况,线程1执行完ensureCapacityInternal后,线程2获得了cpu,进行ensureCapacityInternal发现容量足够,没有扩容,并且执行了元素增加操作。此时线程1重新获取到cpu资源,又执行元素增加操作,就出现了树组下标越界了。

总的来说,就是由于ArrayList的扩容与元素增加操作为非原子性操作,导致出现了并发安全的问题。

再来看看HashMap

public class Demo02 {
    public static void main(String[] args) throws InterruptedException {
        HashMap<Integer, String> map = new HashMap<>();
        for (int i = 0; i < 100; i++) {
            int tmp = i;
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    map.put(1000 * tmp + j, "a");
                }
            }).start();
        }
        TimeUnit.SECONDS.sleep(2);
        System.out.println(map.size());

    }
}

其输出如下。不是预期值10000.实际上,实际上,它还可能引起Entry对象出现环状的数据结构,导致死循环。

img

2.常用并发容器介绍

如何才能够解决容器遇到的并发问题呢?我们首先想到的是使用Synchoronized进行加锁的操作。早期的一些容器比如Vector或者Hashtable就是这么做的。不过众所周知,它们的效率实在是太底了。因此现在我们很少使用它们了。

JUC为我们提供了专门用于并发场景的容器。

2.1 CopyOnWriteArrayList

public static void main(String[] args) throws InterruptedException {
    List<String> arr = new CopyOnWriteArrayList<>();
    Runnable r = () -> {
        for (int i = 0; i < 100; i++) {
            arr.add("hello");
        }
    };
    for (int i = 0; i < 100; i++) {
        new Thread(r).start();
    }
    TimeUnit.SECONDS.sleep(1);
    System.out.println(arr.size());

}

其结果如下。

img

看看它add方法的源码吧。

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

原来是使用了ReentrantLock锁呀。可以看到,它实际上是在加锁以后,获取并复制树组进行了操作,最后再一次性setArray。

因为读一般是线程安全的,并且在add时也是先操作复制的树组,再一次性进行setArray数组替换操作,get方法并没有加锁。

public E get(int index) {
    return get(getArray(), index);
}

2.2 ConcurrentHashMap

 public static void main(String[] args) throws InterruptedException {
    Map<Integer, String> map = new ConcurrentHashMap<>();
    for (int i = 0; i < 100; i++) {
        int tmp = i;
        new Thread(() -> {
            for (int j = 0; j < 100; j++) {
                map.put(1000 * tmp + j, "a");
            }
        }).start();
    }
    TimeUnit.SECONDS.sleep(2);
    System.out.println(map.size());

}

其运行结果如下。

img

因为多个线程的操作可能会争抢同一把锁,在之前介绍LongAdder时,提到过,既然线程间都竞争同一把锁,我不防多搞几把锁,进行压力分散。在jdk7及之前,ConcurrentHashMap采用了类似的策略,它将数据进行分段,每一个小段加一把锁,这样当一个线程获取锁时,仅仅锁了一小段,不会影响其它段的数据被其它线程访问。

img

在jdk8以后,ConcurrentHashMap采用CAS配合锁机制进行实现。

我们先来回顾下HashMap的实现原理吧。

img

Hash表本质上是一个用于存放后续节点头节点的数组,数组中的每一个元素都是一个头节点(或者说是一个链表)。当我们添加一个数据时,会先通过Hash算法计算得到它的Hash值,对应找到它的数组下标,在该位置的链表后插入数据。当链表的长度到达8时,会将链表自动转换为红黑树,这会使其查询时可以使用二分法进行查询,效率大幅度的提升。

因为ConncurrentHashMap的源码比较复杂,我们从简单的地方入手。其构造方法的结构与HashMap大体相似:维护一个哈希表,哈希表中存放的要么是链表要么是红黑树。这里就不去看了,先看看put方法吧。

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException(); // 赋值不能为空,基本操作 
    int hash = spread(key.hashCode()); // 计算键的哈希值,用于锁定在哈希表中的位置 
    int binCount = 0; // 用于记录链表长度 
    for (Node<K,V>[] tab = table;;) { // 无限循环,CAS自旋锁;table:哈希表
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0) //1.如果哈希表为空
            tab = initTable(); // 初始化哈希表,之后进入下一轮循环 
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 2.如果该位置为空,说明哈希表中该位置还没有头节点(注意这里f指向了头节点所在的位置) 
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null))) // 进行CAS操作插入节点作为头节点 
                break;                   // CAS成功则直接break跳出put方法,否则说明有其它线程在操作,跳出当前循环,进入下一轮循环
        }
        else if ((fh = f.hash) == MOVED) //3.头节点的哈希值为-1,说明正在进行扩容
            tab = helpTransfer(tab, f); //帮助其进行数据迁移,完事后进入下一轮循环 
        else { //4.除了以上特殊情况以外的正常插入情况 
            V oldVal = null;
            synchronized (f) { //将头节点进行加锁,防止同一时间其它线程也在操作哈希表中该位置的链表或者红黑树 
                if (tabAt(tab, i) == f) { 
                    if (fh >= 0) { // 4.1 如果头节点的哈希值>=0,说明是链表,针对链表进行操作
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 4.2 如果f是TreeBin,说明是红黑树
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 根据链表长度决定链表是否要进化成为红黑树 
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i); //如果当前哈希表的长度小于64,不会进化成红黑树,优先考虑扩容,这是因为其长度约长,对于并发场景处理支持越好(结合下面的图片理解) 
                if (oldVal != null)
                    return oldVal;
                break;
            } 
        }
    }  // for end
    addCount(1L, binCount);
    return null;
}

下图总结了put插入节点的过程:对hash表中的某个节点进行单独加锁。这样是不是也可以在保证线程安全的情况下降低由于竞争所带来的性能损耗呀。因此ConcurrentHashMao在进行扩容时的机制是,如果当前哈希表的长度小于64,不会进化成红黑树,优先考虑扩容,这是因为其长度约长,对于并发场景处理支持越好(结合下面的图片理解)

img

接着来看看get方法吧。

 public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode()); //计算哈希值 
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) { 
            // 1.如果头节点就是我们要找的,直接返回 
            if ((eh = e.hash) == h) { 
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 2.哈希值为负数,说明要么在扩容,要么就是红黑树 
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
           // 3.链表的情形,进行遍历查找 
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        // 4.没找到只能返回null了 
        return null;
    }

3.阻塞队列

3.1 阻塞队列的介绍

除了常用的并发容器以外,juc还提供了各种阻塞队列,适用于不同的工作场景。先看看接口BlockingQueue

public interface BlockingQueue<E> extends Queue<E> {
    
    boolean add(E e);

   //入队,如果成功则返回true,否则如果队列已满,插入失败返回false(非阻塞) 
    boolean offer(E e);

    // 入队(阻塞,直到队列有容量可以插入元素为止) 
    void put(E e) throws InterruptedException;

   //入队,阻塞,直到入队成功、超时或者中断为止 
    boolean offer(E e, long timeout, TimeUnit unit) 
        throws InterruptedException;

   // 出队 ,阻塞,如果队列为空阻塞线程直到能够出队为止  
    E take() throws InterruptedException;

  //出队,阻塞,直到入队成功、超时或者中断为止
    E poll(long timeout, TimeUnit unit) 
        throws InterruptedException;

   //在理想情况下(无内存或者资源限制),可以不阻塞的入队的容量,如无限制则返回Integer.MAX_VALUE 
    int remainingCapacity();

    
    boolean remove(Object o);

    public boolean contains(Object o);

   // 一次性从BlockingQueue获取所有可用的对象 
    int drainTo(Collection<? super E> c);

   
    int drainTo(Collection<? super E> c, int maxElements);
}

阻塞队列会阻塞线程,不废话,上图。

img

还记得消费者与生产者模式吗?利用阻塞队列可以很轻松的进行实现。

img

下面我们来实战下。假设有2个厨师,3个顾客,一个厨师炒一个菜的时间是3s,一个顾客吃掉一个菜的时间是4s。窗口一次只能够放一个菜。

public class Demo03 {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
        Runnable supplier = () -> {
            while (true) {
                String name = Thread.currentThread().getName();
                System.out.println(currentTimeInFormat() + "Thread " + name + " to prepare food..." );
                try {
                    TimeUnit.SECONDS.sleep(3);
                    queue.put(new Object());
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
                System.out.println(currentTimeInFormat()  + "Thread " + name + "prepare  food finished..");
            }
         
        };

        Runnable comsumer = () -> {
            while (true) {
                String name = Thread.currentThread().getName();
                System.out.println(currentTimeInFormat() + "Thread " + name + " wait ..." );
                try {
                    queue.take();
                    System.out.println(currentTimeInFormat() + "Thread " + name + " eat food..." );
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
                System.out.println(currentTimeInFormat() + "Thread " + name + "eat  food finished..");
            }
        };

        for (int i = 0; i < 2; i++) {
            new Thread(supplier, "s[" + i + "]").start();
        }

        for (int i = 0; i < 3; i++) {
            new Thread(comsumer, "c[" + i + "]").start();
        }
        TimeUnit.SECONDS.sleep(1);
    }

    public static String currentTimeInFormat() {
        SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
        return "[" + format.format( new Date()) + "]" ;
    }
}

输出结果如下哟。

img

常见的阻塞队列有: ArrayBlockingQueue:带缓冲的阻塞队列(即添加容量限制) SynchonousBlockingQueue:无缓冲的阻塞队列(相当于容量为0的ArrayBlockingQueue) LinkedBlockingQueue:无界的阻塞队列,基于链表实现,可以限制容量,可以实现阻塞

3.2 ArrayBlockingQueue源码分析

先看看它的构造方法。

 final ReentrantLock lock;

 private final Condition notEmpty;
  
 private final Condition notFull;

 public ArrayBlockingQueue(int capacity) {
      this(capacity, false);
  }

 public ArrayBlockingQueue(int capacity, boolean fair) {
      if (capacity <= 0)
          throw new IllegalArgumentException();
      this.items = new Object[capacity];
      lock = new ReentrantLock(fair); // 采用指定的公平/非公平锁实现并发安全
      notEmpty = lock.newCondition();  //两个condition,之后用于线程入队和出队的阻塞控制
      notFull =  lock.newCondition(); 
 }

接下来看看put方法与offer方法是如何实现的。

 public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock; //加锁 
    lock.lock();
    try {
        // 非阻塞,直接判断队列是不是满的,如果满返回false 
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock; //加锁 
    lock.lockInterruptibly(); //lock锁可以响应中断 
    try {
        while (count == items.length)
            notFull.await(); //如果队列满,将当前线程挂起,直到其它线程出队时将线程唤醒
        enqueue(e); // 被唤醒后入队
    } finally {
        lock.unlock();
    }
}

我们看到入队操作如果队列满,会使用notFull的await方法挂起当前线程,直到deQueue时调用signal方法唤醒.

private E dequeue() {
    // assert lock.getHoldCount() == 1; 
    // assert items[takeIndex] != null; 
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal(); //notFull的signal方法唤醒正在等待的线程 
    return x;
}

现在我们来看看出队操作。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue(); //如果队列为空则返回null,否则出队
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await(); //如果为空,挂起线程 
        return dequeue();
    } finally {
        lock.unlock();
    }
}

同样的,take方法中的notEmpty在队列为空时被挂起,直到enque方法进行唤醒。

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1; 
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

3.3 SynchronousQueue源码分析

SynchronousQueue没有容量,出队操作与入队操作必须成对的出现。如何理解呢,请看示例代码。

public class Demo04 {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<String> queue = new SynchronousQueue<>();
        queue.put("abc");
    }
}

以上代码不会终止退出,这是因为线程被阻塞了,也就是说,上面代码的put操作相当于ArrayBlockingQueue在队列满了时进行put操作,会导致线程阻塞。只有其它线程调用take代码才能正常的结束。

public class Demo04 {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<String> queue = new SynchronousQueue<>();
        new Thread(()-> {
            try {
                queue.take();
            } catch (InterruptedException exception) {
                exception.printStackTrace();
            }
        }).start();
        queue.put("abc");
    }
}

我们看看它内部是如何实现的。其内部有一个Transferer抽象类,里面有一个抽象方法transferputtake方法会调用transfer方法.

abstract static class Transferer<E> {
    // 可以是put方法也可以是take方法 
    //Params: e 如果非空,则代表生产者 
    //如果为 null,则代表消费者。 
    // Returns:如果不为空说明是生产者返回了结果,如果为空,说明是等待超时或者中断 
    abstract E transfer(E e, boolean timed, long nanos);
}

实际上,当线程1调用put方法后就会调用transfer方法处于等待状态,直到线程2调用take方法,transfer在生产者、消费者线程中交接的数据(线程1put的数据线程2take拿走),也就是说,数据并不会像ArrayBlockingQueue一样存储到队列中,而是直接在线程中进行交接。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted(); // 如果返回值为空,说明是中断或者超时,抛出中断异常 
        throw new InterruptedException();
    }
}

SynchronousQueue有两个不同的实现,公平模式和非公平模式,这里我们来看看公平模式下的实现。

static final class TransferQueue<E> extends Transferer<E> {
	//头节点只作为头节点,不存储真实的数据,后续节点才是线程等待的节点 
    transient volatile QNode head;
    /** Tail of queue */ 
    transient volatile QNode tail;
    /** Node class for TransferQueue. */ 
    static final class QNode {
        volatile QNode next;          // next node in queue 
        volatile Object item;         // 存储的元素 
        volatile Thread waiter;       // 等待的线程封装为一个节点,控制park/unpark 
        final boolean isData;         // 区分是生产者还是消费者,如果是true说明是生产者
        ...
			}
			...
}

可以看到,公平模式下SynchronousQueue的实现是TransferQueue,使用Qnode类来保存等待的线程。接下来看看里面transfer方法是如何实现的。

 E transfer(E e, boolean timed, long nanos) { //里面没有加锁,肯定会存在多个线程的竞争 
    QNode s = null; // constructed/reused as needed 
    boolean isData = (e != null);

    for (;;) { //自旋锁,盲猜一波cas操作 
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // 头节点或者尾节点为空,说明还没有对头尾节点进行初始化(可以看看构造器方法,会赋初始值) 
            continue;                       // 自旋

        if (h == t || t.isData == isData) { //头、尾指针指向相同节点(说明指向的是头节点,还没有存放真实的线程节点数据) 或者 尾节点角色(生产者|消费者)与当前节点的角色相同,需要进行入队操作(比如连续调用了两次put方法,两个线程节点都是生产者角色,那就需要入队等待,即队列中要么全部存放生产者,要么全部存放消费者) 
            QNode tn = t.next;
            if (t != tail)                  // 如果t不是队尾了,说明其它线程将其修改过了(前面的方法没有修改过t引用的对象) 
                continue;                   //跳出当前循环,继续自旋 
            if (tn != null) {               // 如果其它线程修改了tn,使其不指向队尾下一个元素了(也就是队列中插入了其它的元素)
                advanceTail(t, tn);        //队尾指针后移:CAS将队尾元素设置为tn 
                continue;
            }
            if (timed && nanos <= 0)        // 超时返回null 
                return null;
            if (s == null)
                s = new QNode(e, isData);   // 构造当前节点, 
            if (!t.casNext(null, s))        // cas将当前节点置为队尾节点的下一个节点 
                continue;                  //失败说明其它线程正在操作,进入下一轮自旋操作

            advanceTail(t, s);              // 之前的操作都没有问题,将新的队尾元素设置为当前节点,此时节点真正入队成功,进入等待队列 
            Object x = awaitFulfill(s, e, timed, nanos);  //开始等待生产者和消费者进行交接,该操作会先进行几次自旋操作,如果自旋完成没有等待交接,就会将线程挂起 
            if (x == s) {                   // 如果返回的x是当前节点s,说明等待状态被取消
                clean(t, s);
                return null;              // 返回null即可
            }

            if (!s.isOffList()) {           // 交接完成,如果s还没有离开队列。将当前节点移出队列
                advanceHead(t, s);          // 将s设置为新的头节点(头节点不存储数据,不表示该线程在等待状态) 
                if (x != null)              // clear
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e; //假如当前节点是消费者,直接将从生产者那里获取的元素返回即可;如果是生产者,返回传递的元素

        } else {                            // 正好是一个生产者一个消费者的情况 
            QNode m = h.next;               // 获取头节点的下一个节点(第一个出一等待状态的节点) 
            if (t != tail || m == null || h != head)
                continue;                   // 被其它线程修改,跳过本轮自旋

            Object x = m.item;
            if (isData == (x != null) ||    //判断操作的类型,如果是同一个类型的操作,说明有异常
                x == m ||                   // 当前操作被取消
                !m.casItem(x, e)) {         // CAS
                advanceHead(h, m);          // dequeue and retry 
                continue;
            }

            advanceHead(h, m);              // 成功完成交接
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

上面的流程可以总结为下图。

img

TransferQueue没有使用锁,因此从性能角度来看,比ArrayBlockingQueue性能会更好。但是它没有容量。LinkedBlocking无界且具有缓存容量,但内部也是通过锁机制进行实现的,性能并不是特别好。LinkedTransferQueue同时有以上数据结构的优点。

public class Demo05 {
    public static void main(String[] args) {
        LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
        queue.put("abc");    // 可以交接则直接交接,否则就进入等待队列 
       queue.put("aba");
        queue.forEach(System.out::println);
    }
}

3.4 PriorityBlockingQueue介绍

PriorityBlockingQueue是一个支持优先级的阻塞队列。

public class Demo06 {
    public static void main(String[] args) {
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10, Integer::compare); //设置初始容量为10(可以扩容),比较规则使用升序
        queue.add(3);
        queue.add(2);
        queue.add(9);
        System.out.println(queue);
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
    }
}

其输出结果如下。

img

如果使用take方法依然可以实现阻塞。

public class Demo06 {
    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10, Integer::compare);
        queue.take();
        queue.add(3);
        queue.add(2);
        queue.add(9);
        System.out.println(queue);
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
    }
}

上面程序没有输出哟。

3.5 DelayQueue介绍与源码分析

DelayQueue可以设置延迟时间,在到达延迟时间之后才能将队列中的元素取出。而且同样支持优先级。看下它的定义吧。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {...}   

可以发现,DelayQueue存储的元素类型必须是 Delayed的实现类。看看Delayed这个接口。

public interface Delayed extends Comparable<Delayed> {
		// 返回剩余时间,正数等待,负数或者0等待结束 
    long getDelay(TimeUnit unit);
}

好了,那我们来使用下DelayQueue吧。

public class Demo07 {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<MyDelayed> queue = new DelayQueue<>();
        queue.add(new MyDelayed(1, "Second Level", 2));
        queue.add(new MyDelayed(3, "First Level", 1));
        System.out.println(queue.take());
        System.out.println(queue.take());
    }

    private static class MyDelayed implements Delayed{
        private final long delay;
        private final long startTime;
        private final String data;
        private  final int priority;

        private MyDelayed(long delay, String data, int priority) {
           this.priority = priority;
           this.delay = TimeUnit.SECONDS.toMillis(delay);
           this.data = data;
           this.startTime = System.currentTimeMillis();
        }

        @Override 
        public long getDelay(TimeUnit unit) {
            return unit.convert(delay - (System.currentTimeMillis() - startTime), TimeUnit.NANOSECONDS);
        }

        @Override 
        public int compareTo(Delayed o) {
            if(o instanceof  MyDelayed) {
                return (priority - ((MyDelayed) o).priority);
            }
            return 0;
        }

        @Override 
        public String toString() {
            return data;
        }
    }
}
public class Demo07 {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<MyDelayed> queue = new DelayQueue<>();
        queue.add(new MyDelayed(1, "Second Level", 2));
        queue.add(new MyDelayed(3, "First Level", 1));
        System.out.println(queue.take());
        System.out.println(queue.take());
    }

    private static class MyDelayed implements Delayed{
        private final long delay;
        private final long startTime;
        private final String data;
        private  final int priority;

        private MyDelayed(long delay, String data, int priority) {
           this.priority = priority;
           this.delay = TimeUnit.SECONDS.toMillis(delay);
           this.data = data;
           this.startTime = System.currentTimeMillis();
        }

        @Override 
        public long getDelay(TimeUnit unit) {
            return unit.convert(delay - (System.currentTimeMillis() - startTime), TimeUnit.NANOSECONDS);
        }

        @Override 
        public int compareTo(Delayed o) { // priority值越小,优先级越高 
            if(o instanceof  MyDelayed) {
                return (priority - ((MyDelayed) o).priority);
            }
            return 0;
        }

        @Override 
        public String toString() {
            return data;
        }
    }
}

上面代码的效果应该是:阻塞3s钟后输出First Level,后输出Second Level(即使它的阻塞时间更短)。

img

如果把它们的优先级设置为相同。

public class Demo07 {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<MyDelayed> queue = new DelayQueue<>();
        queue.add(new MyDelayed(1, "Second Level", 1));
        queue.add(new MyDelayed(3, "First Level", 1));
        System.out.println(queue.take());
        System.out.println(queue.take());
    }

    private static class MyDelayed implements Delayed{
        private final long delay;
        private final long startTime;
        private final String data;
        private  final int priority;

        private MyDelayed(long delay, String data, int priority) {
           this.priority = priority;
           this.delay = TimeUnit.SECONDS.toMillis(delay);
           this.data = data;
           this.startTime = System.currentTimeMillis();
        }

        @Override 
        public long getDelay(TimeUnit unit) {
            return unit.convert(delay - (System.currentTimeMillis() - startTime), TimeUnit.NANOSECONDS);
        }

        @Override 
        public int compareTo(Delayed o) {
            if(o instanceof  MyDelayed) {
                return (priority - ((MyDelayed) o).priority);
            }
            return 0;
        }

        @Override 
        public String toString() {
            return data;
        }
    }
}

上面代码的输出结果是阻塞1s后输出后输出Second Level,再阻塞2s(总共时长为3s)钟后输出First Level。

img

接下来我们一起来读下它的源码吧。从add开始吧。

private final PriorityQueue<E> q = new PriorityQueue<E>();

public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e); // q是优先接队列 
        if (q.peek() == e) { //如果队列中只有刚入队的元素,进行一次唤醒操作(之前可能有其它线程调用了take在等待了)
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

很简单。看看take方法吧。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek(); //获取队首元素 
            if (first == null)  //队列是空,进入等待
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS); 
                if (delay <= 0)  //等待时间结束,可以出队了 
                    return q.poll();
                first = null; //帮助gc 
                if (leader != null) //如果leader不为null,说明有其它线程在等待
                    available.await(); //进入永久等待状态 
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread; //没有线程等待,将当前线程设置为leader 
                    try {
                        available.awaitNanos(delay); //设置为带截止时间的等待状态,等待时间到则释放锁
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal(); //当前take结束,把之前等待的线程唤醒 
        lock.unlock();
    }
}

至此为止,我们有关并发容器的讲解正式就大功告成嵝,下一篇文章我们将讲解线程池。