https://github.com/go-mysql-org/go-mysql是一个实现了mysql协议和binlog协议的工具库,可以用来实现主从复制(Replication),增量同步(Incremental dumping),客户端(Client),虚拟服务端(Fake server),高可用(Failover),mysql的驱动(database/sql like driver),前面介绍的golang源码分析:mysql同步工具gravity(2)就是基于这个库实现的。下面我们开始体验下如何使用这个库。
1,Replication
package main
import (
"context"
"os"
"time"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
)
func main() {
// Create a binlog syncer with a unique server id, the server id must be different from other MySQL's.
// flavor is mysql or mariadb
cfg := replication.BinlogSyncerConfig{
ServerID: 100,
Flavor: "mysql",
Host: "127.0.0.1",
Port: 3306,
User: "root",
Password: "",
}
syncer := replication.NewBinlogSyncer(cfg)
// Start sync with specified binlog file and position
binlogFile := "/usr/local/var/mysql/mysql-bin.000020"
binlogPos := uint32(1)
streamer, _ := syncer.StartSync(mysql.Position{binlogFile, binlogPos})
// or you can start a gtid replication like
// streamer, _ := syncer.StartSyncGTID(gtidSet)
// the mysql GTID set likes this "de278ad0-2106-11e4-9f8e-6edd0ca20947:1-2"
// the mariadb GTID set likes this "0-1-100"
for {
ev, _ := streamer.GetEvent(context.Background())
// Dump event
ev.Dump(os.Stdout)
}
// or we can use a timeout context
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ev, err := streamer.GetEvent(ctx)
cancel()
if err == context.DeadlineExceeded {
// meet timeout
continue
}
ev.Dump(os.Stdout)
}
}
他的原理就是通过解析mysql的bin log来伪装成一个从库,实现主从复制。
2,Incremental dumping
package main
import (
"github.com/go-mysql-org/go-mysql/canal"
"github.com/siddontang/go-log/log"
)
type MyEventHandler struct {
canal.DummyEventHandler
}
func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
log.Infof("%s %v\n", e.Action, e.Rows)
return nil
}
func (h *MyEventHandler) String() string {
return "MyEventHandler"
}
func main() {
cfg := canal.NewDefaultConfig()
cfg.Addr = "127.0.0.1:3306"
cfg.User = "root"
// We only care table canal_test in test db
cfg.Dump.TableDB = "test"
cfg.Dump.Tables = []string{"canal_test"}
c, err := canal.NewCanal(cfg)
if err != nil {
log.Fatal(err)
}
// Register a handler to handle RowsEvent
c.SetEventHandler(&MyEventHandler{})
// Start canal
c.Run()
}
// create table canal_test (id int );
// Query OK, 0 rows affected (0.01 sec)
// insert into canal_test (id) values (12);
// Query OK, 1 row affected (0.00 sec)
// [2023/04/04 00:36:46] [info] binlogsyncer.go:813 rotate to (mysql-bin.000007, 4)
// [2023/04/04 00:36:46] [info] sync.go:63 received fake rotate event, next log name is mysql-bin.000007
// [2023/04/04 00:37:20] [info] main.go:13 insert [[12]]
它的原理和主从复制一样,只不过它是增量的复制。
3,Client
package main
import (
"fmt"
"github.com/go-mysql-org/go-mysql/client"
"github.com/go-mysql-org/go-mysql/mysql"
)
func main() {
// Connect MySQL at 127.0.0.1:3306, with user root, an empty password and database test
conn, _ := client.Connect("127.0.0.1:3306", "root", "", "test")
// Or to use SSL/TLS connection if MySQL server supports TLS
//conn, _ := client.Connect("127.0.0.1:3306", "root", "", "test", func(c *Conn) {c.UseSSL(true)})
// Or to set your own client-side certificates for identity verification for security
//tlsConfig := NewClientTLSConfig(caPem, certPem, keyPem, false, "your-server-name")
//conn, _ := client.Connect("127.0.0.1:3306", "root", "", "test", func(c *Conn) {c.SetTLSConfig(tlsConfig)})
conn.Ping()
// Insert
r, _ := conn.Execute(`insert into table (id, name) values (1, "abc")`)
// Get last insert id
println(r.InsertId)
// Or affected rows count
println(r.AffectedRows)
// Select
var err error
r, err = conn.Execute(`select id, name from table where id = 1`)
fmt.Println(err)
// Close result for reuse memory (it's not necessary but very useful)
defer r.Close()
// Handle resultset
v, _ := r.GetInt(0, 0)
fmt.Println(v)
v, _ = r.GetIntByName(0, "id")
// Direct access to fields
for _, row := range r.Values {
for _, val := range row {
_ = val.Value() // interface{}
// or
if val.Type == mysql.FieldValueTypeFloat {
_ = val.AsFloat64() // float64
}
}
}
// ...
var result mysql.Result
err = conn.ExecuteSelectStreaming(`select id, name from table LIMIT 100500`, &result, func(row []mysql.FieldValue) error {
for idx, val := range row {
field := result.Fields[idx]
// You must not save FieldValue.AsString() value after this callback is done.
// Copy it if you need.
// ...
fmt.Println(val, field)
}
return nil
}, nil)
// ...
}
既然实现了mysql协议,作为mysql客户端自然不在话下。
4,Fake server
package main
import (
"log"
"net"
"github.com/go-mysql-org/go-mysql/server"
)
func main() {
// Listen for connections on localhost port 4000
l, err := net.Listen("tcp", "127.0.0.1:4000")
if err != nil {
log.Fatal(err)
}
// Accept a new connection once
c, err := l.Accept()
if err != nil {
log.Fatal(err)
}
// Create a connection with user root and an empty password.
// You can use your own handler to handle command here.
conn, err := server.NewConn(c, "root", "", server.EmptyHandler{})
if err != nil {
log.Fatal(err)
}
// as long as the client keeps sending commands, keep handling them
for {
if err := conn.HandleCommand(); err != nil {
log.Fatal(err)
}
}
}
它接受tcp请求,解析mysql协议,相应客户端请求,不过每个sql语句内部如何执行,需要自己定义。
4,Failover
package main
import (
"context"
"github.com/go-mysql-org/go-mysql/client"
log "github.com/sirupsen/logrus"
)
func main() {
pool := client.NewPool(log.Debugf, 100, 400, 5, "127.0.0.1:3306", `root`, ``, `test`)
// ...
ctx := context.Background()
conn, _ := pool.GetConn(ctx)
defer pool.PutConn(conn)
conn.Execute("show databases;")
conn.Begin()
// etc...
}
通过选主的方式将从提升为主。
5,database/sql like driver
package main
import (
"database/sql"
_ "github.com/go-mysql-org/go-mysql/driver"
)
func main() {
// dsn format: "user:password@addr?dbname"
dsn := "root@127.0.0.1:3306?test"
db, _ := sql.Open("mysql", dsn)
db.Close()
}
完全兼容golang官方的mysql包提供的接口,可以作为mysql的驱动包使用。