mysql 连接池的实现

C/C++
6
0
0
2024-11-21

连接池

涉及后端的数据交互管理的时候,我们在应用层总是希望将一些过程进行封装进行规模化管理,池化技术基本就是来干这种事情的,线程池,内存池,连接池,请求池等等都是来干这种事情的,当然如果从算法层面来说,这种就是用空间来换时间的做法。我们的很多著名的算法也是基于这样的方式来优化的,著名的 KMP 算法,通过维护一个 next 数组,来降低算法的时间复杂度。

请求池说白了,就是应用程序跟一些后台组件连接过程,如果每次连接都要经历连接的创建,使用然后销毁太消耗性能了,那我来创建后不销毁不就行了,那就需要管理这些连接,每次需要连接的时候,我从这里边取就行了,用完我放回来。

代码实现

本次实现用 c++ ,所以首先不是定义结构体了,而是先定义类,我们可以先来想一想到底我们需要什么养的类,首先就是连接池就是肯定要封装一个连接池的类,这个类是为了管理各种连接(这里是 mysql )。有了连接池,然后我们的连接也要封装成一个类,这是为了方便管理。

然后回到我们mysql 查询的一个过程,首先就是 MYSQL 的准备阶段,准备好各种数据进行连接,然后建立和数据库连接之后我们需要进行各种数据库的增删改查操作,得到 sql 查询结果然后保存或者返回到前端。为了完成整个连接,我们需要对这个过程进行封装。

准备阶段封装

首先就是一个准备阶段的类

#include <mysql.h> //mysql c api
class CPrepareStatement{
public:
	CPrepareStatement();
	virtual ~CPrepareStatement();

	bool Init(MYSQL* mysql, string& sql);  //初始化 测试连接,准备sql,然后设置参数 sql 查询可以直到返回数量
	
    void SetParam(uint32_t index, int& value);  //设置各种参数 各种类型
	void SetParam(uint32_t index, uint32_t& value);
    void SetParam(uint32_t index, string& value);
    void SetParam(uint32_t index, const string& value);
    bool ExecuteUpdate();   //执行更新
	uint32_t GetInsertId();  //获取 ID
private:
	MYSQL_STMT*	m_stmt;
	MYSQL_BIND*	m_param_bind;
	uint32_t	m_param_cnt;
};

这个类的实现如下:

/// 准备状态,设置 sql 语句进行查询阶段
CPrepareStatement :: CPrepareStatement()
{
    m_stmt = NULL;  //m_stmt 是mysql 的句柄 
	m_param_bind = NULL;  //参数绑定数组,sql 查询参数数组
	m_param_cnt = 0;  //参数数量
}

CPrepareStatement::~CPrepareStatement()
{
   if(m_stmt)
   {
   //关闭 mysql 句柄
   	 mysql_stmt_close(m_stmt);
	 m_stmt = NULL;
   }

   if(m_param_bind)
   {
   //绑定参数删除
      delete[] m_param_bind;
	  m_param_bind = NULL;
   	}
}

bool CPrepareStatement::Init(MYSQL * mysql, string & sql)
{
    //测试连接是否活跃
    mysql_ping(mysql);
    //初始化一个连接句柄
	m_stmt = mysql_stmt_init(mysql);
	if(!m_stmt)
	{
		log_error("mysql_stmt_init failed\n");
		return false;
	}

	//准备sql 语句
	if(mysql_stmt_prepare(m_stmt, sql.c_str(), sql.size()))
	{
	   log_error("mysql_stmt_prepare failed: %s\n", mysql_stmt_error(m_stmt));
	   return false;
	}

    //获取参数的数量
	m_param_cnt = mysql_stmt_param_count(m_stmt);
	if(m_param_cnt > 0)
	{
	    //数组用于绑定参数
	    m_param_bind = new MYSQL_BIND[m_param_cnt];
		if(!m_param_bind)
		{
		   log_error("new failed\n");
		   return false;
		}
        //分配内存初始化为 0
		memset(m_param_bind, 0, sizeof(MYSQL_BIND) * m_param_cnt);
	}

	return true;
}

//通过数组的 index 索引设置参数值
void CPrepareStatement::SetParam(uint32_t index, int & value)
{
   if(index >= m_param_cnt)
   	{
   	   log_error("index too large: %d\n", index);
	   return ;
   	}

   m_param_bind[index].buffer_type = MYSQL_TYPE_LONG;
   m_param_bind[index].buffer = &value;
}

