Java 高并发之设计模式

Java
273
0
0
2023-10-07

优质文章,及时送达

作者:大道方圆

链接:cnblogs.com/xdecode/p/9137793.html

本文主要讲解几种常见并行模式, 具体目录结构如下图.

Java 高并发之设计模式

单例

单例是最常见的一种设计模式, 一般用于全局对象管理, 比如xml配置读写之类的.

一般分为懒汉式, 饿汉式.

懒汉式: 方法上加synchronized

 public static synchronized Singleton getInstance { 
    if (single == ) { 
        single = new Singleton; 
    } 
    return single; 
} 

这种方式, 由于每次获取示例都要获取锁, 不推荐使用, 性能较差

懒汉式: 使用双检锁 + volatile

 private volatile Singleton singleton = ; 
public static Singleton getInstance { 
    if (singleton == ) { 
        synchronized (Singleton.class) { 
            if (singleton == ) { 
                singleton = new Singleton; 
            } 
        } 
    } 
    return singleton; 
} 

本方式是对直接在方法上加锁的一个优化, 好处在于只有第一次初始化获取了锁.

后续调用getInstance已经是无锁状态. 只是写法上稍微繁琐点.

至于为什么要volatile关键字, 主要涉及到jdk指令重排, 详见之前的博文: Java 内存模型与指令重排

懒汉式: 使用静态内部类

 public class Singleton { 
    private static class LazyHolder { 
        private static final Singleton INSTANCE = new Singleton; 
    } 
private Singleton {} 
    public static final Singleton getInstance { 
        return LazyHolder.INSTANCE; 
    } 
} 

该方式既解决了同步问题, 也解决了写法繁琐问题. 推荐使用改写法.

缺点在于无法响应事件来重新初始化INSTANCE.

饿汉式

 public class Singleton1 {  
private Singleton1 {} 
    private static final Singleton1 single = new Singleton1; 
    public static Singleton1 getInstance { 
        return single; 
    } 
} 

缺点在于对象在一开始就直接初始化了.

Future模式

该模式的核心思想是异步调用. 有点类似于异步的ajax请求.

当调用某个方法时, 可能该方法耗时较久, 而在主函数中也不急于立刻获取结果.

因此可以让调用者立刻返回一个凭证, 该方法放到另外 线程 执行,

后续主函数拿凭证再去获取方法的执行结果即可, 其结构图如下

Java 高并发之设计模式

jdk中内置了Future模式的支持, 其接口如下:

Java 高并发之设计模式

通过FutureTask实现

注意其中两个耗时操作.

  • 如果doOtherThing耗时2s, 则整个函数耗时2s左右.
  • 如果doOtherThing耗时0.2s, 则整个函数耗时取决于RealData.costTime, 即1s左右结束.
 public class FutureDemo1 { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
        FutureTask<String> future = new FutureTask<String>(new Callable<String> { 
            @Override 
            public String call throws Exception { 
                return new RealData.costTime; 
            } 
        }); 
        ExecutorService service = Executors.newCachedThreadPool; 
        service.submit(future); 

        System.out.println("RealData方法调用完毕"); 
        // 模拟主函数中其他耗时操作 
        doOtherThing; 
        // 获取RealData方法的结果 
        System.out.println(future.get); 
    } 

    private static void doOtherThing throws InterruptedException { 
        Thread.sleep(2000L); 
    } 
} 

class RealData { 

    public String costTime { 
        try { 
            // 模拟RealData耗时操作 
            Thread.sleep(1000L); 
            return "result"; 
        } catch (InterruptedException e) { 
            e.printStackTrace; 
        } 
        return "exception"; 
    } 

} 

通过Future实现

与上述FutureTask不同的是, RealData需要实现Callable接口

 public class FutureDemo2 { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
        ExecutorService service = Executors.newCachedThreadPool; 
        Future<String> future = service.submit(new RealData2); 

        System.out.println("RealData2方法调用完毕"); 
        // 模拟主函数中其他耗时操作 
        doOtherThing; 
        // 获取RealData2方法的结果 
        System.out.println(future.get); 
    } 

    private static void doOtherThing throws InterruptedException { 
        Thread.sleep(2000L); 
    } 
} 

class RealData2 implements Callable<String>{ 

    public String costTime { 
        try { 
            // 模拟RealData耗时操作 
            Thread.sleep(1000L); 
            return "result"; 
        } catch (InterruptedException e) { 
            e.printStackTrace; 
        } 
        return "exception"; 
    } 

    @Override 
    public String call throws Exception { 
        return costTime; 
    } 
} 

