整体核心
生产者模块
创建 springboot-rabbitmq-producer 的 springboot 项目
项目结构如下
web.xml 配置
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.yml 配置
application.yml 配置
server:
port: 9900
servlet:
context-path: /producer
spring:
application:
name: springboot-rabbitmq-producer
rabbitmq:
host: xxx.xxx.xxx.xxx
port: 5672
virtual-host: /
username: xxx
password: xxx
服务层代码
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 模拟用户下单
* @param userId
* @param productId
* @param num
*/
public void makeOrder(String userId,String productId,int num) {
// 1:根据id查询商品是否充足
// 2:保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:"+orderId);
// 3:通过 MQ 来完成消息的分发
// 交换机,路由 key/queue 队列名称,消息内容
String exchangeName = "fanout_order_exchange";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
}
}
config 配置类
@Configuration
public class RabbitMqConfiguration {
// 1;声明注册 fanout 模式的交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order_exchange",true,false);
}
// 2:声明队列 sms.fanout.queue email.fanout.queue weChat.fanout.queue
@Bean
public Queue smsQueue () {
return new Queue("sms.fanout.queue",true);
}
@Bean
public Queue emailQueue () {
return new Queue("email.fanout.queue",true);
}
@Bean
public Queue weChatQueue () {
return new Queue("weChat.fanout.queue",true);
}
// 3;完成绑定关系(队列和交换机完成绑定关系)
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding weChatBinding() {
return BindingBuilder.bind(weChatQueue()).to(fanoutExchange());
}
}
测试类
@SpringBootTest
class SpringbootRabbitmqProducerApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
orderService.makeOrder("1","1",12);
}
}
消费者模块
项目结构如下
和消费者模块区别在于修改一下端口
服务层代码如下
@Service
// 定义监听的队列
@RabbitListener(queues = {"email.fanout.queue"})
public class EmailConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("email fanout--接收到的订单信息是:->" + message);
}
}
@Service
@RabbitListener(queues = {"sms.fanout.queue"})
public class SMSConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("sms fanout--接收到的订单信息是:->" + message);
}
}
@Service
@RabbitListener(queues = {"weChat.fanout.queue"})
public class WeChatConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("weChat fanout--接收到的订单信息是:->" + message);
}
}
启动消费者模块,再启动生产者模块,效果如下