void CPrepareStatement::SetParam(uint32_t index, uint32_t & value)
{
   if(index >= m_param_cnt)
   	{
   	   log_error("index too large: %d\n", index);
	   return ;
   	}

   m_param_bind[index].buffer_type = MYSQL_TYPE_LONG;
   m_param_bind[index].buffer = &value;
}

void CPrepareStatement::SetParam(uint32_t index, string & value)
{
   if(index >= m_param_cnt)
   	{
   	  log_error("index too large: %d\n", index);
	  return ;
   	}

   m_param_bind[index].buffer_type = MYSQL_TYPE_STRING;
   m_param_bind[index].buffer = (char*)value.c_str();
   m_param_bind[index].buffer_length = value.size();
}

void CPrepareStatement::SetParam(uint32_t index, const string & value)
{
   if(index >= m_param_cnt)
   	{
   	  log_error("index too large: %d\n", index);
	  return ;
   	}

   m_param_bind[index].buffer_type = MYSQL_TYPE_STRING;
   m_param_bind[index].buffer = (char*)value.c_str();
   m_param_bind[index].buffer_length = value.size();
}

bool CPrepareStatement::ExecuteUpdate()
{
    if(!m_stmt)
    {
       log_error("no m_stmt\n");
	   return false;
    }
	//绑定各种参数
	
    if (mysql_stmt_bind_param(m_stmt, m_param_bind))
	{
		log_error("mysql_stmt_bind_param failed: %s\n", mysql_stmt_error(m_stmt));
		return false;
	}

    //执行语句
	if (mysql_stmt_execute(m_stmt))
	{
		log_error("mysql_stmt_execute failed: %s\n", mysql_stmt_error(m_stmt));
		return false;
	}

    //返回执行结果
	if (mysql_stmt_affected_rows(m_stmt) == 0)
	{
		log_error("ExecuteUpdate have no effect\n");
		return false;
	}

	return true;
}

//获取插入 id
uint32_t CPrepareStatement::GetInsertId()
{
	return mysql_stmt_insert_id(m_stmt);
}
////

CDBConn::CDBConn(CDBPool *pPool)
{
	m_pDBPool = pPool;
	m_mysql = NULL;
}

CDBConn::~CDBConn()
{
	if (m_mysql)
	{
		mysql_close(m_mysql);
	}
}

sql 执行结果封装

因为 mysql 连接池和连接紧密关联,我们先把返回结果进行封装。

class CResultSet{
public:
	CResultSet(MYSQL_RES* res);
	virtual ~CResultSet();

	bool next();
	int GetInt(const char* key);
	char* GetString(const char* key);
private:
	int _GetIndex(const char* key);  //通过key 获取 index
	MYSQL_RES* m_res;
	MYSQL_ROW* m_row;
	map<string, int> m_key_map;
	
};

实现如下:

//对 sql 查询结果的封装
CResultSet :: CResultSet(MYSQL_RES * res)
{
    m_res = res;    //执行sql 返回结果
 
	int num_fields = mysql_num_fields(m_res);  //参数数量
	MYSQL_FIELD *fields = mysql_fetch_fields(m_res);  //字段
	for(int i = 0; i < num_fields; i++){
		m_key_map.insert(make_pair(fields[i].name, i));  以字段名,索引存储结果
	}
}

CResultSet::~CResultSet()
{
   if(m_res)
   {
       mysql_free_result(m_res);
	   m_res = NULL;
   	}
}

bool CResultSet::Next()
{
   m_row = mysql_fetch_row(m_res);  //获取下一行数据,每次用于获取下一行数据
   if(m_row)
   	{
   	    return true;
   	}
   else
   	{
   	   return false;
   	}
}

int CResultSet::_GetIndex(const char * key)
{
   map<string, int>::iterator it = m_key_map.find(key);  //通过字段查找index 索引
   if(it == m_key_map.end())
   	{
   	   return -1;
   	}
   else
   	{
   	   return it->second;
   	}
}

int CResultSet::GetInt(const char * key)
{
   int idx = _GetIndex(key);
   if(idx == -1)
   	{
   	   return 0;
   	}
   else
   	{
   	   return atoi(m_row[idx]);
   	}
}

