Java 基础(十五)并发工具包 concurrent

Java
282
0
0
2023-05-27
标签   Java并发


Java 基础(十五)并发工具包 concurrent

本文目录:

  • java.util.concurrent – Java 并发包简介
  • 阻塞队列 B Lock ingQueue
  • 数组阻塞队列 ArrayBlockingQueue
  • 延迟队列 DelayQueue
  • 链阻塞队列 LinkedBlockingQueue
  • 具有优先级的阻塞队列 PriorityBlockingQueue
  • 同步队列 SynchronousQueue
  • 阻塞双端队列 BlockingDeque
  • 链阻塞双端队列 LinkedBlockingDeque
  • 并发 Map ConcurrentMap
  • 并发导航映射 ConcurrentNavigableMap
  • 闭锁 ConutDownLatch
  • 栅栏 CyclicBarrier
  • 交换机 Exchanger
  • 信号量 Semaphore
  • 执行器服务 ExecutorService
  • 线程池 执行者 ThreadPoolExecutor
  • 定时执行者服务 ScheduledExecutorService
  • 使用 ForkJoinPool 进行分叉和合并
  • 锁 Lock
  • 读写锁 ReadWriteLock
  • 原子性布尔 AtomicBoolean
  • 原子性整型 AtomicInteger
  • 原子性长整型 AtomicLong
  • 原子性引用型 AtomicReference

本章内容比较多,我自己也是边学边总结,所以拖到今天才出炉。另外,建议学习本文的小伙伴在学习的过程中,把代码 copy 出去run 一下,有助于理解。

1.java.util.concurrent Java 并发工具包

这是 Java5 添加的一个并发工具包。这个包包含了一系列能够让 Java 的并发编程变得更加简单轻松的类。在这之前,你需要自己手动去实现相关的工具类。

本文将和大家一起学习 java.util.concurrent包里的这些类,学完之后我们可以尝试如何在项目中使用它们。

2.阻塞队列 BlockingQueue

java.util.concurrent 包里面的 BlockingQueue 接口表示一个 线程 安放如和提取实例的队列。

这里我们不会讨论在 Java 中实现一个你自己的 BlockingQueue。

BlockingQueue 用法

BlockingQueue 通常用于一个线程生产对象,而另一个线程消费这些对象的场景。下图是对这个原理的阐述:


一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue.png

一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里面插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。

BlockingQueue 的方法

BlockingQueue 具有4组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

四组不同的行为方式解释:

1.抛异常:如果试图的操作无法立即执行,抛一个异常2.特定值:如果试图的操作无法立即执行,返回一个特定的值(一般是 true/false)3.阻塞:如果试图的操作无法立即执行,该方法将会发生阻塞,直到能执行4.超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定的值以告知该操作是否成功。

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 会抛出一个 NullPointerException。

可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉,那么你可以调用诸如remove(o)方法啦将队列中的特定对象进行移除。但是这么干相率并不高,因此尽量不要用这一类方法,除非迫不得已。

BlockingQueue 的实现

BlockingQueue 是个借口,你可以通过它的实现之一来使用 BlockingQueue。concurrent 包里面有如下几个类实现了 BlockingQueue:

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

Java 中使用 BlockingQueue 的例子

这是一个 Java 钟使用 BlockingQueue 的示例,本示例使用的是 BlockingQueue 借口的 ArrayBlockingQueue 实现。首先,BlockingQueueExample 类分别在两个独立的线程中启动了一个 Producer 和 Consumer 。Producer 向一个共享的 BlockingQueue 中注入字符串,而 Consumer 则会从中把它们拿出来。

 public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer). start ();
        new Thread(consumer).start();
    }
}

class Producer implements Runnable {
    private BlockingQueue<String> queue = null;

    Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("1");
            Thread. sleep (1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException ignored) {
        }
    }
}

class Consumer implements Runnable {
    private BlockingQueue queue = null;

    Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}  

这个例子很简单呐,我就不加文字描述了。

3.数组阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue 类实现了 BlockingQueue 接口。

ArrayBlockingQueue 是衣蛾有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的原色。它有一个同一时间存储元素数量的上线。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了。

ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

以下是在使用 ArrayBlockingQueue 的时候对其初始化的一个示例:

 BlockingQueue queue = new ArrayBlockingQueue(1024);
try {
    queue.put("1");
    Object object = queue.take();
} catch (InterruptedException e) {
    e.printStackTrace();
}  

4.延迟队列 DelayQueue

DelayQueue 实现了 BlockingQueue 接口DelayQueue 对元素进行持有知道一个特定的延迟到期。注入其中的元素必须实现 concurrent.Delay 接口,该接口定义:

 public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit var1);
}  

DelayQueue 将会在每个元素的 getDelay()方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。

传递给 getDelay 方法的 getDelay 实例是一个枚举型,它表明了将要延迟的时间段。

TimeUnit 枚举的取值单位都能顾名思义,这里就带过了。

上面我们可以看到 Delayed 接口继承了 Comparable 接口,这也就意味着 Delayed 对象之间可以进行对比。这个可能在对 DelayeQueue 队列中的元素进行排序时有用,因此它们可以根据过期时间进行有序释放。

以下是使用 DelayQueue 的例子:

 public class DelayQueueExample {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedElement> queue = new DelayQueue<>();
        DelayedElement element1 = new DelayedElement(1000);
        DelayedElement element2 = new DelayedElement(0);
        DelayedElement element3 = new DelayedElement(400);
        queue.put(element1);
        queue.put(element2);
        queue.put(element3);
        DelayedElement e = queue.take();
        System.out.println("e1:" + e.delayTime);
        DelayedElement e2 = queue.take();
        System.out.println("e2:" + e2.delayTime);
        DelayedElement e3 = queue.take();
        System.out.println("e3:" + e3.delayTime);
    }
}

class DelayedElement implements Delayed {
    long delayTime;
    long tamp;

    DelayedElement(long delay) {
        delayTime = delay;
        tamp = delay + System.currentTimeMillis();
    }

    @ Override 
    public long getDelay(@NonNull TimeUnit unit) {
        return tamp - System.currentTimeMillis();
//        return -1;
    }

