golang源码分析:go-mysql(4)binlog增量同步

Golang
207
0
0
2024-01-19

实现binlog增量同步(Incremental dumping)需要哪些步骤呢?获取配置,初始化同步器,找到上一次同步位置,开启同步,并处理解析到的事件,整体流程如下:

  cfg := replication.BinlogSyncerConfig{}
  syncer := replication.NewBinlogSyncer(cfg)
  streamer, _ := syncer.StartSync(mysql.Position{binlogFile, binlogPos})
  for {
    ev, _ := streamer.GetEvent(context.Background())
    // Dump event
    
    ev.Dump(os.Stdout)
  }

配置定义如下,定义了binlog同步需要的必要参数:

// BinlogSyncerConfig is the configuration for BinlogSyncer.
type BinlogSyncerConfig struct {
  // ServerID is the unique ID in cluster.
  ServerID uint32
  // Flavor is "mysql" or "mariadb", if not set, use "mysql" default.
  Flavor string


  // Host is for MySQL server host.
  Host string
  // Port is for MySQL server port.
  Port uint16
  // User is for MySQL user.
  User string
  // Password is for MySQL password.
  Password string


  // Localhost is local hostname if register salve.
  // If not set, use os.Hostname() instead.
  Localhost string


  // Charset is for MySQL client character set
  Charset string


  // SemiSyncEnabled enables semi-sync or not.
  SemiSyncEnabled bool


  // RawModeEnabled is for not parsing binlog event.
  RawModeEnabled bool


  // If not nil, use the provided tls.Config to connect to the database using TLS/SSL.
  TLSConfig *tls.Config


  // Use replication.Time structure for timestamp and datetime.
  // We will use Local location for timestamp and UTC location for datatime.
  ParseTime bool


  // If ParseTime is false, convert TIMESTAMP into this specified timezone. If
  // ParseTime is true, this option will have no effect and TIMESTAMP data will
  // be parsed into the local timezone and a full time.Time struct will be
  // returned.
  //
  // Note that MySQL TIMESTAMP columns are offset from the machine local
  // timezone while DATETIME columns are offset from UTC. This is consistent
  // with documented MySQL behaviour as it return TIMESTAMP in local timezone
  // and DATETIME in UTC.
  //
  // Setting this to UTC effectively equalizes the TIMESTAMP and DATETIME time
  // strings obtained from MySQL.
  TimestampStringLocation *time.Location


  // Use decimal.Decimal structure for decimals.
  UseDecimal bool


  // RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection.
  RecvBufferSize int


  // master heartbeat period
  HeartbeatPeriod time.Duration


  // read timeout
  ReadTimeout time.Duration


  // maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
  // this configuration will not work if DisableRetrySync is true
  MaxReconnectAttempts int


  // whether disable re-sync for broken connection
  DisableRetrySync bool


  // Only works when MySQL/MariaDB variable binlog_checksum=CRC32.
  // For MySQL, binlog_checksum was introduced since 5.6.2, but CRC32 was set as default value since 5.6.6 .
  // https://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#option_mysqld_binlog-checksum
  // For MariaDB, binlog_checksum was introduced since MariaDB 5.3, but CRC32 was set as default value since MariaDB 10.2.1 .
  // https://mariadb.com/kb/en/library/replication-and-binary-log-server-system-variables/#binlog_checksum
  VerifyChecksum bool


  // DumpCommandFlag is used to send binglog dump command. Default 0, aka BINLOG_DUMP_NEVER_STOP.
  // For MySQL, BINLOG_DUMP_NEVER_STOP and BINLOG_DUMP_NON_BLOCK are available.
  // https://dev.mysql.com/doc/internals/en/com-binlog-dump.html#binlog-dump-non-block
  // For MariaDB, BINLOG_DUMP_NEVER_STOP, BINLOG_DUMP_NON_BLOCK and BINLOG_SEND_ANNOTATE_ROWS_EVENT are available.
  // https://mariadb.com/kb/en/library/com_binlog_dump/
  // https://mariadb.com/kb/en/library/annotate_rows_event/
  DumpCommandFlag uint16


  //Option function is used to set outside of BinlogSyncerConfig, between mysql connection and COM_REGISTER_SLAVE
  //For MariaDB: slave_gtid_ignore_duplicates、skip_replication、slave_until_gtid
  Option func(*client.Conn) error


  // Set Logger
  Logger loggers.Advanced


  // Set Dialer
  Dialer client.Dialer


  RowsEventDecodeFunc func(*RowsEvent, []byte) error


  DiscardGTIDSet bool
}

然后初始化同步器,它的定义如下,github.com/go-mysql-org/go-mysql@v1.7.0/replication/binlogsyncer.go:

// BinlogSyncer syncs binlog event from server.
type BinlogSyncer struct {
  m sync.RWMutex


  cfg BinlogSyncerConfig


  c *client.Conn


  wg sync.WaitGroup


  parser *BinlogParser


  nextPos Position


  prevGset, currGset GTIDSet


  // instead of GTIDSet.Clone, use this to speed up calculate prevGset
  prevMySQLGTIDEvent *GTIDEvent


  running bool


  ctx    context.Context
  cancel context.CancelFunc


  lastConnectionID uint32


  retryCount int
}

初始化同步器的时候会指定解析器,以及失败处理器github.com/go-mysql-org/go-mysql@v1.7.0/replication/binlogsyncer.go:

func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
  b := new(BinlogSyncer)
  b.cfg = cfg
  b.parser = NewBinlogParser()
  b.parser.SetFlavor(cfg.Flavor)

同步器位于github.com/go-mysql-org/go-mysql@v1.7.0/replication/parser.go

func NewBinlogParser() *BinlogParser {
  p := new(BinlogParser)
  p.tables = make(map[uint64]*TableMapEvent)
  return p
}
type BinlogParser struct {
  // "mysql" or "mariadb", if not set, use "mysql" by default
  flavor string


  format *FormatDescriptionEvent


  tables map[uint64]*TableMapEvent


  // for rawMode, we only parse FormatDescriptionEvent and RotateEvent
  rawMode bool


  parseTime               bool
  timestampStringLocation *time.Location


  // used to start/stop processing
  stopProcessing uint32


  useDecimal          bool
  ignoreJSONDecodeErr bool
  verifyChecksum      bool


  rowsEventDecodeFunc func(*RowsEvent, []byte) error
}

开启同步的时候需要给定上次同步的位置:

github.com/go-mysql-org/go-mysql@v1.7.0/mysql/position.go

// For binlog filename + position based replication
type Position struct {
  Name string
  Pos  uint32
}

github.com/go-mysql-org/go-mysql@v1.7.0/replication/binlogsyncer.go定位到指定位置后就开启同步

func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
        if err := b.prepareSyncPos(pos); err != nil {
        return b.startDumpStream(), nil

定位到同步位置,包括准备工作如下:这册从库,允许半同步。

func (b *BinlogSyncer) prepareSyncPos(pos Position) error {
        if err := b.prepare(); err != nil {
        if err := b.writeBinlogDumpCommand(pos); err != nil {
func (b *BinlogSyncer) prepare() error {
        if err := b.registerSlave(); err != nil {
        if err := b.enableSemiSync(); err != nil {
func (b *BinlogSyncer) registerSlave() error {
        b.c, err = b.newConnection(b.ctx)
        b.lastConnectionID = b.c.GetConnectionID()
        if r, err := b.c.Execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'"); err != nil {
        s, _ := r.GetString(0, 1)
        if _, err = b.c.Execute(`SET @master_binlog_checksum='NONE'`); err != nil {
        if err = b.writeRegisterSlaveCommand(); err != nil {
        if _, err = b.c.ReadOKPacket(); err != nil {
          if _, err = b.c.Execute(fmt.Sprintf("SET @slave_uuid = '%s', @replica_uuid = '%s'", serverUUID, serverUUID)); err != nil {  
func (b *BinlogSyncer) enableSemiSync() error {
        if r, err := b.c.Execute("SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';"); err != nil {
        _, err := b.c.Execute(`SET @rpl_semi_sync_slave = 1;`)

开启同步后会开启一个独立的携程来解析binlog:

func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
        s := NewBinlogStreamer()
        go b.onStream(s)
func NewBinlogStreamer() *BinlogStreamer {
  s := new(BinlogStreamer)
  s.ch = make(chan *BinlogEvent, 10240)

同步的过程,会先将解析的结果放入BinlogStreamer的channel里面,然后在for循环中消费这个channel

// BinlogStreamer gets the streaming event.
type BinlogStreamer struct {
  ch  chan *BinlogEvent
  ech chan error
  err error
}

解析binlog过程中,会对不同事件进行不同处理,然后放入channel里

func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
for {
    data, err := b.c.ReadPacket()
        for {
        select {
        case <-b.ctx.Done():
          s.close()
          return
        case <-time.After(time.Second):
          b.retryCount++
          if err = b.retrySync(); err != nil {
        switch data[0] {
      case OK_HEADER:
        if err = b.parseEvent(s, data); err != nil {
          s.closeWithError(err)
          return
        }
      case ERR_HEADER:
        err = b.c.HandleErrorPacket(data)
        s.closeWithError(err)
        return
      case EOF_HEADER:
func (b *BinlogSyncer) retrySync() error {
        b.parser.Reset()
        if err := b.prepareSyncGTID(b.prevGset); err != nil {
        if err := b.prepareSyncPos(b.nextPos); err != nil {

核心函数是parseEvent,放入channel就在这个函数里:

func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
e, err := b.parser.Parse(data)
switch event := e.Event.(type) {
  case *RotateEvent:
    b.nextPos.Name = string(event.NextLogName)
    b.nextPos.Pos = uint32(event.Position)
    b.cfg.Logger.Infof("rotate to %s", b.nextPos)
  case *GTIDEvent:
    if b.prevGset == nil {
      break
    }
    if b.currGset == nil {
      b.currGset = b.prevGset.Clone()
    }
    u, _ := uuid.FromBytes(event.SID)
    b.currGset.(*MysqlGTIDSet).AddGTID(u, event.GNO)
    if b.prevMySQLGTIDEvent != nil {
      u, _ = uuid.FromBytes(b.prevMySQLGTIDEvent.SID)
      b.prevGset.(*MysqlGTIDSet).AddGTID(u, b.prevMySQLGTIDEvent.GNO)
    }
    b.prevMySQLGTIDEvent = event
  case *MariadbGTIDEvent:
    if b.prevGset == nil {
      break
    }
    if b.currGset == nil {
      b.currGset = b.prevGset.Clone()
    }
    prev := b.currGset.Clone()
    err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID)
    if err != nil {
      return errors.Trace(err)
    }
    // right after reconnect we will see same gtid as we saw before, thus currGset will not get changed
    if !b.currGset.Equal(prev) {
      b.prevGset = prev
    }
  case *XIDEvent:
    if !b.cfg.DiscardGTIDSet {
      event.GSet = getCurrentGtidSet()
    }
  case *QueryEvent:
    if !b.cfg.DiscardGTIDSet {
      event.GSet = getCurrentGtidSet()
    }
  }
  select {
  case s.ch <- e:
  case <-b.ctx.Done():
    needStop = true
  }
  if needACK {
    err := b.replySemiSyncACK(b.nextPos)  
func (b *BinlogSyncer) replySemiSyncACK(p Position) error {
  data[pos] = SemiSyncIndicator
  err := b.c.WritePacket(data)
  for {
    ev, _ := streamer.GetEvent(context.Background())
    // Dump event
    ev.Dump(os.Stdout)
  }

接着就来到了最后读取解析到的event的过程,它就是简单的channel的读取过程:

func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error) {
  select {
  case c := <-s.ch:
    return c, nil
  case s.err = <-s.ech:
    return nil, s.err
  case <-ctx.Done():
    return nil, ctx.Err()
  }

读取后我们可以进行简单dump

func (e *BinlogEvent) Dump(w io.Writer) {
  e.Header.Dump(w)
  e.Event.Dump(w)
}
type BinlogStreamer struct {
  ch  chan *BinlogEvent
  ech chan error
  err error
}

类似的函数还有:

func (s *BinlogStreamer) GetEventWithStartTime(ctx context.Context, startTime time.Time) (*BinlogEvent, error) {
  startUnix := startTime.Unix()
  select {
  case c := <-s.ch:
    if int64(c.Header.Timestamp) >= startUnix {
      return c, nil
func (s *BinlogStreamer) DumpEvents() []*BinlogEvent {
  count := len(s.ch)
  events := make([]*BinlogEvent, 0, count)
  for i := 0; i < count; i++ {
    events = append(events, <-s.ch)
func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error {
  select {
  case s.ch <- ev:
    return nil
  case err := <-s.ech:
    return err
  }
}