Java并发多线程一文讲完

Java
285
0
0
2023-10-12

在接下来的15分钟里,您将学习如何通过线程、任务和执行器服务并行执行代码。

三大部分

重点:

你为什么不会用 多线程 ,因为你不懂生产者与消费者。

一般分三步:

多线程一般用来执行这样的操作。比如说取数据。去多个平台拿 几千个数据,可以分多个线程去拿。然后将取来的数据放在并发容器中。后面的处理从容器中拿数据 去处理。 相当于分了两步走。

取数据

处理数据

写数据

第1部分:线程和执行器

第2部分:同步和锁

第3部分:原子变量和ConcurrentMap

并发API最初是在Java 5发行版中引入的,然后随着每个新的 Java 发行版逐步增强。本文中展示的大多数概念也适用于较老版本的 java 。然而,我的代码示例侧重于Java 8,并大量使用lambda表达式和其他新特性。如果您还不熟悉lambdas,我建议您先阅读我的Java 8教程。

第一部分Threads and Executors

Threads and Runnables#

所有现代操作系统都通过进程和线程支持并发。进程是通常独立于彼此运行的程序的实例,例如,如果你启动一个java程序,操作系统会产生一个新的进程,它与其他程序并行运行。在这些进程中,我们可以利用线程并发地执行代码,这样我们就可以最大限度地利用CPU的可用内核。

Java从JDK 1.0开始支持线程。在启动一个新 线程 之前,您必须指定该线程要执行的代码, 通常称为任务

这是通过实现Runnable来实现的——一个函数式接口,定义了一个单独的 void no-args方法run(),如下面的例子所示:

 Runnable task = () -> {
    String threadName =  thread .currentThread().getName();
    System.out.println("Hello " + threadName);
};
    
task.run();     //main:当前执行任务的线程是main线程      直接执行run 并非开启线程

Thread thread = new Thread(task);
thread.start();  //拿到的name : Thread-           //开启线程

System.out.println("Done!"); 

[ 功能性接口 ] @FunctionalInterface

由于Runnable是一个功能性接口,我们可以利用Java 8 lambda表达式将当前线程名打印到控制台。首先,在启动新线程之前,我们在主线程上直接执行可运行对象。

控制台的结果可能是这样的:

 Hello main
Hello Thread-
Done! 

或者:

 Hello main
Done!
Hello Thread- 

由于并发执行,我们无法预测runnable是在打印’done’之前还是之后被调用。顺序是不确定的,因此在较大的应用程序中并发编程是一项复杂的任务。

线程可以在一定的时间内处于休眠状态 。这对于在本文后面的代码示例中模拟长时间运行的任务非常方便:

 Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep();
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start(); 

当您运行上面的代码时,您会注意到第一个打印语句和第二个打印语句之间有一秒钟的延迟。TimeUnit是一个用于处理时间单位的枚举。或者,你也可以通过调用Thread.sleep(1000)来达到同样的目的。

使用Thread类可能非常繁琐且容易出错。

由于这个原因,并发API早在2004年Java 5发布时就引入了。

该API位于包java.util.concurrent中,包含许多用于处理并发编程的有用类。

从那时起,随着每个新的Java版本的发布,并发API都得到了增强,甚至Java 8也提供了处理并发性的新类和方法。

现在让我们更深入地了解并发API中最重要的部分之一——执行器服务。

Executors#

并发API引入了ExecutorService的概念,作为直接使用线程的 高级替代

executor能够运行异步任务,并且通常管理 线程池 ,所以我们不必手动创建新线程。内部池的所有线程将被用于revenant任务,因此我们可以在应用程序的整个生命周期中使用单个执行器服务运行任意数量的并发任务。

下面是第一个使用executor的线程示例:

 ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
});
 
// => Hello pool--thread-1      池!! 

executor类为创建不同类型的executor服务提供了方便的工厂方法。在这个示例中,我们使用一个线程池大小为1的执行器。

结果与上面的示例类似,但在运行代码时,您会注意到一个重要的区别:java进程永远不会停止!必须显式地停止执行器,否则它们会继续监听新的任务。

ExecutorService为此提供了两个方法:shutdown()等待当前运行的任务完成,而 shutdown Now()中断所有正在运行的任务并立即关闭执行器。

这是我通常关闭executor的首选方式:

 try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
} 

