golang源码分析:go-mysql-elasticsearch(2)

Golang
225
0
0
2024-01-19
标签   Elasticsearch

go-mysql-elasticsearch的入口位于:mysql/go-mysql-elasticsearch/cmd/go-mysql-elasticsearch/main.go核心逻辑如下:

var configFile = flag.String("config", "./etc/river.toml", "go-mysql-elasticsearch config file")
func main() {
  cfg, err := river.NewConfigWithFile(*configFile)
  r, err := river.NewRiver(cfg)
  go func() {
    r.Run()
  r.Close()

读取配置文件的过程就是解析toml文件github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/config.go

func NewConfig(data string) (*Config, error) {
      _, err := toml.Decode(data, &c)

然后初始化River对象,github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/river.go包括加载master信息,初始化canel对象,加载规则,准备canel对象,最后初始化es配置,并初始化es客户端。

func NewRiver(c *Config) (*River, error) {
      r := new(River)
      if r.master, err = loadMasterInfo(c.DataDir); err != nil {
      if err = r.newCanal(); err != nil {
      if err = r.prepareRule(); err != nil {
      if err = r.prepareCanal(); err != nil {
      if err = r.canal.CheckBinlogRowImage("FULL"); err != nil {
      cfg := new(elastic.ClientConfig)
      r.es = elastic.NewClient(cfg)
      go InitStatus(r.c.StatAddr, r.c.StatPath)

river对象包含了一个canel,一个esclient和一批规则

type River struct {
  c *Config


  canal *canal.Canal


  rules map[string]*Rule


  ctx    context.Context
  cancel context.CancelFunc


  wg sync.WaitGroup


  es *elastic.Client


  master *masterInfo


  syncCh chan interface{}
}

canel的初始化过程就是根据配置决定同步哪些对象:

func (r *River) newCanal() error {
  cfg := canal.NewDefaultConfig()
  for _, s := range r.c.Sources {
    for _, t := range s.Tables {
      cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, s.Schema+"\\."+t)
      r.canal, err = canal.NewCanal(cfg)

准备规则的过程决定了同步哪些表:

func (r *River) prepareRule() error {
    wildtables, err := r.parseSource()
    for _, rule := range r.c.Rules {
    rule.prepare()
      for key, rule := range r.rules {
      if rule.TableInfo, err = r.canal.GetTable(rule.Schema, rule.Table); err != nil {

准本canel的时候,初始化了canel的事件处理器:

func (r *River) prepareCanal() error {
      for _, rule := range r.rules {
      tables = append(tables, rule.Table)
      r.canal.AddDumpDatabases(keys...)
      r.canal.SetEventHandler(&eventHandler{r})

准备工作做完后就开始启动River来开始运行:

func (r *River) Run() error {
  go r.syncLoop()
  pos := r.master.Position()
  if err := r.canal.RunFrom(pos); err != nil {

首先是同步循环,在循环内部不断收集binlog产生的变动,然后批量同步给es,按需保存同步进度。

func (r *River) syncLoop() {
      for {
      select {
    case v := <-r.syncCh:
      switch v := v.(type) {
      case posSaver:
      case []*elastic.BulkRequest:
      if needFlush {
      // TODO: retry some times?
        if err := r.doBulk(reqs); err != nil {
      if needSavePos {
        if err := r.master.Save(pos); err != nil {

最后是销毁操作:

func (r *River) Close() {
  r.cancel()
  r.canal.Close()
  r.master.Close()
  r.wg.Wait()

event handler是处理同步逻辑的核心:github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/sync.go

type eventHandler struct {
  r *River
}

其核心函数如下:

func (h *eventHandler) OnRotate(e *replication.RotateEvent) error {
func (h *eventHandler) OnTableChanged(schema, table string) error {
      err := h.r.updateRule(schema, table)
func (h *eventHandler) OnDDL(nextPos mysql.Position, _ *replication.QueryEvent) error {    
func (h *eventHandler) OnXID(nextPos mysql.Position) error {

根据不同的操作,将它分类成插入,删除和更新操作,最后将操作同步给syncCh。由事件主循环消费。

func (h *eventHandler) OnRow(e *canal.RowsEvent) error {
      rule, ok := h.r.rules[ruleKey(e.Table.Schema, e.Table.Name)]
      switch e.Action {
      case canal.InsertAction:
        reqs, err = h.r.makeInsertRequest(rule, e.Rows)
      case canal.DeleteAction:
        reqs, err = h.r.makeDeleteRequest(rule, e.Rows)
      case canal.UpdateAction:
        reqs, err = h.r.makeUpdateRequest(rule, e.Rows)
      default:
        err = errors.Errorf("invalid rows action %s", e.Action)
      }
      h.r.syncCh <- reqs
func (r *River) makeInsertRequest(rule *Rule, rows [][]interface{}) ([]*elastic.BulkRequest, error) {
      return r.makeRequest(rule, canal.InsertAction, rows)
}
const (
  UpdateAction = "update"
  InsertAction = "insert"
  DeleteAction = "delete"
)

根据规则生成es请求:

func (r *River) makeRequest(rule *Rule, action string, rows [][]interface{}) ([]*elastic.BulkRequest, error) {
for _, values := range rows {
id, err := r.getDocID(rule, values)
  if parentID, err = r.getParentID(rule, values, rule.Parent); err != nil {
      req := &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: id, Parent: parentID, Pipeline: rule.Pipeline}
      if action == canal.DeleteAction {
      req.Action = elastic.ActionDelete
      esDeleteNum.WithLabelValues(rule.Index).Inc()
    } else {
      r.makeInsertReqData(req, rule, values)
      esInsertNum.WithLabelValues(rule.Index).Inc()
    }
func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) {
  if rule.ID == nil {
    ids, err = rule.TableInfo.GetPKValues(row)
    for _, column := range rule.ID {
      value, err := rule.TableInfo.GetColumnValue(column, row)

最后通过批量请求发送给es,es请求是通过http client进行封装的

func (r *River) doBulk(reqs []*elastic.BulkRequest) error {
      if resp, err := r.es.Bulk(reqs); err != nil {

github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/elastic/client.go

type BulkRequest struct {
  Action   string
  Index    string
  Type     string
  ID       string
  Parent   string
  Pipeline string


  Data map[string]interface{}
}
type ClientConfig struct {
  HTTPS    bool
  Addr     string
  User     string
  Password string
}
func NewClient(conf *ClientConfig) *Client {
      c := new(Client)
      c.c = &http.Client{}
func (c *Client) Bulk(items []*BulkRequest) (*BulkResponse, error) {
  reqURL := fmt.Sprintf("%s://%s/_bulk", c.Protocol, c.Addr)
  return c.DoBulk(reqURL, items)
}
func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error) {
      resp, err := c.DoRequest("POST", url, &buf)

最后调用了标准http库的Do方法:

func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) {
      req, err := http.NewRequest(method, url, body)
      resp, err := c.c.Do(req)

github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/metrics.go初始化了matrix监控

func InitStatus(addr string, path string) {
  http.Handle(path, promhttp.Handler())
  http.ListenAndServe(addr, nil)
}

github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/rule.go

func (r *Rule) prepare() error {
r.Index = strings.ToLower(r.Index)
r.Type = strings.ToLower(r.Type)
r.canal.AddDumpTables(db, tables...)

github.com/siddontang/go-mysql-elasticsearch@v0.0.0-20200822025838-fe261969558b/river/master.go

func loadMasterInfo(dataDir string) (*masterInfo, error) {
      m.filePath = path.Join(dataDir, "master.info")
      f, err := os.Open(m.filePath)
      _, err = toml.DecodeReader(f, &m)

github.com/siddontang/go-mysql@v0.0.0-20190524062908-de6c3a84bcbe/canal/canal.go最终解析binlog的任务交给了canel,用到的核心函数如下:

func NewCanal(cfg *Config) (*Canal, error) {
      c := new(Canal)
      c.eventHandler = &DummyEventHandler{}
      if err = c.prepareDumper(); err != nil {
      if err = c.prepareSyncer(); err != nil {
      if err := c.checkBinlogRowFormat(); err != nil {
func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
      key := fmt.Sprintf("%s.%s", db, table)
func (c *Canal) CheckBinlogRowImage(image string) error {
      if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_row_image"`); err
func (c *Canal) RunFrom(pos mysql.Position) error {
  c.master.Update(pos)
  return c.Run()
}
func (c *Canal) Run() error {
  return c.run()
}
func (c *Canal) run() error {

总的来说,整个逻辑还是非常简洁清晰的,它通过解析mysql binlog,然后将binlog事件的数据通过http请求同步到es。通过配置规则来决定如何同步。