消息队列Kafka、RabbitMQ

编程/开发
490
0
0
2023-06-13

Dubbo远程调用的性能问题

Dubbo调用在微服务项目中普遍存在

这些Dubbo调用都是同步的

“同步”指:A(消费者)调用B(生产者)的服务A在发起调用后,在B返回之前只能等待

直到B返回结果后A才能运行

订单减库

Dubbo消费者发送调用后进入阻塞状态,这个状态表示改线程仍占用内存资源,但是什么动作都不做

如果生产者运行耗时较久,消费者就一直等待,如果消费者利用这个时间,那么可以处理更多请求,业务整体效率

实际情况下,Dubbo有些必要的返回值必须等待,但是不必要等待的服务返回值,我们可以不等待去做别的事情

这种情况下我们就要使用消息队列

什么是消息队列

消息队列(Message Queue)简称 MQ

消息队列是采用”异步(两个微服务项目并不需要同时完成请求)”的方式来传递数据完成业务操作流程的业务处理方式

消息队列的特征

  • 利用异步的特性,提高服务器的运行效率,减少因为远程调用出现的线程等待阻塞
  • 削峰填谷:在并发峰值超过当前系统处理能力时,我们将没处理的信息保存在消息队列中,在后面出现的较闲的时间中去处理,直到所有数据依次处理完成,能够防止在并发峰值时短时间大量请求而导致的系统不稳定
  • 消息队列的延时:因为是异步执行,请求的发起者并不知道消息何时能处理完,如果业务不能接收这种延迟,就不要使用消息队列

常见消息队列

  • Kafka : 性能好功能弱:适合大数据量,高并发的情况,大数据领域使用较多
  • RabbitMQ : 功能强性能一般:适合发送需求复杂的消息队列, java 业务中使用较多
  • RocketMQ: 阿里 的
  • ActiveMQ:前几年流行的,老项目可能用到
常见面试题:消息队列异常处理

如果我们真的将上面生成订单业务里,减少库存的操作从正常流程中剥离到消息队列

那么如果库存减少过程中发送异常,就不能由Seata接收了,因为异步的处理无法和Seata通信

意思是如果使用了消息队列,队列中处理数据过程发送异常,那么就要用特殊的方法处理问题

处理方式就是手写代码进行回滚,一般情况就是在stock,模块再向order模块发送消息,order模块接收到消息后进行进一步处理

如果order模块进一步处理时又发生异常,我们可以再向一个实现设置好的消息队列中发送消息

这个消息队列没有处理者,我们称之为”死信队列”,一个正常运行的程序,会定期有人处理死信队列信息

Kafka

什么是Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由 Scala 和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由 LinkedIn 开发,并随后于2011年初开源。

kafka软件结构

Kafka Cluster(Kafka集群)

Partition(分片)

Producer:消息的发送方,也就是消息的来源,Kafka中的生产者

order就是消息的发送方

consumer :消息的接收方,也是消息的目标,Kafka中的消费者

​ stock就是消息的接收方法

Topic:话题或主题的意思,消息的收发双方要依据同一个话题名称,才不会将信息错发给别人

Record:消息记录,就是生产者和消费者传递的信息内容,保存在指定的Topic中

Kafka的特征与优势

Kafka作为消息队列,它和其他同类产品相比,突出的特点就是性能强大

Kafka将消息队列中的信息保存在硬盘中

Kafka对硬盘的读取规则进行优化后,效率能够接近内存

硬盘的优化规则主要依靠”顺序读写,零拷贝,日志压缩等技术”

Kafka处理队列中数据的默认设置:

  • Kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)
  • Kafka默认队列中的信息保存7天,可以配置这个时间,缩短这个时间可以减少Kafka的磁盘消耗

Kafka的安装和配置

最好将我们kafka软件的解压位置设置在一个根目录

然后路径不要有空格和中文

文件位置

我们要创建一个空目录用于保存Kafka运行过程中产生的数据