    @Override
    public int compareTo(@NonNull Delayed o) {
        return tamp - ((DelayedElement) o).tamp > 0 ? 1 : -1;
    }
}  

运行结果:

 e1:0
e2:400
e3:1000  

在 take 取出 e2的时候,会阻塞。compareTo 决定队列中的取出顺序getDelay 决定是否能取出元素,如果无法取出则阻塞线程。

具体玩法,大家可以自行思考,我看了一下别人用DelayQueue,能玩出很多花样,在某些特定的需求很方便。

5.链阻塞队列 LinkedBlockingQueue

LinkedBlockingQueue 类也实现了 BlockingQueue接口。LinkedBlockingQueue 内部以一个链式结构对其元素进行存储。如果需要的话,这一链式结构可以选择一个上线。如果没有定义上线,将使用 Ingeter.MAX_VALUE 作为上线。

LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个。

使用方式同 ArrayBlockingQueue。

6.具有优先级的阻塞队列 PriorityBlockingQueue

PriorityBlockingQueue 类也实现了 BlockingQueue 接口。

PriorityBlockingQueue 是一个无界的并发队列。它使用了和 PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。PriorityQueue 的代码分析在集合中讲了,感兴趣的小伙伴可以回头去阅读。

所有插入到 PriorityBlockingQueue 的元素必须实现 Comparable 接口或者在构造方法中传入Comparator。

注意:PriorityBlockingQueue 对于具有相等优先级的元素并不强制任何特定的行为。

同时注意:如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator并不能保证它对元素的遍历是按照优先顺序的。原理在之前的文章中分析过~

使用方法同上。

7.同步队列 SynchronousQueue

SynchronousQueue 类实现了 BlockingQueue 接口。SynchronousQueue 是一个特殊的队列,它的内部同时只能容纳单个元素。如果该队列已有一个元素的话,试图向队列中插入一个新元素的线程将会阻塞,知道另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词,它更多像是一个汇合点。

使用方法和 ArrayBlockingQueue 一样吧,区别就是 SynchronousQueue 只能保存一个元素。

可以理解成这个,哈哈哈new ArrayBlockingQueue<>(1);

8.阻塞双端队列 BlockingDeque

BlockingDeque 接口在 concurrent 包里,表示一个线程安放入和提取实例的双端队列。

BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。

deque 是“Double Ended Queue”的缩写。因此,双端队列是一个你可以从任意一段插入或者抽取元素的队列。

BlockingDeque 的使用

在线程既是生产者又是这个队列的消费者的时候可以用到 BlockingDeque。如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这时候也可以用 BlockingDeque。BlockingDeque 图解:

一个线程生产元素,并把它们插入到队列的任意一段。如果双端队列已满,插入线程将被阻塞,知道一个移除线程从队列中移除了一个元素。

BlockingDeque 的方法

BlockingDeque 具有4组不同的方法用于插入、移除以及对双端队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

1.抛异常:如果试图的操作无法立即执行,抛一个异常

2.特定值:如果试图的操作无法立即执行,返回一个特定的值(一般是 true/false)

3.阻塞:如果试图的操作无法立即执行,该方法将会发生阻塞,直到能执行

4.超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定的值以告知该操作是否成功。

这一段文字有没有感觉特别眼熟,hahah~其实它和 BlockingQueue 一样。

BlockingDeque 继承自 BlockingQueue

BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你可以像使用一个 BlockingQueue 那样使用 BlockingDeque。如果你这么干的话,各种插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端元素移除。正如 BlockingQueue 接口的插入和移除方法一样。

BlockingDeque 的实现

既然 BlockingDeque 是一个接口,那么肯定有实现类,它的实现类不多,就一个:

  • LinkedBlockingDeque

BlockingDeque 代码示例

这个真没什么好说的。。。

 BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
String two = deque.takeLast();
String one = deque.takeFirst();  

9.链阻塞双端队列LinkedBlockingDeque

LinkedB lock ingDeque 类实现了 BlockingDeque 接口。

不想写描述了,跳过了昂~

10.并发 Map(映射)ConcurrentMap

ConcurrentMap 接口表示了一个能够对别人的访问(插入和提取)进行并发处理的 Map。ConcurrentMap 除了从其父接口 java.util.Map 继承来的方法之外还有一些额外的原子性方法。

ConcurrentMap 的实现

concurrent 包里面就一个类实现了 ConcurrentMap 接口

  • ConcurrentHashMap

ConcurrentHashMap

ConcurrentHashMap 和 HashTable 类很相似,但 ConcurrentHashMap 能提供比 HashTable 更好的并发性能。在你从中读取对象的时候,ConcurrentHashMap 并不会把整个 Map 锁住。此外,在你向其写入对象的时候,ConcurrentHashMap 也不会锁住整个 Map,它的内部只是把 Map 中正在被写入的部分锁定。 其实就是把 synchronized 同步整个方法改为了同步方法里面的部分代码。

另外一个不同点是,在被遍历的时候,即使是 ConcurrentHashMap 被改动,它也不会抛 ConcurrentModificationException。尽管 Iterator 的设计不是为多个线程同时使用。

使用例子:

 public class ConcurrentHashMapExample {

    public static void main(String[] args) {
//        HashMap<String, String> map = new HashMap<>();
        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
        map.put("1", "a");
        map.put("2", "b");
        map.put("3", "c");
        map.put("4", "d");
        map.put("5", "e");
        map.put("6", "f");
        map.put("7", "g");
        map.put("8", "h");
        new Thread1(map).start();
        new Thread2(map).start();

    }

}

class Thread1 extends Thread {

    private final Map map;

    Thread1(Map map) {
        this.map = map;
    }

    @Override
    public void run() {
        super.run();
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        map.remove("6");
    }
}

class Thread2 extends Thread {

    private final Map map;

    Thread2(Map map) {
        this.map = map;
    }

