如何用golang自己实现一个canel呢,github.com/go-mysql-org/go-mysql给我们提供了这样的能力,它已经完成mysql协议的解析,并将解析后同步从库的过程实现,加入了几个插件点,实现自己的canel只需要实现这几个插件点即可完成我们自定义的同步工具。下面我们结合源码分析一下如何实现。实现一个canel需要下面四步:初始化配置,创建canel实例,设置事件处理函数,开始处理。
cfg := canal.NewDefaultConfig()
c, err := canal.NewCanal(cfg)
c.SetEventHandler(&MyEventHandler{})
c.Run()
配置的定义位于github.com/go-mysql-org/go-mysql@v1.7.0/canal/config.go,主要用配置同步的时候源,目标表的一些参数,用于控制同步过程的顺利进行:
func NewDefaultConfig() *Config {
type Config struct {
Addr string `toml:"addr"`
User string `toml:"user"`
Password string `toml:"password"`
Charset string `toml:"charset"`
ServerID uint32 `toml:"server_id"`
Flavor string `toml:"flavor"`
HeartbeatPeriod time.Duration `toml:"heartbeat_period"`
ReadTimeout time.Duration `toml:"read_timeout"`
// IncludeTableRegex or ExcludeTableRegex should contain database name
// Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed
// eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"]
// this will include all database's 'canal' table, except database 'mysql'
// Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables
IncludeTableRegex []string `toml:"include_table_regex"`
ExcludeTableRegex []string `toml:"exclude_table_regex"`
// discard row event without table meta
DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"`
Dump DumpConfig `toml:"dump"`
UseDecimal bool `toml:"use_decimal"`
ParseTime bool `toml:"parse_time"`
TimestampStringLocation *time.Location
// SemiSyncEnabled enables semi-sync or not.
SemiSyncEnabled bool `toml:"semi_sync_enabled"`
// maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
// this configuration will not work if DisableRetrySync is true
MaxReconnectAttempts int `toml:"max_reconnect_attempts"`
// whether disable re-sync for broken connection
DisableRetrySync bool `toml:"disable_retry_sync"`
// Set TLS config
TLSConfig *tls.Config
//Set Logger
Logger loggers.Advanced
//Set Dialer
Dialer client.Dialer
}
详细可以参考源码来进行参数的设置。
初始化canel实例的过程位于:github.com/go-mysql-org/go-mysql@v1.7.0/canal/canal.go
func NewCanal(cfg *Config) (*Canal, error) {
c.dumpDoneCh = make(chan struct{})
c.eventHandler = &DummyEventHandler{}
c.parser = parser.New()
c.tables = make(map[string]*schema.Table)
c.master = &masterInfo{logger: c.cfg.Logger}
if err = c.prepareDumper(); err != nil {
if err = c.prepareSyncer(); err != nil {
if err := c.checkBinlogRowFormat(); err != nil {
if n := len(c.cfg.IncludeTableRegex); n > 0 {
if n := len(c.cfg.ExcludeTableRegex); n > 0 {
先会初始一个canel对象,然后给他一系列默认值,比如哑事件处理器。然后指定解析器,和同步器。其中canel的定义如下:
type Canal struct {
m sync.Mutex
cfg *Config
parser *parser.Parser
master *masterInfo
dumper *dump.Dumper
dumped bool
dumpDoneCh chan struct{}
syncer *replication.BinlogSyncer
eventHandler EventHandler
connLock sync.Mutex
conn *client.Conn
tableLock sync.RWMutex
tables map[string]*schema.Table
errorTablesGetTime map[string]time.Time
tableMatchCache map[string]bool
includeTableRegex []*regexp.Regexp
excludeTableRegex []*regexp.Regexp
delay *uint32
ctx context.Context
cancel context.CancelFunc
}
接着我们重点看下哑事件处理器的实现:github.com/go-mysql-org/go-mysql@v1.7.0/canal/handler.go
type DummyEventHandler struct {
}
func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error {
return nil
}
它实现了EventHandler的所有接口方法,但是内部没有任何操作。 EventHandler,接口有8个方法,如果我们需要自定义实现一个canel,自定义这8个方法即可:
type EventHandler interface {
OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error
// OnTableChanged is called when the table is created, altered, renamed or dropped.
// You need to clear the associated data like cache with the table.
// It will be called before OnDDL.
OnTableChanged(header *replication.EventHeader, schema string, table string) error
OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
OnRow(e *RowsEvent) error
OnXID(header *replication.EventHeader, nextPos mysql.Position) error
OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
String() string
}
看完Canel的对象初始化过程,我们接着需要设置事件处理器,替换哑事件处理器:c.SetEventHandler(&MyEventHandler{}):
func (c *Canal) SetEventHandler(h EventHandler) {
c.eventHandler = h
}
事件处理器应该如何定义呢?我们可以从哑事件处理器继承,然后实现我们关注的接口即可,比如下面的例子:
type MyEventHandler struct {
canal.DummyEventHandler
}
func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
log.Infof("%s %v\n", e.Action, e.Rows)
return nil
}
完成上述准备工作之后就来到了事件处理器的运行这一步:c.Run(),源码位于:github.com/go-mysql-org/go-mysql@v1.7.0/canal/canal.go
func (c *Canal) Run() error {
return c.run()
}
它首先记录运行开始时间,然dump需要同步的内容,最后开始同步到从存储。
func (c *Canal) run() error {
c.master.UpdateTimestamp(uint32(time.Now().Unix()))
err := c.tryDump()
if err := c.runSyncBinlog(); err != nil {
dump输出的过程位于:github.com/go-mysql-org/go-mysql@v1.7.0/canal/dump.go,它首先会解析已经同步的位置和GTID集合,然后开始同步。
func (c *Canal) tryDump() error {
pos := c.master.Position()
gset := c.master.GTIDSet()
return c.dump()
这里补充下mysql binlog同步的基础知识:MySQL有2种方式指定复制同步的方式,分别为:
- 基于binlog文件名及位点的指定方式- 匿名事务(Anonymous_gtid_log_event)
- 基于GTID(全局事务ID)的指定方式- GTID事务(Gtid_log_event)
而基于GTID的方式一方面在一主多从的架构下主从切换有着明显优势外,对于日常复制异常的故障诊断方面也更为方便,从MySQL 5.7.6之后便开始支持动态开启和关闭GTID模式,其参数GTID_MODE有以下取值
OFF - 只允许匿名事务被复制同步
OFF_PERMISSIVE - 新产生的事务都是匿名事务,但也允许有GTID事务被复制同步
ON_PERMISSIVE - 新产生的都是GTID事务,但也允许有匿名事务被复制同步
ON - 只允许GTID事务被复制同步
GTID的解析过程如下:github.com/go-mysql-org/go-mysql@v1.7.0/mysql/gtid.go
func ParseGTIDSet(flavor string, s string) (GTIDSet, error) {
switch flavor {
case MySQLFlavor:
return ParseMysqlGTIDSet(s)
case MariaDBFlavor:
return ParseMariadbGTIDSet(s)
mysql和marindb的实现方式还有些许差异:github.com/go-mysql-org/go-mysql@v1.7.0/mysql/mysql_gtid.go
func ParseMysqlGTIDSet(str string) (GTIDSet, error) {
if set, err := ParseUUIDSet(sp[i]); err != nil {
type MysqlGTIDSet struct {
Sets map[string]*UUIDSet
}
上面刚好对应着两种同步方式。接着看下,数据序列化dump的详细过程:
func (c *Canal) dump() error {
c.master.UpdateTimestamp(uint32(time.Now().Unix()))
h := &dumpParseHandler{c: c}
if c.master.GTIDSet() != nil {
gset, err := c.GetMasterGTIDSet()
if c.cfg.Dump.SkipMasterData {
pos, err := c.GetMasterPos()
if err := c.dumper.DumpAndParse(h); err != nil {
pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
c.master.Update(pos)
c.master.UpdateGTIDSet(h.gset)
if err := c.eventHandler.OnPosSynced(nil, pos, c.master.GTIDSet(), true); err != nil {
c.master.UpdateGTIDSet(h.gset)
在dump的过程中,先解析已经同步的位置或者GTID,然后开启解析同步,同步完成后,更新已经同步的位置和GTID集合,最后调用我们的eventHandler处理位置同步完成事件。binlog同步位置信息记录在
type masterInfo struct {
sync.RWMutex
pos mysql.Position
gset mysql.GTIDSet
timestamp uint32
logger loggers.Advanced
}
解析同步的过程用到了dumpParseHandler,它的定义如下:
type dumpParseHandler struct {
c *Canal
name string
pos uint64
gset mysql.GTIDSet
}
实现了接口
type ParseHandler interface {
// Parse CHANGE MASTER TO MASTER_LOG_FILE=name, MASTER_LOG_POS=pos;
BinLog(name string, pos uint64) error
GtidSet(gtidsets string) error
Data(schema string, table string, values []string) error
}
下面是它的具体实现:
func (h *dumpParseHandler) GtidSet(gtidsets string) (err error) {
err = h.gset.Update(gtidsets)
h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets)
func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
h.name = name
h.pos = pos
return nil
}
重点关注下Data函数的实现:
func (h *dumpParseHandler) Data(db string, table string, values []string) error {
tableInfo, err := h.c.GetTable(db, table)
for i, v := range values {
if v == "NULL" {
vs[i] = nil
} else if v == "_binary ''" {
vs[i] = []byte{}
} else if v[0] != '\'' {
if tableInfo.Columns[i].Type == schema.TYPE_NUMBER || tableInfo.Columns[i].Type == schema.TYPE_MEDIUM_INT {
} else if tableInfo.Columns[i].Type == schema.TYPE_FLOAT {
f, err := strconv.ParseFloat(v, 64)
} else if tableInfo.Columns[i].Type == schema.TYPE_DECIMAL {
if h.c.cfg.UseDecimal {
} else if strings.HasPrefix(v, "0x") {
buf, err := hex.DecodeString(v[2:])
events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil)
return h.c.eventHandler.OnRow(events)
它完成了binlog的解析,将它反序列化到golang的运行时。然后触发了第二个事件OnRow。
具体dump的过程位于github.com/go-mysql-org/go-mysql@v1.7.0/dump/dumper.go,解析过程位于一个独立的协程中,会跳过配置中指定的表格:
func (d *Dumper) DumpAndParse(h ParseHandler) error {
r, w := io.Pipe()
go func() {
err := Parse(r, h, !d.masterDataSkipped)
err := d.Dump(w)
解析过程在一个for循环中:
func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error {
for {
line, err := rb.ReadString('\n')
line = strings.TrimRightFunc(line, func(c rune) bool {
return c == '\r' || c == '\n'
})
if err := h.GtidSet(gtidStr); err != nil {
if err = h.BinLog(name, pos); err != nil && err != ErrSkip {
if err = h.Data(db, table, values); err != nil && err != ErrSkip {
解析完后就是dump的过程,它直接复用了mysqldump工具:
func (d *Dumper) Dump(w io.Writer) error {
_, err := w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB)))
cmd := exec.Command(d.ExecutionPath, args...)
return cmd.Run()
// Unlick mysqldump, Dumper is designed for parsing and syning data easily.
type Dumper struct {
// mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
ExecutionPath string
Addr string
User string
Password string
Protocol string
// Will override Databases
Tables []string
TableDB string
Databases []string
Where string
Charset string
IgnoreTables map[string][]string
ExtraOptions []string
ErrOut io.Writer
masterDataSkipped bool
maxAllowedPacket int
hexBlob bool
// see detectColumnStatisticsParamSupported
isColumnStatisticsParamSupported bool
}
dump完成后就是同步的过程:github.com/go-mysql-org/go-mysql@v1.7.0/canal/sync.go
func (c *Canal) runSyncBinlog() error {
s, err := c.startSyncer()
for {
ev, err := s.GetEvent(c.ctx)
c.updateReplicationDelay(ev)
if ev.Header.LogPos == 0 {
switch e := ev.Event.(type) {
case *replication.RotateEvent:
fakeRotateLogName := string(e.NextLogName)
if fakeRotateLogName != c.master.Position().Name {
pos := c.master.Position()
switch e := ev.Event.(type) {
case *replication.RotateEvent:
if err = c.eventHandler.OnRotate(ev.Header, e); err != nil {
case *replication.RowsEvent:
// we only focus row based event
err = c.handleRowsEvent(ev)
case *replication.XIDEvent:
savePos = true
// try to save the position later
if err := c.eventHandler.OnXID(ev.Header, pos);
c.master.UpdateGTIDSet(e.GSet)
case *replication.MariadbGTIDEvent:
// try to save the GTID later
gtid, err := mysql.ParseMariadbGTIDSet(e.GTID.String())
if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil {
case *replication.GTIDEvent:
u, _ := uuid.FromBytes(e.SID)
gtid, err := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d", u.String(), e.GNO))
if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil {
case *replication.QueryEvent:
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
for _, stmt := range stmts {
nodes := parseStmt(stmt)
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err = c.updateTable(ev.Header, node.db, node.table);
if len(nodes) > 0 {
savePos = true
force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
if savePos {
c.master.Update(pos)
c.master.UpdateTimestamp(ev.Header.Timestamp)
if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil {
先同步位置信息,然后开始同步具体内容,同步过程中处理完内容后开始分发不同的事件处理函数,交给我们的handler来进行处理。同步位置信息的函数定义如下:
func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
s, err := c.syncer.StartSync(pos)
s, err := c.syncer.StartSyncGTID(gset)
同步行变更信息一般是我们重点关注的,它的实现如下,解析mysql binlog协议,然后转化成,插入、更新、删除事件。最后交给我们的OnRow事件来处理。
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
switch e.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
action = InsertAction
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
action = DeleteAction
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
action = UpdateAction
events := newRowsEvent(t, action, ev.Rows, e.Header)
return c.eventHandler.OnRow(events)
总结下,mysql binlog同步的过程主要有三步:1,解析同步快照信息,即位置信息,然后解析binlog,最后同步给下游消费者。go-mysql将上述复杂过程完全包裹起来,抽象出了8个方法,我们只需要按需实现这8个方法中的某些我们需要的,就可以完成自定义的canel工具。