一、Hive增量查询Hudi表
同步Hive
我们在写数据时,可以配置同步Hive参数,生成对应的Hive表,用来查询Hudi表,具体来说,在写入过程中传递了两个由table name
命名的Hive表。例如,如果table name = hudi_tbl
,我们得到
hudi_tbl
实现了由 HoodieParquetInputFormat
支持的数据集的读优化视图,从而提供了纯列式数据
hudi_tbl_rt
实现了由 HoodieParquetRealtimeInputFormat
支持的数据集的实时视图,从而提供了基础数据和日志数据的合并视图
上面的两条对比摘自官网,这里解释一下:其中实时视图_rt
表只有在MOR表同步Hive元数据时才会有,并且hudi_tbl
在表类型为MOR时并且为配置skipROSuffix=true
时才为读优化视图,当为false(默认为false)时,读优化视图应该为hudi_tbl_ro
,当表类型为COW时,hudi_tbl
应该为实时视图,所以官网对这一块解释有一点问题大家注意
增量查询
修改配置hive-site.xml
在Hive SQL白名单里添加hoodie.*,其他均为已存在的配置,还可以根据需要添加其他白名单,如:tez.*|parquet.*|planner.*
hive.security.authorization.sqlstd.confwhitelist.append hoodie.*|mapred.*|hive.*|mapreduce.*|spark.*
设置参数
以表名为hudi_tbl为例
连接Hive connect/Hive Shell
设置该表为增量表
set hoodie.hudi_tbl.consume.mode=INCREMENTAL;
设置增量开始的时间戳(不包含),作用:起到文件级别过滤,减少map数
set hoodie.hudi_tbl.consume.start.timestamp=20211015182330;
设置增量消费的commit次数,默认设置为-1即可,表示增量消费到目前新数据
set hoodie.hudi_tbl.consume.max.commits=-1;
自己根据需要修改commit次数
查询语句
select * from hudi_tbl where `_hoodie_commit_time` > "20211015182330";
因小文件合并机制,在新的commit时间戳的文件中,包含旧数据,因此需要再加where做二次过滤
注:这里的设置设置参数有效范围为connect session
Hudi 0.9.0版本只支持表名参数,不支持数据库限定,这样设置了hudi_tbl
为增量表后,所有数据库的该表名的表查询时都为增量查询模式,起始时间等参数为最后一次设定值,在后面的新版本中,添加了数据库限定,如hudi数据库
二、Spark SQL增量查询Hudi表
编程方式(DF+SQL)
先看一下官方文档上Spark SQL增量查询的方式
地址1:https://hudi.apache.org/cn/docs/quick-start-guide#incremental-query
地址2:https://hudi.apache.org/cn/docs/querying_data#incremental-query
它是先通过spark.read中添加增量参数的形式读Hudi表为DF,然后将DF注册成临时表,最后通过Spark SQL查询临时表的形式,实现增量查询的
参数
- hoodie.datasource.query.type=incremental 查询类型,值为incremental时代表增量查询,默认值snapshot,增量查询时,该参数必填
- hoodie.datasource.read.begin.instanttime 增量查询开始时间,必填 例如:20221126170009762
- hoodie.datasource.read.end.instanttime 增量查询结束时间,非必填 例如:20221126170023240
- hoodie.datasource.read.incr.path.glob 增量查询指定分区路径,非必填 例如 /dt=2022-11/
查询范围 (BEGIN_INSTANTTIME,END_INSTANTTIME],也就是大于开始时间(不包含),小于等于结束时间(包含),如果没有指定结束时间,那么查询大于BEGIN_INSTANTTIME到现在为止最新的数据,如果指定INCR_PATH_GLOB,那么只在指定分区路径下面查询对应的数据
代码示例
import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME, END_INSTANTTIME, INCR_PATH_GLOB, QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
val tableName = "test_hudi_incremental"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| partitioned by (dt)
| options (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'cow'
| )
|""".stripMargin)
spark.sql(s"insert into $tableName values (1,'hudi',10,100,'2022-11-25')")
spark.sql(s"insert into $tableName values (2,'hudi',10,100,'2022-11-25')")
spark.sql(s"insert into $tableName values (3,'hudi',10,100,'2022-11-26')")
spark.sql(s"insert into $tableName values (4,'hudi',10,100,'2022-12-26')")
spark.sql(s"insert into $tableName values (5,'hudi',10,100,'2022-12-27')")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
val basePath = table.storage.properties("path")
// incrementally query data
val incrementalDF = spark.read.format("hudi").
option(QUERY_TYPE.key, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME.key, beginTime).
option(END_INSTANTTIME.key, endTime).
option(INCR_PATH_GLOB.key, "/dt=2022-11*/*").
load(basePath)
// table(tableName)
incrementalDF.createOrReplaceTempView(s"temp_$tableName")
spark.sql(s"select * from temp_$tableName").show()
spark.stop()
结果
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221126165954300|20221126165954300...| id:1| dt=2022-11-25|de99b299-b9de-423...| 1|hudi| 10.0|100|2022-11-25|
| 20221126170009762|20221126170009762...| id:2| dt=2022-11-25|de99b299-b9de-423...| 2|hudi| 10.0|100|2022-11-25|
| 20221126170030470|20221126170030470...| id:5| dt=2022-12-27|75f8a760-9dc3-452...| 5|hudi| 10.0|100|2022-12-27|
| 20221126170023240|20221126170023240...| id:4| dt=2022-12-26|4751225d-4848-4dd...| 4|hudi| 10.0|100|2022-12-26|
| 20221126170017119|20221126170017119...| id:3| dt=2022-11-26|2272e513-5516-43f...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
+-----------------+
| commit_time|
+-----------------+
|20221126170030470|
|20221126170023240|
|20221126170017119|
|20221126170009762|
|20221126165954300|
+-----------------+
20221126170009762
20221126170023240
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221126170017119|20221126170017119...| id:3| dt=2022-11-26|2272e513-5516-43f...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
注释掉INCR_PATH_GLOB,结果
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221127155346067|20221127155346067...| id:4| dt=2022-12-26|33e7a2ed-ea28-428...| 4|hudi| 10.0|100|2022-12-26|
| 20221127155339981|20221127155339981...| id:3| dt=2022-11-26|a5652ae0-942a-425...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
继续注释掉END_INSTANTTIME,结果
20221127161253433
20221127161311831
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221127161320347|20221127161320347...| id:5| dt=2022-12-27|7b389e57-ca44-4aa...| 5|hudi| 10.0|100|2022-12-27|
| 20221127161311831|20221127161311831...| id:4| dt=2022-12-26|2707ce02-548a-422...| 4|hudi| 10.0|100|2022-12-26|
| 20221127161304742|20221127161304742...| id:3| dt=2022-11-26|264bc4a9-930d-4ec...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
可以看到不包含起始时间,包含结束时间
纯SQL方式
一般项目上都采用纯SQL方式进行增量查询,这样比较方便,纯SQL的方式参数和上面讲的参数是一样的,接下来看一下怎么用纯SQL方式实现
建表造数
create table hudi.test_hudi_incremental (
id int,
name string,
price double,
ts long,
dt string
) using hudi
partitioned by (dt)
options (
primaryKey = 'id',
preCombineField = 'ts',
type = 'cow'
);
insert into hudi.test_hudi_incremental values (1,'a1', 10, 1000, '2022-11-25');
insert into hudi.test_hudi_incremental values (2,'a2', 20, 2000, '2022-11-25');
insert into hudi.test_hudi_incremental values (3,'a3', 30, 3000, '2022-11-26');
insert into hudi.test_hudi_incremental values (4,'a4', 40, 4000, '2022-12-26');
insert into hudi.test_hudi_incremental values (5,'a5', 50, 5000, '2022-12-27');
看一下有哪些commit_time
select distinct(_hoodie_commit_time) from test_hudi_incremental order by _hoodie_commit_time
+----------------------+
| _hoodie_commit_time |
+----------------------+
| 20221130163618650 |
| 20221130163703640 |
| 20221130163720795 |
| 20221130163726780 |
| 20221130163823274 |
+----------------------+
纯SQL方式(一)
使用Call Procedures:copy_to_temp_view
、copy_to_table
,目前这两个命令已经合到master,由scxwhite 苏乘祥贡献,这俩参数差不多,建议使用copy_to_temp_view
,因为copy_to_table
会先将数据落盘而copy_to_temp_view
是创建的临时表,效率会高一点,且数据落盘无意义,后面还要将落盘的表删掉。
支持的参数
- table
- query_type
- view_name
- begin_instance_time
- end_instance_time
- as_of_instant
- replace
- global
测试SQL
call copy_to_temp_view(table => 'test_hudi_incremental', query_type => 'incremental',
view_name => 'temp_incremental', begin_instance_time=> '20221130163703640', end_instance_time => '20221130163726780');
select _hoodie_commit_time, id, name, price, ts, dt from temp_incremental;
结果
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time | id | name | price | ts | dt |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163726780 | 4 | a4 | 40.0 | 4000 | 2022-12-26 |
| 20221130163720795 | 3 | a3 | 30.0 | 3000 | 2022-11-26 |
+----------------------+-----+-------+--------+-------+-------------+
可以看到这种方式是可以实现增量查询的,但是需要注意,如果需要修改增量查询的起始时间,那么就需要重复执行copy_to_temp_view,但是因为临时表temp_incremental已经存在,要么新起个表名,要么先删掉,再创建新的,我建议先删掉,通过下面的命令删除
drop view if exists temp_incremental;
纯SQL方式(二)
PR地址:https://github.com/apache/hudi/pull/7182
这个PR同样由scxwhite
贡献,目前只支持Spark3.2以上的版本(目前社区未合并)
增量查询SQL
select id, name, price, ts, dt from tableName
[
'hoodie.datasource.query.type'=>'incremental',
'hoodie.datasource.read.begin.instanttime'=>'$instant1',
'hoodie.datasource.read.end.instanttime'=>'$instant2'
]
这种方式,是支持了一种新的语法,在查询SQL后通过在[]添加参数的形式,感兴趣的话可以拉一下代码,自己打包试一下
纯SQL方式(三)
最终的效果如下
select
/*+
hoodie_prop(
'default.h1',
map('hoodie.datasource.read.begin.instanttime', '20221127083503537', 'hoodie.datasource.read.end.instanttime', '20221127083506081')
),
hoodie_prop(
'default.h2',
map('hoodie.datasource.read.begin.instanttime', '20221127083508715', 'hoodie.datasource.read.end.instanttime', '20221127083511803')
)
*/
id, name, price, ts
from (
select id, name, price, ts
from default.h1
union all
select id, name, price, ts
from default.h2
)
是在hint中添加增量查询相关的参数,先指定表名再写参数,但是文章好像未给出完整的代码地址,大家有时间可以自己试一下
纯SQL方式(四)
这种方式,是我按照Hive增量查询Hudi的方式修改的源码,通过set的方式实现增量查询
PR地址:https://github.com/apache/hudi/pull/7339
我们已经知道Hudi的DefaultSource.createRelation
中的optParam
s参数为readDataSourceTable
中的options = table.storage.properties ++ pathOption,也就是表本身属性中的配置参数+path,之后在createRelation
并没有接收其他参数,所以不能通过set参数的形式进行查询
和Hive增量查询一样,指定具体表名的增量查询参数
set hoodie.test_hudi_incremental.datasource.query.type=incremental
set hoodie.test_hudi_incremental.datasource.read.begin.instanttime=20221130163703640;
select _hoodie_commit_time, id, name, price, ts, dt from test_hudi_incremental;
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time | id | name | price | ts | dt |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163823274 | 5 | a5 | 50.0 | 5000 | 2022-12-27 |
| 20221130163726780 | 4 | a4 | 40.0 | 4000 | 2022-12-26 |
| 20221130163720795 | 3 | a3 | 30.0 | 3000 | 2022-11-26 |
+----------------------+-----+-------+--------+-------+-------------+
如果不同的库下面有相同的表名,则可以通过库名.表名的形式
## 需要先开启使用数据库名称限定表名的配置,开启后上面不加库名的配置就失效了
set hoodie.query.use.database = true;
set hoodie.hudi.test_hudi_incremental.datasource.query.type=incremental;
set hoodie.hudi.test_hudi_incremental.datasource.read.begin.instanttime=20221130163703640;
set hoodie.hudi.test_hudi_incremental.datasource.read.end.instanttime=20221130163726780;
set hoodie.hudi.test_hudi_incremental.datasource.read.incr.path.glob=/dt=2022-11*/*;
refresh table test_hudi_incremental;
select _hoodie_commit_time, id, name, price, ts, dt from test_hudi_incremental;
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time | id | name | price | ts | dt |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163720795 | 3 | a3 | 30.0 | 3000 | 2022-11-26 |
+----------------------+-----+-------+--------+-------+-------------+
大家可以自己试一下,不同的库表关联的情形
这里需要注意一点,更新参数后,需要先refresh table
,再查询,否则查询时修改的参数不生效,因为会使用缓存中的参数
这种方式只是简单地修改了一下源码,使set的参数对查询生效
为了避免有些读者嫌打包麻烦,这里给大家提供了hudi-spark3.1-bundle_2.12-0.13.0-SNAPSHOT.jar
的下载地址:https://download.csdn.net/download/dkl12/87221476
三、Flink SQL增量查询Hudi表
官网文档
地址:https://hudi.apache.org/cn/docs/querying_data#incremental-query
参数
- read.start-commit 增量查询开始时间 对于流读,如果不指定该值,默认取最新的instantTime,也就是流读默认从最新的instantTime开始读(包含最新的)。对于批读,如果不指定该参数,只指定read.end-commit,则实现时间旅行的功能,可查询历史记录
- read.end-commit 增量查询结束时间 不指定该参数则默认读取到最新的记录,该参数一般只适用于批读,因为流读一般的需求是查询所有的增量数据
- read.streaming.enabled 是否流读 默认false
- read.streaming.check-interval 流读的检查时间间隔,单位秒(s),默认值60,也就是一分钟
查询范围 [BEGIN_INSTANTTIME,END_INSTANTTIME],既包含开始时间又包含结束时间,对于默认值可参考上面的参数说明
版本
建表造数:
- Hudi 0.9.0
- Spark 2.4.5
我这里建表造数使用Hudi Spark SQL 0.9.0,目的是为了模拟项目上用Java Client和Spark SQL创建的Hudi表,以验证Hudi Flink SQL增量查询时是否兼容旧版本的Hudi表(大家没有这种需求的,可以使用任何方式正常造数)
查询
- Hudi 0.13.0-SNAPSHOT
- Flink 1.14.3 (增量查询)
- Spark 3.1.2 (主要是为了使用Call Procedures命令查看commit信息)
建表造数
-- Spark SQL Hudi 0.9.0
create table hudi.test_flink_incremental (
id int,
name string,
price double,
ts long,
dt string
) using hudi
partitioned by (dt)
options (
primaryKey = 'id',
preCombineField = 'ts',
type = 'cow'
);
insert into hudi.test_flink_incremental values (1,'a1', 10, 1000, '2022-11-25');
insert into hudi.test_flink_incremental values (2,'a2', 20, 2000, '2022-11-25');
update hudi.test_flink_incremental set name='hudi2_update' where id = 2;
insert into hudi.test_flink_incremental values (3,'a3', 30, 3000, '2022-11-26');
insert into hudi.test_flink_incremental values (4,'a4', 40, 4000, '2022-12-26');
用show_commits看一下有哪些commits(这里查询用的是Hudi的master,因为show_commits是在0.11.0版本开始支持的,也可以通过使用hadoop命令查看.hoodie文件夹下的.commit文件)
call show_commits(table => 'hudi.test_flink_incremental');
20221205152736
20221205152723
20221205152712
20221205152702
20221205152650
Flink SQL创建Hudi内存表
CREATE TABLE test_flink_incremental (
id int PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
price double,
ts bigint,
dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental'
);
建表时不指定增量查询相关的参数,我们在查询时动态指定,这样比较灵活。 动态指定参数方法,在查询语句后面加上如下形式的语句
/*+
options(
'read.start-commit' = '20221205152723',
'read.end-commit'='20221205152736'
)
*/
批读
Flink SQL读Hudi有两种模式:批读和流读。默认批读,先看一下批读的增量查询
验证是否包含起始时间和默认结束时间
select * from test_flink_incremental
/*+
options(
'read.start-commit' = '20221205152723' --起始时间对应id=3的记录
)
*/
结果包含起始时间,不指定结束时间默认读到最新的数据
id name price ts dt
4 a4 40.0 4000 dt=2022-12-26
3 a3 30.0 3000 dt=2022-11-26
验证是否包含结束时间
select * from test_flink_incremental
/*+
options(
'read.start-commit' = '20221205152712', --起始时间对应id=2的记录
'read.end-commit'='20221205152723' --结束时间对应id=3的记录
)
*/
结果包含结束时间
id name price ts dt
3 a3 30.0 3000 dt=2022-11-26
2 hudi2_update 20.0 2000 dt=2022-11-25
验证默认开始时间
这种情况是指定结束时间,但不指定开始时间,如果都不指定,则读表所有的最新版本的记录
select * from test_flink_incremental
/*+
options(
'read.end-commit'='20221205152712' --结束时间对应id=2的更新记录
)
*/
结果:只查询end-commit对应的记录
id name price ts dt
2 hudi2_update 20.0 2000 dt=2022-11-25
时间旅行(查询历史记录)
验证是否可以查询历史记录,我们更新id为2的name,更新前name为a2,更新后为hudi2_update,我们验证一下,是否可以通过FlinkSQL查询Hudi历史记录,预期结果查出id=2,name=a2
select * from test_flink_incremental
/*+
options(
'read.end-commit'='20221205152702' --结束时间对应id=2的历史记录
)
*/
结果:可以正确查询历史记录
id name price ts dt
2 a2 20.0 2000 dt=2022-11-25
流读
开启流读的参数
read.streaming.enabled = true
流读不需要设置结束时间,因为一般的需求是读所有的增量数据,我们只需要验证开始时间就好了
验证默认开始时间
select * from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4'
)
*/
结果:从最新的instantTime开始增量读取,也就是默认的read.start-commit为最新的instantTime
id name price ts dt
4 a4 40.0 4000 dt=2022-12-26
验证指定开始时间
select * from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20221205152712'
)
*/
结果
id name price ts dt
2 hudi2_update 20.0 2000 dt=2022-11-25
3 a3 30.0 3000 dt=2022-11-26
4 a4 40.0 4000 dt=2022-11-26
如果想第一次查询全部的历史数据,可以将start-commit设置的早一点,比如设置到去年:'read.start-commit' = '20211205152712'
select * from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20211205152712'
)
*/
id name price ts dt
1 a1 10.0 1000 dt=2022-11-25
2 hudi2_update 20.0 2000 dt=2022-11-25
3 a3 30.0 3000 dt=2022-11-26
4 a4 40.0 4000 dt=2022-11-26
验证流读的连续性
验证新的增量数据进来,是否可以持续消费Hudi增量数据,验证数据的准确一致性,为了方便验证,我可以使用Flink SQL增量流读Hudi表然后Sink到MySQL表中,最后通过读取MySQL表中的数据验证数据的准确性
Flink SQL读写MySQL需要配置jar包,将flink-connector-jdbc_2.12-1.14.3.jar放到lib下即可,下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.3/flink-connector-jdbc_2.12-1.14.3.jar
先在MySQL中创建一张Sink表
-- MySQL
CREATE TABLE `test_sink` (
`id` int(11),
`name` text DEFAULT NULL,
`price` int(11),
`ts` int(11),
`dt` text DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Flink中创建对应的sink表
create table test_sink (
id int,
name string,
price double,
ts bigint,
dt string
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
'username' = 'root',
'password' = 'root-123',
'table-name' = 'test_sink',
'sink.buffer-flush.max-rows' = '1'
);
然后流式增量读取Hudi表Sink Mysql
insert into test_sink
select * from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20221205152712'
)
*/
这样会起一个长任务,一直处于running状态,我们可以在yarn-session界面上验证这一点
然后先在MySQL中验证一下历史数据的准确性
再利用Spark SQL往source表插入两条数据
-- Spark SQL
insert into hudi.test_flink_incremental values (5,'a5', 50, 5000, '2022-12-07');
insert into hudi.test_flink_incremental values (6,'a6', 60, 6000, '2022-12-07');
我们增量读取的间隔设置的4s,成功插入数据等待4s后,再在MySQL表中验证一下数据
发现新增的数据已经成功Sink到MySQL中了,并且数据没有重复
最后验证一下更新的增量数据,Spark SQL更新Hudi source表
-- Spark SQL
update hudi.test_flink_incremental set name='hudi5_update' where id = 5;
继续验证结果
结果是更新的增量数据也会insert到MySQL中的sink表,但是不会更新原来的数据
那如果想实现更新的效果呢?我们需要在MySQL和Flink的sink表中加上主键字段,两者缺一不可,如下
-- MySQL
CREATE TABLE `test_sink` (
`id` int(11),
`name` text DEFAULT NULL,
`price` int(11),
`ts` int(11),
`dt` text DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- Flink SQL
create table test_sink (
id int PRIMARY KEY NOT ENFORCED,
name string,
price double,
ts bigint,
dt string
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
'username' = 'root',
'password' = 'root-123',
'table-name' = 'test_sink',
'sink.buffer-flush.max-rows' = '1'
);
将刚才起的长任务关掉,重新执行刚才的insert语句,先跑一下历史数据,最后再验证一下增量效果
-- Spark SQL
update hudi.test_flink_incremental set name='hudi6_update' where id = 6;
insert into hudi.test_flink_incremental values (7,'a7', 70, 7000, '2022-12-07');
可以看到,达到了预期效果,对于id=6的执行更新操作,对于id=7的执行插入操作