本次创建名称为data的空目录

下面开始配置启动信息

先到G:kafkaconfig下配置有文件 Zookeeper .properties

找到dataDir属性修改如下

 dataDir=G:/data  

注意G盘和data文件夹名称,匹配自己电脑的真实路径和文件夹名称

还要修改server.properties配置文件

 log.dirs=G:/data  

修改注意事项和上面相同

Zookeeper简介

我们在启动kafka前必须先启动Zookeeper

zoo:动物园

keeper:园长

可以引申为管理动物的人

早期,每个服务器系统中的软件在安装后运行都需要一些配置

那么软件多了,配置会比较复杂

我们使用Zookeeper之后,可以创建一个新的管理各种软件配置的文件管理系统

在Zookeeper中,可以修改服务器系统中的所有软件配置

长此以往,很多软件就删除了自己写配置文件的功能,而直接从Zookeeper中获取

Kafka就是需要将配置编写在Zookeeper中的软件之一

Kafka的启动

启动Zookeeper

进入路径G:kafkabinwindows

输入cmd进入dos命令行

 G:kafkabinwindows>zookeeper-server-start.bat ....configzookeeper.properties  

启动kafka

总体方式一样,输入不同指令

 G:kafkabinwindows>kafka-server-start.bat ....configserver.properties  

附录

Mac系统启动Kafka服务命令 (参考):

 # 进入Kafka文件夹
cd Documents/kafka_.13-2.4.1/bin/
# 动Zookeeper服务
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties 
# 启动Kafka服务
./kafka-server-start.sh -daemon ../config/server.properties   

Mac系统关闭Kafka服务命令(参考):

 # 关闭Kafka服务
./kafka-server-stop.sh 
# 启动Zookeeper服务
./zookeeper-server-stop.sh  

在启动kafka时有一个常见错误

 wmic不是内部或外部命令  

这样的提示,需要安装wmic命令,安装方式参考百度查找

Kafka使用Demo

不要关闭Zookeeper和Kafka的dos窗口

我们再csmall项目中编写一个简单的Demo学习Kafka的使用

在csmall-cart-webapi模块中

添加依赖

 <dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>  

修改yml文件配置

 spring:
  kafka:
    # 定义kafka的位置
     bootstrap -servers: localhost:9092
    # 话题的分组名称,是必须配置的
    # 为了区分当前项目和其他项目使用的,防止了不同项目相同话题的干扰或错乱
    # 本质是在话题名称前添加项目名称为前缀来防止的
     Consumer :
      group-id: csmall  

Spring Boot启动类中添加注解

 @SpringBootApplication
@Enable Dubbo 
// 启动kafka的功能
@EnableKafka
// 为了测试kafka,我们可以周期性的发送消息到消息队列
// 使用SpringBoot自带的调度工具即可
@EnableScheduling
public class CsmallCartWebapiApplication {

  public  static   void  main(String[] args) {
    SpringApplication.run(CsmallCartWebapiApplication.class, args);
  }
}  

下面我们就可以实现周期性的向kafka发送消息并接收的操作了

编写消息的发送

cart-webapi包下创建kafka包

包中创建Producer类来发送消息

 // 我们需要周期性的向Kafka发送消息
// 需要将具备SpringBoot调度功能的类保存到Spring容器才行
@Component
public class Producer {

  // 能够实现将消息发送到Kafka的对象
  // 只要Kafka配置正确,这个对象会自动保存到Spring容器中,我们直接装配即可
  // KafkaTemplate<[话题名称的类型],[传递消息的类型]>
  @Autowired
   private  KafkaTemplate<String,String> kafkaTemplate;