    @Override
    public void run() {
        super.run();
        Set set = map.keySet();
        for (Object next : set) {
            System.out.println(next + ":" + map.get(next));
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}  

打印结果:

 1:a
2:b
3:c
4:d
5:e
7:g
8:h  

思考题:用这个 Map 行不行?Map map = Collections.synchronizedMap(new HashMap());

哈哈哈~答案很简单,思考一下就行了。

11.并发导航映射 ConcurrentNaviagbleMap

ConcurrentNavigableMap 是一个支持并发的 NavigableMap,它还能让他的子 Map 具备并发访问的能力,所谓的“子 map”指的是诸如 headMap(),subMap(),tailMap()之类的方法返回的 map.

NavigableMap

NavigableMap 这个接口之前集合中遗漏了。这里稍微补充一下吧,首先继承关系是 NavigableMap继承自 SortedMap 继承自 Map。

SortedMap从名字就可以看出,在Map的基础上增加了排序的功能。它要求key与key之间是可以相互比较的,从而达到排序的目的。

而NavigableMap是继承于SortedMap,目前只有TreeMap和ConcurrentNavigableMap两种实现方式。它本质上添加了搜索选项到接口,主要为红黑树服务。先来了解下它新增的几个方法

主要方法

  • lowerEntry(K key)返回小于 key 的最大值的节点
  • lowerKey(K key)返回小于 key 的最大值节点的 key
  • floorEntry(K key)返回小于等于 key 的最大值节点
  • floorKey(K key)返回小于等于 key 的最大值节点 key
  • ceilingEntry(K key)返回大于等于 key 的最小节点
  • ceilingkey(K key)返回大于等于 key 的最小节点的 key
  • higherEntry(K key)返回大于 key 的最小节点
  • higherKey(K key)返回大于 key 的最小节点 key
  • firstEntry()返回最小key 节点
  • lastEntry()返回最大 key 节点
  • descendingMap()获取反序的 map
  • navigableKeySet()获取升序迭代器
  • decendingKeySet()获取降序的迭代器
  • subMap(K from,K to)截取 map
  • headMap(K toKey)截取小于等于 toKey 的 map
  • tailMao(K fromKey)截取大于等于 key 的 map

额,讲完了。。。。。就不举了吧~

12.闭锁 CountDownLatch

java.util.concurrent.CountDownLatch 是一个并发构造,它允许一个或多个线程等待一系列指定操作的完成。

CountDownLatch 以一个给定的数量初始化。countDown()每被调用一次,这一数量就建议。通过调用 await()方法之一,线程可以阻塞等待这一数量到达零。

下面是一个简单的示例,Decrementer 三次调用 countDown()之后,等待中的 Waiter 才会从 await()调用中释放出来。

 public class CountDownLatchExample {

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(3);

        Waiter waiter = new Waiter(latch);
        Decrementer decrementer = new Decrementer(latch);

        new Thread(waiter).start();
        new Thread(decrementer).start();

    }

}

class Waiter implements Runnable {

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

class Decrementer implements Runnable {

    CountDownLatch latch = null;

    Decrementer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}  

运行结果

 Waiter Released  

嗯,用法大概就是酱紫。我再给大家举个实际的例子吧~

有时候会有这样的需求,多个线程同时工作,然后其中几个可以随意并发执行,但有一个线程需要等其他线程工作结束后,才能开始。举个例子,开启多个线程分块下载一个大文件,每个线程只下载固定的一截,最后由另外一个线程来拼接所有的分段,那么这时候我们可以考虑使用CountDownLatch来控制并发。

13.栅栏 CyclicBarrier

CyclicBarrier 类是一种同步机制,它能对处理一些算法的线程实现同步。换句话说,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。

这个文字很好理解吧,没理解的把上面这段话再读一遍。

图示如下:


两个线程在栅栏旁等待对方

通过调用 CuclicBarrier 对象的 await()方法,两个线程可以实现互相等待。一旦 N 个线程在等待 CyclicBarrier 达成,所有线程将被释放掉去继续执行。

创建一个 CyclicBarrier

在创建一个 CyclicBarrier 的时候你需要定义有多少线程在被释放之前等待栅栏。创建 CyclicBarrier 示例:

 CyclicBarrier barrier = new CyclicBarrier(2);  

等待一个 CyclicBarrier

以下演示了如何让一个线程等待一个 CyclicBarrier:barrier.await();当然,你也可以为等待线程设定一个超时时间。等待超过了超时时间之后,即便还没有达成 N 个线程等待 CyclicBarrier 的条件,该线程也会被释放出来。以下是定义超时时间示例:barrier.await(10,TimeUnit.SECONDS);

当然,满足以下条件也可以让等待 CyclicBarrier 的线程释放:

  • 最后一个线程也到达 CyclicBarrier(调用 await()方法)
  • 当前线程被其他线程打断(其他线程调用了这个线程的 interrupt()方法)
  • 其他等待栅栏的线程被打断
  • 其他等待栅栏的线程因超时而被释放
  • 外部线程调用了栅栏的 CyclicBarrier.reset()方法

CyclicBarrier 行动

CyclicBarrier 支持一个栅栏行动,栅栏行动是一个 Runnable 实例,一旦最后等待栅栏的线程抵达,该实例将被执行。你可以在 CyclicBarrier 的构造方法中将 Runnable 栅栏行动传给它:

CyclicBarrier barrier = new CyclicBarrier(2,barrierAction);

CyclicBarrier 示例代码

 public class CyclicBarrierExample {

    public static void main(String[] args) {
        Runnable barrier1Action = new Runnable() {
            public void run() {
                System.out.println("BarrierAction 1 executed ");
            }
        };
        Runnable barrier2Action = new Runnable() {
            public void run() {
                System.out.println("BarrierAction 2 executed ");
            }
        };

        CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);
        CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);

        CyclicBarrierRunnable barrierRunnable1 =
                new CyclicBarrierRunnable(barrier1, barrier2);

        new Thread(barrierRunnable1).start();
        new Thread(barrierRunnable1).start();


    }

}

class CyclicBarrierRunnable implements Runnable {

    CyclicBarrier barrier1 = null;
    CyclicBarrier barrier2 = null;

