go-mysql-elasticsearch的入口位于:mysql/go-mysql-elasticsearch/cmd/go-mysql-elasticsearch/main.go核心逻辑如下:
var configFile = flag.String("config", "./etc/river.toml", "go-mysql-elasticsearch config file")
func main() {
cfg, err := river.NewConfigWithFile(*configFile)
r, err := river.NewRiver(cfg)
go func() {
r.Run()
r.Close()
读取配置文件的过程就是解析toml文件github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/config.go
func NewConfig(data string) (*Config, error) {
_, err := toml.Decode(data, &c)
然后初始化River对象,github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/river.go包括加载master信息,初始化canel对象,加载规则,准备canel对象,最后初始化es配置,并初始化es客户端。
func NewRiver(c *Config) (*River, error) {
r := new(River)
if r.master, err = loadMasterInfo(c.DataDir); err != nil {
if err = r.newCanal(); err != nil {
if err = r.prepareRule(); err != nil {
if err = r.prepareCanal(); err != nil {
if err = r.canal.CheckBinlogRowImage("FULL"); err != nil {
cfg := new(elastic.ClientConfig)
r.es = elastic.NewClient(cfg)
go InitStatus(r.c.StatAddr, r.c.StatPath)
river对象包含了一个canel,一个esclient和一批规则
type River struct {
c *Config
canal *canal.Canal
rules map[string]*Rule
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
es *elastic.Client
master *masterInfo
syncCh chan interface{}
}
canel的初始化过程就是根据配置决定同步哪些对象:
func (r *River) newCanal() error {
cfg := canal.NewDefaultConfig()
for _, s := range r.c.Sources {
for _, t := range s.Tables {
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, s.Schema+"\\."+t)
r.canal, err = canal.NewCanal(cfg)
准备规则的过程决定了同步哪些表:
func (r *River) prepareRule() error {
wildtables, err := r.parseSource()
for _, rule := range r.c.Rules {
rule.prepare()
for key, rule := range r.rules {
if rule.TableInfo, err = r.canal.GetTable(rule.Schema, rule.Table); err != nil {
准本canel的时候,初始化了canel的事件处理器:
func (r *River) prepareCanal() error {
for _, rule := range r.rules {
tables = append(tables, rule.Table)
r.canal.AddDumpDatabases(keys...)
r.canal.SetEventHandler(&eventHandler{r})
准备工作做完后就开始启动River来开始运行:
func (r *River) Run() error {
go r.syncLoop()
pos := r.master.Position()
if err := r.canal.RunFrom(pos); err != nil {
首先是同步循环,在循环内部不断收集binlog产生的变动,然后批量同步给es,按需保存同步进度。
func (r *River) syncLoop() {
for {
select {
case v := <-r.syncCh:
switch v := v.(type) {
case posSaver:
case []*elastic.BulkRequest:
if needFlush {
// TODO: retry some times?
if err := r.doBulk(reqs); err != nil {
if needSavePos {
if err := r.master.Save(pos); err != nil {
最后是销毁操作:
func (r *River) Close() {
r.cancel()
r.canal.Close()
r.master.Close()
r.wg.Wait()
event handler是处理同步逻辑的核心:github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/sync.go
type eventHandler struct {
r *River
}
其核心函数如下:
func (h *eventHandler) OnRotate(e *replication.RotateEvent) error {
func (h *eventHandler) OnTableChanged(schema, table string) error {
err := h.r.updateRule(schema, table)
func (h *eventHandler) OnDDL(nextPos mysql.Position, _ *replication.QueryEvent) error {
func (h *eventHandler) OnXID(nextPos mysql.Position) error {
根据不同的操作,将它分类成插入,删除和更新操作,最后将操作同步给syncCh。由事件主循环消费。
func (h *eventHandler) OnRow(e *canal.RowsEvent) error {
rule, ok := h.r.rules[ruleKey(e.Table.Schema, e.Table.Name)]
switch e.Action {
case canal.InsertAction:
reqs, err = h.r.makeInsertRequest(rule, e.Rows)
case canal.DeleteAction:
reqs, err = h.r.makeDeleteRequest(rule, e.Rows)
case canal.UpdateAction:
reqs, err = h.r.makeUpdateRequest(rule, e.Rows)
default:
err = errors.Errorf("invalid rows action %s", e.Action)
}
h.r.syncCh <- reqs
func (r *River) makeInsertRequest(rule *Rule, rows [][]interface{}) ([]*elastic.BulkRequest, error) {
return r.makeRequest(rule, canal.InsertAction, rows)
}
const (
UpdateAction = "update"
InsertAction = "insert"
DeleteAction = "delete"
)
根据规则生成es请求:
func (r *River) makeRequest(rule *Rule, action string, rows [][]interface{}) ([]*elastic.BulkRequest, error) {
for _, values := range rows {
id, err := r.getDocID(rule, values)
if parentID, err = r.getParentID(rule, values, rule.Parent); err != nil {
req := &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: id, Parent: parentID, Pipeline: rule.Pipeline}
if action == canal.DeleteAction {
req.Action = elastic.ActionDelete
esDeleteNum.WithLabelValues(rule.Index).Inc()
} else {
r.makeInsertReqData(req, rule, values)
esInsertNum.WithLabelValues(rule.Index).Inc()
}
func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) {
if rule.ID == nil {
ids, err = rule.TableInfo.GetPKValues(row)
for _, column := range rule.ID {
value, err := rule.TableInfo.GetColumnValue(column, row)
最后通过批量请求发送给es,es请求是通过http client进行封装的
func (r *River) doBulk(reqs []*elastic.BulkRequest) error {
if resp, err := r.es.Bulk(reqs); err != nil {
github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/elastic/client.go
type BulkRequest struct {
Action string
Index string
Type string
ID string
Parent string
Pipeline string
Data map[string]interface{}
}
type ClientConfig struct {
HTTPS bool
Addr string
User string
Password string
}
func NewClient(conf *ClientConfig) *Client {
c := new(Client)
c.c = &http.Client{}
func (c *Client) Bulk(items []*BulkRequest) (*BulkResponse, error) {
reqURL := fmt.Sprintf("%s://%s/_bulk", c.Protocol, c.Addr)
return c.DoBulk(reqURL, items)
}
func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error) {
resp, err := c.DoRequest("POST", url, &buf)
最后调用了标准http库的Do方法:
func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) {
req, err := http.NewRequest(method, url, body)
resp, err := c.c.Do(req)
github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/metrics.go初始化了matrix监控
func InitStatus(addr string, path string) {
http.Handle(path, promhttp.Handler())
http.ListenAndServe(addr, nil)
}
github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/rule.go
func (r *Rule) prepare() error {
r.Index = strings.ToLower(r.Index)
r.Type = strings.ToLower(r.Type)
r.canal.AddDumpTables(db, tables...)
github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/master.go
func loadMasterInfo(dataDir string) (*masterInfo, error) {
m.filePath = path.Join(dataDir, "master.info")
f, err := os.Open(m.filePath)
_, err = toml.DecodeReader(f, &m)
github.com/siddontang/go-mysql@v0.0.0-20190524062908-de6c3a84bcbe/canal/canal.go最终解析binlog的任务交给了canel,用到的核心函数如下:
func NewCanal(cfg *Config) (*Canal, error) {
c := new(Canal)
c.eventHandler = &DummyEventHandler{}
if err = c.prepareDumper(); err != nil {
if err = c.prepareSyncer(); err != nil {
if err := c.checkBinlogRowFormat(); err != nil {
func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
key := fmt.Sprintf("%s.%s", db, table)
func (c *Canal) CheckBinlogRowImage(image string) error {
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_row_image"`); err
func (c *Canal) RunFrom(pos mysql.Position) error {
c.master.Update(pos)
return c.Run()
}
func (c *Canal) Run() error {
return c.run()
}
func (c *Canal) run() error {
总的来说,整个逻辑还是非常简洁清晰的,它通过解析mysql binlog,然后将binlog事件的数据通过http请求同步到es。通过配置规则来决定如何同步。