RabbitMQ 入门案例 - Work 模式 - 轮询模式

Java
448
0
0
2022-04-14
标签   RabbitMQ

架构图

RabbitMQ 入门案例 - Work 模式 - 轮询模式

当有多个消费者时,我们的消息会被哪个消费者消费?我们又如何均衡消费者消费信息的多少?

主要又两种模式

1、轮询模式的分发:一个消费者一条,按均分配

2、公平分发:根据消费者的消费能力进行公平分发,处理快的多处理,处理慢的少处理,按劳分配。

Work模式-轮询模式(Round-Robin)

  • 类型:无
  • 特点:该模式接收消息是当有多个消费者接入时,罅隙的分配模式是一个消费者一条,直至消息消费完成。

生产者消费者代码

public class Producer {public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;try {
            connection = connectionFactory.newConnection("生产者");
            channel = connection.createChannel();

            // 准备发送消息内容for (int i = 0; i < 20; i++) {String msg = "message:"+i;
                channel.basicPublish("","queue1",null,msg.getBytes());}
            System.out.println("发送消息成功!");} catch (Exception e) {
            e.printStackTrace();
            System.out.println("消息发送出现异常");} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try {
                    channel.close();} catch (IOException e) {
                    e.printStackTrace();} catch (TimeoutException e) {
                    e.printStackTrace();}}if (connection != null && connection.isOpen()) {try {
                    connection.close();} catch (IOException e) {
                    e.printStackTrace();}}}}
}

消费者 Work1 代码

public class Work1 {public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;try {
            connection = connectionFactory.newConnection("消费者-Work1");
            channel = connection.createChannel();
            Channel finalChannel = channel;
//            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try {
                        System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
                        Thread.sleep(800);} catch (InterruptedException e) {
                        e.printStackTrace();}}}, new CancelCallback() {public void handle(String consumerTag) throws IOException {
                    System.out.println("Work1-获取消息失败");}});
            System.out.println("Work1-开始接收消息");
            System.in.read();} catch (IOException e) {
            e.printStackTrace();} catch (TimeoutException e) {
            e.printStackTrace();} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try {
                    channel.close();} catch (IOException e) {
                    e.printStackTrace();} catch (TimeoutException e) {
                    e.printStackTrace();}}if (connection != null && connection.isOpen()) {try {
                    connection.close();} catch (IOException e) {
                    e.printStackTrace();}}}}
}

消费者 Work2

public class Work2 {public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;try {
            connection = connectionFactory.newConnection("消费者-Work2");
            channel = connection.createChannel();
            Channel finalChannel = channel;
//            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try {
                        System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
                        Thread.sleep(200);} catch (InterruptedException e) {
                        e.printStackTrace();}}}, new CancelCallback() {public void handle(String consumerTag) throws IOException {
                    System.out.println("Work2-获取消息失败");}});
            System.out.println("Work2-开始接收消息");
            System.in.read();} catch (IOException e) {
            e.printStackTrace();} catch (TimeoutException e) {
            e.printStackTrace();} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try {
                    channel.close();} catch (IOException e) {
                    e.printStackTrace();} catch (TimeoutException e) {
                    e.printStackTrace();}}if (connection != null && connection.isOpen()) {try {
                    connection.close();} catch (IOException e) {
                    e.printStackTrace();}}}}
}

首先创建队列 queue1,然后启动 Work1 和 Work2 两个消费者,接着启动 Producer 生产者。可以看到如下效果

RabbitMQ 入门案例 - Work 模式 - 轮询模式

RabbitMQ 入门案例 - Work 模式 - 轮询模式

这就是轮询分发模式,但是会出现问题,该模式不会因为实际情况的网络带宽的延迟,服务器资源等来进行合理分配

公平分发测试

消费者代码不变,Work1 和 Work2 消费者的代码变为手动应答机制,这里以 Work1 代码为例
public class Work1 {public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;try {
            connection = connectionFactory.newConnection("消费者-Work1");
            channel = connection.createChannel();
            final Channel finalChannel = channel;// 定义指标,qos=1,默认是没有设置,为null,所以默认为轮询分发,1 表示每次从队列中取多少条数据
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", false, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try {
                        System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
                        Thread.sleep(800);
                        finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {
                        e.printStackTrace();}}}, new CancelCallback() {public void handle(String consumerTag) throws IOException {
                    System.out.println("Work1-获取消息失败");}});
            System.out.println("Work1-开始接收消息");
            System.in.read();} catch (IOException e) {
            e.printStackTrace();} catch (TimeoutException e) {
            e.printStackTrace();} finally {// 关闭通道释放连接if (channel != null && channel.isOpen()) {try {
                    channel.close();} catch (IOException e) {
                    e.printStackTrace();} catch (TimeoutException e) {
                    e.printStackTrace();}}if (connection != null && connection.isOpen()) {try {
                    connection.close();} catch (IOException e) {
                    e.printStackTrace();}}}}
}

RabbitMQ 入门案例 - Work 模式 - 轮询模式

代码执行效果如下,由于 Work2 执行效率快很多,所以 Wokr2 消费了大部分的消息

RabbitMQ 入门案例 - Work 模式 - 轮询模式

RabbitMQ 入门案例 - Work 模式 - 轮询模式