架构图
当有多个消费者时,我们的消息会被哪个消费者消费?我们又如何均衡消费者消费信息的多少?
主要又两种模式
1、轮询模式的分发:一个消费者一条,按均分配
2、公平分发:根据消费者的消费能力进行公平分发,处理快的多处理,处理慢的少处理,按劳分配。
Work模式-轮询模式(Round-Robin)
- 类型:无
- 特点:该模式接收消息是当有多个消费者接入时,罅隙的分配模式是一个消费者一条,直至消息消费完成。
生产者消费者代码
public class Producer {public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.33.110");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;try {
connection = connectionFactory.newConnection("生产者");
channel = connection.createChannel();
// 准备发送消息内容for (int i = 0; i < 20; i++) {String msg = "message:"+i;
channel.basicPublish("","queue1",null,msg.getBytes());}
System.out.println("发送消息成功!");} catch (Exception e) {
e.printStackTrace();
System.out.println("消息发送出现异常");} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try {
channel.close();} catch (IOException e) {
e.printStackTrace();} catch (TimeoutException e) {
e.printStackTrace();}}if (connection != null && connection.isOpen()) {try {
connection.close();} catch (IOException e) {
e.printStackTrace();}}}}
}
消费者 Work1 代码
public class Work1 {public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.33.110");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;try {
connection = connectionFactory.newConnection("消费者-Work1");
channel = connection.createChannel();
Channel finalChannel = channel;
// finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try {
System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
Thread.sleep(800);} catch (InterruptedException e) {
e.printStackTrace();}}}, new CancelCallback() {public void handle(String consumerTag) throws IOException {
System.out.println("Work1-获取消息失败");}});
System.out.println("Work1-开始接收消息");
System.in.read();} catch (IOException e) {
e.printStackTrace();} catch (TimeoutException e) {
e.printStackTrace();} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try {
channel.close();} catch (IOException e) {
e.printStackTrace();} catch (TimeoutException e) {
e.printStackTrace();}}if (connection != null && connection.isOpen()) {try {
connection.close();} catch (IOException e) {
e.printStackTrace();}}}}
}
消费者 Work2
public class Work2 {public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.33.110");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;try {
connection = connectionFactory.newConnection("消费者-Work2");
channel = connection.createChannel();
Channel finalChannel = channel;
// finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try {
System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
Thread.sleep(200);} catch (InterruptedException e) {
e.printStackTrace();}}}, new CancelCallback() {public void handle(String consumerTag) throws IOException {
System.out.println("Work2-获取消息失败");}});
System.out.println("Work2-开始接收消息");
System.in.read();} catch (IOException e) {
e.printStackTrace();} catch (TimeoutException e) {
e.printStackTrace();} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try {
channel.close();} catch (IOException e) {
e.printStackTrace();} catch (TimeoutException e) {
e.printStackTrace();}}if (connection != null && connection.isOpen()) {try {
connection.close();} catch (IOException e) {
e.printStackTrace();}}}}
}
首先创建队列 queue1,然后启动 Work1 和 Work2 两个消费者,接着启动 Producer 生产者。可以看到如下效果
这就是轮询分发模式,但是会出现问题,该模式不会因为实际情况的网络带宽的延迟,服务器资源等来进行合理分配
公平分发测试
消费者代码不变,Work1 和 Work2 消费者的代码变为手动应答机制,这里以 Work1 代码为例
public class Work1 {public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.33.110");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;try {
connection = connectionFactory.newConnection("消费者-Work1");
channel = connection.createChannel();
final Channel finalChannel = channel;// 定义指标,qos=1,默认是没有设置,为null,所以默认为轮询分发,1 表示每次从队列中取多少条数据
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try {
System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
Thread.sleep(800);
finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {
e.printStackTrace();}}}, new CancelCallback() {public void handle(String consumerTag) throws IOException {
System.out.println("Work1-获取消息失败");}});
System.out.println("Work1-开始接收消息");
System.in.read();} catch (IOException e) {
e.printStackTrace();} catch (TimeoutException e) {
e.printStackTrace();} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try {
channel.close();} catch (IOException e) {
e.printStackTrace();} catch (TimeoutException e) {
e.printStackTrace();}}if (connection != null && connection.isOpen()) {try {
connection.close();} catch (IOException e) {
e.printStackTrace();}}}}
}
代码执行效果如下,由于 Work2 执行效率快很多,所以 Wokr2 消费了大部分的消息