golang源码分析:mysql同步工具gravity(1)

Golang
255
0
0
2024-01-19

https://github.com/moiot/gravity是摩拜单车开源的一款mysql同步工具:它是由一系列简单插件组成的同步工具,提升了足够的可扩展性的同时,也保证了架构的简洁性。下面先分析下它的原理以及如何使用,然后分析下它的源码。

Gravity是由下列插件组成:

Input:作为数据输入的适配源,比如解析mysql的binlog,来产生gravity内部的数据结构:core.Msg.

Filter:它转换输入产生的数据,比如过滤掉一些数据,修改表的列名,加密表的某些列等。

Scheduler:它负责数据流向的调度,将Input产生的数据写到OutPut。定义了数据一致性的策略,默认的Scheduler支持在同一行数据修改的顺序性。

Output:将输入数据写入到目标存储:比如Kafka或者Mysql(Tidb)。在这个过程中会使用Router插件定义的路由策略。

Matcher:匹配Input插件产生的数据,Filter 和 Router 插件都使用了Matcher 插件来进行数据的匹配。

用户可以自定义上述几类插件来满足具体的业务要求。

它的核心数据结构core.Msg定义如下:

type DDLMsg struct {
  Statement string
}

type DMLMsg struct {
  Operation DMLOp
  Data      map[string]interface{}
  Old       map[string]interface{}
  Pks       map[string]interface{}
  PkColumns []string
}

type Msg struct {
  Type      MsgType
  Host      string
  Database  string
  Table     string
  Timestamp time.Time

  DdlMsg *DDLMsg
  DmlMsg *DMLMsg
  ...
}

gravity支持mysql和mongo两种数据库,都支持三种同步模式:batch, stream, replication

mode = "stream":增量同步
mode = "batch":批量同步
mode = "replication":first do a batch mode table scan, and then start stream mode automatically.

开始使用同步工具之前需要满足如下前提条件

1,mysql的 binlog GTID mode 为On模式:即GTID_MODE=ON

2,创建_gravity 账户,赋予这个账户replication相关权限

3,相关的表,在源和目标mysql集群都要建立完成。

这里复习下知识:MySQL有2种方式指定复制同步的方式,分别为:

1,基于binlog文件名及位点的指定方式- 匿名事务(Anonymous_gtid_log_event)

2,基于GTID(全局事务ID)的指定方式- GTID事务(Gtid_log_event)

MySQL 5.7.6之后便开始支持动态开启和关闭GTID模式,其参数GTID_MODE有以下取值

1,OFF - 只允许匿名事务被复制同步

2,OFF_PERMISSIVE - 新产生的事务都是匿名事务,但也允许有GTID事务被复制同步

3,ON_PERMISSIVE - 新产生的都是GTID事务,但也允许有匿名事务被复制同步

4,ON - 只允许GTID事务被复制同步

下面我们开始使用:首先配置mysql

% vi /usr/local/etc/my.cnf

[mysqld]
server_id=4
log_bin=mysql-bin
enforce-gtid-consistency=ON 
gtid-mode=ON
binlog_format=ROW

启动mysql

% mysql.server restart
Shutting down MySQL
.. SUCCESS!
Starting MySQL
. SUCCESS!

创建账户并赋予权限:

CREATE USER _gravity IDENTIFIED BY 'xxx';
GRANT SELECT, RELOAD, LOCK TABLES, REPLICATION SLAVE, REPLICATION CLIENT, CREATE, INSERT, UPDATE, DELETE ON *.* TO '_gravity'@'%';
GRANT ALL PRIVILEGES ON _gravity.* TO '_gravity'@'%';

创建源,目标表