执行程序通过等待一定的时间来终止当前运行的任务来轻轻地关闭。在最多5秒之后,执行程序最终通过中断所有正在运行的任务而关闭。

Callables and Futures#

除了Runnable,执行器还支持另一种名为Callable的任务。可调用对象是函数接口,就像可运行对象一样,但它们返回一个值,而不是空的。

这个 lambda表达式 定义了一个可调用对象,在休眠1秒后返回一个整数:

 Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep();
        return;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
}; 

可调用对象可以像可运行对象一样提交给executor服务。但是可调用对象的结果呢?因为submit()不会等到任务完成,所以执行器服务不能直接返回可调用对象的结果。相反,执行程序返回一个Future类型的特殊结果,可以在以后的某个时间点使用该结果检索实际结果。

 ExecutorService executor = Executors.newFixedThreadPool();
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

 Integer  result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result); 

在将可调用对象提交给执行器之后,我们首先通过isDone()检查未来对象是否已经完成了执行。我非常确定情况并非如此,因为上面的可调用对象在返回整数之前会休眠一秒钟。

调用方法get()会阻塞当前线程,并等待可调用对象完成返回实际结果123。现在,未来终于完成了,我们可以在控制台看到以下结果:

 future done? false
future done? true
result: 

future与底层执行器服务紧密耦合。请记住,如果你关闭执行器,每个未终止的future都会抛出异常:

 executor.shutdownNow();
future.get(); 

您可能已经注意到,执行程序的创建与前面的示例略有不同。我们使用newFixedThreadPool(1)来创建一个由大小为1的线程池支持的执行器服务。这相当于newSingleThreadExecutor(),但是我们稍后可以通过简单地传递一个大于1的值来增加池的大小。

Timeouts#

对future.get()的任何调用都将阻塞并等待,直到底层可调用对象被终止。在最坏的情况下,可调用对象永远运行—从而使您的应用程序无响应。你可以通过传递一个超时来抵消这些情况:

 ExecutorService executor = Executors.newFixedThreadPool();

Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep();
        return;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

future.get(, TimeUnit.SECONDS); 

执行上述代码会导致TimeoutException:

 Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:) 

您可能已经猜到了抛出此异常的原因:我们指定的最大等待时间为1秒,但可调用对象实际上需要2秒才能返回结果。

InvokeAll#

executor支持通过invokeAll()一次性批量提交多个可调用对象。此方法接受一组可调用对象并返回一个executor支持通过invokeAll()一次性批量提交多个可调用对象。此方法接受一组可调用对象并返回一个futures列表。

 ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task",
        () -> "task",
        () -> "task");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    . forEach (System.out::println); 

在这个例子中,我们利用Java 8函数流来处理调用invokeAll返回的所有future。我们首先将每个future映射到它的返回值,然后将每个值打印到控制台。如果你还不熟悉流,请阅读我的Java 8流教程。

InvokeAny#

批量提交可调用对象的另一种方法是invokeAny(),它的工作原理与invokeAll()略有不同。这个方法不会返回未来的对象,而是阻塞直到第一个可调用对象终止并返回该可调用对象的结果。

为了测试这种行为,我们使用这个助手方法来模拟具有不同持续时间的可调用对象。该方法返回一个可调用对象,休眠一段时间,直到返回给定的结果:

 Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
} 
 ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    callable("task", 2),
    callable("task", 1),
    callable("task", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task 

上面的示例使用了通过newWorkStealingPool()创建的另一种执行器类型。这个工厂方法是Java 8的一部分,它返回一个 Fork JoinPool类型的执行器,其工作方式与普通的执行器略有不同。ForkJoinPools不是使用固定大小的线程池,而是根据给定的 并行度大小(默认为主机CPU的可用内核数)创建

forkjoinpool从Java 7开始就存在了,本系列后面的教程将详细介绍。让我们通过深入了解预定的执行器来结束本教程。

Scheduled Executors#

我们已经学习了如何在执行器上提交和运行任务。为了定期多次运行公共任务,我们可以利用调度的线程池。

ScheduledExecutorService能够将任务调度为周期性运行或在一定时间过后运行一次。

这个代码示例在初始延迟3秒后调度一个任务运行:

 ScheduledExecutorService executor = Executors.newScheduledThreadPool();

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task,, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep();

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay); 

