SpringBoot 整合 RabbitMQ 集群

Java
521
0
0
2022-11-14

引入依赖

    <dependencies> 
        <dependency> 
            <groupId>org.springframework.boot</groupId> 
            <artifactId>spring-boot-starter</artifactId> 
        </dependency>

        <dependency> 
            <groupId>org.springframework.boot</groupId> 
            <artifactId>spring-boot-starter-amqp</artifactId> 
        </dependency> 
    </dependencies>

详细配置如下

 rabbitmq: 
    addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
#    port: 
    ##集群配置 addresses之间用逗号隔开 
    # addresses: ip:port,ip:port 
    password: admin 
    username: 123456 
    virtual-host: / # 连接到rabbitMQ的vhost 
    requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s 
    publisher-confirms: #是否启用 发布确认 
    publisher-reurns: # 是否启用发布返回 
    connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时 
    cache: 
      channel.size: # 缓存中保持的channel数量 
      channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel 
      connection.size: # 缓存的连接数,只有是CONNECTION模式时生效 
      connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTION 
    listener: 
      simple.auto-startup: # 是否启动时自动启动容器 
      simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto 
      simple.concurrency: # 最小的消费者数量 
      simple.max-concurrency: # 最大的消费者数量 
      simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量. 
      simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量. 
      simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) 
      simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒 
      simple.retry.enabled: # 监听重试是否可用 
      simple.retry.max-attempts: # 最大重试次数 
      simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔 
      simple.retry.multiplier: # 应用于上一重试间隔的乘数 
      simple.retry.max-interval: # 最大重试时间间隔 
      simple.retry.stateless: # 重试是有状态or无状态 
    template: 
      mandatory: # 启用强制信息;默认false 
      receive-timeout: # receive() 操作的超时时间 
      reply-timeout: # sendAndReceive() 操作的超时时间 
      retry.enabled: # 发送重试是否可用 
      retry.max-attempts: # 最大重试次数 
      retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔 
      retry.multiplier: # 应用于上一重试间隔的乘数 
      retry.max-interval: #最大重试时间间隔

Spring AMQP的主要对象

AMQP 官网文档地址

类 作用 Queue 对应RabbitMQ中Queue AmqpTemplate 接口,用于向RabbitMQ发送和接收Message RabbitTemplate AmqpTemplate的实现类 @RabbitListener 指定消息接收方,可以配置在类和方法上 @RabbitHandler 指定消息接收方,只能配置在方法上,可以与@RabbitListener一起使用 Message 对RabbitMQ消息的封装 Exchange 对RabbitMQ的Exchange的封装,子类有TopicExchange、FanoutExchange和DirectExchange等 Binding 将一个Queue绑定到某个Exchange,本身只是一个声明,并不做实际绑定操作 AmqpAdmin 接口,用于Exchange和Queue的管理,比如创建/删除/绑定等,自动检查Binding类并完成绑定操作 RabbitAdmin AmqpAdmin的实现类 ConnectionFactory 创建Connection的工厂类,RabbitMQ也有一个名为ConnectionFactory的类但二者没有继承关系,Spring ConnectionFactory可以认为是对RabbitMQ ConnectionFactory的封装 CachingConnectionFactory Spring ConnectionFactory的实现类,可以用于缓存Channel和Connection Connection Spring中用于创建Channel的连接类,RabbitMQ也有一个名为Connection的类,但二者没有继承关系,Spring Connection是对RabbitMQ Connection的封装 SimpleConnection Spring Connection的实现类,将实际工作代理给RabbitMQ的Connection类 MessageListenerContainer 接口,消费端负责与RabbitMQ服务器保持连接并将Message传递给实际的@RabbitListener/@RabbitHandler处理 RabbitListenerContainerFactory 接口,用于创建MessageListenerContainer SimpleMessageListenerContainer MessageListenerContainer的实现类 SimpleRabbitListenerContainerFactory RabbitListenerContainerFactory的实现类 RabbitProperties 用于配置Spring AMQP的Property类

