线程池实现基于C++可以说是一道经典的计算机本科学生练习题。本篇文章会从一个传统实现的线程池开始讲起。
一、线程池和任务
我们看一下线程池类的基本结构。线程池本质是有一些线程在后台等待队列执行任务,我们只需要将任务存储在队列中。线程会从任务队列中获取任务执行。这里最基本的成员就是线程队列,任务队列,条件变量用于通知线程组任务事件,队列锁避免竞争,终止条件。
class ThreadPool { | |
public: | |
void Start(); | |
void QueueJob(const std::function<void()>& job); | |
void Stop(); | |
bool busy(); | |
private: | |
void ThreadLoop(); | |
bool should_terminate = false; // Tells threads to stop looking for jobs | |
std::mutex queue_mutex; // Prevents data races to the job queue | |
std::condition_variable mutex_condition; // Allows threads to wait on new jobs or termination | |
std::vector<std::thread> threads; | |
std::queue<std::function<void()>> jobs; | |
}; |
同时线程组应该实现的功能有以下几个函数
ThreadPool::Start
创建线程池,比较有效率的办法是根据硬件并发数来创建相应数量num_threads 的线程组。这样不会有性能问题,创建不合适数量的线程组,可能用线程组比串行执行更慢。
每个线程都在等待新的task调度并执行。
void ThreadPool::Start() { | |
const uint32_t num_threads = std::thread::hardware_concurrency(); // Max # of threads the system supports | |
for (uint32_t ii = 0; ii < num_threads; ++ii) { | |
threads.emplace_back(std::thread(&ThreadPool::ThreadLoop,this)) | |
} | |
} |
ThreadPool::ThreadLoop
这个是死循环主体. ,在任务队列种寻找待处理的任务。
void ThreadPool::ThreadLoop() { | |
while (true) { | |
std::function<void()> job; | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex); | |
mutex_condition.wait(lock, [this] { | |
return !jobs.empty() || should_terminate; | |
}); | |
if (should_terminate) { | |
return; | |
} | |
job = jobs.front(); | |
jobs.pop(); | |
} | |
job(); | |
} | |
} |
ThreadPool::QueueJob
加入新的任务到队列中,这里避免竞争,加入队列锁。
void ThreadPool::QueueJob(const std::function<void()>& job) { | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex); | |
jobs.push(job); | |
} | |
mutex_condition.notify_one(); | |
} |
调用放传入回调函数指针,和处理内容指针
thread_pool->QueueJob([] { /* ... */ });
ThreadPool::busy
bool ThreadPool::busy() { | |
bool poolbusy; | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex); | |
poolbusy = !jobs.empty(); | |
} | |
return poolbusy; | |
} |
busy()函数 主要用在while循环, 在整个线程组销毁之前,清理任务队列剩余未处理的任务。
ThreadPool::Stop
终止线程池,这里主要是各种线程的join操作.
void ThreadPool::Stop() { | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex); | |
should_terminate = true; | |
} | |
mutex_condition.notify_all(); | |
for (std::thread& active_thread : threads) { | |
active_thread.join(); | |
} | |
threads.clear(); | |
} |
二、主要流程
以下是一个可以运行起来的代码。
我们先创建线程队列。这里使用最基础的std::thread
std::vector< std::thread > workers;
创建job,并把job添加到每个线程
[this] | |
{ | |
for(;;) | |
{ | |
std::function<void()> task; | |
{ | |
std::unique_lock<std::mutex> lock(this->queue_mutex); | |
this->condition.wait(lock, | |
[this]{ return this->stop || !this->tasks.empty(); }); | |
if(this->stop && this->tasks.empty()) | |
return; | |
task = std::move(this->tasks.front()); | |
this->tasks.pop(); | |
} | |
task(); | |
} | |
} |
接下来加任务到队列中,这里加完之后调用条件notify唤醒 等待任务的 线程。
// add new work item to the pool | |
template<class F, class... Args> | |
auto ThreadPool::enqueue(F&& f, Args&&... args) | |
-> std::future<typename std::result_of<F(Args...)>::type> | |
{ | |
using return_type = typename std::result_of<F(Args...)>::type; | |
auto task = std::make_shared< std::packaged_task<return_type()> >( | |
std::bind(std::forward<F>(f), std::forward<Args>(args)...) | |
); | |
std::future<return_type> res = task->get_future(); | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex); | |
// don't allow enqueueing after stopping the pool | |
if(stop) | |
throw std::runtime_error("enqueue on stopped ThreadPool"); | |
tasks.emplace([task](){ (*task)(); }); | |
} | |
condition.notify_one(); | |
return res; | |
} |
完整的可运行的代码如下
class ThreadPool { | |
public: | |
ThreadPool(size_t); | |
template<class F, class... Args> | |
auto enqueue(F&& f, Args&&... args) | |
-> std::future<typename std::result_of<F(Args...)>::type>; | |
~ThreadPool(); | |
private: | |
// need to keep track of threads so we can join them | |
std::vector< std::thread > workers; | |
// the task queue | |
std::queue< std::function<void()> > tasks; | |
// synchronization | |
std::mutex queue_mutex; | |
std::condition_variable condition; | |
bool stop; | |
}; | |
// the constructor just launches some amount of workers | |
inline ThreadPool::ThreadPool(size_t threads) | |
: stop(false) | |
{ | |
for(size_t i = 0;i<threads;++i) | |
workers.emplace_back( | |
[this] | |
{ | |
for(;;) | |
{ | |
std::function<void()> task; | |
{ | |
std::unique_lock<std::mutex> lock(this->queue_mutex); | |
this->condition.wait(lock, | |
[this]{ return this->stop || !this->tasks.empty(); }); | |
if(this->stop && this->tasks.empty()) | |
return; | |
task = std::move(this->tasks.front()); | |
this->tasks.pop(); | |
} | |
task(); | |
} | |
} | |
); | |
} | |
// add new work item to the pool | |
template<class F, class... Args> | |
auto ThreadPool::enqueue(F&& f, Args&&... args) | |
-> std::future<typename std::result_of<F(Args...)>::type> | |
{ | |
using return_type = typename std::result_of<F(Args...)>::type; | |
auto task = std::make_shared< std::packaged_task<return_type()> >( | |
std::bind(std::forward<F>(f), std::forward<Args>(args)...) | |
); | |
std::future<return_type> res = task->get_future(); | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex); | |
// don't allow enqueueing after stopping the pool | |
if(stop) | |
throw std::runtime_error("enqueue on stopped ThreadPool"); | |
tasks.emplace([task](){ (*task)(); }); | |
} | |
condition.notify_one(); | |
return res; | |
} | |
// the destructor joins all threads | |
inline ThreadPool::~ThreadPool() | |
{ | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex); | |
stop = true; | |
} | |
condition.notify_all(); | |
for(std::thread &worker: workers) | |
worker.join(); | |
} | |