调度一个任务会产生一个特定类型schedulefuture的future,除了future之外,它还提供了getDelay()方法来检索剩余的延迟。这个延迟过后,任务将并发执行。

为了定时调度任务,executor提供了两个方法scheduleAtFixedRate()和scheduleWithFixedDelay()。第一种方法能够以固定的时间速率执行任务,例如,如本例中所示的每秒一次:

 ScheduledExecutorService executor = Executors.newScheduledThreadPool();

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay =;
int period =;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS); 

此外,该方法接受一个初始延迟,该延迟描述了任务首次执行之前的起始等待时间。

请记住,scheduleAtFixedRate()并不考虑任务的实际持续时间。因此,如果您指定的周期是1秒,但任务需要2秒才能执行,那么线程池很快就会达到最大容量。

在这种情况下,你应该考虑使用scheduleWithFixedDelay()来代替。此方法的工作原理与上面描述的对应方法类似。不同的是,等待时间适用于一个任务的结束和下一个任务的开始。例如:

 ScheduledExecutorService executor = Executors.newScheduledThreadPool();

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep();
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task,, 1, TimeUnit.SECONDS); 

这个示例在一次执行的结束和下一次执行的开始之间以固定的一秒的延迟调度一个任务。初始延迟为0,任务持续时间为2秒。所以我们最终的执行间隔是0 3s 6 9等等。正如您所看到的,如果您不能预测计划任务的持续时间,那么scheduleWithFixedDelay()非常方便。

第二部分Synchronization and Locks

synchronized #

在前面的教程中,我们学习了如何通过执行器服务并行执行代码。在编写此类多线程代码时,必须特别注意从多个线程并发访问共享可变变量的情况。假设我们想增加一个整数,这个整数可以同时从多个线程访问。

我们用increment()方法定义了一个字段计数,将计数增加1:

 int count =;

void increment() {
    count = count +;
} 

当从多个线程并发调用这个方法时,我们会遇到严重的麻烦:

 ExecutorService executor = Executors.newFixedThreadPool();

IntStream.range(, 10000)
    .forEach(i -> executor.submit(this::increment));

stop(executor);

System.out.println(count);  // 

实际结果会随着上述代码的每次执行而变化,而不是看到固定的结果计数10000。原因是我们在不同的线程上共享可变变量,而没有同步对该变量的访问,这会导致竞态条件。

为了增加这个值,必须执行三个步骤:(i)读取当前值,(ii)将这个值增加1,(iii)将新值写入变量。如果两个线程并行执行这些步骤,那么两个线程可能同时执行步骤1,从而读取相同的当前值。这将导致写入丢失,因此实际结果更低。在上面的示例中,由于对count的并发非同步访问,丢失了35个增量,但是当您自己执行代码时,可能会看到不同的结果。

幸运的是,Java从早期就通过synchronized关键字支持线程同步。当增加计数时,我们可以使用synchronized来修复上述竞态条件:

 synchronized void incrementSync() {
    count = count +;
} 

当同时使用incrementSync()时,我们得到所需的结果计数10000。不再出现竞态条件,每次执行代码,结果都是稳定的:

 ExecutorService executor = Executors.newFixedThreadPool();

IntStream.range(, 10000)
    .forEach(i -> executor.submit(this::incrementSync));

stop(executor);

System.out.println(count);  // 

synchronized关键字也可以作为块语句使用。

 void incrementSync() {
    synchronized (this) {
        count = count +;
    }
} 

Java在内部使用一个所谓的监视器,也称为监视器锁或内在锁来管理同步。这个监视器绑定到一个对象,例如,当使用同步方法时,每个方法共享相应对象的同一个监视器。

所有隐式监视器都实现了可重入特征。可重入意味着锁绑定到当前线程。一个线程可以安全地多次获得同一个锁而不会陷入死锁(例如,一个同步方法调用同一个对象上的另一个同步方法)。

Locks#

与通过synchronized关键字使用隐式锁不同,Concurrency API支持 Lock 接口指定的各种显式锁。锁支持用于更细粒度的锁控制的各种方法,因此比隐式监视器更有表现力。

标准 JDK 中有多个锁实现,将在下面的部分中演示。

ReentrantLock#

