golang mysql的连接池的具体使用

Golang
341
0
0
2023-03-29
目录
  • 1.mysql-通过sql建立连接池
  • 2.mysql-gorm 建立连接池
  • 3.连接池相较于单个client
  • 4.通用连接池
  • 参考

1.mysql-通过sql建立连接池

连接池用sql.Open函数创建连接池,可是此时只是初始化了连接池,并没有创建任何连接。连接创建都是惰性的,只有当你真正使用到连接的时候,连接池才会创建连接。连接池很重要,它直接影响着你的程序行为。

连接池的工作原来却相当简单。当你的函数(例如Exec,Query)调用需要访问底层数据库的时候,函数首先会向连接池请求一个连接。如果连接池有空闲的连接,则返回给函数。否则连接池将会创建一个新的连接给函数。一旦连接给了函数,连接则归属于函数。函数执行完毕后,要不把连接所属权归还给连接池,要么传递给下一个需要连接的(Rows)对象,最后使用完连接的对象也会把连接释放回到连接池。

请求一个连接的函数有好几种,执行完毕处理连接的方式稍有差别,大致如下:

  • db.Ping() 调用完毕后会马上把连接返回给连接池。
  • db.Exec() 调用完毕后会马上把连接返回给连接池,但是它返回的Result对象还保留这连接的引用,当后面的代码需要处理结果集的时候连接将会被重用。
  • db.Query() 调用完毕后会将连接传递给sql.Rows类型,当然后者迭代完毕或者显示的调用.Clonse()方法后,连接将会被释放回到连接池。
  • db.QueryRow()调用完毕后会将连接传递给sql.Row类型,当.Scan()方法调用之后把连接释放回到连接池。
  • db.Begin() 调用完毕后将连接传递给sql.Tx类型对象,当.Commit()或.Rollback()方法调用后释放连接。

因为每一个连接都是惰性创建的,如何验证sql.Open调用之后,sql.DB对象可用呢?通常使用db.Ping()方法初始化,调用了Ping之后,连接池一定会初始化一个数据库连接。

连接失败关于连接池另外一个知识点就是你不必检查或者尝试处理连接失败的情况。当你进行数据库操作的时候,如果连接失败了,database/sql会帮你处理。实际上,当从连接池取出的连接断开的时候,database/sql会自动尝试重连10次。仍然无法重连的情况下会自动从连接池再获取一个或者新建另外一个。

连接池配置配置连接池有两个的方法:

  • db.SetMaxOpenConns(n int) 设置打开数据库的最大连接数。包含正在使用的连接和连接池的连接。如果你的函数调用需要申请一个连接,并且连接池已经没有了连接或者连接数达到了最大连接数。此时的函数调用将会被block,直到有可用的连接才会返回。设置这个值可以避免并发太高导致连接mysql出现too many connections的错误。该函数的默认设置是0,表示无限制。
  • db.SetMaxIdleConns(n int) 设置连接池中的保持连接的最大连接数。默认也是0,表示连接池不会保持释放会连接池中的连接的连接状态:即当连接释放回到连接池的时候,连接将会被关闭。这会导致连接再连接池中频繁的关闭和创建。

对于连接池的使用依赖于你是如何配置连接池,如果使用不当会导致下面问题:

  • 大量的连接空闲,导致额外的工作和延迟。
  • 连接数据库的连接过多导致错误。
  • 连接阻塞。
  • 连接池有超过十个或者更多的死连接,限制就是10次重连。

数据库标准接口里面有3个方法用于设置连接池的属性: SetConnMaxLifetime, SetMaxIdleConns, SetMaxOpenConns

  • SetConnMaxLifetime: 设置一个连接的最长生命周期,因为数据库本身对连接有一个超时时间的设置,如果超时时间到了数据库会单方面断掉连接,此时再用连接池内的连接进行访问就会出错, 因此这个值往往要小于数据库本身的连接超时时间
  • SetMaxIdleConns: 连接池里面允许Idel的最大连接数, 这些Idel的连接 就是并发时可以同时获取的连接,也是用完后放回池里面的互用的连接, 从而提升性能。
  • SetMaxOpenConns: 设置最大打开的连接数,默认值为0表示不限制。控制应用于数据库建立连接的数量,避免过多连接压垮数据库。

项目结构

.
+--- go.mod
+--- go.sum
+--- main.go
+--- pool
|   +--- sql-pool.go

代码pool

package pool

import (
	"database/sql"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"log"
	"time"
)

var DB *sql.DB

func init()  {
	DB, _ = sql.Open("mysql", "root:root@tcp(localhost:3306)/test?charset=utf8&parseTime=True&loc=Local") // 使用本地时间,即东八区,北京时间
	 // set pool params
	DB.SetMaxOpenConns(2000)
	DB.SetMaxIdleConns(1000)
	DB.SetConnMaxLifetime(time.Minute * 60) // mysql default conn timeout=8h, should < mysql_timeout
	 err := DB.Ping()
	 if err != nil {
	 	log.Fatalf("database init failed, err: ", err)
	 }
	log.Println("mysql conn pool has initiated.")
}

