golang源码分析:go-mysql-elasticsearch(1)

Golang
170
0
0
2024-01-19
标签   Elasticsearch

https://github.com/go-mysql-org/go-mysql-elasticsearch是一款通过解析mysql的binlog将mysql数据同步到es的同步工具,支持正则表达式多表同步。下面我们分析下如何使用。首先是安装

git clone https://github.com/go-mysql-org/go-mysql-elasticsearch
 make
GO111MODULE=on go build -o bin/go-mysql-elasticsearch ./cmd/go-mysql-elasticsearch

或者

go install github.com/siddontang/go-mysql-elasticsearch/cmd/go-mysql-elasticsearch

然后准备数据库:

create database test;
use test;
CREATE TABLE `t9` (   `id` int NOT NULL AUTO_INCREMENT,   `name` varchar(50) CHARACTER SET utf8 DEFAULT NULL,   `remark` varchar(50) CHARACTER SET utf8 DEFAULT NULL,   `add_time` datetime DEFAULT NULL,   PRIMARY KEY (`id`),   KEY `ix_t2_name` (`name`) ) ENGINE=InnoDB AUTO_INCREMENT=3829 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
Query OK, 0 rows affected, 2 warnings (0.01 sec)
create table tid_0( `id`  int(11)   not null,`tag` varchar(256),`desc` varchar(256),primary key (`id`));
create table tfilter (id  int(11),c1   int(11) , c2   int(11) , name varchar(256),primary key(id));
create table tfield (id    int(11) ,tags      varchar(25),keywords  varchar(256),primary key (id));
create table t (id int(11) ,name varchar(256) ,primary key (id));

然后准备我们的配置文件river.toml

# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "127.0.0.1:3306"
my_user = "root"
my_pass = ""
my_charset = "utf8"

# Set true when elasticsearch use https
#es_https = false
# Elasticsearch address
es_addr = "127.0.0.1:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""

# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing. 
# TODO: support other storage, like etcd. 
data_dir = "./var"

# Inner Http status address
stat_addr = "127.0.0.1:12800"
stat_path = "/metrics"

# pseudo server id like a slave 
server_id = 1001

# mysql or mariadb
flavor = "mysql"

# mysqldump execution path
# if not set or empty, ignore mysqldump.
mysqldump = "mysqldump"

# if we have no privilege to use mysqldump with --master-data,
# we must skip it.
#skip_master_data = false

# minimal items to be inserted in one bulk
bulk_size = 128

# force flush the pending requests if we don't have enough items >= bulk_size
flush_bulk_time = "200ms"

# Ignore table without primary key
skip_no_pk_table = false

# MySQL data source
[[source]]
schema = "test"

# Only below tables will be synced into Elasticsearch.
# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
# I don't think it is necessary to sync all tables in a database.
tables = ["t", "t_[0-9]{1}","tid_[0-9]{1}", "tfield", "tfilter"]

# Below is for special rule mapping

# Very simple example
# 
# desc t;
# +-------+--------------+------+-----+---------+-------+
# | Field | Type         | Null | Key | Default | Extra |
# +-------+--------------+------+-----+---------+-------+
# | id    | int(11)      | NO   | PRI | NULL    |       |
# | name  | varchar(256) | YES  |     | NULL    |       |
# +-------+--------------+------+-----+---------+-------+
# 
# The table `t` will be synced to ES index `test` and type `t`.
[[rule]]
schema = "test"
table = "t"
index = "test"
type = "t"

# Wildcard table rule, the wildcard table must be in source tables 
# All tables which match the wildcard format will be synced to ES index `test` and type `t`.
# In this example, all tables must have same schema with above table `t`;
[[rule]]
schema = "test"
table = "t_[0-9]{1}"
index = "test"
type = "t"

# Simple field rule 
#
# desc tfield;
# +----------+--------------+------+-----+---------+-------+
# | Field    | Type         | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id       | int(11)      | NO   | PRI | NULL    |       |
# | tags     | varchar(256) | YES  |     | NULL    |       |
# | keywords | varchar(256) | YES  |     | NULL    |       |
# +----------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tfield"
index = "test"
type = "tfield"