    CyclicBarrierRunnable(
            CyclicBarrier barrier1,
            CyclicBarrier barrier2) {

        this.barrier1 = barrier1;
        this.barrier2 = barrier2;
    }

    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() +
                    " waiting at barrier 1");
            this.barrier1.await();

            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() +
                    " waiting at barrier 2");
            this.barrier2.await();

            System.out.println(Thread.currentThread().getName() +
                    " done!");

        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}  

思考一下程序的运行结果~

 Thread-0 waiting at barrier 1
Thread-1 waiting at barrier 1
BarrierAction 1 executed 
Thread-1 waiting at barrier 2
Thread-0 waiting at barrier 2
BarrierAction 2 executed 
Thread-0 done!
Thread-1 done!  

14.交换机 Exchanger

Exchanger 类表示一种两个线程可以进行互相交换对象的会和点。这种机制图如下:


两个线程通过一个 Exchanger 交换对象

交换对象的动作由Exchanger 的两个 exchange()方法中的其中一个完成。以下是一个示例:

 public class ExchangerExample {

    public static void main(String[]args){
        Exchanger exchanger = new Exchanger();

        ExchangerRunnable exchangerRunnable1 =
                new ExchangerRunnable(exchanger, "Thread-0数据");

        ExchangerRunnable exchangerRunnable2 =
                new ExchangerRunnable(exchanger, "Thread-1数据");

        new Thread(exchangerRunnable1).start();
        new Thread(exchangerRunnable2).start();

    }

}
 class ExchangerRunnable implements Runnable{

    Exchanger exchanger = null;
    Object    object    = null;

    ExchangerRunnable(Exchanger exchanger, Object object) {
        this.exchanger = exchanger;
        this.object = object;
    }

    public void run() {
        try {
            Object previous = this.object;

            this.object = exchanger.exchange(this.object);

            System.out.println(
                    Thread.currentThread().getName() +
                            " exchanged " + previous + " for " + this.object
            );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}  

输出结果:

 Thread-1 exchanged Thread-1数据 for Thread-0数据
Thread-0 exchanged Thread-0数据 for Thread-1数据  

当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。

在常见的 生产者-消费者 模型中用于同步数据。

15.信号量 Semaphore

Semaphore类是一个计数信号量。这就意味着它具备两个主要方法:

  • acquire()获得
  • release()释放

计数信号量由一个指定数量的“许可”初始化。每调用一次 acquire(),一个许可会被调用线程取走。没调用一次 release(),一个许可会被还给信号量。因此,在没有任何 release()调用时,最多有 N 个线程能够通过 acquire()方法,N 是该信号量初始化时的许可的指定数量。这些许可只是一个简单的计数器。没有啥奇特的地方。

Semaphore 用途

信号量主要有两种用途:

1.保护一个重要(代码)部分防止一次超过 N 个线程进入2.在两个线程之间发送信号

Semaphore 用法

如果你将信号量用于保护一个重要部分,试图进入这一部分的代码通常会首先尝试获得一个许可,然后才能进入重要部分(代码块),执行完之后,再把许可释放掉:

 Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
//重要部分代码块
semaphore.release();  

在线程之间发生信号

如果你将一个信号量用于在两个线程之间发送信号,通常你应该用一个线程调用 acquire()方法,而另一个线程调用 release()方法。

如果没有可用的许可,acquire()调用将会阻塞,知道一个许可被另一个线程释放出来。同理,如果无法往信号量释放更多许可时,一个 release()方法调用也会阻塞。

通过这个可以对多个线程进行协调。比如,如果线程1将一个对象插入到了一个共享列表(list)之后调用了 acquire(),而线程2则从该列表中获取一个对象之前调用了release(),这时你其实已经创建了一个阻塞队列。信号量中可用的许可的数量也就等同于该则是队列能够持有的元素个数。

公平

没有办法保证线程能够公平地从信号量中获得许可。也就是说,无法担保第一个调用 acquire()的线程会是第一个获得许可的线程。如果第一个线程在等待一个许可时发生阻塞,而第二个线程来索要一个许可的时候刚好有一个许可被释放出来,那么它就可能在第一个线程之前获得许可。如果需要强制公平,Semaphore 类有一个具有一个布尔类型的参数的构造子,通过这个参数以告知 Semaphore 是否要强制公平。强制公平会影响到并发性能,所以除非你确实需要它,否则不要启动它。

以下是如何在公平模式创建一个 Semaphore 的示例:

Semaphore semaphore = new Semaphore(1,ture);

更多方法

  • acquire()获取一个许可
  • availablePermits()返回当前可用许可数
  • drainPermits()获取并返回立即可用的所有许可
  • getQueueThreads()返回一个集合,包含可能等待获取的数量
  • hasQueueThreads()返回正在等待获取的线程的估计数目
  • isFair()如果此信号量的公平设置为 true,则返回 true
  • reducePermits(int)根据指定的缩减量减小可用许可的数量
  • relaese()释放一个许可

具体参考 JDK 文档吧

使用案例

 public class SemaphoreExample {

    public static void main(String[]args){
        Semaphore semaphore = new Semaphore(3);

        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();

    }

}

 class ThreadSemaphore implements Runnable{

    private final Semaphore semaphore;

     ThreadSemaphore(Semaphore semaphore){
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+"获取到锁进来");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        semaphore.release();

    }
}  

打印结果是先打印三条线程,1秒后再打印两条线程。

脑补一下,我们当年抢第一代红米手机的时候,抢购界面是不是一直在排队,是不是用信号量可以实现呢,10w 台手机,同时只允许1000个用户购买(Semaphore 的许可为1000个),然后运气好突然排队进去了(有人购买成功,释放了许可,运气好分配给了你),然后红米抢购到手。。。

16.执行器服务 ExecutorService

ExecutorService 接口表示一个异步执行机制,使我们能够在后台执行任务。因此一个 ExecutorService 很类似一个线程池。实际上,存在于 concurrent 包里的 ExecutorService 实现就是一个线程池实现。

ExecutorService 例子

以下是一个简单的 ExecutorService 例子:

 ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});
executorService.shutdown();  

