架构图
当有多个消费者时,我们的消息会被哪个消费者消费?我们又如何均衡消费者消费信息的多少?
主要又两种模式
1、轮询模式的分发:一个消费者一条,按均分配
2、公平分发:根据消费者的消费能力进行公平分发,处理快的多处理,处理慢的少处理,按劳分配。
Work模式-轮询模式(Round-Robin)
- 类型:无
- 特点:该模式接收消息是当有多个消费者接入时,罅隙的分配模式是一个消费者一条,直至消息消费完成。
生产者消费者代码
public class Producer {public static void main(String[] args) { | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
connectionFactory.setHost("192.168.33.110"); | |
connectionFactory.setPort(5672); | |
connectionFactory.setUsername("admin"); | |
connectionFactory.setPassword("test"); | |
connectionFactory.setVirtualHost("/"); | |
Connection connection = null; | |
Channel channel = null;try { | |
connection = connectionFactory.newConnection("生产者"); | |
channel = connection.createChannel(); | |
// 准备发送消息内容for (int i = 0; i < 20; i++) {String msg = "message:"+i; | |
channel.basicPublish("","queue1",null,msg.getBytes());} | |
System.out.println("发送消息成功!");} catch (Exception e) { | |
e.printStackTrace(); | |
System.out.println("消息发送出现异常");} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try { | |
channel.close();} catch (IOException e) { | |
e.printStackTrace();} catch (TimeoutException e) { | |
e.printStackTrace();}}if (connection != null && connection.isOpen()) {try { | |
connection.close();} catch (IOException e) { | |
e.printStackTrace();}}}} | |
} |
消费者 Work1 代码
public class Work1 {public static void main(String[] args) { | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
connectionFactory.setHost("192.168.33.110"); | |
connectionFactory.setPort(5672); | |
connectionFactory.setUsername("admin"); | |
connectionFactory.setPassword("test"); | |
connectionFactory.setVirtualHost("/"); | |
Connection connection = null; | |
Channel channel = null;try { | |
connection = connectionFactory.newConnection("消费者-Work1"); | |
channel = connection.createChannel(); | |
Channel finalChannel = channel; | |
// finalChannel.basicQos(1); | |
finalChannel.basicConsume("queue1", true, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try { | |
System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8")); | |
Thread.sleep(800);} catch (InterruptedException e) { | |
e.printStackTrace();}}}, new CancelCallback() {public void handle(String consumerTag) throws IOException { | |
System.out.println("Work1-获取消息失败");}}); | |
System.out.println("Work1-开始接收消息"); | |
System.in.read();} catch (IOException e) { | |
e.printStackTrace();} catch (TimeoutException e) { | |
e.printStackTrace();} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try { | |
channel.close();} catch (IOException e) { | |
e.printStackTrace();} catch (TimeoutException e) { | |
e.printStackTrace();}}if (connection != null && connection.isOpen()) {try { | |
connection.close();} catch (IOException e) { | |
e.printStackTrace();}}}} | |
} |
消费者 Work2
public class Work2 {public static void main(String[] args) { | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
connectionFactory.setHost("192.168.33.110"); | |
connectionFactory.setPort(5672); | |
connectionFactory.setUsername("admin"); | |
connectionFactory.setPassword("test"); | |
connectionFactory.setVirtualHost("/"); | |
Connection connection = null; | |
Channel channel = null;try { | |
connection = connectionFactory.newConnection("消费者-Work2"); | |
channel = connection.createChannel(); | |
Channel finalChannel = channel; | |
// finalChannel.basicQos(1); | |
finalChannel.basicConsume("queue1", true, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try { | |
System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8")); | |
Thread.sleep(200);} catch (InterruptedException e) { | |
e.printStackTrace();}}}, new CancelCallback() {public void handle(String consumerTag) throws IOException { | |
System.out.println("Work2-获取消息失败");}}); | |
System.out.println("Work2-开始接收消息"); | |
System.in.read();} catch (IOException e) { | |
e.printStackTrace();} catch (TimeoutException e) { | |
e.printStackTrace();} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try { | |
channel.close();} catch (IOException e) { | |
e.printStackTrace();} catch (TimeoutException e) { | |
e.printStackTrace();}}if (connection != null && connection.isOpen()) {try { | |
connection.close();} catch (IOException e) { | |
e.printStackTrace();}}}} | |
} |
首先创建队列 queue1,然后启动 Work1 和 Work2 两个消费者,接着启动 Producer 生产者。可以看到如下效果
这就是轮询分发模式,但是会出现问题,该模式不会因为实际情况的网络带宽的延迟,服务器资源等来进行合理分配
公平分发测试
消费者代码不变,Work1 和 Work2 消费者的代码变为手动应答机制,这里以 Work1 代码为例 | |
public class Work1 {public static void main(String[] args) { | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
connectionFactory.setHost("192.168.33.110"); | |
connectionFactory.setPort(5672); | |
connectionFactory.setUsername("admin"); | |
connectionFactory.setPassword("test"); | |
connectionFactory.setVirtualHost("/"); | |
Connection connection = null; | |
Channel channel = null;try { | |
connection = connectionFactory.newConnection("消费者-Work1"); | |
channel = connection.createChannel(); | |
final Channel finalChannel = channel;// 定义指标,qos=1,默认是没有设置,为null,所以默认为轮询分发,1 表示每次从队列中取多少条数据 | |
finalChannel.basicQos(1); | |
finalChannel.basicConsume("queue1", false, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try { | |
System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8")); | |
Thread.sleep(800); | |
finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) { | |
e.printStackTrace();}}}, new CancelCallback() {public void handle(String consumerTag) throws IOException { | |
System.out.println("Work1-获取消息失败");}}); | |
System.out.println("Work1-开始接收消息"); | |
System.in.read();} catch (IOException e) { | |
e.printStackTrace();} catch (TimeoutException e) { | |
e.printStackTrace();} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try { | |
channel.close();} catch (IOException e) { | |
e.printStackTrace();} catch (TimeoutException e) { | |
e.printStackTrace();}}if (connection != null && connection.isOpen()) {try { | |
connection.close();} catch (IOException e) { | |
e.printStackTrace();}}}} | |
} |
代码执行效果如下,由于 Work2 执行效率快很多,所以 Wokr2 消费了大部分的消息