RabbitMQ 入门案例 - fanout 模式

Java
315
0
0
2022-04-14
标签   RabbitMQ

发布订阅模式

图解

RabbitMQ 入门案例 - fanout 模式

具体实现

类型 fanout

特点 Fanout - 发布订阅模式,是一种广播机制,它是没有路由 key 的模式

生产者代码

public class Producer {
  public static void main(String[] args) {
    // 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议
    // ip port
    // 1 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    // 2 设置连接属性
    connectionFactory.setHost("192.168.33.110");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("admin");
    connectionFactory.setPassword("test");
    connectionFactory.setVirtualHost("/");
    Connection connection = null;
    Channel channel = null;
    try {
      // 3 从连接工厂中获取连接 Connection
      connection = connectionFactory.newConnection("生产者");
      // 4 通过连接获取通道 Channel
      channel = connection.createChannel();

      // 5 准备发送消息的内容
      String message = "This is a routing message";

      // 6 准备交换机
      String exchangeName = "fanout-exchange";
      // 7 定义路由 key
      String routingKey = "";
      // 8 指定交换机类型
      String type = "fanout";


      // 7 发送消息给队列 queue
      /*
       * @param1:交换机名称
       * @param2:队列名称/routingKey
       * @param3:属性配置
       * @param4:发送消息的内容
       * #.course.* queue3...
       */ 
      channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
      System.out.println("发送消息成功");
    } catch (IOException e) {
      e.printStackTrace();
    } catch (TimeoutException e) {
      e.printStackTrace();
    } finally {
      // 7 关闭通道
      if (channel!= null && channel.isOpen()) {
        try {
          channel.close();
        } catch (IOException e) {
          e.printStackTrace();
        } catch (TimeoutException e) {
          e.printStackTrace();
        }
      }
      // 8 关闭连接
      if (connection != null && connection.isOpen()) {
        try {
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

消费者代码


public class Consumer {
  public static Runnable runnable = new Runnable() {
    public void run() {
      // 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议
      // ip port
      // 1 创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      // 2 设置连接属性
      connectionFactory.setHost("192.168.33.110");
      connectionFactory.setPort(5672);
      connectionFactory.setUsername("admin");
      connectionFactory.setPassword("test");
      connectionFactory.setVirtualHost("/");
      // 获取队列的名称
      final String queueName = Thread.currentThread().getName();
      Connection connection = null;
      Channel channel = null;
      try {
        // 3 从连接工厂中获取连接
        connection = connectionFactory.newConnection("生产者");
        // 4 通过连接获取通道 Channel
        channel = connection.createChannel();
        // 5 声明队列 queue 存储消息
        /*
         * 如果队列不存在,则会创建
         * RabbitMQ 不允许创建两个相同的队列名称,否则会报错
         *
         * @param1:queue 队列的名称
         * @param2:durable 队列是否持久化
         * @param3:exclusive 是否排他,即是否私有,如果为 true,会对当前队列加锁,其它通道不能访问,并且连接自动关闭
         * @param4:autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息
         * @param5:arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
         */ 
        Channel finalChannel = channel;
        finalChannel.basicConsume(queueName, true, new DeliverCallback() {
          public void handle(String s, Delivery delivery) throws IOException {
            System.out.println(delivery.getEnvelope().getDeliveryTag());
            System.out.println(queueName+":收到消息是:\t" + new String(delivery.getBody(), "UTF-8"));
          }
        }, new CancelCallback() {
          public void handle(String s) throws IOException {
            System.out.println("接收失败");
          }
        });

        System.out.println(queueName+":开始接收消息");
//        System.in.read();
      } catch (IOException e) {
        e.printStackTrace();
      } catch (TimeoutException e) {
        e.printStackTrace();
      } finally {
        // 7 关闭通道
        if (channel!= null && channel.isOpen()) {
          try {
            channel.close();
          } catch (IOException e) {
            e.printStackTrace();
          } catch (TimeoutException e) {
            e.printStackTrace();
          }
        }
        // 8 关闭连接
        if (connection != null && connection.isOpen()) {
          try {
            connection.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
      }
    }
  };
  public static void main(String[] args) {
    // 启动三个线程去执行
    new Thread(runnable,"queue1").start();
    new Thread(runnable,"queue2").start();
  }
}