项目架构如下
服务层代码如下
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "*.email.#"
))
public class TopicEmailConsumer {@RabbitHandlerpublic void receiveMessage(String message) {System.out.println("email fanout--接收到的订单信息是:->" + message);}
}
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.sms.#"
))
public class TopicSMSConsumer {@RabbitHandlerpublic void receiveMessage(String message) {System.out.println("sms fanout--接收到的订单信息是:->" + message);}
}
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "weChat.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "weChat.#"
))
public class TopicWeChatConsumer {@RabbitHandlerpublic void receiveMessage(String message) {System.out.println("weChat fanout--接收到的订单信息是:->" + message);}
}
这里使用的是 RabbitMQ 提供的注解的方式来进行队列和交换机进行绑定,启动消费者,可以看到创建了 Topic 类型的交换机,并且进行了交换机的绑定。
消费者代码如下
public void makeOrderTopic(String userId,String productId,int num) {// 1:根据id查询商品是否充足// 2:保存订单String orderId = UUID.randomUUID().toString();System.out.println("订单生成成功:"+orderId);// 3:通过 MQ 来完成消息的分发// 交换机,路由 key/queue 队列名称,消息内容String exchangeName = "topic_order_exchange";String routingKey = "com.email.sms";
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);}