func checkErr(err error)  {
	if err != nil {
		log.Println(err)
		panic(err)
	}
}

func createTable() {
	db := DB
	table := `CREATE TABLE IF NOT EXISTS test.user (
	 user_id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '用户编号',
	 user_name VARCHAR(45) NOT NULL COMMENT '用户名称',
	 user_age TINYINT(3) UNSIGNED NOT NULL DEFAULT 0 COMMENT '用户年龄',
	 user_sex TINYINT(3) UNSIGNED NOT NULL DEFAULT 0 COMMENT '用户性别',
	 PRIMARY KEY (user_id))
	 ENGINE = InnoDB
	 AUTO_INCREMENT = 1
	 DEFAULT CHARACTER SET = utf8
	 COLLATE = utf8_general_ci
	 COMMENT = '用户表'`
	if _, err := db.Exec(table); err != nil {
		checkErr(err)
	}
}

func insert() {
	db := DB
	stmt, err := db.Prepare(`INSERT user (user, age) values (?, ?)`)
	checkErr(err)
	res, err := stmt.Exec("Elvis", 26)
	checkErr(err)
	id, err := res.LastInsertId()
	checkErr(err)
	log.Println(id)
}

func query() {
	db := DB
	rows, err := db.Query("SELECT * FROM user")
	checkErr(err)
	for rows.Next() {
		var userId int
		var userName string
		var userAge int
		var userSex int
		rows.Columns()
		err = rows.Scan(&userId, &userName, &userAge, &userSex)
		checkErr(err)
		fmt.Println(userId)
		fmt.Println(userName)
		fmt.Println(userAge)
		fmt.Println(userSex)
	}
}

func queryToMap() {
	db := DB
	rows, err := db.Query("SELECT * FROM user")
	checkErr(err)
	//字典类型
	//构造scanArgs、values两个数组,scanArgs的每个值指向values相应值的地址
	columns, _ := rows.Columns()
	scanArgs := make([]interface{}, len(columns))
	values := make([]interface{}, len(columns))
	for i := range values {
		scanArgs[i] = &values[i]
	}
	for rows.Next() {
		//将行数据保存到record字典
		err = rows.Scan(scanArgs...)
		record := make(map[string]string)
		for i, col := range values {
			if col != nil {
				record[columns[i]] = string(col.([]byte))
			}
		}
		fmt.Println(record)
	}
}

func update() {
	db := DB
	stmt, err := db.Prepare(`UPDATE user SET user_age=?,user_sex=? WHERE user_id=?`)
	checkErr(err)
	res, err := stmt.Exec(21, 2, 1)
	checkErr(err)
	num, err := res.RowsAffected()
	checkErr(err)
	fmt.Println(num)
}

func remove() {
	db := DB
	stmt, err := db.Prepare(`DELETE FROM user WHERE user_id=?`)
	checkErr(err)
	res, err := stmt.Exec(1)
	checkErr(err)
	num, err := res.RowsAffected()
	checkErr(err)
	fmt.Println(num)
}

main

package main

import (
	"fmt"
	"log"
	"net/http"

	. "go-mysql-pool-v1/pool"
)

func main()  {
	http.HandleFunc("/pool", pool)
	log.Println("server is up now...")
	http.ListenAndServe(":8080", nil)
}

func pool(w http.ResponseWriter, r *http.Request)  {
	rows, err := DB.Query(`select * from user limit 1`)
	defer rows.Close()
	checkErr(err)

	columns, _ := rows.Columns()
	scanArgs := make([]interface{}, len(columns))
	values := make([]interface{}, len(columns))
	for j := range values {
		scanArgs[j] = &values[j]
	}

	record := make(map[string]string)
	for rows.Next() {
		err = rows.Scan(scanArgs...)
		for i, col := range values {
			if col != nil {
				record[columns[i]] = string(col.([]byte))
			}
		}
	}
	log.Println(record)
	fmt.Fprintf(w, "finish")
}

func checkErr(err error)  {
	if err != nil {
		log.Println(err)
		panic(err)
	}
}

可以通过工具ab测试连接池性能,用法见最下方注。

2.mysql-gorm 建立连接池

其实gorm的连接池设置,底层还是用的database/sql的设置连接池的方法,无非就是加一层gorm自身的一些设置。

以下示例为gorm v2版本,v1版本通过github.com/jinzhu/gorm,如mysql的驱动导入需要加_

代码结构

.
+--- config
|   +--- config.go
|   +--- config.json
+--- go.mod
+--- go.sum
+--- main.go
+--- pool
|   +--- gorm-pool.go
+--- test.db

config.json

{
  "database": {
    "name": "test",
    "type": "mysql",
    "host": "localhost",
    "port": "3306",
    "user": "root",
    "password": "root",
    "table_prefix": ""
  }
}

config.go

package config

import (
	"encoding/json"
	"os"
)

