生产消费者模式

Java
367
0
0
2022-04-21

生产消费者模式

使用阻塞队列控制消息的产生和消费

代码实现:

class MessageQueue {

    private final LinkedList<Message> list = new LinkedList<>();
    private final int capacity;

    public MessageQueue (int capacity) {

        this.capacity = capacity;
    }

    public Message take () {
        boolean first = true;
        synchronized (list) {
            // 空队列检测 
            while (list.isEmpty()) {
                if (first) {
                    System.out.println("queue is empty!");
                    first = false;
                }
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.notifyAll();
            return list.removeFirst();
        }
    }

    public void put (Message message) {
        boolean first = true;
        synchronized (list) {
            // 溢出检测 
            while (list.size() == capacity) {
                if (first) {
                    System.out.println("queue is full!");
                    first = false;
                }
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.addLast(message);
            list.notifyAll();
        }
    }
}

final class Message {

    private final int id;
    private final Object message;


    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }

    @Override 
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", message=" + message +
                '}';
    }
}

测试代码:

public class ProduceConsume {

    public static void main(String[] args) {

        MessageQueue queue = new MessageQueue(3);

        for (int i = 0; i < 6; i++) {
            int id = i;
            new Thread(() -> {
                Message message = new Message(id, "消息" + id);
                queue.put(message);
                System.out.println("put message:" + message);
            }, "producer:" + i).start();
        }

        new Thread(() -> {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("take message:" + queue.take());
            }
        }, "consumer").start();
    }
}

运行效果:

生产消费者模式