[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
# Map column `tags` to ES field `es_tags` with array type 
tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"

# Filter rule 
#
# desc tfilter;
# +-------+--------------+------+-----+---------+-------+
# | Field | Type         | Null | Key | Default | Extra |
# +-------+--------------+------+-----+---------+-------+
# | id    | int(11)      | NO   | PRI | NULL    |       |
# | c1    | int(11)      | YES  |     | 0       |       |
# | c2    | int(11)      | YES  |     | 0       |       |
# | name  | varchar(256) | YES  |     | NULL    |       |
# +-------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tfilter"
index = "test"
type = "tfilter"

# Only sync following columns
filter = ["id", "name"]

# id rule
#
# desc tid_[0-9]{4};
# +----------+--------------+------+-----+---------+-------+
# | Field    | Type         | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id       | int(11)      | NO   | PRI | NULL    |       |
# | tag      | varchar(256) | YES  |     | NULL    |       |
# | desc     | varchar(256) | YES  |     | NULL    |       |
# +----------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tid_[0-9]{1}"
index = "test"
type = "t"
# The es doc's id will be `id`:`tag`
# It is useful for merge muliple table into one type while theses tables have same PK 
id = ["id", "tag"]

启动es

% docker run -d --name elasticsearch  -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.17.6
db1d853a4926b5e0f93fe24ea2b34a9aeb70c7057e66ada39705f1c4f6951a63

查看es的索引信息

 % curl  -H "Content-Type:application/json" -XGET http://127.0.0.1:9200/_cat/indices
green open .geoip_databases 4i1YaRoUSmie4QWfwkBLCw 1 0 42 0 39.9mb 39.9mb

然后启动我们的同步服务:

% ./bin/go-mysql-elasticsearch -config=../../mysql/mysql-es/river.toml

可以看到下面的日志:

[2023/04/09 23:36:32] [info] binlogsyncer.go:141 create BinlogSyncer with config {1001 mysql 127.0.0.1 3306 root   utf8 false false <nil> false UTC false 0 0s 0s 0 false 0}
[2023/04/09 23:36:32] [info] dump.go:180 skip dump, use last binlog replication pos (mysql-bin.000009, 7696) or GTID set <nil>
[2023/04/09 23:36:32] [info] binlogsyncer.go:362 begin to sync binlog from position (mysql-bin.000009, 7696)
[2023/04/09 23:36:32] [info] binlogsyncer.go:211 register slave for master server 127.0.0.1:3306
[2023/04/09 23:36:32] [info] sync.go:25 start sync binlog at binlog file (mysql-bin.000009, 7696)
[2023/04/09 23:36:32] [info] binlogsyncer.go:731 rotate to (mysql-bin.000009, 7696)
[2023/04/09 23:36:32] [info] sync.go:71 rotate binlog to (mysql-bin.000009, 7696)
[2023/04/09 23:36:32] [info] master.go:54 save position (mysql-bin.000009, 7696)

在mysql执行两个插入操作

mysql> insert into tid_7 (id,tag,`desc`)values(1,"aa","aaaa"),(2,"bb","bbbbb");
Query OK, 2 rows affected (0.02 sec)
Records: 2  Duplicates: 0  Warnings: 0
mysql> insert into tid_7 (id,tag,`desc`)values(3,"aa","aaaa"),(4,"bb","bbbbb");
Query OK, 2 rows affected (0.01 sec)
Records: 2  Duplicates: 0  Warnings: 0

可以看到同步工具binlog日志的变化

[2023/04/09 23:43:28] [info] sync.go:25 start sync binlog at binlog file (mysql-bin.000009, 7696)
[2023/04/09 23:43:28] [info] binlogsyncer.go:731 rotate to (mysql-bin.000009, 7696)
[2023/04/09 23:43:28] [info] sync.go:71 rotate binlog to (mysql-bin.000009, 7696)
[2023/04/09 23:43:28] [info] master.go:54 save position (mysql-bin.000009, 7696)
[2023/04/09 23:45:43] [info] master.go:54 save position (mysql-bin.000009, 8009)
[2023/04/09 23:46:10] [info] master.go:54 save position (mysql-bin.000009, 8322)

接着看下索引信息

% curl  -H "Content-Type:application/json" -XGET http://127.0.0.1:9200/_cat/indices
green  open .geoip_databases 4i1YaRoUSmie4QWfwkBLCw 1 0 42 0 39.9mb 39.9mb
yellow open test             XDB0pmPFQD2oilTF9x0lNg 1 1  4 0    9kb    9kb


% curl  -H "Content-Type:application/json" -XGET http://127.0.0.1:9200/test/_mapping
{"test":{"mappings":{"properties":{"desc":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"id":{"type":"long"},"tag":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}}}}}}%

可以看到,表的信息已经同步到es的index里面去了。