生产消费者模式
使用阻塞队列控制消息的产生和消费
代码实现:
class MessageQueue {
private final LinkedList<Message> list = new LinkedList<>();
private final int capacity;
public MessageQueue (int capacity) {
this.capacity = capacity;
}
public Message take () {
boolean first = true;
synchronized (list) {
// 空队列检测
while (list.isEmpty()) {
if (first) {
System.out.println("queue is empty!");
first = false;
}
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.notifyAll();
return list.removeFirst();
}
}
public void put (Message message) {
boolean first = true;
synchronized (list) {
// 溢出检测
while (list.size() == capacity) {
if (first) {
System.out.println("queue is full!");
first = false;
}
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addLast(message);
list.notifyAll();
}
}
}
final class Message {
private final int id;
private final Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", message=" + message +
'}';
}
}
测试代码:
public class ProduceConsume {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(3);
for (int i = 0; i < 6; i++) {
int id = i;
new Thread(() -> {
Message message = new Message(id, "消息" + id);
queue.put(message);
System.out.println("put message:" + message);
}, "producer:" + i).start();
}
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("take message:" + queue.take());
}
}, "consumer").start();
}
}
运行效果: