golang源码分析:mysql同步工具gravity(2)

Golang
193
0
0
2024-01-19

在分析完gravity的原理和如何使用以后,我们开始分析下它的源码。gravity有5个入口,代表了5个工具。

1,cmd/dcp/main.go

go func() {
    closed <- dcp.StartLocal(&barrierConfig, collectorConfigs, &checkerConfig, shutDown, alarm)
  }()

2,cmd/gravity/main.go 这个是我们同步工具的入口:它先注册里一系列插件,然后启动http服务 ,并监控配置文件的变化

log.RegisterExitHandler(func() {
    hplugin.CleanupClients()
  })
    server, err := app.NewServer(cfg.PipelineConfig)
    go func() {
    http.Handle("/metrics", promhttp.Handler())
    http.HandleFunc("/reset", resetHandler(server, cfg.PipelineConfig))
    http.HandleFunc("/status", statusHandler(server, cfg.PipelineConfig.PipelineName, hash))
    http.HandleFunc("/healthz", healthzHandler(server))
    err = http.ListenAndServe(cfg.HttpAddr, nil)
    err = server.Start()
    watcher, err := fsnotify.NewWatcher()
    err = watcher.Add(cfg.ConfigFile)

3,cmd/padder/main.go

if err := padder.Pad(cfg.PadderConfig); err != nil {

4,cmd/prometheus_etcd_sd/main.go:从etcd获取元信息,并监控集群状态:

client, err := utils.CreateEtcdClient(*etcdServer)
    srvs = services{}
    res, err := client.Get(context.TODO(), *servicesPrefix, etcd.WithPrefix())
      for _, kv := range res.Kvs {
    srvs.handle(kv, srvs.update)
  }
    srvs.persist()
    updates := client.Watch(context.TODO(), *servicesPrefix, etcd.WithPrefix())
      for resp := range updates {
    for _, res := range resp.Events {
      log.Infoln(res.Type, string(res.Kv.Key), string(res.Kv.Value))


      h := srvs.update
      if res.Type == mvccpb.DELETE {
        h = srvs.delete
      }
      srvs.handle(res.Kv, h)
    }
    srvs.persist()
  }

5,cmd/verifier/main.go:检查源,目标的合法性。

source, err := utils.CreateDBConnection(c.Source)
target, err := utils.CreateDBConnection(c.Target)
sourceTables, err := schema_store.GetTablesFromDB(source, c.Source.Schema)
targetTables, err := schema_store.GetTablesFromDB(target, c.Target.Schema)
targetTables = filter(targetTables, c.TargetTable)
if !reflect.DeepEqual(sourceTables, targetTables) {

首先看下dcp调用的StartLocal函数,它定义位于dcp/local_server.go:它首先调用barrier.Start,创建表来记录同步的位置信息。然后启动collector,支持mysql和grpc两种。然后启动协程调用c.Consume(m)方法来消费Msg信息。

func StartLocal(barrierConfig *barrier.Config, collectorConfigs []collector.Config, checkerConfig *checker.Config, shutdown chan struct{}, alarm chan<- checker.Result) error {
  barrier.Start(barrierConfig, shutdown)
  allMsg := make(chan *dcp.Message, 100)
  for _, cfg := range collectorConfigs {
     case *collector.MysqlConfig:
      c = collector.NewMysqlCollector(cfg)
    case *collector.GrpcConfig:
      c = collector.NewGrpc(cfg)
    c.Start()
    go func(c <-chan *dcp.Message) {
      for m := range c {
        allMsg <- m
      }
    }(c.GetChan())
    collectors = append(collectors, c)
   for {
      select {
        case m := <-allMsg:
      c.Consume(m)

首先看下其中一种collector:mysql collector,位于dcp/collector/mysql.go

func NewMysqlCollector(config *MysqlConfig) Interface {
syncer := replication.NewBinlogSyncer(cfg)
gtid, err := mysql.ParseMysqlGTIDSet(id)
schemaTables := make(map[string]tagInfo)
  for _, tagConfig := range config.TagConfigs {
    for _, v := range tagConfig.Tables {
      schemaTables[schemaTbl2Key(v.Schema, v.Table)] = tagInfo{
        tag:           tagConfig.Tag,
        primaryKeyIdx: v.PrimaryKeyIdx,
      }
    }
  }

它首先解析GTID,然后开始根据配置,组装tag信息。它的Satrt方法开始处理数据同步:

func (c *Mysql) Start() {
  go c.mainLoop()
}

根据binlog不同事件,进行不同处理:

func (c *Mysql) mainLoop() {
 for {
    rawEvent, err := c.streamer.GetEvent(context.Background())
      switch evt := rawEvent.Event.(type) {
    case *replication.RowsEvent:
      schemaName, tableName := string(evt.Table.Schema), string(evt.Table.Table)
      switch rawEvent.Header.EventType {
      case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
        for _, row := range evt.Rows {
          if isBarrier {
            for _, v := range c.tagInfo {
              msg := &dcp.Message{
                Tag:       v.tag,
                Id:        c.getNextGtid(),
                Timestamp: uint64(rawEvent.Header.Timestamp),
              }

grpc collector也类似dcp/collector/grpc.go

func (s *Grpc) Start() {
      lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))
      go s.server.Serve(lis)
func (s *Grpc) Process(stream dcp.DCPService_ProcessServer) error {
   for {
    msg, err := stream.Recv()
      s.output <- msg


    err = stream.Send(&dcp.Response{
      Id: msg.Id,
    })

最后看下Consume方法dcp/checker/checker.go

func (s *checker) Consume(msg *dcp.Message) {
      idx, ok := s.tagIndex[msg.Tag]
      s.buffers[idx].Write(msg)

上述过程中解析gtid使用了go-mysql库的函数github.com/siddontang/go-mysql@v0.0.0-20190312052122-c6ab05a85eb8/mysql/mysql_gtid.go

func ParseMysqlGTIDSet(str string) (GTIDSet, error) {
     if set, err := ParseUUIDSet(sp[i]); err != nil {

通用开始同步消息的函数StartSyncGTID也是来自这个库 github.com/siddontang/go-mysql@v0.0.0-20190312052122-c6ab05a85eb8/replication/binlogsyncer.go

func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
if err := b.prepare(); err != nil {
  switch b.cfg.Flavor {
  case MariaDBFlavor:
    err = b.writeBinlogDumpMariadbGTIDCommand(gset)
  default:
    // default use MySQL
    err = b.writeBinlogDumpMysqlGTIDCommand(gset)
      return b.startDumpStream(), nil  
func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
s := newBinlogStreamer()
  b.wg.Add(1)
  go b.onStream(s)

记录同步状态需要初始化一个mysql表,它的实现位于dcp/barrier/barrier.go

func Start(config *Config, shutdown chan struct{}) {
_, err = db.Exec("CREATE database if not EXISTS " + DB_NAME)
  _, err = db.Exec(`
CREATE TABLE IF NOT EXISTS ` + DB_TABLE_NAME + ` (
  id int(11) unsigned NOT NULL AUTO_INCREMENT,
  offset BIGINT unsigned NOT NULL DEFAULT 0,
  ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`)
r, err := db.Exec("INSERT IGNORE INTO " + DB_TABLE_NAME + "(id, offset) VALUES (" + RECORD_ID + ", 0);")
go barrierLoop(ticker, db, shutdown)

库名和表名定义如下:

const (
  DB_NAME           = "drc"
  TABLE_NAME        = "barrier"
  DB_TABLE_NAME     = DB_NAME + "." + TABLE_NAME
  RECORD_ID         = "1"
  OFFSET_COLUMN_SEQ = 1
)

了解完第一个命令后,我们重点分析下,核心命令gravity,为了找到核心命令入口,我们先看下Makefile

default: build
build:
  $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/gravity cmd/gravity/main.go
  #$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/padder cmd/padder/main.go

在cmd/gravity/main.go里调用了 pkg/app/server.go的NewServer

func NewServer(pipelineConfig config.PipelineConfigV3) (*Server, error) {
      server, err := ParsePlugins(pipelineConfig)
      if p, err := newer.NewPositionCache(); err != nil {

它依次执行了Ouput、Scheduler、PositionCache、Input等插件:

func (s *Server) Start() error {
switch o := s.Output.(type) {
  case core.SynchronousOutput:
    if err := o.Start(); err != nil {
  case core.AsynchronousOutput:
    if err := o.Start(s.Scheduler); err != nil {
if err := s.Scheduler.Start(s.Output); err != nil {
if err := s.PositionCache.Start(); err != nil {
if err := s.Input.Start(s.Emitter, s.Output.GetRouter(), s.PositionCache); err != nil {

插件的接口定义在pkg/core/output.go

type SynchronousOutput interface {
  Start() error
  Output
}
type AsynchronousOutput interface {
  Start(msgAcker MsgAcker) error
  Output
}
type Output interface {
  Execute(msgs []*Msg) error
  GetRouter() Router
  Close()
}

插件的注册和解析位于pkg/app/server.go

func ParsePlugins(pipelineConfig config.PipelineConfigV3) (*Server, error) 
      plugin, err := registry.GetPlugin(registry.OutputPlugin, pipelineConfig.OutputPlugin.Type)
      if err := plugin.Configure(pipelineConfig.PipelineName, pipelineConfig.OutputPlugin.Config); err != nil {
      plugin, err = registry.GetPlugin(registry.SchedulerPlugin, pipelineConfig.SchedulerPlugin.Type)
      fs, err := filters.NewFilters(pipelineConfig.FilterPlugins)
      e, err := emitter.NewEmitter(fs, server.Scheduler)
      input, err := newInput(pipelineConfig.PipelineName, inputPlugins)

比如Input插件 ,它是通过Type和name从map里面定位的。

func newInput(pipelineName string, inputConfig config.InputConfig) (core.Input, error) {
      plugin, err := registry.GetPlugin(registry.InputPlugin, inputConfig.Type)

pkg/registry/registry.go

func GetPlugin(pluginType PluginType, name string) (Plugin, error) {
      plugins, ok := registry[pluginType]
      p, ok := plugins[name]
      return p(), nil
var registry map[PluginType]map[string]PluginFactory
func RegisterPluginFactory(pluginType PluginType, name string, v PluginFactory) {
      registry[pluginType][name] = v

如果是单例,直接返回,否则利用反射复制一份:

func RegisterPlugin(pluginType PluginType, name string, v Plugin, singleton bool) {
  if singleton {
    pf = func() Plugin {
      return v
    }
  } else {
    pf = func() Plugin {
      return reflect.New(reflect.TypeOf(v).Elem()).Interface().(Plugin)
    }
  }

注册插件的时候调用

RegisterPluginFactory(pluginType, name, pf)

插件的类型定义如下:

const (
  InputPlugin              PluginType = "input"
  PositionRepo             PluginType = "positionRepo"
  OutputPlugin             PluginType = "output"
  FilterPlugin             PluginType = "filters"
  MatcherPlugin            PluginType = "matcher"
  SchedulerPlugin          PluginType = "scheduler"
  SQLExecutionEnginePlugin PluginType = "sqlExecutionEngine"
)

比如filter插件的注册pkg/filters/accept_filter.go

func init() {
  registry.RegisterPlugin(registry.FilterPlugin, AcceptFilterName, &acceptFilterFactoryType{}, true)
}

input插件的注册:pkg/inputs/mysql/input.go

func init() {
  registry.RegisterPlugin(registry.InputPlugin, Name, &input{}, false)
}
const Name = "mysql"
type input struct {
  core.Input
}
func (i *input) Configure(pipelineName string, data map[string]interface{}) error {    
func (i *input) NewPositionCache() (position_cache.PositionCacheInterface, error) {

流式输入的插件定义在pkg/inputs/mysqlstream/input.go

const inputStreamKey = "mysqlstream"
const Name = "mysql-stream"
func (plugin *mysqlStreamInputPlugin) Start(emitter core.Emitter, router core.Router, positionCache position_cache.PositionCacheInterface) error {
sourceDB, err := utils.CreateDBConnection(plugin.cfg.Source)
sourceSchemaStore, err := schema_store.NewSimpleSchemaStoreFromDBConn(sourceDB)
plugin.binlogChecker, err = binlog_checker.NewBinlogChecker(
    plugin.pipelineName,
    plugin.probeDBConfig,
    plugin.probeSQLAnnotation,
    BinlogProbeInterval,
    plugin.cfg.DisableBinlogChecker)
if err := plugin.binlogChecker.Start(); err != nil {
      gravityServerID := utils.GenerateRandomServerID()
      plugin.binlogTailer, err = NewBinlogTailer(
    plugin.pipelineName,
    plugin.cfg,
    gravityServerID,
    positionCache,
    sourceSchemaStore,
    sourceDB,
    emitter,
    router,
    plugin.binlogChecker,
    nil)
if err := plugin.binlogTailer.Start(); err != nil {

pkg/inputs/mysqlstream/binlog_tailer.go

if err := utils.CheckBinlogFormat(tailer.sourceDB); err != nil {
    position, exist, err := tailer.positionCache.Get()
    binlogPositionValue, ok := position.Value.(helper.BinlogPositionsValue)
    streamer, err := tailer.getBinlogStreamer(binlogPositionValue.CurrentPosition.BinlogGTID)
    currentPosition, err := GetCurrentPositionValue(tailer.positionCache)
  for {
// ctx, cancel := context.WithTimeout(tailer.ctx, binlogSyncerTimeout)
  e, err := streamer.GetEvent(tailer.ctx)
    switch ev := e.Event.(type) {
      case *replication.RotateEvent:
        sourcePosition := gomysql.Position{Name: string(ev.NextLogName), Pos: uint32(ev.Position)}