type Database struct {
	Type        string `json:"type"`
	Host        string `json:"host"`
	Port        string `json:"port"`
	User        string `json:"user"`
	Password    string `json:"password"`
	Name        string `json:"name"`
	TablePrefix string `json:"table_prefix"`
}

var DatabaseSetting = &Database{}


type Config struct {
	Database *Database `json:"database"`
}

var GlobalConfigSetting = &Config{}

func init()  {
	// win path is abs-path, linux -> config.json
	filePtr, err := os.Open("D:\\demo1\\src\\demo\\demo06\\go-mysql-pool-v2\\config\\config.json")
	if err != nil {
		return
	}
	defer filePtr.Close()

	// json decode
	decoder := json.NewDecoder(filePtr)
	err = decoder.Decode(GlobalConfigSetting)
	DatabaseSetting = GlobalConfigSetting.Database
}

gorm-pool.go注意设置表前缀和单复数

package pool

import (
	"fmt"
	"go-mysql-pool-v2/config"
	"gorm.io/driver/mysql"
	"gorm.io/driver/postgres"
	"gorm.io/driver/sqlite"
	"gorm.io/gorm"
	"gorm.io/gorm/schema"
	"log"
	"time"
)

var db *gorm.DB

// gorm v2
func init()  {
	var dbURi string
	var dialector gorm.Dialector
	if config.DatabaseSetting.Type == "mysql" {
		dbURi = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&parseTime=true",
			config.DatabaseSetting.User,
			config.DatabaseSetting.Password,
			config.DatabaseSetting.Host,
			config.DatabaseSetting.Port,
			config.DatabaseSetting.Name)
		dialector = mysql.New(mysql.Config{
			DSN:                       dbURi, // data source name
			DefaultStringSize:         256,   // default size for string fields
			DisableDatetimePrecision:  true,  // disable datetime precision, which not supported before MySQL 5.6
			DontSupportRenameIndex:    true,  // drop & create when rename index, rename index not supported before MySQL 5.7, MariaDB
			DontSupportRenameColumn:   true,  // `change` when rename column, rename column not supported before MySQL 8, MariaDB
			SkipInitializeWithVersion: false, // auto configure based on currently MySQL version
		})
	} else if config.DatabaseSetting.Type == "postgres" {
		dbURi = fmt.Sprintf("host=%s port=%s user=%s dbname=%s sslmode=disable password=%s",
			config.DatabaseSetting.Host,
			config.DatabaseSetting.Port,
			config.DatabaseSetting.User,
			config.DatabaseSetting.Name,
			config.DatabaseSetting.Password)
		dialector = postgres.New(postgres.Config{
			DSN:                  "user=gorm password=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai",
			PreferSimpleProtocol: true, // disables implicit prepared statement usage
		})
	} else { // sqlite3
		dbURi = fmt.Sprintf("test.db")
		dialector = sqlite.Open("test.db")
	}

	conn, err := gorm.Open(dialector, &gorm.Config{ // 如果不用设置表前缀及复数,nil
		NamingStrategy: schema.NamingStrategy{
			TablePrefix: config.DatabaseSetting.TablePrefix, // set table prefix
			SingularTable: true, // set table singular
		},
	})
	if err != nil {
		log.Println(err.Error())
	}
	sqlDB, err := conn.DB()
	if err != nil {
		log.Println("connect db server failed.")
	}
	sqlDB.SetMaxOpenConns(100)
	sqlDB.SetMaxIdleConns(10)
	sqlDB.SetConnMaxLifetime(600*time.Second)

	db = conn
}

// open api
func GetDB() *gorm.DB {
	sqlDB, err := db.DB()
	if err != nil {
		log.Println("connect db server failed.")
	}
	if err = sqlDB.Ping(); err != nil {
		sqlDB.Close()
	}

	return db
}

main

package main

import (
	"go-mysql-pool-v2/pool"
	"gorm.io/gorm"
	"log"
)

type Product struct {
	gorm.Model // default add id stats time, id as primary key
	Code string
	Price uint
}

func main()  {
	log.Println("gorm init...")
	SetupModel()
}

func SetupModel()  {
	db := pool.GetDB()
	// auto migrate
	db.AutoMigrate(&Product{})
	// create record
	db.Create(&Product{Code: "L1212", Price: 1000})
}

go.mod

module go-mysql-pool-v2

go 1.16

replace go-mysql-pool-v2 => ../go-mysql-pool-v2

require (
	gorm.io/driver/mysql v1.3.4
	gorm.io/driver/postgres v1.3.4
	gorm.io/driver/sqlite v1.3.4
	gorm.io/gorm v1.23.5
)

3.连接池相较于单个client

4.通用连接池

ubuntu安装ab工具,apt-get install apache2-utils
// get 一般请求
ab -n 1000 -c 1000 http://xxx

// post json请求
ab -n 100000 -c 400 -p tempPara.txt -T application/json http://xxx

tempPara.txt内容:
{"driverId": 17,"pageNo": 1,"pageSize": 20,"status": 1}

参考