RabbitMQ 代码示例

Java
443
0
0
2022-04-14
标签   RabbitMQ

Web 管理页面

RabbitMQ

通过代码 debug 来对 Web 管理页面有个更直观的认识

生产者代码如下

public class Producer {public static void main(String[] args) {// 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议// ip port// 1 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2 创建连接 Connection
            connection = connectionFactory.newConnection("生产者");// 3 通过连接获取通道 Channel
            channel = connection.createChannel();// 4 通过创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue1";/*
             * 队列名字
             * 是否具有持久化 durable=false 所谓持久化消息是否存盘,如果是false 非持久化 true 持久化?非持久化会存盘么,会存盘,但是会随着服务器的重启丢失
             * 排他性,是否具有独立线程
             * 是否自动删除,随着最后一个消费者消息消费完毕之后,是否把队列自动删除
             * 携带一些附属参数
             */
            channel.queueDeclare(queueName,false,false,false,null);// 5 准备消息内容String message = "hello,world";// 6 发送消息给队列 queue// 交换机,队列、路由key,消息是否持久化,消息内容主体
            channel.basicPublish("",queueName,null,message.getBytes());System.out.println("发送消息成功");} catch (IOException e) {
            e.printStackTrace();} catch (TimeoutException e) {
            e.printStackTrace();} finally {// 7 关闭通道if (channel!= null && channel.isOpen()) {try {
                    channel.close();} catch (IOException e) {
                    e.printStackTrace();} catch (TimeoutException e) {
                    e.printStackTrace();}}// 8 关闭连接if (connection != null && connection.isOpen()) {try {
                    connection.close();} catch (IOException e) {
                    e.printStackTrace();}}}}
}

当代码执行到创建连接 Connection 时,可以在 Connections 一栏看到连接信息。

RabbitMQ

当代码执行到创建通道时,看到 Channels 一栏信息

RabbitMQ

当代码执行到队列时候,看到 queues 一栏的信息

RabbitMQ

执行结束之后可以看到队列中的消息增加了一个

RabbitMQ

消费着代码如下

public class Consumer {public static void main(String[] args) {// 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议// ip port// 1 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2 创建连接 Connection
            connection = connectionFactory.newConnection("生产者");// 3 通过连接获取通道 Channel
            channel = connection.createChannel();// 4 通过创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息
            channel.basicConsume("queue1", true, new DeliverCallback() {public void handle(String s, Delivery delivery) throws IOException {System.out.println("收到消息是:\t" + new String(delivery.getBody(), "UTF-8"));}}, new CancelCallback() {public void handle(String s) throws IOException {System.out.println("接收失败");}});

            System.out.println("开始接收消息");System.in.read();} catch (IOException e) {
            e.printStackTrace();} catch (TimeoutException e) {
            e.printStackTrace();} finally {// 7 关闭通道if (channel!= null && channel.isOpen()) {try {
                    channel.close();} catch (IOException e) {
                    e.printStackTrace();} catch (TimeoutException e) {
                    e.printStackTrace();}}// 8 关闭连接if (connection != null && connection.isOpen()) {try {
                    connection.close();} catch (IOException e) {
                    e.printStackTrace();}}}}
}

当代码执行到 System.in.read() 时,此时消息都被消费了

RabbitMQ

当服务重启后,消息队列消失。