类ReentrantLock是一个互斥锁,其基本行为与通过synchronized关键字访问的隐式监视器相同,但具有扩展功能。顾名思义,这个锁实现了可重入的特征,就像隐式监视器一样。

让我们看看上面的示例是如何使用

 ReentrantLock lock = new ReentrantLock();
int count =;

void increment() {
    lock.lock();
    try {
        count++;
    } finally {
        lock.unlock();
    }
} 

锁通过lock()获取,通过unlock()释放。将代码封装到try/finally块中非常重要,以确保在出现异常时解锁。这个方法是线程安全的,就像同步对等体一样。如果另一个线程已经获得了锁,随后调用lock()会暂停当前线程,直到锁被解锁。在任何给定时间,只有一个线程可以持有锁。

锁支持各种细粒度控制方法,如下面的示例所示:

 ExecutorService executor = Executors.newFixedThreadPool();
ReentrantLock lock = new ReentrantLock();

executor.submit(() -> {
    lock.lock();
    try {
        sleep();
    } finally {
        lock.unlock();
    }
});

executor.submit(() -> {
    System.out.println("Locked: " + lock.isLocked());
    System.out.println("Held by me: " + lock.isHeldByCurrentThread());
    boolean locked = lock.tryLock();
    System.out.println("Lock acquired: " + locked);
});

stop(executor); 

当第一个任务持有锁一秒钟时,第二个任务获得关于锁当前状态的不同信息:

 Locked: true
Held by me: false
Lock acquired: false 

作为lock()的替代方法,tryLock()尝试在不暂停当前线程的情况下获取锁。必须使用布尔结果来检查在访问任何共享可变变量之前是否已经实际获得了锁。

ReadWriteLock#

接口ReadWriteLock指定了另一种类型的锁,维护一对锁用于读写访问。读写锁背后的思想是,只要没有人写入可变变量,并发读取可变变量通常是安全的。因此,只要没有线程持有写锁,多个线程就可以同时持有读锁。这可以在读比写更频繁的情况下提高性能和吞吐量。

 ExecutorService executor = Executors.newFixedThreadPool();
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();

executor.submit(() -> {
    lock.writeLock().lock();
    try {
        sleep();
        map.put("foo", "bar");
    } finally {
        lock.writeLock().unlock();
    }
}); 

上面的例子首先获取写锁,以便在休眠一秒钟后给映射添加一个新值。在这个任务完成之前,有两个其他任务正在被提交,试图从映射中读取条目并休眠一秒钟:

 Runnable readTask = () -> {
    lock.readLock().lock();
    try {
        System.out.println(map.get("foo"));
        sleep();
    } finally {
        lock.readLock().unlock();
    }
};

executor.submit(readTask);
executor.submit(readTask);

stop(executor); 

当您执行这个代码示例时,您会注意到两个读任务都必须等待整整一秒,直到写任务完成。释放写锁后,两个读任务将并行执行,并将结果同时打印到控制台。它们不必彼此等待完成,因为只要没有其他线程持有写锁,就可以安全地并发获得读锁。

StampedLock#

Java 8附带了一种名为StampedLock的新型锁,它也支持读锁和写锁,就像上面的例子一样。与ReadWriteLock相比,StampedLock的锁定方法返回一个由长值表示的戳记。您可以使用这些戳记来释放锁或检查锁是否仍然有效。另外,标记锁支持另一种锁模式,称为乐观锁。

让我们重写最后一个示例代码来使用StampedLock而不是ReadWriteLock:

 ExecutorService executor = Executors.newFixedThreadPool();
Map<String, String> map = new HashMap<>();
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        sleep();
        map.put("foo", "bar");
    } finally {
        lock.unlockWrite(stamp);
    }
});

Runnable readTask = () -> {
    long stamp = lock.readLock();
    try {
        System.out.println(map.get("foo"));
        sleep();
    } finally {
        lock.unlockRead(stamp);
    }
};

executor.submit(readTask);
executor.submit(readTask);

stop(executor); 

通过readLock()或writeLock()获得读或写锁将返回一个戳记,该戳记稍后用于finally块中的解锁。请记住,经过标记的锁不会实现可重入的特征。每次对lock的调用都会返回一个新的戳记,如果没有可用的锁,则阻塞,即使同一个线程已经持有一个锁。所以你必须特别注意不要陷入死锁。

