Kafka 入门

Java
621
0
0
2022-05-01
标签   Kafka

一、生产者消费者模型与阻塞队列

肯德基扫码点餐流程

大多数人都在 KFC 扫码点过餐。首先你会扫描二维码进入小程序点餐,完成付款;付完款之后,你会收到你的取餐码,等到你的订单做好了,肯德基的服务员会通知你到前台取餐。其模型如下图所示:

生产者消费者模型

上面介绍的肯德基扫码点餐流程实际上就是一个生产者消费者模型在生活中的经典应用。生产者消费者模型是一种程序设计模式,其被广泛应用在解耦消息队列等场景。

在一个系统中,存在生产者和消费者两种角色,它们通过内存缓冲区进行通信,生产者产生消费者需要的资料,消费者把资料做成产品。生产者消费者模型如下图所示:

阻塞队列

我们要知道,内存缓冲区的容量大小不是无穷无尽的,就好比一个停车场,当所有的停车位都被占满时,外面的车辆只有等待里面的车位腾出来才可以进入。

  • 生产者生产数据到缓冲区,消费者从缓冲区中取数据
  • 生产者的生产速度远远大于消费者的消费速度时;如果缓冲区已经满了,那么生产者的线程需要被阻塞
  • 消费者的消费速度远远大于生产者的生产速度时;如果缓冲区为空,那么消费者的线程需要被阻塞

如何让生产者,消费者的线程在上述情况中挂起?我们可以使用阻塞队列(BlockingQueue)

我们来看一个示例程序:

代码地址:github.com/jinrunheng/blocking-que...

生产者生产速度大于消费者消费速度

Producer

package com.github.blockingqueuedemo;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + " produce. " + "blocking queue size is :" + queue.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Consumer

