golang源码分析:go-mysql(3)mysql客户端

Golang
203
0
0
2024-01-19

mysql Client进行一次查询需要三个核心操作,获得连接、测试连是否可用、发送请求:

conn, _ := client.Connect("127.0.0.1:3306", "root", "", "test")
conn.Ping()
r, _ := conn.Execute(`insert into table (id, name) values (1, "abc")`)

首先我们看下获取连接的过程:github.com/go-mysql-org/go-mysql@v1.7.0/client/conn.go

func Connect(addr string, user string, password string, dbName string, options ...func(*Conn)) (*Conn, error) {
        dialer := &net.Dialer{}
        return ConnectWithDialer(ctx, "", addr, user, password, dbName, dialer.DialContext, options...)
func ConnectWithDialer(ctx context.Context, network string, addr string, user string, password string, dbName string, dialer Dialer, options ...func(*Conn)) (*Conn, error) {
  c := new(Conn)
  conn, err := dialer(ctx, network, addr)
  if err = c.handshake(); err != nil {
    

握手过程中需要进行身份认证:

  func (c *Conn) handshake() error {
        if err = c.readInitialHandshake(); err != nil {
        if err := c.writeAuthHandshake(); err != nil {
        if err := c.handleAuthResult(); err != nil {

标识连接的过程如下:

type Conn struct {
  *packet.Conn


  user      string
  password  string
  db        string
  tlsConfig *tls.Config
  proto     string


  // server capabilities
  capability uint32
  // client-set capabilities only
  ccaps uint32


  attributes map[string]string


  status uint16


  charset string


  salt           []byte
  authPluginName string


  connectionID uint32
}

认证完成后就可以获得server端分配的本次连接的连接ID:github.com/go-mysql-org/go-mysql@v1.7.0/client/auth.go

func (c *Conn) readInitialHandshake() error {
        data, err := c.ReadPacket()
        return errors.Annotate(c.handleErrorPacket(data), "read initial handshake error")
        c.connectionID = binary.LittleEndian.Uint32(data[pos : pos+4])
        pos += 4

一般通过conn.Ping()命令测试连接的可用性

func (c *Conn) Ping() error {
    if err := c.writeCommand(COM_PING); err != nil {
    if _, err := c.readOK(); err != nil {

可以看到它是一个单独的命令,mysql的命令列表如下:

const (
  COM_SLEEP byte = iota
  COM_QUIT
  COM_INIT_DB
  COM_QUERY
  COM_FIELD_LIST
  COM_CREATE_DB
  COM_DROP_DB
  COM_REFRESH
  COM_SHUTDOWN
  COM_STATISTICS
  COM_PROCESS_INFO
  COM_CONNECT
  COM_PROCESS_KILL
  COM_DEBUG
  COM_PING
  COM_TIME
  COM_DELAYED_INSERT
  COM_CHANGE_USER
  COM_BINLOG_DUMP
  COM_TABLE_DUMP
  COM_CONNECT_OUT
  COM_REGISTER_SLAVE
  COM_STMT_PREPARE
  COM_STMT_EXECUTE
  COM_STMT_SEND_LONG_DATA
  COM_STMT_CLOSE
  COM_STMT_RESET
  COM_SET_OPTION
  COM_STMT_FETCH
  COM_DAEMON
  COM_BINLOG_DUMP_GTID
  COM_RESET_CONNECTION
)

执行请求的场景可以分为两种:不带参数的请求和带参数的请求:github.com/go-mysql-org/go-mysql@v1.7.0/client/conn.go,不带参数的请求直接执行,带参数的需要先预编译,然后执行。

func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) {
        if len(args) == 0 {
          return c.exec(command)
        if s, err := c.Prepare(command); err != nil {
        r, err = s.Execute(args...)

执行的过程就是向server发送COM_QUERY命令

func (c *Conn) exec(query string) (*Result, error) {
        if err := c.writeCommandStr(COM_QUERY, query); err != nil {
        return c.readResult(false)

github.com/go-mysql-org/go-mysql@v1.7.0/client/req.go

func (c *Conn) writeCommandStr(command byte, arg string) error {
  return c.writeCommandBuf(command, utils.StringToByteSlice(arg))
}
func (c *Conn) writeCommandBuf(command byte, arg []byte) error {
        err := c.WritePacket(data.B)

github.com/go-mysql-org/go-mysql@v1.7.0/packet/conn.go

func (c *Conn) WritePacket(data []byte) error {

预编译的请求发送在github.com/go-mysql-org/go-mysql@v1.7.0/client/stmt.go,对应命令:COM_STMT_PREPARE

func (c *Conn) Prepare(query string) (*Stmt, error) {
        if err := c.writeCommandStr(COM_STMT_PREPARE, query); err != nil {
        data, err := c.ReadPacket()
        s := new(Stmt)
func (c *Conn) writeCommandStr(command byte, arg string) error {
    return c.writeCommandBuf(command, utils.StringToByteSlice(arg))
func (s *Stmt) Execute(args ...interface{}) (*Result, error) {
        if err := s.write(args...); err != nil {
        return s.conn.readResult(true)

发送命令的过程如下,它先填充字段类型信息,然后填充字段的值,然后进行发送:

func (s *Stmt) write(args ...interface{}) error {
for i := range args {
  switch v := args[i].(type) {
    case int8:
      paramTypes[i<<1] = MYSQL_TYPE_TINY
      paramValues[i] = []byte{byte(v)}
s.conn.ResetSequence()
return s.conn.WritePacket(data)
}

每次发生之前都会重置序列号。

func (c *Conn) ResetSequence() {
  c.Sequence = 0
}

mysql的类型信息定义如下:

const (
  MYSQL_TYPE_DECIMAL byte = iota
  MYSQL_TYPE_TINY
  MYSQL_TYPE_SHORT
  MYSQL_TYPE_LONG
  MYSQL_TYPE_FLOAT
  MYSQL_TYPE_DOUBLE
  MYSQL_TYPE_NULL
  MYSQL_TYPE_TIMESTAMP
  MYSQL_TYPE_LONGLONG
  MYSQL_TYPE_INT24
  MYSQL_TYPE_DATE
  MYSQL_TYPE_TIME
  MYSQL_TYPE_DATETIME
  MYSQL_TYPE_YEAR
  MYSQL_TYPE_NEWDATE
  MYSQL_TYPE_VARCHAR
  MYSQL_TYPE_BIT

  //mysql 5.6
  MYSQL_TYPE_TIMESTAMP2
  MYSQL_TYPE_DATETIME2
  MYSQL_TYPE_TIME2
)

const (
  MYSQL_TYPE_JSON byte = iota + 0xf5
  MYSQL_TYPE_NEWDECIMAL
  MYSQL_TYPE_ENUM
  MYSQL_TYPE_SET
  MYSQL_TYPE_TINY_BLOB
  MYSQL_TYPE_MEDIUM_BLOB
  MYSQL_TYPE_LONG_BLOB
  MYSQL_TYPE_BLOB
  MYSQL_TYPE_VAR_STRING
  MYSQL_TYPE_STRING
  MYSQL_TYPE_GEOMETRY
)

总结一下,mysql client的实现起来很简单,建立连接,按照mysql协议编码,发送请求到服务端。