  // 每隔秒向Kafka发送信息
  int i=;
  // fixedRate是周期运行,单位毫秒ms就是10秒
  @Scheduled(fixedRate =)
  // 这个方法只要启动SpringBoot项目就会按上面设置的时间运行
  public void sendMessage(){
    // 实例化一个Cart类型对象,用于发送消息
    Cart cart=new Cart();
    cart.setId(i++);
    cart.setCommodityCode("PC");
    cart.setPrice(RandomUtils.nextInt()+200);
    cart.setCount(RandomUtils.nextInt()+1);
    cart.setUserId("UU");
    // 将cart对象转换为json格式字符串
    Gson gson=new Gson();
    // 执行转换
    String json=gson.toJson(cart);
    System.out.println("本次发送的消息为:"+ json );
    // 执行发送
    // send([话题名称],[发送的消息]),需要遵循上面kafkaTemplate声明的 泛型 类型
    kafkaTemplate.send("myCart",json);
  }
}  

创建一个叫Consumer的类来接收消息

 // 因为Kafka接收消息是自动的,所以这个类也必须交由Spring容器管理
@Component
public class Consumer {

  // SpringKafka框架接收Kafka中的消息使用监听机制
  // SpringKafka框架提供一个监听器,专门负责关注指定的话题名称
  // 只要该话题名称中有消息,会自动获取该消息,并调用下面方法
  @KafkaListener(topics = "myCart")
  // 上面注解和下面方法关联,方法的参数就是接收到的消息
  public void received(ConsumerRecord<String,String> record){
    // 方法参数类型必须是ConsumerRecord
    // ConsumerRecord<[话题名称类型],[消息类型]>
    // 获取消息内容
    String json=record.value();
    // 要想在java中使用,需要转换为java对象
    Gson gson=new Gson();
    // 将json转换为java对象,需要提供转换目标类型的反射
    Cart cart=gson.fromJson(json,Cart.class);
    System.out.println("接收到对象为:"+cart);
  }

}  

RabbitMQ

什么是RabbitMQ

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。 AMQP :Advanced Message Queue,高级消息队列协议。它是 应用层协议 的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。 RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ特征