对于消息的发送方而言,需要进行如下配置

  1. 配置 CachingConnectionFactory
  2. 配置 Exchange/Queue/Binding
  3. 配置 RabbitAdmin 创建上一步的 Exchange/Queue/Binding
  4. 配置 RabbitTemplate 用于发送消息,RabbitTemplate通过CachingConnectionFactory 获取到 Connection,然后想指定 Exchange发送

消息的消费方需要进行如下配置

  1. 配置 CachingConnectionFactory
  2. 配置 Exchange/Queue/Binding
  3. 配置 RabbitAdmin 创建上一步的 Exchange/Queue/Binding
  4. 配置 RabbitListenerContainerFactory
  5. 配置 @RabbitListener/@RabbitHandler 用于接收消息

默认情况下的配置如下

配置项 默认值 作用 host localhost RabbitMQ服务器地址 port 5672 RabbitMQ服务器端口 username guest 用户名 password guest 密码 virtualHost / RabbitMQ 虚拟主机名 publisherConfirms false 设置是否启用生产方确认 publisherReturns false 设置是否启用生产方消息返回 ssl 对象 配置 SSL,默认停用 template 对象 设置 RabbitTemplate template.retry 默认停用 设置RabbitTemplate发送消息时的重试,主要用于RabbitTemplate与RabbitMQ之间的网络连接 template.mandatory false 设置发送消息失败时(无接收queue)是否return 消息,与return callback一并使用 template.exchange “” 默认发送的exchange template.routingKey “” 默认发送消息时的routing key template.defaultReceiveQueue null 默认接收消息的queue listener.simple 对象 设置SimpleRabbitListenerContainerFactory listener.direct 对象 设置DirectRabbitListenerContainerFactory listener.simple.concurrency null 并发消费方数量 listener.simple.acknowledgeMode AUTO 设置消费方确认模式,这里的AUTO与RabbitMQ的自动确认不是一回事 listener.simple.prefetch 250 设置消费方一次性接收消息的条数 listener.simple.defaultRequeueRejected true 当Listener发生异常时是否requeue listener.simple.retry 对象 设置Listener的重试机制,默认停用,当启用时,Listener对于消息处理过程中的异常将进行requeue重试,超过重试次数再抛弃,此时AmqpRejectAndDontRequeueException异常也会被重试

通过配置类加载

@Configuration
public class RabbitClusterConfig {
    @Value("${spring.rabbitmq.addresses}")
    private String addresses;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Bean 
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        // connectionFactory.setHost(); 
        // connectionFactory.setPort();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);

        return connectionFactory;
    }

    @Bean 
    // 如果需要对 rabbitTemplate 设置不同的回调类,需要设置原型模式,不然回调类只能有一个 
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE,proxyMode = ScopedProxyMode.TARGET_CLASS)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置消布确认回调,即当消息达到交换机回调 
        // rabbitTemplate.setConfirmCallback(); 
        // 消息(带有 RoutingKey)到达交换机,与交换机的所有所有绑定的键进行匹配,匹配不到触发回调 
        // rabbitTemplate.setReturnsCallback(); 
        return rabbitTemplate;
    }

    public static final String CLUSTER_EXCHANGE_NAME = "cluster_direct_exchange";
    public static final String CLUSTER_QUEUE_NAME = "cluster_direct_queue";
    public static final String CLUSTER_ROUTING_KEY = "cluster";

    @Bean 
    public DirectExchange clusterDirectExchange() {
        return new DirectExchange(CLUSTER_EXCHANGE_NAME,true,false);
    }

    @Bean 
    public Queue clusterDirectQueue() {
        return new Queue(CLUSTER_QUEUE_NAME,true,false,false);
    }

    @Bean 
    public Binding clusterDirectBinding(@Qualifier("clusterDirectQueue") Queue clusterDirectQueue,
                                        @Qualifier("clusterDirectExchange") DirectExchange clusterDirectExchange) {
        return BindingBuilder.bind(clusterDirectQueue).to(clusterDirectExchange).with(CLUSTER_ROUTING_KEY);
    }
}