另外Future本身还提供了一些额外的简单控制功能, 其API如下

 // 取消任务 
boolean cancel(boolean mayInterruptIfRunning); 
// 是否已经取消 
boolean isCancelled; 
// 是否已经完成 
boolean isDone; 
// 取得返回对象 
V get throws InterruptedException, ExecutionException; 
// 取得返回对象, 并可以设置超时时间 
V get(long timeout, TimeUnit unit) 
throws InterruptedException, ExecutionException, TimeoutException; 

生产消费者模式

生产者-消费者模式是一个经典的多线程设计模式. 它为多线程间的协作提供了良好的解决方案。

在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。

生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。

生产者和消费者之间则通过共享内存缓冲区进行通信, 其结构图如下

Java 高并发之设计模式

PCData为我们需要处理的元数据模型, 生产者构建PCData, 并放入缓冲队列.

消费者从缓冲队列中获取数据, 并执行计算.

生产者核心代码

 while(isRunning) { 
    Thread.sleep(r.nextInt(SLEEP_TIME)); 
    data = new PCData(count.incrementAndGet); 
    // 构造任务数据 
    System.out.println(data + " is put into queue"); 
    if (!queue.offer(data, 2, TimeUnit.SECONDS)) { 
        // 将数据放入队列缓冲区中 
        System.out.println("faild to put data : " + data); 
    } 
} 

消费者核心代码

 while (true) { 
    PCData data = queue.take; 
    // 提取任务 
    if (data != ) { 
        // 获取数据, 执行计算操作 
        int re = data.getData * 10; 
        System.out.println("after cal, value is : " + re); 
        Thread.sleep(r.nextInt(SLEEP_TIME)); 
    } 
} 

生产消费者模式可以有效对数据解耦, 优化系统结构.

降低生产者和消费者线程相互之间的依赖与性能要求.

一般使用BlockingQueue作为数据缓冲队列, 他是通过锁和阻塞来实现数据之间的同步,

如果对缓冲队列有性能要求, 则可以使用基于CAS无锁设计的ConcurrentLinkedQueue.

分而治之

严格来讲, 分而治之不算一种模式, 而是一种思想.

它可以将一个大任务拆解为若干个小任务并行执行, 提高系统吞吐量.

我们主要讲两个场景, Master-Worker模式, ForkJoin 线程池 .

Master-Worker模式

该模式核心思想是系统由两类进行协助工作: Master进程, Worker进程.

Master负责接收与分配任务, Worker负责处理任务. 当各个Worker处理完成后,

将结果返回给Master进行归纳与总结.

Java 高并发之设计模式

假设一个场景, 需要计算100个任务, 并对结果求和, Master持有10个子进程.

Master代码

 public class MasterDemo { 
    // 盛装任务的集合 
    private ConcurrentLinkedQueue<TaskDemo> workQueue = new ConcurrentLinkedQueue<TaskDemo>; 
    // 所有worker 
    private HashMap<String, Thread> workers = new HashMap<>; 
    // 每一个worker并行执行任务的结果 
    private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<>; 

    public MasterDemo(WorkerDemo worker, int workerCount) { 
        // 每个worker对象都需要持有queue的引用, 用于领任务与提交结果 
        worker.setResultMap(resultMap); 
        worker.setWorkQueue(workQueue); 
        for (int i = 0; i < workerCount; i++) { 
            workers.put("子节点: " + i, new Thread(worker)); 
        } 
    } 

    // 提交任务 
    public void submit(TaskDemo task) { 
        workQueue.add(task); 
    } 

    // 启动所有的子任务 
    public void execute{ 
        for (Map.Entry<String, Thread> entry : workers.entrySet) { 
            entry.getValue.start; 
        } 
    } 

    // 判断所有的任务是否执行结束 
    public boolean isComplete { 
        for (Map.Entry<String, Thread> entry : workers.entrySet) { 
            if (entry.getValue.getState != Thread.State.TERMINATED) { 
                return false; 
            } 
        } 

        return true; 
    } 

    // 获取最终汇总的结果 
    public int getResult { 
        int result = 0; 
        for (Map.Entry<String, Object> entry : resultMap.entrySet) { 
            result += Integer.parseInt(entry.getValue.toString); 
        } 

        return result; 
    } 

} 