就像前面的ReadWriteLock例子一样,两个读任务都必须等待,直到写锁被释放。然后两个读任务同时打印到控制台,因为只要没有写锁,多个读任务不会互相阻塞。

下一个例子演示了乐观锁定:

 ExecutorService executor = Executors.newFixedThreadPool();
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.tryOptimisticRead();
    try {
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        sleep();
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        sleep();
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
    } finally {
        lock.unlock(stamp);
    }
});

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        System.out.println("Write Lock acquired");
        sleep();
    } finally {
        lock.unlock(stamp);
        System.out.println("Write done");
    }
});

stop(executor); 

乐观读锁是通过调用tryOptimisticRead()来获得的,它总是返回一个戳,而不会阻塞当前线程,无论锁是否实际可用。如果已经有一个激活的写锁,返回的戳记等于零。您总是可以通过调用lock.validate(stamp)来检查戳记是否有效。

执行上述代码的结果如下:

 Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false 

乐观锁在获得锁后立即有效。与普通的读锁相比,乐观锁不会阻止其他线程立即获得写锁。在将第一个线程发送到睡眠状态一秒钟后,第二个线程获得写锁,而无需等待乐观读锁被释放。从这里开始,乐观读锁不再有效。即使释放了写锁,乐观读锁仍然无效。

因此,在使用乐观锁时,您必须在每次访问任何共享可变变量后验证锁,以确保读取仍然有效。

有时,将读锁转换为写锁而不解锁和再次锁定是很有用的。StampedLock为此提供了tryConvertToWriteLock()方法,如下例所示:

 ExecutorService executor = Executors.newFixedThreadPool();
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.readLock();
    try {
        if (count ==) {
            stamp = lock.tryConvertToWriteLock(stamp);
            if (stamp ==L) {
                System.out.println("Could not convert to write lock");
                stamp = lock.writeLock();
            }
            count =;
        }
        System.out.println(count);
    } finally {
        lock.unlock(stamp);
    }
});

stop(executor); 

该任务首先获得一个读锁,并将字段count的当前值打印到控制台。但是如果当前值是0,我们想赋值一个新的值23。我们首先必须将读锁转换为写锁,以避免破坏其他线程潜在的并发访问。调用tryConvertToWriteLock()不会阻塞,但可能返回一个零标记,表示当前没有可用的写锁。在这种情况下,我们调用writeLock()来阻塞当前线程,直到有一个写锁可用。

Semaphores信号量#

除了锁之外,并发API还支持计数信号量。锁通常授予对变量或资源的独占访问权,而信号量能够维护整个许可集。这在必须限制对应用程序某些部分的并发访问量的不同场景中非常有用。

下面是一个如何限制对sleep(5)模拟的长时间运行任务的访问的例子:

 ExecutorService executor = Executors.newFixedThreadPool();

Semaphore semaphore = new Semaphore();

Runnable longRunningTask = () -> {
    boolean permit = false;
    try {
        permit = semaphore.tryAcquire(, TimeUnit.SECONDS);
        if (permit) {
            System.out.println("Semaphore acquired");
            sleep();
        } else {
            System.out.println("Could not acquire semaphore");
        }
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    } finally {
        if (permit) {
            semaphore.release();
        }
    }
}

IntStream.range(, 10)
    .forEach(i -> executor.submit(longRunningTask));

stop(executor); 

执行器可能并发运行10个任务,但我们使用的信号量为5,因此将并发访问限制为5。使用try/finally块来正确释放信号量是很重要的,即使在异常的情况下也是如此。

 Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore 

这些信号量允许访问由sleep(5)模拟的实际长时间运行的操作,最多可以访问5个。随后对tryAcquire()的每次调用都会超过1秒的最大等待超时,导致无法获得任何信号量的适当控制台输出。

第三部分Atomic Variables and ConcurrentMap

为简单起见,本教程的代码示例使用了这里定义的两个辅助方法sleep(seconds)和stop(executor)。

AtomicInteger#

包java.concurrent.atomic包含许多有用的类来执行原子操作。当您可以安全地在多个线程上并行执行操作,而不像我前面的教程中所示使用synchronized关键字或锁时,操作就是原子操作。