CREATE TABLE `test`.`test_source_table` (
  `id` int(11),
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `test`.`test_target_table` (
  `id` int(11),
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

编译源码

% make 
go build -ldflags '-X "github.com/moiot/gravity/pkg/utils.Version=0.0.1+git.e934fa33" -X "github.com/moiot/gravity/pkg/utils.BuildTS=2023-04-02 02:50:11" -X "github.com/moiot/gravity/pkg/utils.GitHash=e934fa33dfbf5a155fb17ad65acf36ad91f4f616" -X "github.com/moiot/gravity/pkg/utils.GitBranch=master"' -o bin/gravity cmd/gravity/main.go
#go build -ldflags '-X "github.com/moiot/gravity/pkg/utils.Version=0.0.1+git.e934fa33" -X "github.com/moiot/gravity/pkg/utils.BuildTS=2023-04-02 02:50:11" -X "github.com/moiot/gravity/pkg/utils.GitHash=e934fa33dfbf5a155fb17ad65acf36ad91f4f616" -X "github.com/moiot/gravity/pkg/utils.GitBranch=master"' -o bin/padder cmd/padder/main.go

配置同步参数

  # name (required)
  name = "mysql2mysqlDemo"
  version = "1.0"
  
  # optional. 
  # database name used to store position, heartbeat, etc. 
  # default to "_gravity"
  internal-db-name = "_gravity"
  
  #
  # The definition of Input. `mysqlbinlog` is used for this definition.
  #
  [input]
  type = "mysql"
  mode = "stream"
  [input.config.source]
  host = "127.0.0.1"
  username = "root"
  password = ""
  port = 3306
  location = "Local"

  #
  # The definition of Output. `mysql` is used for this definition.
  #
  [output]
  type = "mysql"
  [output.config.target]
  host = "127.0.0.1"
  username = "root"
  password = ""
  port = 3306
  location = "Local"

  # The definition of the routing rule
  [[output.config.routes]]
  match-schema = "test"
  match-table = "test_source_table"
  target-schema = "test"
  target-table = "test_target_table"

开启同步

bin/gravity -config mysql2mysql.toml

可以看到如下输出

INFO[0000] xxhash backend: GoUnsafe                     
{"level":"info","msg":"Welcome to gravity.","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"Release Version: 0.0.1+git.e934fa33","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"Git Commit Hash: e934fa33dfbf5a155fb17ad65acf36ad91f4f616","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"Git Branch: master","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"UTC Build Time: 2023-04-02 02:50:11","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"[output-mysql] Using mysql-replace-engine","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"[batchScheduler] {NrWorker:10 MaxBatchPerWorker:1 QueueSize:1024 SlidingWindowSize:10240 NrRetries:3 RetrySleepString:1s HealthyThreshold:0 RetrySleep:1s}","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"DSN is root:@tcp(127.0.0.1:3306)/?interpolateParams=true\u0026timeout=5s\u0026readTimeout=5s\u0026writeTimeout=5s\u0026parseTime=true\u0026collation=utf8mb4_general_ci\u0026loc=Local","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"DSN is root:@tcp(127.0.0.1:3306)/?interpolateParams=true\u0026timeout=5s\u0026readTimeout=5s\u0026writeTimeout=5s\u0026parseTime=true\u0026collation=utf8mb4_general_ci\u0026loc=Local","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"DSN is root:@tcp(127.0.0.1:3306)/?interpolateParams=true\u0026timeout=5s\u0026readTimeout=5s\u0026writeTimeout=5s\u0026parseTime=true\u0026collation=utf8mb4_general_ci\u0026loc=Local","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"[Server] start input","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"DSN is root:@tcp(127.0.0.1:3306)/?interpolateParams=true\u0026timeout=5s\u0026readTimeout=5s\u0026writeTimeout=5s\u0026parseTime=true\u0026collation=utf8mb4_general_ci\u0026loc=Local","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"DSN is root:@tcp(127.0.0.1:3306)/?interpolateParams=true\u0026timeout=5s\u0026readTimeout=5s\u0026writeTimeout=5s\u0026parseTime=true\u0026collation=utf8mb4_general_ci\u0026loc=Local","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"[binlog_checker] start","time":"2023-04-02T22:54:38+08:00"}
[2023/04/02 22:54:38] [info] binlogsyncer.go:141 create BinlogSyncer with config {9467353 mysql 127.0.0.1 3306 root    false false <nil> true UTC false 0 0s 0s 0 false 0}
{"level":"info","msg":"[binlogTailer] start","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"[binlogTailer] getBinlogStreamer gtid: b35ff9d4-2ff7-11eb-ba5c-0802f4c2f06c:1-8","time":"2023-04-02T22:54:38+08:00"}
[2023/04/02 22:54:38] [info] binlogsyncer.go:380 begin to sync binlog from GTID set b35ff9d4-2ff7-11eb-ba5c-0802f4c2f06c:1-8
[2023/04/02 22:54:38] [info] binlogsyncer.go:211 register slave for master server 127.0.0.1:3306
{"level":"info","msg":"[Server] started","time":"2023-04-02T22:54:38+08:00"}
[2023/04/02 22:54:38] [info] binlogsyncer.go:731 rotate to (mysql-bin.000006, 4)
{"level":"info","msg":"[binlogTailer] skip rotate event: source binlog Name mysql-bin.000006, source binlog Pos: 4; store Name: mysql-bin.000006, store Pos: 2498","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"[batchScheduler.SubmitMsg] added new sliding window, key=mysqlstream, type=ctl","time":"2023-04-02T22:54:38+08:00"}
{"level":"info","msg":"[staticSlidingWindow] init nextItemToCommit: core.Msg{ mysqlstream-1 ctl {xid {mysql-bin.000006 %!s(uint32=2577) b35ff9d4-2ff7-11eb-ba5c-0802f4c2f06c:1-9}} }","time":"2023-04-02T22:54:38+08:00"}

我们输入数据尝试下看是否同步成功:

mysql> use test;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select * from test_target_table;
Empty set (0.00 sec)

mysql> select * from test_source_table;
Empty set (0.00 sec)

mysql> insert into test_source_table values(1);
Query OK, 1 row affected (0.00 sec)

mysql> select * from test_source_table;
+----+
| id |
+----+
|  1 |
+----+
1 row in set (0.00 sec)

mysql> select * from test_target_table;
+----+
| id |
+----+
|  1 |
+----+
1 row in set (0.00 sec)

默认如果唯一键冲突,会被覆盖掉,并不会报错。

mysql> insert into test_target_table values(2);
Query OK, 1 row affected (0.00 sec)

mysql> insert into test_source_table values(2);
Query OK, 1 row affected (0.01 sec)


mysql> select * from test_target_table;
+----+
| id |
+----+
|  1 |
|  2 |
+----+
2 rows in set (0.00 sec)

默认插件是不支持ddl的同步的,但是日志里会有记录。

mysql> alter table test_source_table add column name char(255) not null default '';
Query OK, 0 rows affected (0.01 sec)
Records: 0  Duplicates: 0  Warnings: 0
mysql> show create table test_source_table\G
*************************** 1. row ***************************
       Table: test_source_table
Create Table: CREATE TABLE `test_source_table` (
  `id` int NOT NULL,
  `name` char(255) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
1 row in set (0.00 sec)

mysql> show create table test_target_table\G
*************************** 1. row ***************************
       Table: test_target_table
Create Table: CREATE TABLE `test_target_table` (
  `id` int NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
1 row in set (0.00 sec)
{"level":"info","msg":"QueryEvent: database: test, sql: alter table test_source_table add column name char(255) not null default '', position: {mysql-bin.000006 105150 b35ff9d4-2ff7-11eb-ba5c-0802f4c2f06c:1-212}","time":"2023-04-02T23:00:44+08:00"}
{"level":"info","msg":"[binlogTailer] ddl done with gtid: b35ff9d4-2ff7-11eb-ba5c-0802f4c2f06c:1-212, stmt: alter table test_source_table add column name char(255) not null default ''","time":"2023-04-02T23:00:44+08:00"}

这样做的好处是不像dm,经常因为ddl问题卡住,或者源分表schema不一致导致同步失败。

gravity的监控采用常见的grafana+prometheus。

docker run -v ${PWD}/config.toml:/etc/gravity/config.toml -d --net=host moiot/gravity:latest

当然也提供了docker运行方式,如何使用我们介绍完毕,下一讲,将详细分析下它的源码。