go-mysql-elasticsearch的入口位于:mysql/go-mysql-elasticsearch/cmd/go-mysql-elasticsearch/main.go核心逻辑如下:
var configFile = flag.String("config", "./etc/river.toml", "go-mysql-elasticsearch config file") | |
func main() { | |
cfg, err := river.NewConfigWithFile(*configFile) | |
r, err := river.NewRiver(cfg) | |
go func() { | |
r.Run() | |
r.Close() |
读取配置文件的过程就是解析toml文件github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/config.go
func NewConfig(data string) (*Config, error) { | |
_, err := toml.Decode(data, &c) |
然后初始化River对象,github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/river.go包括加载master信息,初始化canel对象,加载规则,准备canel对象,最后初始化es配置,并初始化es客户端。
func NewRiver(c *Config) (*River, error) { | |
r := new(River) | |
if r.master, err = loadMasterInfo(c.DataDir); err != nil { | |
if err = r.newCanal(); err != nil { | |
if err = r.prepareRule(); err != nil { | |
if err = r.prepareCanal(); err != nil { | |
if err = r.canal.CheckBinlogRowImage("FULL"); err != nil { | |
cfg := new(elastic.ClientConfig) | |
r.es = elastic.NewClient(cfg) | |
go InitStatus(r.c.StatAddr, r.c.StatPath) |
river对象包含了一个canel,一个esclient和一批规则
type River struct { | |
c *Config | |
canal *canal.Canal | |
rules map[string]*Rule | |
ctx context.Context | |
cancel context.CancelFunc | |
wg sync.WaitGroup | |
es *elastic.Client | |
master *masterInfo | |
syncCh chan interface{} | |
} |
canel的初始化过程就是根据配置决定同步哪些对象:
func (r *River) newCanal() error { | |
cfg := canal.NewDefaultConfig() | |
for _, s := range r.c.Sources { | |
for _, t := range s.Tables { | |
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, s.Schema+"\\."+t) | |
r.canal, err = canal.NewCanal(cfg) |
准备规则的过程决定了同步哪些表:
func (r *River) prepareRule() error { | |
wildtables, err := r.parseSource() | |
for _, rule := range r.c.Rules { | |
rule.prepare() | |
for key, rule := range r.rules { | |
if rule.TableInfo, err = r.canal.GetTable(rule.Schema, rule.Table); err != nil { |
准本canel的时候,初始化了canel的事件处理器:
func (r *River) prepareCanal() error { | |
for _, rule := range r.rules { | |
tables = append(tables, rule.Table) | |
r.canal.AddDumpDatabases(keys...) | |
r.canal.SetEventHandler(&eventHandler{r}) |
准备工作做完后就开始启动River来开始运行:
func (r *River) Run() error { | |
go r.syncLoop() | |
pos := r.master.Position() | |
if err := r.canal.RunFrom(pos); err != nil { |
首先是同步循环,在循环内部不断收集binlog产生的变动,然后批量同步给es,按需保存同步进度。
func (r *River) syncLoop() { | |
for { | |
select { | |
case v := <-r.syncCh: | |
switch v := v.(type) { | |
case posSaver: | |
case []*elastic.BulkRequest: | |
if needFlush { | |
// TODO: retry some times? | |
if err := r.doBulk(reqs); err != nil { | |
if needSavePos { | |
if err := r.master.Save(pos); err != nil { |
最后是销毁操作:
func (r *River) Close() { | |
r.cancel() | |
r.canal.Close() | |
r.master.Close() | |
r.wg.Wait() |
event handler是处理同步逻辑的核心:github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/sync.go
type eventHandler struct { | |
r *River | |
} |
其核心函数如下:
func (h *eventHandler) OnRotate(e *replication.RotateEvent) error { | |
func (h *eventHandler) OnTableChanged(schema, table string) error { | |
err := h.r.updateRule(schema, table) | |
func (h *eventHandler) OnDDL(nextPos mysql.Position, _ *replication.QueryEvent) error { | |
func (h *eventHandler) OnXID(nextPos mysql.Position) error { |
根据不同的操作,将它分类成插入,删除和更新操作,最后将操作同步给syncCh。由事件主循环消费。
func (h *eventHandler) OnRow(e *canal.RowsEvent) error { | |
rule, ok := h.r.rules[ruleKey(e.Table.Schema, e.Table.Name)] | |
switch e.Action { | |
case canal.InsertAction: | |
reqs, err = h.r.makeInsertRequest(rule, e.Rows) | |
case canal.DeleteAction: | |
reqs, err = h.r.makeDeleteRequest(rule, e.Rows) | |
case canal.UpdateAction: | |
reqs, err = h.r.makeUpdateRequest(rule, e.Rows) | |
default: | |
err = errors.Errorf("invalid rows action %s", e.Action) | |
} | |
h.r.syncCh <- reqs | |
func (r *River) makeInsertRequest(rule *Rule, rows [][]interface{}) ([]*elastic.BulkRequest, error) { | |
return r.makeRequest(rule, canal.InsertAction, rows) | |
} | |
const ( | |
UpdateAction = "update" | |
InsertAction = "insert" | |
DeleteAction = "delete" | |
) |
根据规则生成es请求:
func (r *River) makeRequest(rule *Rule, action string, rows [][]interface{}) ([]*elastic.BulkRequest, error) { | |
for _, values := range rows { | |
id, err := r.getDocID(rule, values) | |
if parentID, err = r.getParentID(rule, values, rule.Parent); err != nil { | |
req := &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: id, Parent: parentID, Pipeline: rule.Pipeline} | |
if action == canal.DeleteAction { | |
req.Action = elastic.ActionDelete | |
esDeleteNum.WithLabelValues(rule.Index).Inc() | |
} else { | |
r.makeInsertReqData(req, rule, values) | |
esInsertNum.WithLabelValues(rule.Index).Inc() | |
} | |
func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) { | |
if rule.ID == nil { | |
ids, err = rule.TableInfo.GetPKValues(row) | |
for _, column := range rule.ID { | |
value, err := rule.TableInfo.GetColumnValue(column, row) |
最后通过批量请求发送给es,es请求是通过http client进行封装的
func (r *River) doBulk(reqs []*elastic.BulkRequest) error { | |
if resp, err := r.es.Bulk(reqs); err != nil { |
github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/elastic/client.go
type BulkRequest struct { | |
Action string | |
Index string | |
Type string | |
ID string | |
Parent string | |
Pipeline string | |
Data map[string]interface{} | |
} | |
type ClientConfig struct { | |
HTTPS bool | |
Addr string | |
User string | |
Password string | |
} | |
func NewClient(conf *ClientConfig) *Client { | |
c := new(Client) | |
c.c = &http.Client{} | |
func (c *Client) Bulk(items []*BulkRequest) (*BulkResponse, error) { | |
reqURL := fmt.Sprintf("%s://%s/_bulk", c.Protocol, c.Addr) | |
return c.DoBulk(reqURL, items) | |
} | |
func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error) { | |
resp, err := c.DoRequest("POST", url, &buf) |
最后调用了标准http库的Do方法:
func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) { | |
req, err := http.NewRequest(method, url, body) | |
resp, err := c.c.Do(req) |
github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/metrics.go初始化了matrix监控
func InitStatus(addr string, path string) { | |
http.Handle(path, promhttp.Handler()) | |
http.ListenAndServe(addr, nil) | |
} |
github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/rule.go
func (r *Rule) prepare() error { | |
r.Index = strings.ToLower(r.Index) | |
r.Type = strings.ToLower(r.Type) | |
r.canal.AddDumpTables(db, tables...) |
github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/master.go
func loadMasterInfo(dataDir string) (*masterInfo, error) { | |
m.filePath = path.Join(dataDir, "master.info") | |
f, err := os.Open(m.filePath) | |
_, err = toml.DecodeReader(f, &m) |
github.com/siddontang/go-mysql@v0.0.0-20190524062908-de6c3a84bcbe/canal/canal.go最终解析binlog的任务交给了canel,用到的核心函数如下:
func NewCanal(cfg *Config) (*Canal, error) { | |
c := new(Canal) | |
c.eventHandler = &DummyEventHandler{} | |
if err = c.prepareDumper(); err != nil { | |
if err = c.prepareSyncer(); err != nil { | |
if err := c.checkBinlogRowFormat(); err != nil { | |
func (c *Canal) GetTable(db string, table string) (*schema.Table, error) { | |
key := fmt.Sprintf("%s.%s", db, table) | |
func (c *Canal) CheckBinlogRowImage(image string) error { | |
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_row_image"`); err | |
func (c *Canal) RunFrom(pos mysql.Position) error { | |
c.master.Update(pos) | |
return c.Run() | |
} | |
func (c *Canal) Run() error { | |
return c.run() | |
} | |
func (c *Canal) run() error { |
总的来说,整个逻辑还是非常简洁清晰的,它通过解析mysql binlog,然后将binlog事件的数据通过http请求同步到es。通过配置规则来决定如何同步。