导读: 基于上篇文章 ,我们可以实现将官方提供的 logstash-output-java_output_example 项目在本地构建、测试和打包使用。本文将基于此 Demo 之上讨论如实现一个输出到 MySQL 的 output插件,本例子只是一个简单的例子,并未达到生产可用。目的是想通过该例子让各位熟悉如何用 Java 编写 Logstash 插件及分享我编写和设计插件的思路供各位参考。
// logstash-outup-java-my SQL -plugin 插件 Github 地址
项目结构及核心文件
1、主要项目结构可划分为三块
- 核心处理逻辑类
- 测试类
- Gradle 依赖管理
2、核心文件 JavaOutputExample.class
// 官方文档 How to write a Java output plugin
#_package_and_deploy_
在官方文档中已详细描述 JavaOutputExample 这个类每一部分的作用和含义,大致为以下几部分
- 构造方法:插件必须提供一个可收 id,、 config uration、Context 的构造方法,初始化参数
- PluginConfigSpec:允许开发人员通过设置名称、数据类型、弃用状态、必需状态和默认值来指定该插件支持的setting设置。
- Output 方法:编写核心输出逻辑,们将在该方法中具体编写输出到 MySQL 的逻辑
- Stop and await Stop
编写和设计的思路
我在编写该插件时参考了 Github 上一个采用 Ruby 语言编写的 logstash-output-jdbc 项目,该项目这个插件允许您使用 JDBC 适配器输出到 SQL 数据库(支持多种数据库)。关于如何使用该插件可以参考 。
// logstash-output-jdbc GitHub
在该项目中其使用了 Hikari 作为数据源,并引入各种数据库对应的 JDBC 驱动。通过安装该插件后我们在 logstash.conf 配置以下代码即可实现输入到 mysql 数据库中。
output {
jdbc {
driver_jar_path => "/usr/local/logstash/jdbc/mysql-connector-java-.0.19.jar"
driver_class => "com.mysql.jdbc.Driver"
#数据库连接
connection_string => "jdbc:mysql://xxxxxxxxx.ads.aliyuncs.com:/数据库名?user=xxxxx&password=xxxxxx"
#insert语句
statement => [ "insert into v_ods_media_show_log (`idfa`) value(?)",
"idfa"]
}
}
参考了该项目,我尝试使用 Java 语言编写输出到 MySQL 的插件,核心设计点如下:
- 可配置参数
- SQL 简化
- 批处理
- 重试机制
先附上修改后的 JavaOutputExample 类实现代码(省略部分非核心代码)
@LogstashPlugin(name = "java_output_example")
public class JavaOutputExample implements Output {
private final String id;
private final Integer flushSize;
private final Integer retryMaxCount;
private final Integer retryIntervalTime;
private final CountDownLatch done = new CountDownLatch();
private volatile boolean stopped = false;
private PreparedStatement pstm = null;
private String statement;
private List<String> fields;
public static final String STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss";
/**
* all plugins must provide a constructor that accepts id, Configuration, and Context
*/ public JavaOutputExample(final String id, final Configuration config, final Context context) {
this.id = id;
this.flushSize = Integer.parseInt(config.get(PluginConfigParams.FLUSH_SIZE));
this.retryMaxCount = Integer.parseInt(config.get(PluginConfigParams.RETRY_MAX_COUNT));
this.retryIntervalTime = Integer.parseInt(config.get(PluginConfigParams.RETRY_INTERVAL_TIME));
this.statement = config.get(PluginConfigParams.STATEMENT);
// init HikariPool
HikariPool.setUpPool(config);
}
@Override
public void output(final Collection<Event> events) {
// divide and rule
if (events.size() >) {
List<List<Event>> lists = Slicer.fixedGrouping(new ArrayList<>(events), this.flushSize);
for (List<Event> list : lists) {
retryingSubmit(list);
}
}
}
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
// should return a list of all configuration options for this plugin
return Arrays.asList(
PluginConfigParams.DRIVER_CLASS,
PluginConfigParams.CONNECTION_STRING,
PluginConfigParams.STATEMENT,
PluginConfigParams.CONNECTION_TIMEOUT,
PluginConfigParams.MAX_POOL_SIZE,
PluginConfigParams.FLUSH_SIZE);
}
/**
* deal Statement
*、Find all {value} in the statement, and extract the fields in {} and put them in the list
*、replace {value} to ?
*
* @param statement
* @return
*/ private DealedStatement dealStatement(String statement) {
List<String> fields = new ArrayList<>();
Pattern pattern = Pattern.compile("\{[\w]*\}");
Matcher matcher = pattern.matcher(statement);
while (matcher.find()) {
String e = matcher.group();
String substring = e.substring(, e.length() - 1);
fields.add(substring);
statement = statement.replace(e, "?");
}
DealedStatement ds = new DealedStatement(statement, fields);
return ds;
}
private void paramReplace(PreparedStatement prepStatement, Object param, int i) throws SQLException {
// PreparedStatement's index is starting with
if (param instanceof Integer) {
int value = ((Integer) param).intValue();
prepStatement.setInt(i +, value);
} else if (param instanceof String) {
String s = (String) param;
prepStatement.setString(i +, s);
} else if (param instanceof Double) {
double d = ((Double) param).doubleValue();
prepStatement.setDouble(i +, d);
} else if (param instanceof Float) {
float f = ((Float) param).floatValue();
prepStatement.setFloat(i +, f);
} else if (param instanceof Long) {
long l = ((Long) param).longValue();
prepStatement.setLong(i +, l);
} else if (param instanceof Boolean) {
boolean b = ((Boolean) param).booleanValue();
prepStatement.setBoolean(i +, b);
} else if (param instanceof org.logstash.Timestamp) {
// use joda
prepStatement.setString(i +, ((Timestamp) param).getTime().toString(STANDARD_FORMAT));
}
}
/**
* Retry submit
*/ private void retryingSubmit(List<Event> events) {
Integer retryCount =;
while (retryCount < retryMaxCount) {
if (submit(events)) {
break ;
} else {
retryCount++;
System.err.println("Execution exception,retry count: " + retryCount);
try {
TimeUnit.MILLISECONDS.sleep(retryIntervalTime);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
/**
* Submit
*/ private boolean submit(List<Event> events) {
Connection conn = null;
try {
conn = HikariPool.getConnection();
// deal prepareStatement
DealedStatement ds = dealStatement(statement);
fields = ds.getFields();
pstm = conn.prepareStatement(ds.getPreparedStatement());
for (Event event : events) {
// param replace
for (int i =; i < fields.size(); i++) {
paramReplace(pstm, event.getField(fields.get(i)), i);
}
pstm.addBatch();
}
// Batch processing
pstm.executeBatch();
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
} finally {
// close
if (pstm != null) {
try {
pstm.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
1、SQL 简化设计
在 logstash-output-jdbc 项目中其设计的思路为通过 statement 参数接收一个数组,数组的第一个位置放置一条预编译 SQL,之后每一个数组位按顺序对应该 SQL 的 “?”。
statement => [ "insert into v_ods_media_show_log (`idfa`) value(?)",
"idfa"]
不过上面写法似乎有些过于繁琐,我基于 MyBatis SQL 使用 #{param} 语法灵感,因为 logstash中 # 为注释,所以我设计为使用 {param} 替换参数,之后 Statement 可简化如下。
statement => "INSERT INTO datahub_test.v_dim_agent_id(
agent_id,agent_name,platform,insert_time,update_time)
value({agent_id},{agent_name},{platform},DEFAULT,DEFAULT)"
实现的逻辑则是将语句中 {param} 的参数找出来,并将它们的param值存到一个列表中,并将所有{param} 都替换为 ?,行成一条预编译语句。具体实现细节在 dealStatement() 方法中。
2、批处理
在 logstash-output-jdbc 项目中是为每条 INSERT 语句创建一个 Connect 并逐条执行,这样效率较低,所以我采用了批处理来提高效率。
// 将多条语句加入到同一个 Batch
pstm.addBatch();
pstm.executeBatch();
注意:
- MySQL 执行 executeBatch 需要在 URL 加上 rewriteBatchedStatements=true
这里提一下output() 方法里的切块设计,通过 Slicer.fixedGrouping 配置将传入来的数据进行切块,如传入 1W 条,可设置切块大小为 2000.则切成 5块进行批处理,也是采用了分而治之的思想。
3、 重试 机制
重试机制设计比较简单通过 retryMaxCount 最大重试次数和 retryIntervalTime 每次重试间隔(毫秒) 来实现,在output 方法中调用retryingSubmit,代码如下:
public void output(final Collection<Event> events) {
// 切块
if (events.size() >) {
List<List<Event>> lists = Slicer.fixedGrouping(new ArrayList<>(events), this.flushSize);
for (List<Event> list : lists) {
// 重试
retryingSubmit(list);
}
}
}
// 重试
private void retryingSubmit(List<Event> events) {
Integer retryCount =;
while (retryCount < retryMaxCount) {
if (submit(events)) {
break;
} else {
retryCount++;
System.err.println("Execution exception,retry count: " + retryCount);
try {
TimeUnit.MILLISECONDS.sleep(retryIntervalTime);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
4、可配置参数
本插件只提供了部分基础的可配置参数
最后
以上就是用 Java 编写 Logstash 输出到 MySQL 插件的简单例子,本例子意在提供一个设计思路和帮助各位熟悉如何采用 Java 编写 logstash 插件。该插件并未应用于生产之上,只是作为学习的 Demo,设计上还有诸多可提升的地方,各位如果有好的建议欢迎提出讨论,共同进步。
感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。