char* CResultSet::GetString(const char * key)
{
    int idx = _GetIndex(key);
	if(idx == -1)
	{
	    return NULL;
	}
	else
	{
		return m_row[idx];
	}
}

可以看到,上述类就是对返回结果封装,首先要明确 sql 查询返回结果是多个字段,每个字段多行,因此将字段名和数据下标对应,那么我们可以通过字段名查询下标,然后获取要查询每一行字段具体数据。

连接的封装

连接的封装如下:

class CBPool;

class CDBConn{
public:
	CBConn(CDBPool* pDBPool);  //创建连接从连接池中获取
	virtual ~CDBConn();
	int Init();  //初始化连接

   //增删查改的函数
	bool ExecuteCreate(cosnt char *sql_query);  
	bool ExecuteDrop(const char* sql_query);
	CResultSet* ExecuteQuery(cosnt char* sql_query);
	bool ExecuteUpdate(const char* sql_query, bool care_affected_rows=true);
	uint32_t GetInsertId();

    //事务的封装函数
	bool StartTransaction();
	bool Commit();
	bool Rollback();
	MYSQL* GetMysql(){
		return m_mysql;
	}
private:
    CDBPool* m_pDBPoll;
	MYSQL*  m_mysql;
	char m_escape_string[MAX_ESCAPE_STRING_LEN + 1];
};

这个连接是我们实际使用连接池要用的类,但是因为连接不是自己创建而是从连接池获取,因此跟一般封装其实很类似,就是在初始化和连接结束需要跟连接池交互。

下边是具体实现:

CDBConn::CDBConn(CDBPool *pPool)
{
	m_pDBPool = pPool;
	m_mysql = NULL;
}

CDBConn::~CDBConn()
{
	if (m_mysql)
	{
		mysql_close(m_mysql);
	}
}

int CDBConn::Init()
{
	m_mysql = mysql_init(NULL);	// mysql_标准的mysql c client对应的api
	if (!m_mysql)
	{
		log_error("mysql_init failed\n");
		return 1;
	}

	my_bool reconnect = true;
	mysql_options(m_mysql, MYSQL_OPT_RECONNECT, &reconnect);	// 配合mysql_ping实现自动重连
	mysql_options(m_mysql, MYSQL_SET_CHARSET_NAME, "utf8mb4");	// utf8mb4和utf8区别

	// ip 端口 用户名 密码 数据库名
	if (!mysql_real_connect(m_mysql, m_pDBPool->GetDBServerIP(), m_pDBPool->GetUsername(), m_pDBPool->GetPasswrod(),
							m_pDBPool->GetDBName(), m_pDBPool->GetDBServerPort(), NULL, 0))
	{
		log_error("mysql_real_connect failed: %s\n", mysql_error(m_mysql));
		return 2;
	}

	return 0;
}

const char *CDBConn::GetPoolName()
{
	return m_pDBPool->GetPoolName();
}

//增删改查具体实现
bool CDBConn::ExecuteCreate(const char *sql_query)
{
	mysql_ping(m_mysql);
	// mysql_real_query 实际就是执行了SQL
	if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
	{
		log_error("mysql_real_query failed: %s, sql: start transaction\n", mysql_error(m_mysql));
		return false;
	}

	return true;
}
bool CDBConn::ExecuteDrop(const char *sql_query)
{
	mysql_ping(m_mysql);	// 如果端开了,能够自动重连

	if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
	{
		log_error("mysql_real_query failed: %s, sql: start transaction\n", mysql_error(m_mysql));
		return false;
	}

	return true;
}

CResultSet *CDBConn::ExecuteQuery(const char *sql_query)
{
	mysql_ping(m_mysql);

	if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
	{
		log_error("mysql_real_query failed: %s, sql: %s\n", mysql_error(m_mysql), sql_query);
		return NULL;
	}
	// 返回结果
	MYSQL_RES *res = mysql_store_result(m_mysql);	// 返回结果
	if (!res)
	{
		log_error("mysql_store_result failed: %s\n", mysql_error(m_mysql));
		return NULL;
	}

	CResultSet *result_set = new CResultSet(res);	// 存储到CResultSet
	return result_set;
}