在内部,原子类大量使用比较和交换(CAS),这是大多数现代cpu直接支持的原子指令。这些指令通常比通过锁进行同步快得多。因此,我的建议是,如果您只需要同时更改一个可变变量,那么最好选择原子类而不是锁。

现在让我们为几个示例选择一个原子类:AtomicInteger

 AtomicInteger atomicInt = new AtomicInteger();

ExecutorService executor = Executors.newFixedThreadPool();

IntStream.range(, 1000)
    .forEach(i -> executor.submit(atomicInt::incrementAndGet));

stop(executor);

System.out.println(atomicInt.get());    // => 

通过使用AtomicInteger代替Integer,我们可以在线程安全的情况下并发地增加这个数字,而不需要同步对该变量的访问。方法incrementAndGet()是一个原子操作,因此我们可以安全地从多个线程调用该方法。

tomicInteger支持各种原子操作。updateAndGet()方法接受lambda表达式,以便对整数执行任意算术操作:

 AtomicInteger atomicInt = new AtomicInteger();

ExecutorService executor = Executors.newFixedThreadPool();

IntStream.range(, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.updateAndGet(n -> n +);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 

方法accumulateAndGet()接受另一种类型为IntBinaryOperator的lambda表达式。在下一个示例中,我们使用这个方法并发地将从0到1000的所有值加起来:

 AtomicInteger atomicInt = new AtomicInteger();

ExecutorService executor = Executors.newFixedThreadPool();

IntStream.range(, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.accumulateAndGet(i, (n, m) -> n + m);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 

LongAdder#

类LongAdder作为AtomicLong的替代方案,可以用于连续向数字添加值。

 ExecutorService executor = Executors.newFixedThreadPool();

IntStream.range(, 1000)
    .forEach(i -> executor.submit(adder::increment));

stop(executor);

System.out.println(adder.sumThenReset());   // => 

LongAdder提供了add()和increment()方法,就像原子号类一样,而且也是线程安全的。但是,这个类不是汇总单个结果,而是在内部维护一组变量,以减少线程争用。实际的结果可以通过调用sum()或sumThenReset()来检索。

sumThenReset()。

当来自多个线程的更新比读取更常见时,此类通常比原子数更可取。这是捕获统计数据时经常出现的情况,例如,你想统计web服务器上服务的请求数。longader的缺点是内存消耗较高,因为一组变量保存在内存中。

LongAccumulator#

LongAccumulator是LongAdder的一个更通用的版本。与执行简单的添加操作不同,LongAccumulator类围绕LongBinaryOperator类型的lambda表达式构建,如下面的代码示例所示:

 LongBinaryOperator op = (x, y) -> * x + y;
LongAccumulator accumulator = new LongAccumulator(op,L);

ExecutorService executor = Executors.newFixedThreadPool();

IntStream.range(, 10)
    .forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

stop(executor);

System.out.println(accumulator.getThenReset());     // => 

我们创建了一个LongAccumulator,函数为2 * x + y,初始值为1。每次调用累加(i)时,当前结果和值i都作为参数传递给lambda表达式。

LongAccumulator就像LongAdder一样在内部维护一组变量来减少线程之间的争用。

ConcurrentMap#

接口ConcurrentMap扩展了映射接口,并定义了最有用的并发集合类型之一。Java 8通过向该接口添加新方法来引入函数式编程。

 ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r", "d2");
map.put("c", "p0"); 

forEach()方法接受bicconsumer类型的lambda表达式,并将映射的键和值作为参数传递。它可以作为for-each循环的替代来遍历并发映射的条目。迭代在当前线程上按顺序执行。

 map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value)); 

putIfAbsent()方法仅在给定键不存在值的情况下才会将一个新值放入映射中。至少对于这个方法的ConcurrentHashMap实现是线程安全的,就像put()一样,所以当你从不同的线程并发访问map时,你不必同步:

 String value = map.putIfAbsent("c", "p1");
System.out.println(value);    // p 

getOrDefault()方法返回给定键的值。如果该键不存在,则返回传递的默认值:

 String value = map.getOrDefault("hi", "there");
System.out.println(value);    // there 

replaceAll()方法接受BiFunction类型的lambda表达式。BiFunctions接受两个参数并返回单个值。在这种情况下,函数被调用时带有键和每个映射条目的值,并返回一个为当前键分配的新值:

 map.replaceAll((key, value) -> "r".equals(key) ? "d3" : value);