Worker代码

 public class WorkerDemo implements Runnable{ 

    private ConcurrentLinkedQueue<TaskDemo> workQueue; 
    private ConcurrentHashMap<String, Object> resultMap; 

    @Override 
    public void run { 
        while (true) { 
            TaskDemo input = this.workQueue.poll; 
            // 所有任务已经执行完毕 
            if (input == ) { 
                break; 
            } 
            // 模拟对task进行处理, 返回结果 
            int result = input.getPrice; 
            this.resultMap.put(input.getId + "", result); 
            System.out.println("任务执行完毕, 当前线程: " + Thread.currentThread.getName); 
        } 
    } 

    public ConcurrentLinkedQueue<TaskDemo> getWorkQueue { 
        return workQueue; 
    } 

    public void setWorkQueue(ConcurrentLinkedQueue<TaskDemo> workQueue) { 
        this.workQueue = workQueue; 
    } 

    public ConcurrentHashMap<String, Object> getResultMap { 
        return resultMap; 
    } 

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { 
        this.resultMap = resultMap; 
    } 
} 
 public class TaskDemo { 

    private int id; 
    private String name; 
    private int price; 

    public int getId { 
        return id; 
    } 

    public void setId(int id) { 
        this.id = id; 
    } 

    public String getName { 
        return name; 
    } 

    public void setName(String name) { 
        this.name = name; 
    } 

    public int getPrice { 
        return price; 
    } 

    public void setPrice(int price) { 
        this.price = price; 
    } 
} 

主函数测试

 MasterDemo master = new MasterDemo(new WorkerDemo, 10); 
for (int i = 0; i < 100; i++) { 
    TaskDemo task = new TaskDemo; 
    task.setId(i); 
    task.setName("任务" + i); 
    task.setPrice(new Random.nextInt(10000)); 
    master.submit(task); 
} 

master.execute; 

while (true) { 
    if (master.isComplete) { 
        System.out.println("执行的结果为: " + master.getResult); 
        break; 
    } 
} 

ForkJoin线程池

该线程池是jdk7之后引入的一个并行执行任务的框架, 其核心思想也是将任务分割为子任务,

有可能子任务还是很大, 还需要进一步拆解, 最终得到足够小的任务.

将分割出来的子任务放入双端队列中, 然后几个启动线程从双端队列中获取任务执行.

子任务执行的结果放到一个队列里, 另起线程从队列中获取数据, 合并结果.

Java 高并发之设计模式

假设我们的场景需要计算从0到20000000L的累加求和. CountTask继承自RecursiveTask, 可以携带返回值.

每次分解大任务, 简单的将任务划分为100个等规模的小任务, 并使用fork提交子任务.

在子任务中通过THRESHOLD设置子任务分解的阈值, 如果当前需要求和的总数大于THRESHOLD, 则子任务需要再次分解,

如果子任务可以直接执行, 则进行求和操作, 返回结果. 最终等待所有的子任务执行完毕, 对所有结果求和.

 public class CountTask extends RecursiveTask<Long>{ 
    // 任务分解的阈值 
    private static final int THRESHOLD = 10000; 
    private long start; 
    private long end; 


    public CountTask(long start, long end) { 
        this.start = start; 
        this.end = end; 
    } 

    public Long compute { 
        long sum = 0; 
        boolean canCompute = (end - start) < THRESHOLD; 
        if (canCompute) { 
            for (long i = start; i <= end; i++) { 
                sum += i; 
            } 
        } else { 
            // 分成100个小任务 
            long step = (start + end) / 100; 
            ArrayList<CountTask> subTasks = new ArrayList<CountTask>; 
            long pos = start; 
            for (int i = 0; i < 100; i++) { 
                long lastOne = pos + step; 
                if (lastOne > end) { 
                    lastOne = end; 
                } 
                CountTask subTask = new CountTask(pos, lastOne); 
                pos += step + 1; 
                // 将子任务推向线程池 
                subTasks.add(subTask); 
                subTask.fork; 
            } 

            for (CountTask task : subTasks) { 
                // 对结果进行join 
                sum += task.join; 
            } 
        } 
        return sum; 
    } 

    public static void main(String[] args) throws ExecutionException, InterruptedException { 
        ForkJoinPool pool = new ForkJoinPool; 
        // 累加求和 0 -> 20000000L 
        CountTask task = new CountTask(0, 20000000L); 
        ForkJoinTask<Long> result = pool.submit(task); 
        System.out.println("sum result : " + result.get); 
    } 
} 

ForkJoin线程池使用一个无锁的栈来管理空闲线程, 如果一个工作线程暂时取不到可用的任务, 则可能被挂起.

挂起的线程将被压入由线程池维护的栈中, 待将来有任务可用时, 再从栈中唤醒这些线程.

-END-