1. 可靠性 (Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

2. 灵活的路由 (Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能, rabbitmq 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

3. 消息集群 (Clustering)多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker

4. 高可用 (Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

5 .多种协议 (Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、 MQTT 等等。

6. 多语言客户端 (Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、 Ruby 等等。

7. 管理界面 (Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

8. 跟踪机制 (Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

9. 插件机制 (Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

下载软件

RabbitMQ是Erlang语言开发的,所以要先安装Erlang语言的运行环境

下载 Erlang 的官方路径

安装的话就是双击

不要安装在中文路径和有空格的路径下!!!

下载RabbitMQ

下载地址

安装也是双击即可

不要安装在中文路径和有空格的路径下!!!

RabbitMQ的结构

结构图

和Kafka不同,Kafka是使用话题名称来收发信息,结构简单

RabbitMQ是使用 交换机 路由key指定要发送消息的队列

消息的发送者发送消息时,需要指定交换机和路由key名称

消息的接收方接收消息时,只需要指定队列的名称

在编写代码上,相比于Kafka,每个业务要编写一个配置类

这个配置类中要绑定交换机和路由key的关系,以及路由Key和队列的关系

配置Erlang的环境变量

要想运行RabbitMQ必须保证系统有Erlang的环境变量

配置Erlang环境变量

把安装Erlang的bin目录配置在环境变量Path的属性中

环境变量

启动RabbitMQ

找到RabbitMQ的安装目录

可能是:

 G:pgmrabbitrabbitmq_server-.10.1sbin  

具体路径根据自己的情况寻找

地址栏运行cmd

输入启动指令如下

 G:pgmrabbitrabbitmq_server-.10.1sbin>rabbitmq-plugins enable rabbitmq_management  
 rabbitmq-plugins enable rabbitmq_management  

结果如下

命令结果

运行完成后

可以在Window任务管理器中的服务选项卡里找到RabbitMQ的服务(Ctrl+Shift+ESC)

另外的验证方法:

打开浏览器访问

登录界面用户名密码

guest

guest

登录成功后看到RabbitMQ运行的状态

如果启动失败,需要重新安装

参考路径如下

利用RabbitMQ完成消息的收发

csmall-stock-webapi项目中测试RabbitMQ

可以利用之前我们使用 quartz 实现的每隔一段时间输出当前日期信息的方法改为发送消息

添加依赖

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>  

yml文件配置

 spring:
  rabbitmq:
    host: localhost
    port:
    username: guest
    password: guest
    virtual-host: /  

交换机路由Key队列的配置类

RabbitMQ要求我们再java代码级别设置交换机路由Key队列的关系

我们再quartz包下,创建config包

包中创建配置信息类

 // SpringBoot整合RabbitMQ之后
// 这些配置信息要保存在Spring容器中,所以这些配置也要交给SpringBoot管理
@Configuration
public class RabbitMQConfig {
  // 声明需要使用的交换机路由Key队列的名称
  public static final String STOCK_EX="stock_ex";
  public static final String STOCK_ROUT="stock_rout";
  public static final String STOCK_QUEUE="stock_queue";

  // 声明交换机,需要几个声明几个,这里就一个
  // 方法中实例化交换机对象,确定名称,保存到Spring容器
  @Bean
  public DirectExchange stockDirectExchange(){
    return new DirectExchange(STOCK_EX);
  }

  // 声明队列,需要几个声明几个,这里就一个
  // 方法中实例化队列对象,确定名称,保存到Spring容器
  @Bean
  public Queue stockQueue(){
    return new Queue(STOCK_QUEUE);
  }

  // 声明路由Key(交换机和队列的关系),需要几个声明几个,这里就一个
  // 方法中实例化路由Key对象,确定名称,保存到Spring容器
  @Bean
  public Binding stockBinding(){
    return BindingBuilder.bind(stockQueue()).to(stockDirectExchange())
      .with(STOCK_ROUT);
  }

}  

RabbitMQ发送消息

我们再QuartzJob类中输出时间的代码后继续编写代码

实现RabbitMQ消息的发送

 public class QuartzJob implements Job {

  // RabbitTemplate就是amqp框架提供的发送消息的对象
  @Autowired
  private RabbitTemplate rabbitTemplate;

  @Override
  public void execute(JobExecutionContext jobExecutionContext) 
  			throws JobExecution Exception  {
    //输出当前时间
    System.out.println("--------"+ LocalDateTime.now() +"-------");
    // 先简单的发送一个 字符串 
    rabbitTemplate.convertAndSend(RabbitMQConfig.STOCK_EX,
                   RabbitMQConfig.STOCK_ROUT,"接收到减少库存的消息");

  }
}  

我们可以通过修改QuartzConfig类中的Cron表达式修改调用的周期

 CronScheduleBuilder cronScheduleBuilder=
  CronScheduleBuilder.cronSchedule("/10 * * * * ?");  

接收RabbitMQ的消息

quartz包下再创建一个新的类用于接收信息

RabbitMQConsumer代码如下

 // 这个对象也是需要交由Spring容器管理的,才能实现监听Spring容器中保存的队列的效果
@Component
// 和Kafka不同的是Kafka在一个方法上声明监听器
// 而RabbitMQ是在类上声明,监听具体的队列名称
@RabbitListener(queues = {RabbitMQConfig.STOCK_QUEUE})
public class RabbitMQConsumer {

  // 监听了类,但是运行代码的一定是个方法
  // 框架要求这个类中只允许一个方法包含下面这个注解
  // 表示这个方法是处理消息的方法
  // 方法的参数就是消息的值
  @RabbitHandler
  public void process(String str){
    System.out.println("接收到的消息为:"+str);
  }

}  

启动NacosRabbitMQSeata

启动stock-webapi

根据Cron表达式,消息会在0/10/20/30/40/50秒数时运行

测试成功表示一切正常

学习记录,如有侵权请联系删除