Pipeline 设计模式的优缺点和实践案例

Java
509
0
0
2023-04-18

一、概述

img

有同学提到几个问题,本文简单探讨下。 (1)例子中 Pipeline 的代码使用硬编码也可以实现,为什么要用这个模式,有什么好处? (2)Pipeline 设计模式在实际的编码中是怎样体现的? (3)Pipeline 设计模式有什么缺点?如何避免?

二、答疑

2.1 为什么要用 Pipeline 设计模式,而不是硬编码?

“为什么用 XXX 设计模式,而不是硬编码?” 这个问题对其他设计模式都适用。 这个问题分为两个维度来回答: (1) 该设计模式有什么优点? (2) 该设计模式的适用场景有哪些?

2.1.1 Pipeline 设计模式 的优点

Pipeline 设计模式的优点主要有以下三点:降低耦合、增加灵活性、提高性能。

  • 降低耦合度。Pipeline 设计模式将不同的处理逻辑封装成独立的阶段,每个阶段只关注自己的输入和输出,不需要知道其他阶段的细节。这样可以方便地增加、删除或修改阶段,而不影响整个流程的运行。
  • 增加灵活性。Pipeline设计模式可以通过配置化来实现不同的业务走不同的流程,而不需要修改代码。这样可以根据需求变化快速地调整流程,提高开发效率和可维护性。
  • 提高性能。Pipeline设计模式可以利用多线程或异步机制来并行执行不同的阶段,从而提高整个流程的吞吐量和响应时间。

2.1.2 Pipeline 设计模式的常见场景

一般来说,某个处理流程可以拆分成多个处理步骤,不同的步骤之间相对独立,数据在不同的步骤之间传递,可以通过特定编排来完成一个复杂的任务,此时可以考虑使用 Pipeline 设计模式。

下面给出一些常见的场景: 数据处理:当需要对大量数据进行处理时,通常需要将处理过程分为多个阶段。例如,数据清洗、转换、归一化、特征提取等阶段都可以作为 Pipeline 中的一部分。

图像处理:在图像处理中,需要对图像进行多个处理阶段,例如颜色空间转换、滤波、边缘检测、特征提取等。这些处理步骤可以被组合成一个 Pipeline,以便可以轻松地处理整个图像数据集。

构建 DevOps 流水线:在软件开发过程中,需要对代码进行多个处理阶段,例如代码编译、单元测试、代码分析、代码部署等。这些步骤可以被组合成一个 Pipeline,以构成整个开发过程。

2.2 实际工作中是怎样落地的?

很多人觉得上面讲的 Pipeline 设计模式可能不够接地气,那么实际工作中 Pipeline 有哪些常见的落地方式?

2.2.1 Java Stream API

Java Stream API 就是一种典型的流水线落地方式。 下面是一个简单的Java Stream的示例代码,它使用了 filtermapcollect操作,从一个字符串列表中筛选出以字母"A"开头的字符串,并转换为大写,然后收集到一个新的列表中。

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class StreamExample {

    public static void main(String[] args) {
        // Create a list of strings
        List<String> list = Arrays.asList("Apple", "Banana", "Orange", "Pear", "Avocado");

        // Create a stream from the list
        // Filter the strings that start with "A"
        // Map the strings to upper case
        // Collect the results into a new list
        List<String> result = list.stream()
                .filter(s -> s.startsWith("A"))
                .map(s -> s.toUpperCase())
                .collect(Collectors.toList());

        // Print the result
        System.out.println(result); // [APPLE, AVOCADO]
    }
}

日常开发中,通常查询出底层数据,通过筛选和映射,转为所需的结构。由于 Java 的 Stream 比较简单和常用,在这里就不作过多陈述。

2.2.2 业务编排

比如你工作中有一个这种需求:需要做一个物料(新闻资讯、短视频等)推荐系统,有以下几个步骤:物料召回(根据业务需求从 MySQL 、ES 或二方接口中查询候选物料)、黑名单过滤(有些物料不允许透出)、观看记录过滤(观看过的不能透出,需要过滤掉)、按类型粗排(同个类目或主题只能保留 M 个)、算法精排(调用算法系统进行打分)、业务置顶(根据业务需要对某些物料置顶)、按 size 截断(返回请求所需的 size)等步骤。

伪代码如下:

// 定义一个Pipeline接口,表示一个流水线
public interface Pipeline<T> {
    // 添加一个阶段到流水线
    void addStage(Stage<T> stage);
    // 执行流水线
    void execute(T input);
}

// 定义一个Stage接口,表示一个阶段
public interface Stage<T> {
    // 处理输入数据,并返回输出数据
    T process(T input);
}

// 定义一个PipelineContext类,表示流水线的上下文
public class PipelineContext<T> {
    // 存储流水线的阶段列表
    private List<Stage<T>> stages;
    // 存储流水线的当前索引
    private int index;

    public PipelineContext() {
        stages = new ArrayList<>();
        index = 0;
    }

    // 添加一个阶段到上下文
    public void addStage(Stage<T> stage) {
        stages.add(stage);
    }

    // 执行上下文中的下一个阶段
    public void invokeNext(T input) {
        if (index < stages.size()) {
            Stage<T> stage = stages.get(index++);
            stage.process(input);

         }
    }
}     


// 定义一个RecContext类,表示推荐的上下文
public class RecContext<T> {
    // 存储推荐中的物料列表
    private List<T> items; 

    // 其他属性

    public PipelineContext() {
        items = new ArrayList<>();
    }

    // 省略其他方法
}     