/*
1.执行成功,则返回受影响的行的数目,如果最近一次查询失败的话,函数返回 -1

2.对于delete,将返回实际删除的行数.

3.对于update,如果更新的列值原值和新值一样,如update tables set col1=10 where id=1;
id=1该条记录原值就是10的话,则返回0。

mysql_affected_rows返回的是实际更新的行数,而不是匹配到的行数。
*/
bool CDBConn::ExecuteUpdate(const char *sql_query, bool care_affected_rows)
{
	mysql_ping(m_mysql);

	if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
	{
		log_error("mysql_real_query failed: %s, sql: %s\n", mysql_error(m_mysql), sql_query);
		//g_master_conn_fail_num ++;
		return false;
	}

	if (mysql_affected_rows(m_mysql) > 0)
	{
		return true;
	}
	else
	{ // 影响的行数为0时
		if (care_affected_rows)
		{ // 如果在意影响的行数时, 返回false, 否则返回true
			log_error("mysql_real_query failed: %s, sql: %s\n\n", mysql_error(m_mysql), sql_query);
			return false;
		}
		else
		{
			log_warn("affected_rows=0, sql: %s\n\n", sql_query);
			return true;
		}
	}
}
//事务的封装
bool CDBConn::StartTransaction()
{
	mysql_ping(m_mysql);

	if (mysql_real_query(m_mysql, "start transaction\n", 17))
	{
		log_error("mysql_real_query failed: %s, sql: start transaction\n", mysql_error(m_mysql));
		return false;
	}

	return true;
}

bool CDBConn::Rollback()
{
	mysql_ping(m_mysql);

	if (mysql_real_query(m_mysql, "rollback\n", 8))
	{
		log_error("mysql_real_query failed: %s, sql: rollback\n", mysql_error(m_mysql));
		return false;
	}

	return true;
}

bool CDBConn::Commit()
{
	mysql_ping(m_mysql);

	if (mysql_real_query(m_mysql, "commit\n", 6))
	{
		log_error("mysql_real_query failed: %s, sql: commit\n", mysql_error(m_mysql));
		return false;
	}

	return true;
}
uint32_t CDBConn::GetInsertId()
{
	return (uint32_t)mysql_insert_id(m_mysql);
}

上述逻辑较为简单,最后就是连接池的封装和实现了。

连接池的封装

class CDBPool {	// 只是负责管理连接CDBConn,真正干活的是CDBConn
public:
	CDBPool() {}
	CDBPool(const char* pool_name, const char* db_server_ip, uint16_t db_server_port,
			const char* username, const char* password, const char* db_name, 
			int max_conn_cnt);
	virtual 	~CDBPool();

	int 		Init();		// 连接数据库,创建连接
	CDBConn* 	GetDBConn(const int timeout_ms = -1);	// 获取连接资源
	void 		RelDBConn(CDBConn* pConn);	// 归还连接资源

	const char* GetPoolName() { return m_pool_name.c_str(); }
	const char* GetDBServerIP() { return m_db_server_ip.c_str(); }
	uint16_t 	GetDBServerPort() { return m_db_server_port; }
	const char* GetUsername() { return m_username.c_str(); }
	const char* GetPasswrod() { return m_password.c_str(); }
	const char* GetDBName() { return m_db_name.c_str(); }
private:
	string 		m_pool_name;	// 连接池名称
	string 		m_db_server_ip;	// 数据库ip
	uint16_t	m_db_server_port; // 数据库端口
	string 		m_username;  	// 用户名
	string 		m_password;		// 用户密码
	string 		m_db_name;		// db名称
	int			m_db_cur_conn_cnt;	// 当前启用的连接数量
	int 		m_db_max_conn_cnt;	// 最大连接数量
	list<CDBConn*>	m_free_list;	// 空闲的连接

	list<CDBConn*>	m_used_list;		// 记录已经被请求的连接
	std::mutex m_mutex;
    std::condition_variable m_cond_var;
	bool m_abort_request = false;
	// CThreadNotify	m_free_notify;	// 信号量
};

我们看到其中很多 get 方法,这种就直接在这里实现。

后边是具体是实现