首先使用 newFixedThreadPool 工厂方法创建一个 ExecutorService。这里创建了一个是个线程执行任务的线程池。然后,将一个 Runnable 接口的匿名实现类传给 execute()方法。这将导致 ExecutorService 中的某个线程执行该 Runnable。

任务委派

下图说明了一个线程是如歌将一个任务委托给一个 ExecutorService 去异步执行的:

一个线程将一个任务委派给一个 ExecutorService 去异步执行

一旦该线程将任务委派给 ExecutorService,该线程将继续它自己的执行,独立于该任务的执行。

ExecutorService 实现

既然 ExecutorService 是个接口,如果你想用它的话,还得去使用它的实现类。

  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

创建一个 ExecutorService

ExecutorService 的创建依赖于你使用的具体实现。但是你也可以使用 Executors 工厂类来创建 ExecutorService 实例。以下是几个创建 ExecutorService 实例的例子:

 ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newScheduledThreadPool(10);  

ExecutorService 使用

有几种不同的方式来将任务委托给 ExecutorService 去执行:

  • execute(Runnable)
  • submit(Runnable)
  • submit(Callable)
  • invokeAny(Collection)
  • invokeAll(Collection)

接下来我们来一个一个看这些方法

  • execute(Runnable)

execute 方法要求一个 Runnable 对象,然后对它进行异步执行。以下是使用 ExecutorService 执行一个 Runnable 的示例:

 ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("Asynchronous task");
    }
});
System.out.println("Asynchronous task");
executorService.shutdown();  

没有办法得知被执行的 Runnable 的执行结果。如果需要的话,得使用 Callable

  • submit(Runnable)

sunmit 方法也要求一个 Runnable 实现类,但它返回一个 Future 对象。这个 Future 对象可以用来检查 Runnable 是否已经执行完毕。

以下是 ExecutorService submit 示例:

 ExecutorService executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});
future.get(); //returns null if the task has finished correctly.  
  • submit(Callable)
  • submit(Callable)方法类似于 submit(Runnable)方法,除了它所要求的参数类型之外。Callable 实例除了它的 call()方法能够返回一个结果之外和一个 Runnable 很像。Runnable.run()不能返回一个结果。

Callable 的结果可以通过 submit(Callable)方法返回的 Future 对象进行获取。以下是一个 ExecutorService Callable 示例:

 ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Object> future = executorService.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println("Asynchronous Callable");
        return "Callable Result";
        }
    });
System.out.println("future.get() = " + future.get());  

以上代码输出:

 Asynchronous Callable
future.get() = Callable Result  

注意:future.get()会阻塞线程直到 Callable 执行结束。你可以把这个当成是一个有返回值的线程。

  • invokeAny()

invokeAny()方法要求一系列的 Callable 或者其子接口的实例对象。调用这个方法并不会返回一个 Future,但它返回其中一个 Callable 对象的结果。无法保证返回的是哪个 Callable 的结果,只能表明其中一个已经执行结束。

 ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> set = new HashSet<>();
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});

set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});
String result = executorService.invokeAny(set);
System.out.println("result = " + result);
executorService.shutdown();  

执行结果就不看了,自行测试吧

 ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> set = new HashSet<>();
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});

set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});
List<Future<String>> list = executorService.invokeAll(set);
for (Future<String> future : list)
    System.out.println("result = " + future.get());
executorService.shutdown();  

执行结果自行测试。。。

ExecutorService 关闭

使用完 ExecutorService 之后,应该将其关闭,以使其中的线程不再允许。比如,如果你的应用是通过一个 main 方法启动的,之后 main 方法退出了你的应用,如果你的应用有一个活动的 ExecutorService,它将还会保持运行。ExecutorService 里的活动线程阻止了 JVM 的关闭。要终止 ExecutorService 里的线程,你需要调用 ExecutorService 的 shutdown 方法。

ExecutorService 并不会立即关闭,但它将不再接受新的任务,而且一旦所有线程都完成了当前任务的时候,ExecutorService 将会关闭。在 shutdown 被调用之前所有提交给ExecutorService 的任务都被执行。如果你想立即关闭 ExecutorService,你可以调用 shutdownNow 方法,这样会立即尝试停止所有执行中的任务,并忽略掉那些已提交但尚未开始处理的任务。无法保证执行任务的正确执行。可能它们被停止了,也可能已经执行结束。

17.线程池执行者 ThreadPoolExecutor

ThreadPoolExecutor 是 ExecutorService 接口的一个实现。ThreadPoolExecutor 使用其内部池中的线程执行给定任务(Callable 或者 Runnable)。ThreadPoolExecutor 包含的线程池能够包含不同数量的线程,池中线程的数量由以下变量决定:

  • corePoolSize
  • maximumPoolSize
  • 当一个任务委托给线程池时,如果池中线程数量低于 corePoolSize,一个新的线程将被创建,即使池中可能尚未有空闲线程。
  • 如果内部任务队列已满,而且有至少 corePoolSize 正在运行,但是运行线程的数量低于 maximumPoolSize,一个新的线程将被创建去执行该任务。

ThreadPoolExecutor 图解:

一个ThreadPoolExecutor

创建一个 ThreadPoolExecutor

ThreadPoolExecutor 有若干个可用构造方法。比如:

 int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 5000;
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
            maxPoolSize,
            keepAliveTime,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());  

但是,除非你确实需要显式为 ThreadPoolExecutor 定义所有参数,使用 Executors 类中的额工厂方法之一会更加方便。

18.定时执行者服务 ScheduleExecutorService

ScheduleExecutorService 是一个 ExecutorService,它能够将任务延后执行,或者间隔固定时间多次执行。任务由一个工作者线程异步执行,而不是由提交任务给 ScheduleExecutorService 的那个线程执行。

ScheduleExecutorService 例子

以下是一个简单的 ScheduleExecutorService 示例:

 ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(5);
scheduledExecutorService.schedule(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println("Executed!");
        return "Called!";
    }
}, 5, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();  

首先一个内置5个线程的 ScheduleExecutorService 被创建,之后一个 Callable 接口的匿名类示例被创建然后传递给 schedule()方法。后边的两参数定义了 Callable 将在5秒钟之后被执行。

