线程池实现基于C++可以说是一道经典的计算机本科学生练习题。本篇文章会从一个传统实现的线程池开始讲起。
一、线程池和任务
我们看一下线程池类的基本结构。线程池本质是有一些线程在后台等待队列执行任务,我们只需要将任务存储在队列中。线程会从任务队列中获取任务执行。这里最基本的成员就是线程队列,任务队列,条件变量用于通知线程组任务事件,队列锁避免竞争,终止条件。
class ThreadPool {
public:
void Start();
void QueueJob(const std::function<void()>& job);
void Stop();
bool busy();
private:
void ThreadLoop();
bool should_terminate = false; // Tells threads to stop looking for jobs
std::mutex queue_mutex; // Prevents data races to the job queue
std::condition_variable mutex_condition; // Allows threads to wait on new jobs or termination
std::vector<std::thread> threads;
std::queue<std::function<void()>> jobs;
};
同时线程组应该实现的功能有以下几个函数
ThreadPool::Start
创建线程池,比较有效率的办法是根据硬件并发数来创建相应数量num_threads 的线程组。这样不会有性能问题,创建不合适数量的线程组,可能用线程组比串行执行更慢。
每个线程都在等待新的task调度并执行。
void ThreadPool::Start() {
const uint32_t num_threads = std::thread::hardware_concurrency(); // Max # of threads the system supports
for (uint32_t ii = 0; ii < num_threads; ++ii) {
threads.emplace_back(std::thread(&ThreadPool::ThreadLoop,this))
}
}
ThreadPool::ThreadLoop
这个是死循环主体. ,在任务队列种寻找待处理的任务。
void ThreadPool::ThreadLoop() {
while (true) {
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(queue_mutex);
mutex_condition.wait(lock, [this] {
return !jobs.empty() || should_terminate;
});
if (should_terminate) {
return;
}
job = jobs.front();
jobs.pop();
}
job();
}
}
ThreadPool::QueueJob
加入新的任务到队列中,这里避免竞争,加入队列锁。
void ThreadPool::QueueJob(const std::function<void()>& job) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
jobs.push(job);
}
mutex_condition.notify_one();
}
调用放传入回调函数指针,和处理内容指针
thread_pool->QueueJob([] { /* ... */ });
ThreadPool::busy
bool ThreadPool::busy() {
bool poolbusy;
{
std::unique_lock<std::mutex> lock(queue_mutex);
poolbusy = !jobs.empty();
}
return poolbusy;
}
busy()函数 主要用在while循环, 在整个线程组销毁之前,清理任务队列剩余未处理的任务。
ThreadPool::Stop
终止线程池,这里主要是各种线程的join操作.
void ThreadPool::Stop() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
should_terminate = true;
}
mutex_condition.notify_all();
for (std::thread& active_thread : threads) {
active_thread.join();
}
threads.clear();
}
二、主要流程
以下是一个可以运行起来的代码。
我们先创建线程队列。这里使用最基础的std::thread
std::vector< std::thread > workers;
创建job,并把job添加到每个线程
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
接下来加任务到队列中,这里加完之后调用条件notify唤醒 等待任务的 线程。
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
完整的可运行的代码如下
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
#endif