C语言标准库中并没有提供线程池的实现,线程池需要手搓
实现线程池的基本思路是:先创建几个固定的线程,让每个线程运行起来,然后通过互斥锁和条件变量使得每个线程进入等待状态,当需要分派线程时,改变条件变量,使得某个线程退出等待状态开始执行传入的函数参数,执行完后重新进入等待状态。
同时实现了一个队列来存储需要执行的任务。
Task结构体用于表示线程池需要执行的任务,包括属性函数指针和函数参数。
typedef struct {
void (*function)(void *); // 函数指针,表示任务的函数
void *argument; // 函数参数
} Task;
ThreadPool结构体用于表示线程池,包括内嵌实现的队列,用的是循环索引数组模拟实现的队列,互斥锁和条件变量,固定大小的线程组,还有一个是否销毁线程池的标记。
typedef struct {
Task *tasks; // 任务数组
int size; // 当前任务数量
int front; // 队头索引
int rear; // 队尾索引
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t condition; // 条件变量
pthread_t *threads; // 线程数组
int shutdown; // 是否销毁线程池
} ThreadPool;
初始化线程池,创建POOLSIZE个线程,创建日志文件,初始化互斥锁和条件变量。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define POOLSIZE 4
void init_thread_pool(ThreadPool *threadPool) {
threadPool->tasks = (Task *) malloc(sizeof(Task) * POOLSIZE);
threadPool->size = 0;
threadPool->front = 0;
threadPool->rear = 0;
threadPool->shutdown=-1;
pthread_mutex_init(&threadPool->mutex, NULL);
pthread_cond_init(&threadPool->condition, NULL);
threadPool->threads = (pthread_t *) malloc(sizeof(pthread_t) * POOLSIZE);
for (int i = 0; i < POOLSIZE; ++i) {
pthread_create(&threadPool->threads[i], NULL, execute, threadPool);
}
}
提交任务到线程池,如果当前线程池的任务数量等于拥有的线程数,说明没有可以用的线程,进入等待,直到有空闲的线程,那么将任务添加到任务队列中,通知线程执行新任务,并写日志记录线程被分派事件。
void submit_task(ThreadPool *threadPool, void (*function)(void *), void *argument) {
pthread_mutex_lock(&threadPool->mutex);
while (threadPool->size == POOLSIZE) { // 等待直到有空闲位置
pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
}
// 添加任务到队列
threadPool->tasks[threadPool->rear].function = function;
threadPool->tasks[threadPool->rear].argument = argument;
threadPool->rear = (threadPool->rear + 1) % POOLSIZE;
threadPool->size++;
// 通知线程有新任务
pthread_cond_signal(&threadPool->condition);
pthread_mutex_unlock(&threadPool->mutex);
}
最重要的是这个一直工作的工作线程,当线程池中没有任务时一直处于等待状态,当有任务时,就从任务队列中取出一个任务,释放互斥锁,执行任务后回收该线程,并写日志记录线程被回收事件,如果线程池没有被销毁,就继续等待任务。
void *execute(void *arg) {
ThreadPool *threadPool = (ThreadPool *) arg;
while(threadPool->shutdown){
pthread_mutex_lock(&threadPool->mutex);
while (threadPool->size == 0) { // 等待直到有任务
pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
}
// 取出任务
Task task = threadPool->tasks[threadPool->front];
threadPool->front = (threadPool->front + 1) % POOLSIZE;
--threadPool->size;
// 执行任务
pthread_cond_signal(&threadPool->condition);
pthread_mutex_unlock(&threadPool->mutex);
task.function(task.argument);
free(task.argument);
}
return NULL;
}
线程池还有一个线程的销毁功能,设置线程池销毁标记,等待所有线程结束后释放线程内存,并销毁互斥锁和条件变量。
void shutdown_thread_pool(ThreadPool *threadPool) {
threadPool->shutdown=0;
for (int i = 0; i < POOLSIZE; ++i) {
pthread_join(threadPool->threads[i], NULL);
}
free(threadPool->threads);
free(threadPool->tasks);
pthread_mutex_destroy(&threadPool->mutex);
pthread_cond_destroy(&threadPool->condition);
}
使用例子
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define POOLSIZE 4
typedef struct {
void (*function)(void *); // 函数指针,表示任务的函数
void *argument; // 函数参数
} Task;
typedef struct {
Task *tasks; // 任务数组
int size; // 当前任务数量
int front; // 队头索引
int rear; // 队尾索引
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t condition; // 条件变量
pthread_t *threads; // 线程数组
int shutdown; // 是否销毁线程池
} ThreadPool;
// 执行任务的线程函数
void *execute(void *arg) {
ThreadPool *threadPool = (ThreadPool *) arg;
while (threadPool->shutdown) {
pthread_mutex_lock(&threadPool->mutex);
while (threadPool->size == 0) { // 等待直到有任务
pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
}
// 取出任务
Task task = threadPool->tasks[threadPool->front];
threadPool->front = (threadPool->front + 1) % POOLSIZE;
--threadPool->size;
// 执行任务
pthread_cond_signal(&threadPool->condition);
pthread_mutex_unlock(&threadPool->mutex);
task.function(task.argument);
free(task.argument);
}
return NULL;
}
// 初始化线程池
void init_thread_pool(ThreadPool *threadPool) {
threadPool->tasks = (Task *) malloc(sizeof(Task) * POOLSIZE);
threadPool->size = 0;
threadPool->front = 0;
threadPool->rear = 0;
threadPool->shutdown = -1;
pthread_mutex_init(&threadPool->mutex, NULL);
pthread_cond_init(&threadPool->condition, NULL);
threadPool->threads = (pthread_t *) malloc(sizeof(pthread_t) * POOLSIZE);
for (int i = 0; i < POOLSIZE; ++i) {
pthread_create(&threadPool->threads[i], NULL, execute, threadPool);
}
}
// 销毁线程池
void shutdown_thread_pool(ThreadPool *threadPool) {
threadPool->shutdown = 0;
for (int i = 0; i < POOLSIZE; ++i) {
pthread_join(threadPool->threads[i], NULL);
}
free(threadPool->threads);
free(threadPool->tasks);
pthread_mutex_destroy(&threadPool->mutex);
pthread_cond_destroy(&threadPool->condition);
}
// 向线程池中添加任务
void submit_task(ThreadPool *threadPool, void (*function)(void *), void *argument) {
pthread_mutex_lock(&threadPool->mutex);
while (threadPool->size == POOLSIZE) { // 等待直到有空闲位置
pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
}
// 添加任务到队列
threadPool->tasks[threadPool->rear].function = function;
threadPool->tasks[threadPool->rear].argument = argument;
threadPool->rear = (threadPool->rear + 1) % POOLSIZE;
threadPool->size++;
// 通知线程有新任务
pthread_cond_signal(&threadPool->condition);
pthread_mutex_unlock(&threadPool->mutex);
}
void *f(void*arg) {
printf("1\n");
return NULL;
}
int main() {
ThreadPool threadPool;
init_thread_pool(&threadPool);
// 添加一些任务到线程池
for (int i = 0; i < 10; ++i) {
submit_task(&threadPool, (void (*)(void *)) &f, NULL);
}
// 等待任务执行完毕
shutdown_thread_pool(&threadPool);
return 0;
}