package com.github.blockingqueuedemo;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override 
    public void run() {
        while (true) {
            try {
                // 0 - 1000ms 消费一个数据,可以认为 生产速度大于消费速度
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName() + " consume. " + "blocking queue size is :" + queue.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Main

package com.github.blockingqueuedemo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Main {
    public static void main(String[] args) {
        // 阻塞队列的最大长度为 10
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        // 一个生产者,最多生产 100 个数据  
        new Thread(new Producer(queue)).start();

        // 三个消费者线程  
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

程序执行结果:

Thread-0 produce. blocking queue size is :1
Thread-0 produce. blocking queue size is :2
Thread-0 produce. blocking queue size is :3
Thread-0 produce. blocking queue size is :4
Thread-1 consume. blocking queue size is :3
Thread-0 produce. blocking queue size is :4
Thread-0 produce. blocking queue size is :5
Thread-0 produce. blocking queue size is :6
Thread-0 produce. blocking queue size is :7
Thread-0 produce. blocking queue size is :8
Thread-0 produce. blocking queue size is :9
Thread-0 produce. blocking queue size is :10
Thread-3 consume. blocking queue size is :9
Thread-0 produce. blocking queue size is :10
Thread-2 consume. blocking queue size is :9
Thread-0 produce. blocking queue size is :10
Thread-1 consume. blocking queue size is :9
... ...

我们看到,生产者的生产速度大于消费者消费速度的情况下,生产者会一直生产,直至阻塞队列为满,然后进入到阻塞状态,之后消费者开始消费。

消费者的消费速度大于生产者的生产速度

将消费者的代码改动为:

Consumer

package com.github.blockingqueuedemo;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override 
    public void run() {
        while (true) {
            try {
                // 0-10ms 消费一个数据,可以认为,消费速度要大于生产速度
                Thread.sleep(new Random().nextInt(10));
                queue.take();
                System.out.println(Thread.currentThread().getName() + " consume. " + "blocking queue size is :" + queue.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

程序执行的结果:

Thread-1 consume. blocking queue size is :0
Thread-0 produce. blocking queue size is :1
Thread-0 produce. blocking queue size is :1
Thread-3 consume. blocking queue size is :0
Thread-0 produce. blocking queue size is :1
Thread-2 consume. blocking queue size is :0
Thread-0 produce. blocking queue size is :1
Thread-1 consume. blocking queue size is :0
Thread-0 produce. blocking queue size is :1
Thread-3 consume. blocking queue size is :0
Thread-0 produce. blocking queue size is :1
Thread-2 consume. blocking queue size is :0
Thread-0 produce. blocking queue size is :1
Thread-1 consume. blocking queue size is :0
Thread-0 produce. blocking queue size is :1
Thread-3 consume. blocking queue size is :0
... ...

我们可以看到,当消费者的消费速度大于生产者的生产速度时,如果阻塞队列为空,那么消费者线程会进入阻塞状态。

二、消息队列(MQ)

什么是消息队列?

首先,消息(Message) 指的是我们要传输的数据,队列(Queue)指的就是存放我们传输数据的内存缓冲区。

消息队列从字面意思来看,它就是一个存放数据的容器,其本质还是使用了生产者消费者模型,生产者将数据放入到消息队列中,消费者从消息队列里取出数据。

一般来说,消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件;其主要解决 应用耦合异步消息流量削峰 等问题,实现高性能,高可用,可伸缩的一致性框架。常用的消息队列框架有:RabbitMQ,Kafka 等。

消息队列的使用场景

1. 系统解耦

举例:用户下订单

用户下订单后,订单系统需要通知库存系统,传统的做法是订单系统来调用库存系统的接口

这样做的缺点是,两者产生了耦合关系,假如库存系统无法访问,那么订单减库存将会失败,从而导致用户下订单失败。

这个时候,可以使用消息队列来解除系统模块之间造成的耦合

用户下订单后,订单系统作为生产者,将消息写入消息队列,返回用户订单下单成功;库存系统作为消费者订阅下单的消息,进行库存操作。即便库存系统出现了问题,也不会影响用户正常下订单,可以等到库存系统恢复,再来处理 MQ 中的消息。这样一来,我们就实现了订单系统与库存系统的解耦。为了保证库存的数量,可以将消息队列的大小设置为当前库存的数量,这样就可以保证库存的商品一定是有的。

2. 异步通信

举例:依旧是用户下订单

假设用户下订单有几个流程:

  1. 支付
  2. 优惠券系统
  3. 积分系统
  4. 发送短信
  5. … …

如果使用传统的串行方式:

我们看到如果使用传统的方式来做下订单的逻辑,第一个问题是各个系统之间的耦合,如果优惠券系统错误导致添加优惠券失败,短信系统出现问题导致发短信失败,就会导致用户下订单失败;第二个问题是性能问题,用户下订单目前已经集成了好几个系统模块了,如果后续还要添加其他的功能,那么随着支付的链路越来越长,用户下订单就会变成一个非常耗时的操作,严重影响用户的体验。

我们可以使用消息队列将串行的方式变为异步通信的方式:

我们使用消息队列将消息异步分发给各个系统模块,不仅做到了模块之间的解耦,还优化了系统的速度。

3. 流量削峰

举例:

例如秒杀活动,双十一活动,在某个时间点,服务器会一瞬间收到大量的请求,你的服务器,Redis,MySQL 的承受能力都不一样,如果不做流量削峰处理,很有可能导致服务器宕机。

我们可以使用消息队列做流量削峰,将用户所有的请求放到 MQ 中,按照自己的服务器处理能力来设置每秒服务器能处理多少请求。

消息队列的通讯模型

消息队列主要有两种通讯模型:PTP 以及 Pub/Sub 模型

1. PTP

PTP 即 Point to Point ,点对点通讯

生产者发送一条消息到 MQ,只有一个消费者才能收到

2. Pub/Sub

Pub/Sub 即:Publisher/Subscriber ,发布/订阅通讯模型

发布者发送消息到 Topic,所有订阅了 Topic 的订阅者都可以收到消息

三、Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由ScalaJava编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。

Kafka 的应用

  • 消息队列(采用 Pub/Sub 模型)
  • 日志收集
  • 用户行为追踪
  • 流式处理
  • … …

Kafka 的特点

  • 高吞吐量(可以处理 TB 级的异步消息)
  • 消息持久化
  • 高可靠性
  • 高扩展性

Kafka 模型

Kafka 术语

1. Topic

在 Kafka 中,Topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。具体的物理存储是基于 Partition 来的。一个 Topic 可以划分为多个分区(Partition),且每个 Topic 至少有一个分区。

2. Broker

一台 Kafka 服务器就是一个 Broker 。一个 Kafka 集群由多个 Broker 组成,然后通过 Zookeeper 来进行集群的管理。

如果 Topic 有 N 个 Partition,集群有 N 个 Broker ,那么每个 Broker 存储该 Topic 的一个 Partition。

如果某 Topic 有 N 个 Partition,集群有 (N+M) 个 Broker,那么其中有 N 个 Broker存储该 Topic的一个Partition,剩下的 M 个 Broker 不存储该 Topic 的 Partition数据。

如果某 Topic有 N 个 Partition,集群中 Broker数目少于 N 个,那么一个 Broker 存储该 Topic 的一个或多个 Partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。

3. Partition

Partition (分区),是 Kafka 下数据存储的基本单元。Topic 是一个逻辑的概念,Partition 则是物理的概念。分区是实际存储在 Broker 上的。每个 Partition 都是一个有序的队列,通过提升分区数量可以提升同一个 Topic 的数据吞吐量。

4. offset

每个消息被添加到 Partition 时,都会分配唯一的 offset,以保证 Partition 内消息的顺序性。消费者通过 offset 定位并读取消息,且哥哥消费者持有的 offset 是自己的消费进度。

5. Replica

Replica 即副本,也就是 Partition 的一个备份。一个分区(Partition)只能有一个leader,但是可以设置多个副本(follower),同一分区的副本不能在同一台机器上。也就是说如果有2 台 Broker,那么一个分区就最多会有 1 个副本。leader 的主要作用是:完成与生产者、消费者的交互;follower的主要作用是:做数据备份,当 leader 发生故障时,某个 follower 会成为新的 leader,以此来保证 Kafka 的可用性。

Kafka 下载与安装

Kafka 官网:kafka.apache.org/

Kafka 下载:kafka.apache.org/downloads

Kafka 的基本使用

下载好 Kafka 后,我们来使用基本的生产者消费者模型来发送,接收消息。

  1. 启动 Zookeeper

在 Kafka 的安装包 bin 目录下运行命令:

sh zookeeper-server-start.sh ../config/zookeeper.properties

启动 Zookeeper

  1. 启动 Kafka

启动 Zookeeper 后,新开一个 Terminal ,在 Kafka 的安装包 bin 目录下运行命令:

sh kafka-server-start.sh ../config/server.properties

启动 Kafka

  1. 创建一个 Topic

再开一个 Terminal 窗口,进入到 Kafka 安装包 bin 目录下运行命令:

sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

我们创建了一个主题(Topic) test;查看刚刚创建的主,使用命令:

sh kafka-topics.sh --list --bootstrap-server localhost:9092

可以看到,返回结果:

➜  bin sh kafka-topics.sh --list --bootstrap-server localhost:9092
test

说明我们创建 Topic 成功。

  1. 调用生产者发送消息

新开一个 Terminal 窗口,进入到 Kafka 安装包 bin 目录下,使用命令:

sh kafka-console-producer.sh --broker-list localhost:9092 --topic test

发送消息内容如下:

  1. 调用消费者接受消息

新开一个 Terminal 窗口,进入到 Kafka 安装包 bin 目录下,使用命令:

sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

我们可以看到消费者接受到了生产者发送的消息:

开启 Zookeeper 和 Kafka 有严谨的顺序,一定要先启动ZooKeeper 再启动Kafka;先关闭kafka ,再关闭zookeeper ,顺序不可以改变。

Spring 整合 Kafka

  • 引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
  • 配置 application,properties
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
# 是否自动提交消费者的偏移量
spring.kafka.consumer.enable-auto-commit=true
# 提交的频率 单位为ms
spring.kafka.consumer.auto-commit-interval=3000
  • 访问 Kafka
  • 生产者
kafkaTemplate.send(topic,data);
  • 消费者
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record){...}

示例程序

源码地址:github.com/jinrunheng/spring-kafka-...

首先开启 Zookeeper 与 Kafka 服务

KafkaProducer

package com.github.springkafkademo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {

    @Autowired 
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }
}

KafkaConsumer

package com.github.springkafkademo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"test"}) 
    public void receiveMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }
}

Test

package com.github.springkafkademo;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;

@SpringBootTest
@ContextConfiguration(classes = SpringKafkaDemoApplication.class)
class SpringKafkaDemoApplicationTests {

    @Autowired 
    private KafkaProducer producer;

    @Test 
    public void kafka() {
        producer.sendMessage("test", "hello");
        producer.sendMessage("test", "kafka");

        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

运行测试,程序 10 秒后,消费者接受到生产者的数据

hello
kafka

参考文章

www.zhihu.com/question/54152397

www.cnblogs.com/chentingk/p/649710...

www.cnblogs.com/weixuqin/p/1143098...

www.cnblogs.com/qingyunzong/p/9004...

blog.csdn.net/xiaolyuh123/article/...