ScheduleExecutorService 实现

既然是一个接口,要使用它的话就得使用 concurrent 包下的实现类

  • ScheduleThreadPoolExecutor

创建一个 ScheduleExecutorService

如何创建一个 ScheduleExecutorService,取决于你采用它的实现类。但是你也可以使用 Executors 工厂类来创建一个 ScheduleExecutorService 实例。比如:

 ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(5);  

ScheduleExecutorService 使用

一旦你创建了一个 ScheduleExecutorService,你可以通过调用它的以下方法:

  • shcedule(Callable task,long delay,TimeUnit timeunit)
  • shcedule(Runnable task,long delay,TimeUnit timeunit)
  • shceduleAtFixedRate(Runnable task,long initialDelay,long period,TimeUtil timeutil)
  • shceduleWithFixedDelay(Runnable task,long initialDelay,long period,TimeUtil timeutil)

下面我们就简单看一下这些方法。

  • schedule(Callable task,long delay,TimeUnit timeUnit)
  • 这个方法计划指定的 Callable 在给定的延迟之后执行。
  • 这个方法返回一个 ScheduleFuture,通过它你可以在它被执行前对它进行取消,或者在它执行之后获取结果。

以下是一个示例:

 ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(5);
ScheduledFuture<Object> schedule = scheduledExecutorService.schedule(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println("Executed!");
        return "Called!";
    }
}, 5, TimeUnit.SECONDS);
System.out.println(schedule.get());
scheduledExecutorService.shutdown();  

输出结果:

 Executed!
Called!  
  • shcedule(Runnable task,long delay,TimeUnit timeUnit)

除了 Runnable 无法返回一个结果之外,这一方法工作起来就像一个 Callable 作为一个参数的那个版本的方法一样,因此 ScheduleFuture.get()在任务执行结束之后返回 null。

  • scheduleAtFixedRate(Runnable,long initialDelay,long period,TimeUnit tomeUnit)这一方法规划一个任务将被定期执行。该任务将会在某个 initialDelay 之后得到执行,然后每个 period 时间之后重复执行。如果给的任务的执行抛出了异常,该任务将不再执行。如果没有任何异常的话,这个任务将会持续循环执行到 ScheduleExecutorService 被关闭。如果一个任务占用了比计划的时间间隔更长的时候,下一次执行将在当前执行结束执行才开始。计划任务在同一时间不会有多个线程同时执行。
  • scheduleWithFixedDelay(Runnable r,long initalDelay,long period,TimeUnit timeUnit)

除了 period 有不同的解释之外这个方法和 scheduleAtFixedRate()非常像。scheduleAtFixedRate()方法中,period 被解释为前一个执行的开始和下一个执行的开始之间的间隔时间。而在本方法中,period 则被解释为前一个执行的结束和下一个执行开始之间的间隔。

ShceduleExecutorService 关闭

正如 ExecutorService,在你使用结束之后,你需要吧 ScheduleExecutorService 关闭掉。否则他将导致 JVM 继续运行,即使所有其他线程已经全部被关闭。你可以从 ExecutorService 接口继承来的 shutdown()或 shutdownNow()方法将 ScheduleExecutorService 关闭。

19.使用 ForkJoinPool 进行分叉和合并

ForkJoinPool 在 Java7中被引入。它和 ExecutorService 很相似,除了一点不同。ForkJoinPool 让我们可以很方便把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。可能听起来有点抽象,因此本节中我们将会解释 ForkJoinPool 是如何工作的,还有任务分割是如何进行的。

分叉和合并解释

在我们开始看 ForkJoinPool 之前,我们先来简要解释一下分叉和合并的原理。分叉和合并原理包含两个递归进行的步骤。两个步骤分别是分叉步骤和合并步骤。

  • 分叉

一个使用了分叉和合并原理的任务可以将自己分叉(分割)为更小的子任务,这些子任务可以被并发执行。如下图所示:

分叉

通过把自己分割成多个子任务,每个子任务可以由不同的 CPU 并发执行,或者被同一个 CPU 上的不同线程执行。

只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一点的开销,因此对于小型任务,这个分割的消耗可能比每个子任务并发执行的消耗还要大。

什么时候把一个任务分割成子任务是有意义的,这个界限也称作一个阈值。折腰看每个任务对有意义阈值的决定。很大程度取决于它要做的工作的种类。

  • 合并

当一个任务将自己分割成若干子任务之后,该任务将进入等待所有子任务的结束之中。一旦子任务执行结束,该任务可以把所有结果合并到同一结果。图示如下:

合并

当然,并非所有类型的任务都会返回一个结果。如果这个任务并不返还一个结果,它只需等待所有子线程执行完毕。也就不需要结果合并。

ForkJoinPool

ForkJoinPool 是一个特殊的线程池,她的设计是为了更好的配合 分叉-合并 任务分割的工作。ForkJoinPool 也在 concurrent 包中。

可以通过其构造方法创建一个 ForkJoinPool。 ForkJoinPool 构造函数的参数定义了 ForkJoinPool 的并行级别,并行级别表示分叉的线程或 CPU 数量。

创建示例:ForkJoinPool forkJoinPool = new ForkJoinPool(4);

提交任务到 ForkJoinPool

就像提交任务到 ExecutorService那样,把任务提交到 ForkJoinPool。你可以提交两种类型的任务。一种是没有任何返回值的,另一种是有返回值的。这两周任务分别有 RecursiveAction 和 RecursiveTask 表示。接下来介绍如何使用这两种类型的任务,以及如何对它们进行提交。

RecursiveAction

RecursiveAction 是一种没有返回值的任务。它只是做一些工作,比如写数据到磁盘,然后就退出了。一个 RecursiveAction 可以把自己的工作分割成更小的几块,这样它们可以由独立的线程或者 CPU 执行。你可以通过集成来实现一个 RecursiveAction。示例如下:

 public class RecursiveActionExample {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(40);
        MyRecursiveAction myRecursiveAction = new MyRecursiveAction(240);
        forkJoinPool.invoke(myRecursiveAction);

    }
}