// 定义一个DefaultPipeline类,实现Pipeline接口
public class DefaultPipeline<T> implements Pipeline<T> {
    // 创建一个PipelineContext对象
    private PipelineContext<T> context;

    public DefaultPipeline() {
        context = new PipelineContext<>();
    }

    @Override
    public void addStage(Stage<T> stage) {
        context.addStage(stage);
    }

    @Override
    public void execute(T input) {
        context.invokeNext(input);
    }
}

// 定义一个物料类,表示推荐系统的输入和输出数据
public class Material {
    // 物料的id
    private String id;
    
    // 物料的类型(资讯、视频等)
    private String type;
    
    // 物料的评分(算法精排后的结果)
    private double score;
    
    // 省略构造方法、getters和setters

}


// 定义一个物料召回阶段类,实现Stage接口
public class MaterialRecallStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据用户的兴趣、行为等特征,从物料库中召回一批候选物料,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    }
}

// 定义一个黑名单过滤阶段类,实现Stage接口
public class BlacklistFilterStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据用户的黑名单设置,过滤掉不符合条件的物料,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    }
}


// 定义一个观看记录过滤阶段类,实现Stage接口
public class WatchRecordFilterStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据用户的观看记录,过滤掉已经观看过的物料,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    }
}

// 定义一个按类型粗排阶段类,实现Stage接口
public class TypeSortStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据用户的偏好和物料的类型,按照一定的规则对物料进行粗排,并设置到 context 的 items中
       
        // 省略具体实现细节
        return context;
    }
}

// 定义一个算法精排阶段类,实现Stage接口
public class AlgorithmSortStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据用户的特征和物料的特征,使用机器学习模型对物料进行打分,排序后设置到 context 的 items中
       
        // 省略具体实现细节
        return context;
    }
}

// 定义一个业务置顶阶段类,实现Stage接口
public class BusinessTopStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据业务的需求,对部分物料进行置顶操作,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    }
}

// 定义一个按size截断阶段类,实现Stage接口
public class SizeCutStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据请求中的 size 数量,对物料的数量进行截断,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    }
}

// 定义一个测试类,用来创建和执行流水线
public class Test {

    public static void main(String[] args) {
        // 创建一个物料对象,作为流水线的输入数据
        RecContext<Material> recContext = new RecContext<Material>();
        
        // 创建一个流水线对象
        Pipeline<RecContext<Material>> pipeline = new DefaultPipeline<>();
        
        // 添加各个阶段到流水线中
        pipeline.addStage(new MaterialRecallStage());
        pipeline.addStage(new BlacklistFilterStage());
        pipeline.addStage(new WatchRecordFilterStage());
        pipeline.addStage(new TypeSortStage());
        pipeline.addStage(new AlgorithmSortStage());
        pipeline.addStage(new BusinessTopStage());
        pipeline.addStage(new SizeCutStage());
        
        // 执行流水线
        pipeline.execute(recContext);
        
        // 输出流水线的结果
        System.out.println(material);
    }
}

每个流程可以配置为 Spring 的 Bean, 流程的编排可以使用动态配置进行控制。 比如粗排、置顶等步骤有多种方式可选,可以根据业务需要通过修改动态配置进行替换。 还可以自研框架实现某些步骤之间的并行执行,比如将 bean 的 name 进行配置,其中中括号的部分可以并行执行:

[videoRecall, newsRecall,topicRecal],blacklist,
    recordFilter,typeSorce,algorithmSort,businessTop,sizeCut

通过这里例子,大家可以更好地理解, Pipeline 设计模式的优点,不同的步骤可以相互独立降低耦合,灵活组合复用,部分步骤之间可以采用并行执行的方式提高性能。

2.3 Pipeline 模式有哪些缺点?

每种设计模式都有自己的局限性,下面给出 Pipeline 设计模式的几个缺点:

  • 可读性不强。因为 Pipeline 设计模式是可配置化的,且配置经常在外部(比如数据库里的一个 JSON 、携程的 Apollo 动态配置),所以不容易看出整个流程的逻辑和细节。
  • 调试困难。因为 Pipeline 设计模式涉及多个阶段的协作,如果某个阶段出现问题,不容易快速定位和修复。
  • 性能损耗。因为 Pipeline 设计模式需要在每个阶段之间传递数据,如果每个阶段是跨机器的,会增加内存和网络的开销。

当然这些缺点也不是不能解决的: (1)针对可读性不强的问题,我们可以在请求的入口处贴出配置的地址,方便代码和配置关联。由于每个步骤非常独立,做好每个步骤的代码可读性也可以在一定程度上解决问题。 (2)针对调试和排查问题困难的问题,使用 Pipeline 设计模式时,关键地方一定要打好日志,方便快速定位、排查问题。 (3)我们可以通过一些调整提高性能,比如上述物料推荐业务而言,需要调用算法平台的服务去打分,我们可以在打分前进行粗排,只将粗排分数较高的传给算法平台,用户和物料特征不需要传递给算法平台,算法平台自己去查询相关物料和用户特征再打分等。

三、总结

学习的目的还是为了应用,大家在学习设计模式时,要主动和 JDK 源码,和自己使用的二方和三方框架的设计相结合,要主动和日常的业务场景相结合,以便更好的做到学以致用。 每种设计模式都有自己的适用场景、优点和缺点,我们要注重掌握,并且不仅要了解某种设计模式存在的问题,还要积极思考如何解决。 通常来说,对于非常简单的场景,杀鸡焉用牛刀,直接编码即可;对于复杂场景建议优先考虑遵循设计原则,使用经典的设计模式,以提高代码的可重用性、可读性、灵活性、可拓展性、安全性和降低代码复杂度等。