Juc并发编程包
一、介绍
关于Java如何创建线程,大家都可以马上能想到有两种方法,无非不就是继承 Thread
类和实现 Runnable
接口嘛,顶多再加上个实现 Callable
接口。而且 synchronized
解决并发问题,如果学艺不精,锁住的对象是哪个都不知道,实在是不友好。
所以,我们在企业开发中基本不这样使用线程。在线程的启动上,我们常使用线程池。对于线程池的使用,可以看我另一篇博客,讲到了线程池的使用。
本文将讲解,线程池所在的包 java.util.concurrent
,在这个包下,还有什么值得关注的类和方法。
附上java8在线文档,边看边学
二、线程安全集合
在使用的集合中,ArrayList
或者是 HashMap
都是平常我们接触比较多的。但很遗憾,这两个集合类,他们在多线程的情况下,并不是安全的。如果需要使用线程安全的集合,将要有特殊的方法和类。
我们先来演示一下,在多线程情况下,此类集合发生的问题。
1)不安全集合示例
package com.banmoon.collection; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.TreeSet; | |
/** | |
* 线程不安全的集合 | |
*/ | |
public class Demo1 { | |
public static void main(String[] args) { | |
testArrayList(); | |
// testHashMap(); | |
// testSet(); | |
} | |
public static void testArrayList(){ | |
ArrayList<String> list = new ArrayList<>(); | |
for (int i = 1; i <= 10; i++) { | |
new Thread(() -> { | |
list.add(Thread.currentThread().getName()); | |
System.out.println(list); | |
}, "线程" + i).start(); | |
} | |
} | |
public static void testHashMap(){ | |
HashMap<Integer, String> map = new HashMap<>(); | |
for (int i = 0; i < 10; i++) { | |
int temp = i; | |
new Thread(() -> { | |
map.put(temp, Thread.currentThread().getName()); | |
System.out.println(map); | |
}, "线程" + i).start(); | |
} | |
} | |
public static void testSet(){ | |
TreeSet<String> set = new TreeSet<>(); | |
for (int i = 1; i <= 10; i++) { | |
new Thread(() -> { | |
set.add(Thread.currentThread().getName()); | |
System.out.println(set); | |
}, "线程" + i).start(); | |
} | |
} | |
} |
分别执行 testArrayList
、testHashMap
和 testSet
,执行结果如下
执行结果有时候会出现 java.util.ConcurrentModificationException
异常,字面意思就是并发修改异常,也就说明了原本喜欢用的 ArrayList
和 HashMap
是线程不安全的。
为何线程不安全,主要还是关键字 synchronized
,可以查看上述集合的添加方法,并没有添加这个关键字。所以,我们在多线程的时候,要避免使用以上这些不安全的集合类。
2)Collections
工具类之sync方法
上述讲的集合类都是线程不安全的,但是有办法使他们转换成线程安全的集合类。只需要 Collections
工具类使用对应的方法进行转换即可。
方法的实现就是将集合作为参数构造出了另一个线程安全的集合类。转换的方法还是比较多的,简单讲前三个就好。
package com.banmoon.collection; | |
import java.util.*; | |
/** | |
* 线程安全集合 | |
*/ | |
public class Demo2 { | |
public static void main(String[] args) { | |
testArrayList(); | |
// testHashMap(); | |
// testSet(); | |
} | |
public static void testArrayList(){ | |
List<String> list = Collections.synchronizedList(new ArrayList<>()); | |
for (int i = 1; i <= 10; i++) { | |
new Thread(() -> { | |
list.add(Thread.currentThread().getName()); | |
System.out.println(list); | |
}, "线程" + i).start(); | |
} | |
} | |
public static void testHashMap(){ | |
Map<Integer, String> map = Collections.synchronizedMap(new HashMap<>()); | |
for (int i = 0; i < 10; i++) { | |
int temp = i; | |
new Thread(() -> { | |
map.put(temp, Thread.currentThread().getName()); | |
System.out.println(map); | |
}, "线程" + i).start(); | |
} | |
} | |
public static void testSet(){ | |
Set<String> set = Collections.synchronizedSet(new TreeSet<>()); | |
for (int i = 1; i <= 10; i++) { | |
new Thread(() -> { | |
set.add(Thread.currentThread().getName()); | |
System.out.println(set); | |
}, "线程" + i).start(); | |
} | |
} | |
} |
就不贴执行结果了,自己可以试试,保证不出并发修改异常
3)juc下的安全集合
不对啊,上面的集合都在 java.util
下面,怎么没有juc什么事呢?别急,要来了,juc包下的线程安全集合
package com.banmoon.collection; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.CopyOnWriteArrayList; | |
import java.util.concurrent.CopyOnWriteArraySet; | |
/** | |
* juc并发包下的集合 | |
*/ | |
public class Demo3 { | |
public static void main(String[] args) { | |
testArrayList(); | |
// testHashMap(); | |
// testSet(); | |
} | |
public static void testArrayList(){ | |
CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>(); | |
for (int i = 1; i <= 10; i++) { | |
new Thread(() -> { | |
list.add(Thread.currentThread().getName()); | |
System.out.println(list); | |
}, "线程" + i).start(); | |
} | |
} | |
public static void testHashMap(){ | |
ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>(); | |
for (int i = 0; i < 10; i++) { | |
int temp = i; | |
new Thread(() -> { | |
map.put(temp, Thread.currentThread().getName()); | |
System.out.println(map); | |
}, "线程" + i).start(); | |
} | |
} | |
public static void testSet(){ | |
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>(); | |
for (int i = 1; i <= 10; i++) { | |
new Thread(() -> { | |
set.add(Thread.currentThread().getName()); | |
System.out.println(set); | |
}, "线程" + i).start(); | |
} | |
} | |
} |
4)Vector类
关于有人提到一个叫 Vector
的类,也是线程安全的。的确是这样,线程是安全的,我们查看他的 add
方法
public class Vector<E> | |
extends AbstractList<E> | |
implements List<E>, RandomAccess, Cloneable, java.io.Serializable | |
{ | |
public synchronized boolean add(E e) { | |
modCount++; | |
ensureCapacityHelper(elementCount + 1); | |
elementData[elementCount++] = e; | |
return true; | |
} | |
} |
比较简单,主要使用了 synchronized
关键字,来保证了线程安全。
没什么问题,但更推荐使用新的 lock
锁写的 CopyOnWriteArrayList
等类。
至于什么是 lock
,它的优势在哪里,可以继续查看下一章
三、Lock锁
由文档可知,Lock是个接口。使用得从它的几个实现类中入手
1)简单使用
使用 ReentrantLock
类,来完成线程安全的取票操作
package com.banmoon.collection; | |
import java.util.concurrent.locks.ReentrantLock; | |
/** | |
* ReentrantLock | |
* 1、创建公共Lock锁 | |
* 2、使用lock方法进行上锁 | |
* 3、在finally代码块中释放锁 | |
*/ | |
public class Demo4 { | |
public static void main(String[] args) { | |
TicketServer ticketServer = new TicketServer(); | |
new Thread(ticketServer, "A").start(); | |
new Thread(ticketServer, "B").start(); | |
new Thread(ticketServer, "C").start(); | |
} | |
} | |
class TicketServer implements Runnable{ | |
private int ticketNum = 10; | |
// 创建锁ReentrantLock lock = new ReentrantLock(); | |
void run() { | |
String name = Thread.currentThread().getName(); | |
while (true){ | |
// 上锁 | |
lock.lock(); | |
try { | |
if(ticketNum<=0) | |
return; | |
System.out.println(name + ":取到了第" + ticketNum + "张票"); | |
// 模拟网络延迟 | |
Thread.sleep(200); | |
ticketNum--; | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
}finally { | |
// 最后始终都要释放锁 | |
lock.unlock(); | |
} | |
} | |
} | |
} |
执行结果,与使用 synchronized
关键字无异
lock锁的基本操作
- 创建公共Lock锁,注意所有的线程访问到的
lock
都是同一个 - 使用lock方法进行上锁
- 在finally代码块中释放锁,必须要手动释放
2)生产者消费者模式
在使用 synchronized
关键词时,线程之间使用 wait
方法进行阻塞释放锁,以及使用 notify
方法进行唤醒阻塞线程。
而在 Lock
锁中,需要创建锁的状态监视器,也就是 Condition
。在 Condition
中,也有相对应的方法,他们则是 await
方法和 signal
方法。
使用 Condition
类来实现生产者消费者模式
package com.banmoon.collection; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.ReentrantLock; | |
/** | |
* 简单的生产者消费者模式 | |
*/ | |
public class Demo5 { | |
public static void main(String[] args) { | |
MyDemo5 myDemo4 = new MyDemo5(); | |
new Thread(() -> { | |
for(int i = 0; i < 10; i++) myDemo4.increment(); | |
}, "线程A").start(); | |
new Thread(() -> { | |
for(int i = 0; i < 10; i++) myDemo4.decrement(); | |
}, "线程B").start(); | |
new Thread(() -> { | |
for(int i = 0; i < 10; i++) myDemo4.increment(); | |
}, "线程C").start(); | |
new Thread(() -> { | |
for(int i = 0; i < 10; i++) myDemo4.decrement(); | |
}, "线程D").start(); | |
} | |
} | |
class MyDemo5{ | |
private int number = 0; | |
private ReentrantLock lock = new ReentrantLock(); | |
/** | |
* 通过lock创建出监视状态对象 | |
*/ | |
Condition condition = lock.newCondition(); | |
public void increment(){ | |
lock.lock(); | |
try { | |
while (number==1) | |
condition.await(); | |
number++; | |
System.out.println(Thread.currentThread().getName() + ":" + number); | |
condition.signalAll(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
public void decrement(){ | |
lock.lock(); | |
try { | |
while (number==0) | |
condition.await(); | |
number--; | |
System.out.println(Thread.currentThread().getName() + ":" + number); | |
condition.signalAll(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
} |
执行结果,和 synchronized
关键字无异
3)精准唤醒
Condition
类和传统的 wait
方法、notify
方法不同,它可以实现精准的唤醒。
比如上面的生产消费模式,让A线程生产完成后,让B线程进行消费;B线程消费完成后,让C进行生产。
这一点在 Object
的 notify
方法是做不到的,notify
方法唤醒的虽然只有一条线程,但这是cpu进行调度的,人为并不可控制。
package com.banmoon.collection; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.Lock; | |
import java.util.concurrent.locks.ReentrantLock; | |
/** | |
* 使用多个Condition实现精准唤醒 | |
*/ | |
public class Demo6 { | |
public static void main(String[] args) { | |
MyDemo6 myDemo4 = new MyDemo6(); | |
new Thread(() -> { | |
for(int i = 0; i < 10; i++) myDemo4.increment(); | |
}, "线程A").start(); | |
new Thread(() -> { | |
for(int i = 0; i < 10; i++) myDemo4.decrement(); | |
}, "线程B").start(); | |
new Thread(() -> { | |
for(int i = 0; i < 10; i++) myDemo4.increment(); | |
}, "线程C").start(); | |
new Thread(() -> { | |
for(int i = 0; i < 10; i++) myDemo4.decrement(); | |
}, "线程D").start(); | |
} | |
} | |
class MyDemo6 { | |
private int number = 0; | |
private Lock lock = new ReentrantLock(); | |
private Condition condition1 = lock.newCondition(); | |
private Condition condition2 = lock.newCondition(); | |
private Condition condition3 = lock.newCondition(); | |
private Condition condition4 = lock.newCondition(); | |
public void increment(){ | |
lock.lock(); | |
try { | |
while (number==1) | |
await(); | |
number++; | |
System.out.println(Thread.currentThread().getName() + ":" + number); | |
signal(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
public void decrement(){ | |
lock.lock(); | |
try { | |
while (number==0) | |
await(); | |
number--; | |
System.out.println(Thread.currentThread().getName() + ":" + number); | |
signal(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
/** | |
* 判断当前线程,并触发等待 | |
*/public void await(){ | |
try { | |
String name = Thread.currentThread().getName(); | |
switch (name){ | |
case "线程A": condition1.await(); break; | |
case "线程B": condition2.await(); break; | |
case "线程C": condition3.await(); break; | |
case "线程D": condition4.await(); break; | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
/** | |
* 判断当前线程,并触发唤醒 | |
*/public void signal(){ | |
String name = Thread.currentThread().getName(); | |
switch (name){ | |
case "线程A": condition2.signal(); break; | |
case "线程B": condition3.signal(); break; | |
case "线程C": condition4.signal(); break; | |
case "线程D": condition1.signal(); break; | |
} | |
} | |
} |
执行结果,发现了确实按照了我们的顺序来进行唤醒
根据锁创建出监视状态对象,用来判断线程的等待和唤醒
简单来说,就是用 Condition
来做一个标记,我们需要做的就是,判断当前的条件,是否使用 Condition
进行等待或者唤醒。
上述代码会出现BUG,但我不说,哈哈哈哈哈。 大家可以把17行的线程B
和23行的线程D
位置互换,你会发现问题的。 可以试着解决一下这个BUG,可以更好理解Condition
的精准唤醒。
4)读写锁
上述使用的都是 ReentrantLock
类,这次讲讲另外的两个。处于 ReentrantReadWriteLock
类下的两个静态内部类,ReadLock
和 WriteLock
为什么会有读写锁呢,它解决了同步带来的效率低下问题。
在变量的使用上,我们无非就是读和写,只有多线程写入才会造成线程安全的问题,而多线程读永远不会修改变量值,也就不会造成线程的安全问题了。
正因为如此,读写锁出现了,它可以限制只能单线程写入,但可以允许多线程读取变量,由此来保证效率的最大化使用
package com.banmoon.collection; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.locks.ReentrantLock; | |
import java.util.concurrent.locks.ReentrantReadWriteLock; | |
/** | |
* 读写锁 | |
*/ | |
public class Demo7 { | |
public static void main(String[] args) { | |
MyDemo7 demo7 = new MyDemo7(); | |
for (int i = 0; i < 10; i++) { | |
new Thread(() -> { | |
demo7.add(Thread.currentThread().getName()); | |
}, "写线程" + i).start(); | |
new Thread(() -> { | |
demo7.toString(true); | |
}, "读线程" + i).start(); | |
} | |
} | |
} | |
class MyDemo7{ | |
private List<String> list = new ArrayList<>(); | |
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); | |
// private ReentrantLock lock = new ReentrantLock(); | |
public void add(String str){ | |
lock.writeLock().lock(); | |
try { | |
System.out.println(Thread.currentThread().getName() + ":正在添加"); | |
list.add(str); | |
System.out.println(Thread.currentThread().getName() + ":添加成功"); | |
} finally { | |
lock.writeLock().unlock(); | |
} | |
} | |
public void toString(boolean b){ | |
lock.readLock().lock(); | |
try { | |
System.out.println(Thread.currentThread().getName() + ":正在读取"); | |
System.out.println(Thread.currentThread().getName() + ":" + list); | |
System.out.println(Thread.currentThread().getName() + ":读取成功"); | |
} finally { | |
lock.readLock().unlock(); | |
} | |
} | |
} |
执行结果,主要查看与 ReentrantLock
类的区别
可以看到的是,写入线程不会被抢锁,而读线程能被其他读线程插入。
四、辅助类
在juc并发包中,还有一些辅助工具类,让我们可以更好的使用多线程。
1)CountDownLatch
package com.banmoon.utils; | |
import cn.hutool.core.date.DateUtil; | |
import cn.hutool.core.util.StrUtil; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
public class CountDownLatchTest { | |
public static void main(String[] args) throws InterruptedException { | |
CountDownLatch count = new CountDownLatch(5); | |
for (int i = 1; i <= 5; i++) { | |
new Thread(() -> { | |
try { | |
System.out.println(StrUtil.format("{}:{}", Thread.currentThread().getName(), DateUtil.now())); | |
TimeUnit.SECONDS.sleep(2); | |
count.countDown(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}, "线程"+i).start(); | |
} | |
// 等待上面几个线程完成,才能进行下面的逻辑 | |
count.await(); | |
System.out.println("主线程:" + DateUtil.now()); | |
} | |
} |
CountDownLatch
对线程进行减法计数,计算还有多少线程没有完成任务。
直到指定的线程数,到达指定位置后,才进行下一步的操作。
2)CyclicBarrier
如果上述 CountDownLatch
是通过减法计算来达到屏障,那么 CyclicBarrier
就是通过加法计算来达到屏障。
package com.banmoon.utils; | |
import cn.hutool.core.util.StrUtil; | |
import java.util.concurrent.*; | |
public class CyclicBarrierTest { | |
public static void main(String[] args) { | |
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { | |
System.out.println("======= 当前阶段已完成 ======="); | |
}); | |
new Thread(new Demo(cyclicBarrier), "线程A").start(); | |
new Thread(new Demo(cyclicBarrier), "线程B").start(); | |
new Thread(new Demo(cyclicBarrier), "线程C").start(); | |
} | |
} | |
class Demo implements Runnable{ | |
private CyclicBarrier cyclicBarrier; | |
public Demo(CyclicBarrier cyclicBarrier) { | |
this.cyclicBarrier = cyclicBarrier; | |
} | |
void run() { | |
try { | |
String threadName = Thread.currentThread().getName(); | |
System.out.println(StrUtil.format("{}:准备就绪", threadName)); | |
TimeUnit.SECONDS.sleep(3); | |
// 等待所有线程都就绪 | |
cyclicBarrier.await(); | |
System.out.println(StrUtil.format("{}:上台发言", threadName)); | |
TimeUnit.SECONDS.sleep(3); | |
// 等待所有线程都上台发言 | |
cyclicBarrier.await(); | |
System.out.println(StrUtil.format("{}:散会回家", threadName)); | |
TimeUnit.SECONDS.sleep(1); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
与 CountDownLatch
类似,当线程全部达到一个共同屏障时,从而再向下进行执行。
但和其不一样的是,CyclicBarrier
可以执行多次,设立多点屏障。出现问题导致的计数异常后,也可以重新进行计数
3)Semaphore
Semaphore
通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
比如去餐厅吃饭,没有座位需要进行等待,直到别人吃完有座位后,才能进入。
package com.banmoon.utils; | |
import java.util.concurrent.Semaphore; | |
import java.util.concurrent.TimeUnit; | |
public class SemaphoreTest { | |
public static void main(String[] args) { | |
Semaphore semaphore = new Semaphore(3); | |
for (int i = 1; i <= 6; i++) { | |
new Thread(() -> { | |
try { | |
semaphore.acquire(); | |
System.out.println(Thread.currentThread().getName() + ":排队完成,抓紧吃饭"); | |
TimeUnit.SECONDS.sleep(3); | |
System.out.println(Thread.currentThread().getName() + ":吃完了离开"); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} finally { | |
semaphore.release(); | |
} | |
}, "线程" + i).start(); | |
} | |
} | |
} |
适用于有限的资源使用,对同一个资源最多允许几个线程可以访问它。直到访问结束后才释放,让其他线程进入可以访问。
和 synchornized
包住一段代码块比较类似,但 Semaphore
允许多个线程,而前者只有一个。
五、阻塞队列
在Java线程池的讲解中,我初步的讲了阻塞队列的功能。但在此,我还是得详细讲讲,什么是阻塞队列。
阻塞队列,顾名思义就是会阻塞的队列。而队列的基本操作就只有两个,存和取。所以阻塞就此产生,有些存会发生阻塞,有些取会发生阻塞。
下面就一起来看看juc包中的阻塞队列吧
1)ArrayBlockingQueue
Array结构,没问题吧,基于数组结构实现的队列。既然叫阻塞队列,那就必然会有阻塞,有阻塞会有锁吧。简单看看源码
public class ArrayBlockingQueue<E> extends AbstractQueue<E> | |
implements BlockingQueue<E>, java.io.Serializable { | |
// 内容存在这的 | |
final Object[] items; | |
// 锁,保护所有的访问 | |
final ReentrantLock lock; | |
// 关于取操作的 Conditionprivate final Condition notEmpty; | |
// 关于存操作的 Conditionprivate final Condition notFull; | |
// 构造方法,传入一个容量。还有两个重载,具体自己看源码吧public ArrayBlockingQueue(int capacity) { | |
this(capacity, false); | |
} | |
// 取数public E take() throws InterruptedException { | |
final ReentrantLock lock = this.lock; | |
// 上锁lock.lockInterruptibly(); | |
try { | |
// 判断个数是否为0,是则阻塞进行等待while (count == 0) | |
notEmpty.await(); | |
// 返回取数结果return dequeue(); | |
} finally { | |
// 解锁lock.unlock(); | |
} | |
} | |
// 存数 | |
public void put(E e) throws InterruptedException { | |
// 检查元素是否为空,为空则会抛出空指针异常 | |
checkNotNull(e); | |
final ReentrantLock lock = this.lock; | |
// 上锁lock.lockInterruptibly(); | |
try { | |
// 如果个数已经和队列容量相等了,则阻塞进行等待while (count == items.length) | |
notFull.await(); | |
// 存数 | |
enqueue(e); | |
} finally { | |
// 解锁lock.unlock(); | |
} | |
} | |
} |
ArrayBlockingQueue
有一把 Lock
锁以及它的两个 Condition
监听器,分别来对队列为空或满的时候,进行阻塞操作。
上面源码只是一小部分,dequeue
和 enqueue
方法就交给你们去看啦,不要怕,挺简单的。
测试一下,取和存之间照成的阻塞
package com.banmoon.queue; | |
import cn.hutool.core.date.DateUtil; | |
import cn.hutool.core.util.StrUtil; | |
import java.util.Date; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
public class ArrayBlockingQueueTest { | |
public static void main(String[] args) throws InterruptedException { | |
putTest(); | |
System.out.println("========= 分割线 ========="); | |
takeTest(); | |
} | |
public static void putTest() throws InterruptedException { | |
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); | |
// LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3); | |
new Thread(() -> { | |
try { | |
// 3秒后再取出 | |
TimeUnit.SECONDS.sleep(3); | |
String ele = queue.take(); | |
System.out.println(ele + "已取出:" + DateUtil.formatTime(new Date())); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
queue.put("1"); | |
System.out.println(StrUtil.format("{}已插入,{}", 1, DateUtil.formatTime(new Date()))); | |
queue.put("2"); | |
System.out.println(StrUtil.format("{}已插入,{}", 2, DateUtil.formatTime(new Date()))); | |
queue.put("3"); | |
System.out.println(StrUtil.format("{}已插入,{}", 3, DateUtil.formatTime(new Date()))); | |
queue.put("4"); | |
System.out.println(StrUtil.format("{}已插入,{}", 4, DateUtil.formatTime(new Date()))); | |
} | |
private static void takeTest() throws InterruptedException { | |
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); | |
// LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3); | |
// 其他线程插入数据for (int i = 1; i <= 3; i++) { | |
String str = StrUtil.toString(i); | |
new Thread(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(2); | |
queue.put(str); | |
System.out.println(StrUtil.format("{}已插入,{}", str, DateUtil.formatTime(new Date()))); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
} | |
// 直接取数 | |
System.out.println(queue.take() + "已取出:" + DateUtil.formatTime(new Date())); | |
System.out.println(queue.take() + "已取出:" + DateUtil.formatTime(new Date())); | |
System.out.println(queue.take() + "已取出:" + DateUtil.formatTime(new Date())); | |
} | |
} |
执行结果,不要问我为什么分割线下面取出插入搞反了,问就是打印慢了。在插入的结束的马上,就马上唤醒被阻塞的取的线程。
就这样会出现取出在前的打印问题。
2)LinkedBlockingQueue
在简单查看了 ArrayBlockingQueue
的源码,它只有一把 Lock
锁,
但 LinkedBlockingQueue
不同,它有两把锁。简单看看源码吧
public class LinkedBlockingQueue<E> extends AbstractQueue<E> | |
implements BlockingQueue<E>, java.io.Serializable { | |
// 好的出现了,静态内部类,链表形式的指针引向。一个节点对象存一个元素,同时指向下一个节点对象static class Node<E> { | |
E item; | |
Node<E> next; | |
Node(E x) { item = x; } | |
} | |
// 容量private final int capacity; | |
// 当前的元素个数private final AtomicInteger count = new AtomicInteger(); | |
// 头部节点transient Node<E> head; | |
// 最后的节点private transient Node<E> last; | |
// take、poll、etc等方法,取操作持有的锁private final ReentrantLock takeLock = new ReentrantLock(); | |
// 不为空的状态Conditionprivate final Condition notEmpty = takeLock.newCondition(); | |
// put、offer、etc等方法,存操作持有的锁private final ReentrantLock putLock = new ReentrantLock(); | |
// 容量未满的状态Conditionprivate final Condition notFull = putLock.newCondition(); | |
} |
对应的取存操作方法都差不多,这边的测试代码就不贴了。和上面的 ArrayBlockingQueue
差不多,把注释掉的代码打开换成 LinkedBlockingQueue
就好。
3)存取的4组API
在上述的阻塞队列中,只举例了一组存取的方法。不过除了这一组,还有其他存取操作的API。
这边以 ArrayBlockingQueue
为例,总结进行测试一下。
功能说明 | 存 | 取 |
队列空或者满的时候会抛出异常 | add(E e) | remove() |
满了还去存则直接返回false空了还去取就会返回null | offer(E e) | poll() |
满了还去存就会一直阻塞,直到被唤醒空了还去取就会一直阻塞,直到被唤醒 | put(E e) | take() |
满了还去存就会阻塞一段时间,超过后就返回false空了还去取就会阻塞一段时间,超过时间后就会返回null | offer(E e, long timeout, TimeUnit unit) | poll(long timeout, TimeUnit unit) |
简单测试下这四组API,可以根据自己业务需求来选择对应的API
package com.banmoon.queue; | |
import cn.hutool.core.date.DateUtil; | |
import cn.hutool.core.util.StrUtil; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
public class FourApiTest { | |
public static void main(String[] args) throws InterruptedException { | |
// test1(); | |
// test2(); | |
// test3();test4(); | |
} | |
/** | |
* 会抛出异常 | |
*/private static void test1() { | |
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2); | |
System.out.println(queue.add("A")); | |
System.out.println(queue.add("B")); | |
// System.out.println(queue.add("C"));// 队列已满,将报错 | |
System.out.println("======== 分割线 ========"); | |
System.out.println(queue.remove()); | |
System.out.println(queue.remove()); | |
// System.out.println(queue.remove());// 队列已空,将报错 | |
} | |
/** | |
* 满了还去存则直接返回false,空了还去取就会返回null | |
*/private static void test2() { | |
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2); | |
System.out.println(queue.offer("A")); | |
System.out.println(queue.offer("B")); | |
System.out.println(queue.offer("C"));// 队列已满,存失败就直接返回false | |
System.out.println("======== 分割线 ========"); | |
System.out.println(queue.poll()); | |
System.out.println(queue.poll()); | |
System.out.println(queue.poll());// 队列已空,直接返回null | |
} | |
/** | |
* 满了还去存就会一直阻塞,直到被唤醒,空了还去取就会一直阻塞,直到被唤醒 | |
* @throws InterruptedException | |
*/private static void test3() throws InterruptedException { | |
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2); | |
queue.put("A"); | |
System.out.println("A已插入"); | |
queue.put("B"); | |
System.out.println("B已插入"); | |
// queue.put("C");// 队列已满,到此将会阻塞 | |
// System.out.println("C已插入"); | |
System.out.println("======== 分割线 ========"); | |
System.out.println(queue.take()); | |
System.out.println(queue.take()); | |
// System.out.println(queue.take());// 队列已空,将会持续阻塞 | |
} | |
/** | |
* 满了还去存就会阻塞一段时间,超过后就返回false | |
* 空了还去取就会阻塞一段时间,超过时间后就会返回null | |
*/private static void test4() throws InterruptedException { | |
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2); | |
String template = "{}:{}"; | |
System.out.println(StrUtil.format(template, queue.offer("A", 2, TimeUnit.SECONDS), DateUtil.now())); | |
System.out.println(StrUtil.format(template, queue.offer("B", 2, TimeUnit.SECONDS), DateUtil.now())); | |
System.out.println(StrUtil.format(template, queue.offer("C", 2, TimeUnit.SECONDS), DateUtil.now()));// 队列已满,等待2秒后返回false | |
System.out.println("======== 分割线 ========"); | |
System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now())); | |
System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now())); | |
System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now()));// 队列已空,等待2秒后返回null | |
} | |
} |
test1
执行结果,会抛出异常
test2
执行结果,满了还去存则直接返回false,空了还去取就会返回null
test3
执行结果,可以看到程序一直都没有关闭。存的在等待位置,取的在等元素
test4
执行结果,注意看时间,不会死等
4)SynchronousQueue
这是一个比较特殊的阻塞队列,存取互相阻塞。生产者存入一个元素就马上阻塞,必须被另一个线程消费者取出这个元素,生产者才解锁。
简单测试一下
package com.banmoon.queue; | |
import cn.hutool.core.util.StrUtil; | |
import java.util.concurrent.SynchronousQueue; | |
import java.util.concurrent.TimeUnit; | |
public class SynchronousQueueTest { | |
public static void main(String[] args) { | |
SynchronousQueue<String> queue = new SynchronousQueue<>(); | |
new Thread(() -> { | |
try { | |
String name = Thread.currentThread().getName(); | |
queue.put("A"); | |
System.out.println(name + "插入A成功"); | |
TimeUnit.SECONDS.sleep(1); | |
queue.put("B"); | |
System.out.println(name + "插入B成功"); | |
TimeUnit.SECONDS.sleep(1); | |
queue.put("C"); | |
System.out.println(name + "插入C成功"); | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}, "线程A").start(); | |
new Thread(() -> { | |
try { | |
String name = Thread.currentThread().getName(); | |
String template = "{}取出了{}"; | |
System.out.println(StrUtil.format(template, name, queue.take())); | |
TimeUnit.SECONDS.sleep(1); | |
System.out.println(StrUtil.format(template, name, queue.take())); | |
TimeUnit.SECONDS.sleep(1); | |
System.out.println(StrUtil.format(template, name, queue.take())); | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}, "线程B").start(); | |
} | |
} |
执行结果,存取交替进行,没问题
现在我们把线程B的代码注释掉,让其只有生产的线程。再次查看结果
好的,被阻塞了。也就是在15行存入后,线程A直接阻塞了,要不然怎么连16行的插入成功信息都没有打印呢?
所以,我们需要进入 SynchronousQueue
的源码,简单查看下
public class SynchronousQueue<E> extends AbstractQueue<E> | |
implements BlockingQueue<E>, java.io.Serializable { | |
// 首先出现了就是一个抽象静态内部类,还有它的两个子类 | |
abstract static class Transferer<E> { | |
abstract E transfer(E e, boolean timed, long nanos); | |
} | |
// Transferer的子类,也是一个静态内部类。栈结构,后进先出static final class TransferStack<E> extends Transferer<E> {} | |
// Transferer的子类,也是一个静态内部类。队列结构,先进先出static final class TransferQueue<E> extends Transferer<E> {} | |
// 转让器private transient volatile Transferer<E> transferer; | |
// 无参构造器 | |
public SynchronousQueue() { | |
this(false); | |
} | |
// 构造器,传入是否公平,如果公平,则创建队列结构的转让器,否则创建栈结构的转让器public SynchronousQueue(boolean fair) { | |
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); | |
} | |
// 存方法public void put(E e) throws InterruptedException { | |
// 判断是否为空if (e == null) throw new NullPointerException(); | |
// 直接通过转让器来存if (transferer.transfer(e, false, 0) == null) { | |
Thread.interrupted(); | |
throw new InterruptedException(); | |
} | |
} | |
// 取方法public E take() throws InterruptedException { | |
// 又是通过转让器来取?还是同样的方法?E e = transferer.transfer(null, false, 0); | |
// 如果取到的元素为null,直接抛出打断异常if (e != null) | |
return e; | |
Thread.interrupted(); | |
throw new InterruptedException(); | |
} | |
} |
看到这,我有点蒙,看来还是逃避不过要查看 TransferStack
或者 TransferQueue
这两个内部类啊。后续开个专章来讲述源码好了。简单说下它的流程,transferer
方法
- 根据实参是否为null来判断出是哪种操作,并为其打上标记,这边分为
DATA
和REQUEST
DATA
说明是存,REQUEST
说明是取- 判断队列是否为空,为空就将此操作线程作为节点阻塞住
- 判断队列是否为空,不为空就判断队尾的节点模式和此操作的模式是否匹配
- 匹配的意思是指,一个
DATA
操作就要匹配一个REQUEST
- 如果模式相等,没有匹配上,就将此操作线程作为节点阻塞住
- 如果模式不相等,可以匹配上,则将
DATA
操作的元素交给REQUEST
,然后进行消除。
TransferStack
和TransferQueue
的区别在与 判断队尾节点的模式是否相等,TransferQueue
是将头部的节点进行匹配消除,而TransferStack
全部都是队尾的节点。这也体现的队列的栈的区别,一个是先进先出,一个是后进先出。 源码内部还使用到了CAS自旋锁,计划出一章关于锁类型的文章。
5)LinkedTransferQueue
如果理解了前面的 SynchronousQueue
的话,那么 LinkedTransferQueue
就很好理解了。
简单来说,SynchronousQueue
中的 TransferQueue
直接维护在 LinkedTransferQueue
里面,少了一层抽象内部类。
如果 SynchronousQueue
存取操作都会阻塞,只有配对上才会唤醒。那么 LinkedTransferQueue
做了一定的简化和增强,其中一项就是可以自己决定是否阻塞存取操作。
一起来测试一下吧
package com.banmoon.queue; | |
import cn.hutool.core.date.DateUtil; | |
import cn.hutool.core.util.StrUtil; | |
import java.util.Date; | |
import java.util.concurrent.LinkedTransferQueue; | |
import java.util.concurrent.TimeUnit; | |
public class LinkedTransferQueueTest { | |
public static void main(String[] args) throws InterruptedException { | |
test1(); | |
// test2(); | |
// test3(); | |
} | |
/** | |
* 效果与SynchronousQueue基本一致 | |
* transfer插入时,没有匹配的取操作则会阻塞 | |
* @throws InterruptedException | |
*/private static void test1() throws InterruptedException { | |
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>(); | |
new Thread(() -> { | |
try { | |
for (int i = 0; i < 3; i++) { | |
String str = queue.take(); | |
TimeUnit.SECONDS.sleep(1); | |
System.out.println(StrUtil.format("取到了{},{}", str, DateUtil.formatTime(new Date()))); | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
queue.transfer("A"); | |
System.out.println("插入了A"); | |
queue.transfer("B"); | |
System.out.println("插入了B"); | |
queue.transfer("C"); | |
System.out.println("插入了C"); | |
} | |
/** | |
* 存元素时 | |
* 如果有取操作阻塞的话,则进行匹配,返回true | |
* 没有取操作的话,不阻塞,直接返回false | |
*/private static void test2() throws InterruptedException { | |
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>(); | |
for (int i = 0; i < 3; i++) { | |
new Thread(() -> { | |
try { | |
System.out.println(StrUtil.format("取到了{},{}", queue.take(), DateUtil.formatTime(new Date()))); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
} | |
// 等待三个取的操作都执行完 | |
TimeUnit.SECONDS.sleep(2); | |
System.out.println(StrUtil.format("插入A:{},{}", queue.tryTransfer("A"), DateUtil.formatTime(new Date()))); | |
System.out.println(StrUtil.format("插入B:{},{}", queue.tryTransfer("B"), DateUtil.formatTime(new Date()))); | |
System.out.println(StrUtil.format("插入C:{},{}", queue.tryTransfer("C"), DateUtil.formatTime(new Date()))); | |
} | |
/** | |
* 存操作时,没有取操作匹配,将会等待一段时间再进行返回 | |
* @throws InterruptedException | |
*/private static void test3() throws InterruptedException { | |
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>(); | |
new Thread(() -> { | |
for (int i = 0; i < 3; i++) { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
System.out.println(StrUtil.format("取到了{},{}", queue.take(), DateUtil.formatTime(new Date()))); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
}).start(); | |
System.out.println(StrUtil.format("插入A:{},{}", queue.tryTransfer("A", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date()))); | |
System.out.println(StrUtil.format("插入B:{},{}", queue.tryTransfer("B", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date()))); | |
System.out.println(StrUtil.format("插入C:{},{}", queue.tryTransfer("C", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date()))); | |
} | |
} |
执行 test1
,transfer
方法插入的操作会阻塞,直到有取的操作进入相匹配,这效果和 SynchronousQueue
一样
执行 test2
。简单来说,存元素时,如果已有取操作阻塞了,将返回true。否则,直接返回false
执行 test3
,存操作时,没有取操作匹配,将会等待一段时间再进行返回
5)PriorityBlockingQueue
PriorityBlockingQueue
优先级阻塞队列,存元素时不会阻塞,取元素时为空则阻塞。和其他阻塞队列相比,他的数组维护了一个二叉堆,对元素进行了优先级的排序。
测试下列代码,自定义实现比较器,这边就按照数字大小排序
package com.banmoon.queue; | |
import java.util.Comparator; | |
import java.util.concurrent.PriorityBlockingQueue; | |
public class PriorityBlockingQueueTest { | |
public static void main(String[] args) throws InterruptedException { | |
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<Integer>(3, new MyComparator()); | |
queue.put(1); | |
queue.put(3); | |
queue.put(2); | |
System.out.println(queue.take()); | |
System.out.println(queue.take()); | |
System.out.println(queue.take()); | |
} | |
} | |
class MyComparator implements Comparator<Integer> { | |
int compare(Integer o1, Integer o2) { | |
if(o1==o2) | |
return 0; | |
return o1>o2? -1: 1; | |
} | |
} |
执行结果
6)LinkedBlockingDeque
这个和 LinkedBlockingQueue
好像,我可以称其为 LinkedBlockingQueue
的升级版。
LinkedBlockingDeque
被称为双端队列,是因为 LinkedBlockingDeque
可以往链表两头插入元素。他的存储结构是链表,同 LinkedBlockingQueue
一致,又因为存取可以在双端进行,所以不能像 LinkedBlockingQueue
一样给两把锁,这里保持了和 ArrayBlockingQueue
一样,仅有一把锁保持线程安全。
package com.banmoon.queue; | |
import java.util.concurrent.LinkedBlockingDeque; | |
public class LinkedBlockingDequeTest { | |
public static void main(String[] args) throws InterruptedException { | |
LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>(4); | |
deque.put("A"); | |
deque.put("B"); | |
deque.put("C"); | |
deque.putFirst("D"); | |
System.out.println(deque.take()); | |
System.out.println(deque.takeLast()); | |
} | |
} |
执行结果,此队列中还有其他的api,都可以指定在队首或者队尾存元素,取元素同理
六、最后
在以前,并没有熟悉去使用过并发包的东西,在这次整理后,我对并发包有一定的了解。尤其是阻塞队列这一块,在看源码的时候还是挺有意思的。
当然本文没有列出所有使用 Lock
锁的api方法,提供上jkd8在线文档,学习要对着文档。
关于本文出现的代码示例,已提交至码云,只看文章不懂时,一定要敲代码进行理解。
我是半月,祝你幸福!!!