//构造函数,较为简单
CDBPool :: CDBPool(const char * pool_name, const char * db_server_ip, uint16_t db_server_port, const char * username, const char * password, const char * db_name, int max_conn_cnt)
{
     m_pool_name = pool_name;
	 m_db_server_ip = db_server_ip;
	 m_db_server_port = db_server_port;
	 m_username = username;
	 m_password = password;
	 m_db_max_conn_cnt = max_conn_cnt;
	 m_db_cur_conn_cnt = MIN_DB_CONN_CNT;
}
//析构函数,将占有内存的删除
CDBPool::~CDBPool()
{
   std::lock_guard<std::mutex> lock(m_mutex);
   m_abort_request = true;
   m_cond_var.notify_all();

   for(list<CDBConn*>::itrator it = m_free_list.begin(); it != m_free_list.end(); it++)
   	{
   	   CDBConn* pConn = *it;
	   delete pConn;
   	}

   m_free_list.clear();
}
//生成规定数量的连接,然后放入 m_free_list
int CDBPool::Init()
{
   for(int i = 0; i < m_db_cur_conn_cnt; i++)
   {
      CDBConn* pDBConn = new CDBConn(this);
	  int ret = pDBConn->Init();
	  if(ret)
	  {
	     delete pDBConn;
		 return ret;
	  }

	  m_free_list.push_back(pDBConn)
   }

   return 0;
}

//从连接池获取连接
CDBConn* CDBPool::GetDBConn(const int timeout_ms)
{
     //加锁
   std::unique_lock<std::mutex> lock(m_mutex);
   if(m_abort_request)
   {
      log_warn("have abort\n");
	  return NULL;
   }

   if(m_free_list.empty())
   {
      //到达最大连接数量
      if(m_db_cur_conn_cnt >= m_db_max_conn_cnt)
      {  
         //马上获取,就要返回值
         if(timeout_ms < 0)
         {
            log_info("wait ms: %d\n", timeout_ms);
			m_cond_var.wait(lock, [this]
			{
			   return (!m_free_list.empty()) | m_abort_request;

			})
				//等待获取
         }else{
            m_cond_var.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]
			{
			     return (!m_free_list.empty()) | m_abort_request;
			})

			if(m_free_list.empty())
			{
			   return NULL;
			}
			

         }

		if(m_abort_request) 
		{
			log_warn("have aboort\n");
			return NULL;
		}
      }
	  else // 还没有到最大连接则创建连接
		{
			CDBConn *pDBConn = new CDBConn(this);	//新建连接
			int ret = pDBConn->Init();
			if (ret)
			{
				log_error("Init DBConnecton failed\n\n");
				delete pDBConn;
				return NULL;
			}
			else
			{
				m_free_list.push_back(pDBConn);
				m_db_cur_conn_cnt++;
				log_info("new db connection: %s, conn_cnt: %d\n", m_pool_name.c_str(), m_db_cur_conn_cnt);
			}
		}
	}

	CDBConn *pConn = m_free_list.front();	// 获取连接
	m_free_list.pop_front();	// STL 吐出连接,从空闲队列删除
	// pConn->setCurrentTime();  // 伪代码
	m_used_list.push_back(pConn);		// 

	return pConn;
}
/*
上述逻辑比较简单,就是规定一个连接池最大数量,如果当前使用连接数量等于最大数据,那就等着,timeout 等不及了那就返回一个结果或者错误
如果小于最大连接数量那就创建一个连接,然后加入连接池,然后从连接池取出来连接,加入正在使用的行列
*/

//归还连接,先查看连接是不是在连接池里边,如果在那就不需要归还,如果不在就从使用过的连接移除然后归还
void CDBPool::RelDBConn(CDBConn *pConn)
{
	std::lock_guard<std::mutex> lock(m_mutex);

	list<CDBConn *>::iterator it = m_free_list.begin();
	for (; it != m_free_list.end(); it++)	// 避免重复归还
	{
		if (*it == pConn)	
		{
			break;
		}
	}

	if (it == m_free_list.end())
	{
		m_used_list.remove(pConn);
		m_free_list.push_back(pConn);
		m_cond_var.notify_one();		// 通知取队列
	} else 
	{
		log_error("RelDBConn failed\n");
	}
} 	

结语

至此连接池基本已经结束,连接池其实主要封装的连接阶段,添加准备和结果封装只是为了行文方便,理解起来也比较简单,就是一个连接队列和一个使用队列,然后通过锁和线程关联在一起。