发布订阅模式
图解
具体实现
类型 fanout
特点 Fanout - 发布订阅模式,是一种广播机制,它是没有路由 key 的模式
生产者代码
public class Producer { | |
public static void main(String[] args) { | |
// 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议 | |
// ip port | |
// 1 创建连接工厂 | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
// 2 设置连接属性 | |
connectionFactory.setHost("192.168.33.110"); | |
connectionFactory.setPort(5672); | |
connectionFactory.setUsername("admin"); | |
connectionFactory.setPassword("test"); | |
connectionFactory.setVirtualHost("/"); | |
Connection connection = null; | |
Channel channel = null; | |
try { | |
// 3 从连接工厂中获取连接 Connection | |
connection = connectionFactory.newConnection("生产者"); | |
// 4 通过连接获取通道 Channel | |
channel = connection.createChannel(); | |
// 5 准备发送消息的内容 | |
String message = "This is a routing message"; | |
// 6 准备交换机 | |
String exchangeName = "fanout-exchange"; | |
// 7 定义路由 key | |
String routingKey = ""; | |
// 8 指定交换机类型 | |
String type = "fanout"; | |
// 7 发送消息给队列 queue | |
/* | |
* @param1:交换机名称 | |
* @param2:队列名称/routingKey | |
* @param3:属性配置 | |
* @param4:发送消息的内容 | |
* #.course.* queue3... | |
*/ | |
channel.basicPublish(exchangeName,routingKey,null,message.getBytes()); | |
System.out.println("发送消息成功"); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} catch (TimeoutException e) { | |
e.printStackTrace(); | |
} finally { | |
// 7 关闭通道 | |
if (channel!= null && channel.isOpen()) { | |
try { | |
channel.close(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} catch (TimeoutException e) { | |
e.printStackTrace(); | |
} | |
} | |
// 8 关闭连接 | |
if (connection != null && connection.isOpen()) { | |
try { | |
connection.close(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
} |
消费者代码
public class Consumer { | |
public static Runnable runnable = new Runnable() { | |
public void run() { | |
// 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议 | |
// ip port | |
// 1 创建连接工厂 | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
// 2 设置连接属性 | |
connectionFactory.setHost("192.168.33.110"); | |
connectionFactory.setPort(5672); | |
connectionFactory.setUsername("admin"); | |
connectionFactory.setPassword("test"); | |
connectionFactory.setVirtualHost("/"); | |
// 获取队列的名称 | |
final String queueName = Thread.currentThread().getName(); | |
Connection connection = null; | |
Channel channel = null; | |
try { | |
// 3 从连接工厂中获取连接 | |
connection = connectionFactory.newConnection("生产者"); | |
// 4 通过连接获取通道 Channel | |
channel = connection.createChannel(); | |
// 5 声明队列 queue 存储消息 | |
/* | |
* 如果队列不存在,则会创建 | |
* RabbitMQ 不允许创建两个相同的队列名称,否则会报错 | |
* | |
* @param1:queue 队列的名称 | |
* @param2:durable 队列是否持久化 | |
* @param3:exclusive 是否排他,即是否私有,如果为 true,会对当前队列加锁,其它通道不能访问,并且连接自动关闭 | |
* @param4:autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息 | |
* @param5:arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 | |
*/ | |
Channel finalChannel = channel; | |
finalChannel.basicConsume(queueName, true, new DeliverCallback() { | |
public void handle(String s, Delivery delivery) throws IOException { | |
System.out.println(delivery.getEnvelope().getDeliveryTag()); | |
System.out.println(queueName+":收到消息是:\t" + new String(delivery.getBody(), "UTF-8")); | |
} | |
}, new CancelCallback() { | |
public void handle(String s) throws IOException { | |
System.out.println("接收失败"); | |
} | |
}); | |
System.out.println(queueName+":开始接收消息"); | |
// System.in.read(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} catch (TimeoutException e) { | |
e.printStackTrace(); | |
} finally { | |
// 7 关闭通道 | |
if (channel!= null && channel.isOpen()) { | |
try { | |
channel.close(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} catch (TimeoutException e) { | |
e.printStackTrace(); | |
} | |
} | |
// 8 关闭连接 | |
if (connection != null && connection.isOpen()) { | |
try { | |
connection.close(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
}; | |
public static void main(String[] args) { | |
// 启动三个线程去执行 | |
new Thread(runnable,"queue1").start(); | |
new Thread(runnable,"queue2").start(); | |
} | |
} |