System.out.println(map.get("r"));    // d3 

我们不替换map compute()的所有值,而是转换一个条目。该方法接受要计算的键和指定值转换的双函数。

 map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo"));   // barbar 

除了compute()之外,还存在两个变体:computeIfAbsent()和computeIfPresent()。这些方法的函数形参仅在键不存在或存在时分别被调用。

最后,可以使用merge()方法将新值与映射中的现有值统一起来。Merge接受一个键,一个要合并到现有条目中的新值,以及一个bi-function来指定两个值的合并行为:

 map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal);
System.out.println(map.get("foo"));   // boo was foo 

ConcurrentHashMap#

上面所有这些方法都是ConcurrentMap接口的一部分,因此对该接口的所有实现都可用。此外,最重要的实现ConcurrentHashMap也得到了进一步的增强,使用了两个新方法来在映射上执行并行操作。

就像并行流一样,这些方法使用一个特殊的ForkJoinPool,可通过Java 8中的ForkJoinPool. commonpool()获得。这个池使用预置的并行度,这取决于可用核的数量。在我的机器上有四个CPU内核,这导致了三个并行性:

 System.out.println(ForkJoinPool.getCommonPoolParallelism());  // 

这个值可以通过设置以下JVM参数来增加或减少:

 -Djava.util.concurrent.ForkJoinPool.common.parallelism= 

为了演示目的,我们使用了相同的示例映射,但这次我们处理的是具体实现ConcurrentHashMap,而不是接口ConcurrentMap,因此我们可以访问这个类中的所有公共方法:

 ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r", "d2");
map.put("c", "p0"); 

Java 8引入了三种并行操作:forEach、search和reduce。这些操作有四种形式,接受带键、值、项和键值对参数的函数。

所有这些方法都使用一个称为parallelismThreshold的通用参数。此阈值指示并行执行操作时的最小收集大小。例如,如果你通过了500的阈值,而map的实际大小是499,那么操作将在一个线程上顺序执行。在下一个示例中,为了演示目的,我们使用一个阈值1来强制并行执行。

ForEach#

forEach()方法能够并行地遍历映射的键-值对。使用当前迭代步骤的键和值调用bicconsumer类型的lambda表达式。为了可视化并行执行,我们将当前线程名打印到控制台。请记住,在我的情况下,底层ForkJoinPool最多使用三个线程。

 map.forEach(, (key, value) ->
    System.out.printf("key: %s; value: %s; thread: %s\n",
        key, value, Thread.currentThread().getName()));

// key: r; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-
// key: c; value: p0; thread: main 

Search#

search()方法接受一个BiFunction,返回当前键值对的非空搜索结果,如果当前迭代不匹配所需的搜索条件,则返回null。一旦返回非空结果,就会抑制进一步的处理。请记住,ConcurrentHashMap是无序的。搜索函数不应依赖于映射的实际处理顺序。如果映射的多个条目匹配给定的搜索函数,则结果可能是不确定的。

 String result = map.search(, (key, value) -> {
    System.out.println(Thread.currentThread().getName());
    if ("foo".equals(key)) {
        return value;
    }
    return null;
});
System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-
// main
// ForkJoinPool.commonPool-worker-
// Result: bar 
 String result = map.searchValues(, value -> {
    System.out.println(Thread.currentThread().getName());
    if (value.length() >) {
        return value;
    }
    return null;
});

System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-
// main
// main
// ForkJoinPool.commonPool-worker-
// Result: solo 

Reduce#

方法reduce()从Java 8 Streams中已经知道,它接受两个BiFunction类型的lambda表达式。第一个函数将每个键值对转换为任意类型的单个值。第二个函数将所有转换后的值合并为一个结果,忽略任何可能的空值。

 String result = map.reduce(,
    (key, value) -> {
        System.out.println("Transform: " + Thread.currentThread().getName());
        return key + "=" + value;
    },
    (s, s2) -> {
        System.out.println("Reduce: " + Thread.currentThread().getName());
        return s + ", " + s2;
    });

System.out.println("Result: " + result);

// Transform: ForkJoinPool.commonPool-worker-
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-
// Reduce: ForkJoinPool.commonPool-worker-
// Transform: main
// Reduce: main
// Reduce: main
// Result: r=d2, c3=p0, han=solo, foo=bar