用Java编写Logstash输出到MySQL的插件

Java
223
0
0
2024-02-23

导读: 基于上篇文章 ,我们可以实现将官方提供的 logstash-output-java_output_example 项目在本地构建、测试和打包使用。本文将基于此 Demo 之上讨论如实现一个输出到 MySQL 的 output插件,本例子只是一个简单的例子,并未达到生产可用。目的是想通过该例子让各位熟悉如何用 Java 编写 Logstash 插件及分享我编写和设计插件的思路供各位参考。

 // logstash-outup-java-my SQL -plugin 插件  Github  地址

项目结构及核心文件

1、主要项目结构可划分为三块

  • 核心处理逻辑类
  • 测试类
  • Gradle 依赖管理

2、核心文件 JavaOutputExample.class

 // 官方文档 How to write a Java output plugin
#_package_and_deploy_ 

在官方文档中已详细描述 JavaOutputExample 这个类每一部分的作用和含义,大致为以下几部分

  • 构造方法:插件必须提供一个可收 id,、 config uration、Context 的构造方法,初始化参数
  • PluginConfigSpec:允许开发人员通过设置名称、数据类型、弃用状态、必需状态和默认值来指定该插件支持的setting设置。
  • Output 方法:编写核心输出逻辑,们将在该方法中具体编写输出到 MySQL 的逻辑
  • Stop and await Stop

编写和设计的思路

我在编写该插件时参考了 Github 上一个采用 Ruby 语言编写的 logstash-output-jdbc 项目,该项目这个插件允许您使用 JDBC 适配器输出到 SQL 数据库(支持多种数据库)。关于如何使用该插件可以参考 。

 // logstash-output-jdbc GitHub

在该项目中其使用了 Hikari 作为数据源,并引入各种数据库对应的 JDBC 驱动。通过安装该插件后我们在 logstash.conf 配置以下代码即可实现输入到 mysql 数据库中。

 output {
   jdbc {
      driver_jar_path => "/usr/local/logstash/jdbc/mysql-connector-java-.0.19.jar"
      driver_class => "com.mysql.jdbc.Driver"
      #数据库连接
      connection_string => "jdbc:mysql://xxxxxxxxx.ads.aliyuncs.com:/数据库名?user=xxxxx&password=xxxxxx"
      #insert语句
      statement => [ "insert into v_ods_media_show_log (`idfa`) value(?)",
             "idfa"]
   }
} 

参考了该项目,我尝试使用 Java 语言编写输出到 MySQL 的插件,核心设计点如下:

  • 可配置参数
  • SQL 简化
  • 批处理
  • 重试机制

先附上修改后的 JavaOutputExample 类实现代码(省略部分非核心代码)

 @LogstashPlugin(name = "java_output_example")
public class JavaOutputExample implements Output {

    private final String id;
    private final  Integer  flushSize;
    private final Integer retryMaxCount;
    private final Integer retryIntervalTime;
    private final CountDownLatch done = new CountDownLatch();
    private volatile boolean stopped = false;
    private PreparedStatement pstm = null;
    private String statement;
    private List<String> fields;
    public static final String STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss";

    /**
     * all plugins must provide a constructor that accepts id, Configuration, and Context
     */    public JavaOutputExample(final String id, final Configuration config, final Context context) {
        this.id = id;
        this.flushSize = Integer.parseInt(config.get(PluginConfigParams.FLUSH_SIZE));
        this.retryMaxCount = Integer.parseInt(config.get(PluginConfigParams.RETRY_MAX_COUNT));
        this.retryIntervalTime = Integer.parseInt(config.get(PluginConfigParams.RETRY_INTERVAL_TIME));
        this.statement = config.get(PluginConfigParams.STATEMENT);
        // init HikariPool
        HikariPool.setUpPool(config);
    }

    @Override
    public void output(final Collection<Event> events) {
        // divide and rule
        if (events.size() >) {
            List<List<Event>> lists = Slicer.fixedGrouping(new ArrayList<>(events), this.flushSize);
            for (List<Event> list : lists) {
                retryingSubmit(list);
            }
        }
    }

    @Override
    public Collection<PluginConfigSpec<?>> configSchema() {
        // should return a list of all configuration options for this plugin
        return Arrays.asList(
                PluginConfigParams.DRIVER_CLASS,
                PluginConfigParams.CONNECTION_STRING,
                PluginConfigParams.STATEMENT,
                PluginConfigParams.CONNECTION_TIMEOUT,
                PluginConfigParams.MAX_POOL_SIZE,
                PluginConfigParams.FLUSH_SIZE);
    }

    /**
     * deal Statement
     *、Find all {value} in the statement, and extract the fields in {} and put them in the list
     *、replace {value} to ?
     *
     * @param statement
     * @return
     */    private DealedStatement dealStatement(String statement) {
        List<String> fields = new ArrayList<>();
        Pattern pattern = Pattern.compile("\{[\w]*\}");
        Matcher matcher = pattern.matcher(statement);
        while (matcher.find()) {
            String e = matcher.group();
            String substring = e.substring(, e.length() - 1);
            fields.add(substring);
            statement = statement.replace(e, "?");
        }
        DealedStatement ds = new DealedStatement(statement, fields);
        return ds;
    }

