发布订阅模式
图解
具体实现
类型 fanout
特点 Fanout - 发布订阅模式,是一种广播机制,它是没有路由 key 的模式
生产者代码
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议
// ip port
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接属性
connectionFactory.setHost("192.168.33.110");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3 从连接工厂中获取连接 Connection
connection = connectionFactory.newConnection("生产者");
// 4 通过连接获取通道 Channel
channel = connection.createChannel();
// 5 准备发送消息的内容
String message = "This is a routing message";
// 6 准备交换机
String exchangeName = "fanout-exchange";
// 7 定义路由 key
String routingKey = "";
// 8 指定交换机类型
String type = "fanout";
// 7 发送消息给队列 queue
/*
* @param1:交换机名称
* @param2:队列名称/routingKey
* @param3:属性配置
* @param4:发送消息的内容
* #.course.* queue3...
*/
channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
System.out.println("发送消息成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7 关闭通道
if (channel!= null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者代码
public class Consumer {
public static Runnable runnable = new Runnable() {
public void run() {
// 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议
// ip port
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接属性
connectionFactory.setHost("192.168.33.110");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
// 获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4 通过连接获取通道 Channel
channel = connection.createChannel();
// 5 声明队列 queue 存储消息
/*
* 如果队列不存在,则会创建
* RabbitMQ 不允许创建两个相同的队列名称,否则会报错
*
* @param1:queue 队列的名称
* @param2:durable 队列是否持久化
* @param3:exclusive 是否排他,即是否私有,如果为 true,会对当前队列加锁,其它通道不能访问,并且连接自动关闭
* @param4:autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息
* @param5:arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
*/
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName+":收到消息是:\t" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("接收失败");
}
});
System.out.println(queueName+":开始接收消息");
// System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7 关闭通道
if (channel!= null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去执行
new Thread(runnable,"queue1").start();
new Thread(runnable,"queue2").start();
}
}