class MyRecursiveAction extends RecursiveAction {
    private long workLoad = 0;

    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected void compute() {
        //if work is above threshold, break tasks up into smaller tasks
        if (this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);
            List<MyRecursiveAction> subtasks =
                    new ArrayList<>();
            subtasks.addAll(createSubtasks());
            for (RecursiveAction subtask : subtasks) {
                subtask.fork();
            }
        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
        }
    }

    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks =
                new ArrayList<>();
        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
        subtasks.add(subtask1);
        subtasks.add(subtask2);
        return subtasks;
    }
}  

例子跟简单。MyRecursiveAction 将一个虚构的 workLoad 作为参数传给自己的构造方法。如果 wrokLoad 高于一个特定的阈值,该工作将分割为几个子工作,子工作继续分割。如果 workLoad 高于一个特定阈值,该工作将被分割为几个子工作,子工作继续分割。如果 workLoad 低于特定阈值,该工作将有 MyRecursiveAction 自己执行。

运行结果:

 Splitting workLoad : 240
Splitting workLoad : 120
Splitting workLoad : 120
Splitting workLoad : 60
Splitting workLoad : 60
Splitting workLoad : 60
Splitting workLoad : 30
Splitting workLoad : 30
Splitting workLoad : 30
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 60
Splitting workLoad : 30
Doing workLoad myself: 15  

RecursiveTask

RecursiveTask 是一种会返回结果的任务。它可以将自己的工作分割为若干更小任务,并将这些子任务的执行结果合并到一个集体结果。可以有几个水平的分割和合并。以下是一个 RecursiveTask 示例:

 public class RecursiveTaskExample {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(40);
        MyRecursiveTask myRecursiveAction = new MyRecursiveTask(240);
        Object invoke = forkJoinPool.invoke(myRecursiveAction);
        System.out.println("mergedResult = " + invoke);
    }
}

class MyRecursiveTask extends RecursiveTask<Long> {

    private long workLoad = 0;

    public MyRecursiveTask(long workLoad) {
        this.workLoad = workLoad;
    }

    protected Long compute() {

        //if work is above threshold, break tasks up into smaller tasks
        if (this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);

            List<MyRecursiveTask> subtasks =
                    new ArrayList<>();
            subtasks.addAll(createSubtasks());

            for (MyRecursiveTask subtask : subtasks) {
                subtask.fork();
            }

            long result = 0;
            for (MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
            }
            return result;

        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
            return workLoad * 3;
        }
    }

    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
                new ArrayList<>();

        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }
}  

注意是如何通过 ForkJoinPool.invoke()方法的调用来获取最终执行结果的。

运行结果:

 Splitting workLoad : 240
Splitting workLoad : 120
Splitting workLoad : 120
Splitting workLoad : 60
Splitting workLoad : 30
Doing workLoad myself: 15
Splitting workLoad : 60
Splitting workLoad : 30
Doing workLoad myself: 15
Splitting workLoad : 30
Splitting workLoad : 60
Splitting workLoad : 30
Splitting workLoad : 60
Doing workLoad myself: 15
Splitting workLoad : 30
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
mergedResult = 720  

ForkJoinPool 评论

貌似并非每个人都对 Java7里面的 ForkJoinPool 满意,也就是说,这里面会有坑,在你计划在自己的项目里使用 ForkJoinPool 之前最好阅读一下这篇文章《一个 Java 分叉-合并 带来的灾祸》

haha…文章是英文版本的,可以用浏览器插件翻译,或者自行百度吧。

20.锁 Lock

Lock 是一个类似于 Synchronized 块的线程同步机制。但是 Lock 比 Synchronized 块更加灵活、精细。

Lock 例子

既然 Lock 是一个接口,在程序中总需要使用它的实现类之一来使用它。以下是一个简单示例:

 Lock lock = new ReentrantLock();
lock.lock();
//同步代码
lock.unLock();  

首先创建了一个 Lock 对象。之后调用了它的 lock()方法。这时候这个 lock 实例就被锁住啦。任何其他再过来调用 lock()方法的线程将会被锁阻塞住,直到锁定 lock 线程的实例的线程调用了 unlock()方法。最后 unlock()被调用了,lock 对象解锁了,其他线程可以对它进行锁定了。

Lock 实现

concurrent 包下 Lock 的实现类如下:

  • ReentrantLock

Lock 和 Synchronized 代码块的主要不同点

  • Synchronized 代码块不能够保证进入访问等待的线程的先后顺序
  • 你不能传递任何参数给一个 Synchronized 代码块的入口。因此,对于 Synchronized 代码块的访问等待设置超时时间是不可能的事情。
  • Synchronized 块必须被完整地包含在单个方法里。而一个 Lock 对象可以把它的 lock()和 unLock()方法的调用放在不同的方法里。

Lock 的方法

Lock 接口主要有以下几个方法

  • lock()
  • lockInterruptibly()
  • tryLock()
  • tryLock(long timeout,TimeUnit unit)
  • unLock()

lock()将 Lock 实例锁定。如果该 Lock 实例已被锁定,调用lock()方法的线程将会被阻塞,直到 Lock 实例解锁。lockInterruptibly()方法将会被调用线程锁定,除非该线程将被打断。此外,如果一个线程在通过这个方法来锁定 Lock 对象时进入阻塞等待,而它被打断了的话,该线程将会退出这个方法调用。tryLock()方法视图立即锁定 Lock 实例。如果锁定成功,它将返回 true,如果 Lock 实例已经被锁定,则返回 false。这一方法用不阻塞。tryLock(long timeout,TimeUnit unit)的工作类似于 tryLock()方法,除了它在放弃锁定 Lock 之前等待一个给定的超时时间之外。unlock()方法对 Lock 实例解锁。一个 Lock 实现将只允许锁定了该对象的线程来调用此方法。其他线程对 unlock()方法调用将会抛出异常。

21.读写锁 ReadWriteLock

读写锁是一种先进的线程锁机制。它能够允许多个线程在同一时间对某特定资源进行读取,但同一时间内只能有一个线程对其进行写入。读写锁的理念在于多个线程能够对一个共享资源进行读取,而不会导致并发问题。并发问题的发生场景在于对一个共享资源的读和写操作的同时进行,或者多个读写操作并发进行。