    private void paramReplace(PreparedStatement prepStatement, Object param, int i) throws SQLException {
        // PreparedStatement's index is starting with
        if (param instanceof Integer) {
            int value = ((Integer) param).intValue();
            prepStatement.setInt(i +, value);
        } else if (param instanceof String) {
            String s = (String) param;
            prepStatement.setString(i +, s);
        } else if (param instanceof Double) {
            double d = ((Double) param).doubleValue();
            prepStatement.setDouble(i +, d);
        } else if (param instanceof Float) {
            float f = ((Float) param).floatValue();
            prepStatement.setFloat(i +, f);
        } else if (param instanceof Long) {
            long l = ((Long) param).longValue();
            prepStatement.setLong(i +, l);
        } else if (param instanceof Boolean) {
            boolean b = ((Boolean) param).booleanValue();
            prepStatement.setBoolean(i +, b);
        } else if (param instanceof org.logstash.Timestamp) {
            // use joda
            prepStatement.setString(i +, ((Timestamp) param).getTime().toString(STANDARD_FORMAT));
        }
    }

    /**
     * Retry submit
     */    private void retryingSubmit(List<Event> events) {
        Integer retryCount =;
        while (retryCount < retryMaxCount) {
            if (submit(events)) {
                 break ;
            } else {
                retryCount++;
                System.err.println("Execution exception,retry count: " + retryCount);
                try {
                    TimeUnit.MILLISECONDS.sleep(retryIntervalTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    }

    /**
     * Submit
     */    private boolean submit(List<Event> events) {
        Connection conn = null;
        try {
            conn = HikariPool.getConnection();
            // deal prepareStatement
            DealedStatement ds = dealStatement(statement);
            fields = ds.getFields();
            pstm = conn.prepareStatement(ds.getPreparedStatement());

            for (Event event : events) {
                // param replace
                for (int i =; i < fields.size(); i++) {
                    paramReplace(pstm, event.getField(fields.get(i)), i);
                }
                pstm.addBatch();
            }
            // Batch processing
            pstm.executeBatch();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        } finally {
            //  close 
            if (pstm != null) {
                try {
                    pstm.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

1、SQL 简化设计

在 logstash-output-jdbc 项目中其设计的思路为通过 statement 参数接收一个数组,数组的第一个位置放置一条预编译 SQL,之后每一个数组位按顺序对应该 SQL 的 “?”。

 statement => [ "insert into v_ods_media_show_log (`idfa`) value(?)",
             "idfa"] 

不过上面写法似乎有些过于繁琐,我基于 MyBatis SQL 使用 #{param} 语法灵感,因为 logstash中 # 为注释,所以我设计为使用 {param} 替换参数,之后 Statement 可简化如下。

 statement => "INSERT INTO datahub_test.v_dim_agent_id(
                agent_id,agent_name,platform,insert_time,update_time)
                value({agent_id},{agent_name},{platform},DEFAULT,DEFAULT)" 

实现的逻辑则是将语句中 {param} 的参数找出来,并将它们的param值存到一个列表中,并将所有{param} 都替换为 ?,行成一条预编译语句。具体实现细节在 dealStatement() 方法中。

2、批处理

在 logstash-output-jdbc 项目中是为每条 INSERT 语句创建一个 Connect 并逐条执行,这样效率较低,所以我采用了批处理来提高效率。

 // 将多条语句加入到同一个 Batch
pstm.addBatch();
pstm.executeBatch(); 

注意:

  • MySQL 执行 executeBatch 需要在 URL 加上 rewriteBatchedStatements=true

这里提一下output() 方法里的切块设计,通过 Slicer.fixedGrouping 配置将传入来的数据进行切块,如传入 1W 条,可设置切块大小为 2000.则切成 5块进行批处理,也是采用了分而治之的思想。

3、 重试 机制

重试机制设计比较简单通过 retryMaxCount 最大重试次数和 retryIntervalTime 每次重试间隔(毫秒) 来实现,在output 方法中调用retryingSubmit,代码如下:

 public void output(final Collection<Event> events) {
        // 切块
        if (events.size() >) {
            List<List<Event>> lists = Slicer.fixedGrouping(new ArrayList<>(events), this.flushSize);
            for (List<Event> list : lists) {
                // 重试
                retryingSubmit(list);
            }
        }
    }

// 重试
 private void retryingSubmit(List<Event> events) {
        Integer retryCount =;
        while (retryCount < retryMaxCount) {
            if (submit(events)) {
                break;
            } else {
                retryCount++;
                System.err.println("Execution exception,retry count: " + retryCount);
                try {
                    TimeUnit.MILLISECONDS.sleep(retryIntervalTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    } 

4、可配置参数

本插件只提供了部分基础的可配置参数

最后

以上就是用 Java 编写 Logstash 输出到 MySQL 插件的简单例子,本例子意在提供一个设计思路和帮助各位熟悉如何采用 Java 编写 logstash 插件。该插件并未应用于生产之上,只是作为学习的 Demo,设计上还有诸多可提升的地方,各位如果有好的建议欢迎提出讨论,共同进步。

感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。