14. 多线程案例(2)——阻塞队列

Java
268
0
0
2022-11-28
标签   Java多线程

生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

阻塞队列是并发编程中的一个重要基础组件,帮助我们实现“生产者-消费者模型”(是一种典型的处理并发编程的模型)

如果入队列太快了,继续入队列就会阻塞,一直阻塞到有其他线程去消费队列了,才能继续入队列;如果出队列操作太快了,队列空了,也会阻塞,一直阻塞到没有其他线程生产了元素,才能继续出队列

阻塞队列也符合先进先出的规则

阻塞队列实现

//阻塞队列
public class ThreadDemo21 {
    static class BlockingQueue{
        private int[] array = new int[1000];
        private int head = 0;
        private int tail = 0;
        private int size = 0;

        //队列的基本操作(入队列/出队列)(没有去队首元素操作) 
        //1.阻塞版本的入队列 
        public void put(int value) throws InterruptedException {
            //把value放到队尾 
            synchronized (this) {
                if (size == array.length) {
                    wait();
                }
                array[tail] = value;
                tail++;
                if (tail == array.length){
                    tail = 0;
                }
                size++;
                notify();
            }
        }

        //2.阻塞版本的出队列 
        public int take() throws InterruptedException {
            int ret = -1;
            synchronized (this) {
                if (size == 0){
                    wait();
                }
                ret = array[head];
                head++;
                if (head == array.length){
                    head = 0;
                }
                size--;
                notify();
            }
            return ret;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue blockingQueue = new BlockingQueue();

        //生产者:生产者生产的慢 
        Thread t1 = new Thread(){
            @Override 
            public void run() {
                for (int i = 0; i < 10000; i++){
                    try {
                        blockingQueue.put(i);
                        System.out.println("生产元素:"+ i);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        t1.start();

        //消费者:这里设置的消费的快 
        Thread t2 = new Thread(){
            @Override 
            public void run() {
                while (true){
                    try {
                        int ret = blockingQueue.take();
                        System.out.println("消费元素:"+ ret);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        t2.start();

        t1.join();
        t2.join();
    }
}