ReadWrite Lock 锁规则

一个线程在对受保护资源在读或者写之前对 ReadWriteLock 锁定的规则如下:

  • 读锁:如果没有任何写操作线程锁定 ReadWriteLock,并且没有任何写操作线程要求一个写锁(但还没有获得该锁)。因此,可以有多个读操作线程对该锁进行锁定。
  • 写锁:如果没有任何读操作或者写操作。因此,在写操作的时候,只能有一个线程对该锁进行锁定。

ReadWriteLock 实现

ReadWriteLock 是个接口,如果你想使用它的话就得去使用它的实现类之一。concurrent 包提供了一个实现类:

  • ReentrantReadWriteLock

ReadWriteLock 代码示例以下是 ReadWriteLock 的创建以及如何使用它进行读、写锁定的简单示例代码:

 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readWriteLock.readLock().locl();
//干点事情
readWriteLock.readLock().unlock();

ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readWriteLock.writeLock().locl();
//干点事情
readWriteLock.writeLock().unlock();  

注意如何使用 ReadWriteLock 对两种锁示例的持有。一个对读访问进行保护,一个对写访问进行保护。

当然,这里的“读写”你可以根据需求灵活变化。

22.原子性布尔 AtomicBoolean

AtomicBoolean 类为我们提供了一个可以用原子方式进行读和谐的布尔值,它还拥有一些先进的原子性操作,比如 compareAndSet()。AtomicBoolean 类位于 concurrent.atomic 包。

创建一个 AtomicBoolean

你可以这样创建一个 AtomicBoolean。AtomicBoolean atomicBoolean = new AtomicBoolean();以上示例新建了一个默认值为 false 的 AtomicBoolean。如果你想要为 AtomicBoolean 示例设置一个显示的初始值,那么你可以将初始值传给 AtomicBoolean 的构造参数。AtomicBoolean atomicBoolean = new AtomicBoolean(true);

获取 AtomicBoolean 的值

你可以通过使用 get()方法来获取一个 AtomicBoolean 的值。示例如下:

 AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean value = atomicBoolean.get();  

以上代码执行后 value 的值为 true。

设置 AtomicBoolean 的值

你可以通过 set() 方法来设置 AtomicBoolean 的值:

 AtomicBoolean atomicBoolean = new AtomicBoolean(true);
atomicBoolean.set(false);  

交互 AtomicBoolean 的值

你可以通过 getAndSet()方法来交换一个 AtomicBoolean 实例的值。getAndSet()方法将返回 AtomicBoolean 当前的值,并将为 AtomicBoolean 设置一个新值,示例如下:

 AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean oldValue = atomicBoolean.getAndSet(false);  

以上代码执行后 oldValue 变量的值为 true,atomicBoolean 实例将持有 false 值,代码成功将 AtomicBoolean 当前值 true 交换为 false。

比较并设置 AtomicBoolean 的值

compareAndSet()方法允许你对 AtomicBoolean 的当前值与与一个期望值进行比较,如果当前值等于期望值的话,将会对 AtomicBoolean 设定一个新值。compareAndSet()方法是原子性质的,因此在同一时间之内有耽搁线程执行她。因此 compareAndSet()方法可被用于一些类似于锁的同步的简单实现。以下是一个 compareAndSet()示例:

 AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean expectedValue = true;
boolean newVaule = false;
boolean wasNewValueSet = atomicBoolean.compareAndSet(expectedValue,newValue);  

本示例针对 AtomicBoolean 的当前值与 true 值进行比较,如果相等,将 AtomicBoolean 的值更新为 false。

有什么用?

可能有些小伙伴到这里还是有点懵逼,这个原子布尔到底有什么用,给大家看一个示例代码:

 class XxxService {

    private static AtomicBoolean initState = new AtomicBoolean(false);
    private static AtomicBoolean initFinish = new AtomicBoolean(false);
    private static XxxService instance;

    private XxxService() {
    }

    public static XxxService getInstance() {
        if (initState.compareAndSet(false, true)) {
            //TODO 写初始化代码
            initFinish.set(true);
        }
        while(!initFinish.get()){
            Thread.yield();
        }

        return instance;
    }
}  

假如程序需要在多线程的情况下初始化一个类,并且保证只初始化一次,完美解决并发问题。

23.原子性整形 AtomicIngteger

同22,略

24.原子性长整型 AtomicBooleanLong

同22,略

25.原子性引用型 AtomicReference

AtomicReference 提供了一个可以被原子性读和写的对象引用变量。原子性的意思是多个想要改变同一个 AtomicReference 的线程不会导致 AtomicReference 处于不一致的状态。AtomicReference 还有一个 compareAndSet()方法,通过它你可以将当前引用于一个期望值(引用)进行比较,如果相等,在该 AtomicReference 对象内部设置一个新的引用。

创建一个 AtomicReference

创建 AtomicReference 如下:AtomicReference atomicReference = new AtomicReference();如果你需要使用一个指定引用创建 AtomicReference,可以:String initialReference = “the initialyl reference string”; AtomicReference atomicReference = new AtomicReference(initialReference);

创建泛型 AtomicReference

你可以使用 Java 泛型来创建一个泛型 AtomicReference。示例:AtomicReference<String> atomicReference = new AtomicReference();你也可以为泛型 AtomicReference 设置一个初始值。示例:String initialReference = “the initialyl reference string”; AtomicReference<String> atomicReference = new AtomicReference<>(initialReference);

获取 AtomicReference 引用

你可以通过 AtomicReference 的 get()方法来获取保存在 AtomicReference 里的引用.

设置 AtomicReference 引用

AtomicReference.set(V newValue);

比较并设置 AtomicReference 引用

使用 compareAndSet()

和 volatile 关键字的区别

敲黑板!!!

Atomic 和 volatile的区别很简单,Atomic 保证读写操作同步,但是 volatile 只保证写的操作,并没有保证读的操作同步。

具体原理牵涉到虚拟机的层次了,感兴趣的小伙伴可自行学习。