生产消费者模式
使用阻塞队列控制消息的产生和消费
代码实现:
class MessageQueue { | |
private final LinkedList<Message> list = new LinkedList<>(); | |
private final int capacity; | |
public MessageQueue (int capacity) { | |
this.capacity = capacity; | |
} | |
public Message take () { | |
boolean first = true; | |
synchronized (list) { | |
// 空队列检测 | |
while (list.isEmpty()) { | |
if (first) { | |
System.out.println("queue is empty!"); | |
first = false; | |
} | |
try { | |
list.wait(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
list.notifyAll(); | |
return list.removeFirst(); | |
} | |
} | |
public void put (Message message) { | |
boolean first = true; | |
synchronized (list) { | |
// 溢出检测 | |
while (list.size() == capacity) { | |
if (first) { | |
System.out.println("queue is full!"); | |
first = false; | |
} | |
try { | |
list.wait(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
list.addLast(message); | |
list.notifyAll(); | |
} | |
} | |
} | |
final class Message { | |
private final int id; | |
private final Object message; | |
public Message(int id, Object message) { | |
this.id = id; | |
this.message = message; | |
} | |
public String toString() { | |
return "Message{" + | |
"id=" + id + | |
", message=" + message + | |
'}'; | |
} | |
} |
测试代码:
public class ProduceConsume { | |
public static void main(String[] args) { | |
MessageQueue queue = new MessageQueue(3); | |
for (int i = 0; i < 6; i++) { | |
int id = i; | |
new Thread(() -> { | |
Message message = new Message(id, "消息" + id); | |
queue.put(message); | |
System.out.println("put message:" + message); | |
}, "producer:" + i).start(); | |
} | |
new Thread(() -> { | |
while (true) { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
System.out.println("take message:" + queue.take()); | |
} | |
}, "consumer").start(); | |
} | |
} |
运行效果: