C语言实现线程池

C/C++
196
0
0
2024-03-12

C语言标准库中并没有提供线程池的实现,线程池需要手搓

实现线程池的基本思路是:先创建几个固定的线程,让每个线程运行起来,然后通过互斥锁和条件变量使得每个线程进入等待状态,当需要分派线程时,改变条件变量,使得某个线程退出等待状态开始执行传入的函数参数,执行完后重新进入等待状态。

同时实现了一个队列来存储需要执行的任务。

Task结构体用于表示线程池需要执行的任务,包括属性函数指针和函数参数。

typedef struct {
    void (*function)(void *); // 函数指针,表示任务的函数
    void *argument;          // 函数参数
} Task;

ThreadPool结构体用于表示线程池,包括内嵌实现的队列,用的是循环索引数组模拟实现的队列,互斥锁和条件变量,固定大小的线程组,还有一个是否销毁线程池的标记。

typedef struct {
    Task *tasks;            // 任务数组
    int size;                 // 当前任务数量
    int front;                // 队头索引
    int rear;                 // 队尾索引
    pthread_mutex_t mutex;    // 互斥锁
    pthread_cond_t condition; // 条件变量
    pthread_t *threads;       // 线程数组
    int shutdown;          // 是否销毁线程池
} ThreadPool;

初始化线程池,创建POOLSIZE个线程,创建日志文件,初始化互斥锁和条件变量。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define POOLSIZE 4
void init_thread_pool(ThreadPool *threadPool) {
    threadPool->tasks = (Task *) malloc(sizeof(Task) * POOLSIZE);
    threadPool->size = 0;
    threadPool->front = 0;
    threadPool->rear = 0;
    threadPool->shutdown=-1;
    pthread_mutex_init(&threadPool->mutex, NULL);
    pthread_cond_init(&threadPool->condition, NULL);
    threadPool->threads = (pthread_t *) malloc(sizeof(pthread_t) * POOLSIZE);
    for (int i = 0; i < POOLSIZE; ++i) {
        pthread_create(&threadPool->threads[i], NULL, execute, threadPool);
    }
}

提交任务到线程池,如果当前线程池的任务数量等于拥有的线程数,说明没有可以用的线程,进入等待,直到有空闲的线程,那么将任务添加到任务队列中,通知线程执行新任务,并写日志记录线程被分派事件。

void submit_task(ThreadPool *threadPool, void (*function)(void *), void *argument) {
    pthread_mutex_lock(&threadPool->mutex);
    while (threadPool->size == POOLSIZE) {    // 等待直到有空闲位置
        pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
    }
    // 添加任务到队列
    threadPool->tasks[threadPool->rear].function = function;
    threadPool->tasks[threadPool->rear].argument = argument;
    threadPool->rear = (threadPool->rear + 1) % POOLSIZE;
    threadPool->size++;
    // 通知线程有新任务
    pthread_cond_signal(&threadPool->condition);
    pthread_mutex_unlock(&threadPool->mutex);
}

最重要的是这个一直工作的工作线程,当线程池中没有任务时一直处于等待状态,当有任务时,就从任务队列中取出一个任务,释放互斥锁,执行任务后回收该线程,并写日志记录线程被回收事件,如果线程池没有被销毁,就继续等待任务。

void *execute(void *arg) {
    ThreadPool *threadPool = (ThreadPool *) arg;
    while(threadPool->shutdown){
        pthread_mutex_lock(&threadPool->mutex);
        while (threadPool->size == 0) {    // 等待直到有任务
            pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
        }
        // 取出任务
        Task task = threadPool->tasks[threadPool->front];
        threadPool->front = (threadPool->front + 1) % POOLSIZE;
        --threadPool->size;
        // 执行任务
        pthread_cond_signal(&threadPool->condition);
        pthread_mutex_unlock(&threadPool->mutex);
        task.function(task.argument);
        free(task.argument);
    }
    return NULL;
}

线程池还有一个线程的销毁功能,设置线程池销毁标记,等待所有线程结束后释放线程内存,并销毁互斥锁和条件变量。

void shutdown_thread_pool(ThreadPool *threadPool) {
    threadPool->shutdown=0;
    for (int i = 0; i < POOLSIZE; ++i) {
        pthread_join(threadPool->threads[i], NULL);
    }
    free(threadPool->threads);
    free(threadPool->tasks);
    pthread_mutex_destroy(&threadPool->mutex);
    pthread_cond_destroy(&threadPool->condition);
}

使用例子

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define POOLSIZE 4

typedef struct {
    void (*function)(void *); // 函数指针,表示任务的函数
    void *argument;          // 函数参数
} Task;

typedef struct {
    Task *tasks;            // 任务数组
    int size;                 // 当前任务数量
    int front;                // 队头索引
    int rear;                 // 队尾索引
    pthread_mutex_t mutex;    // 互斥锁
    pthread_cond_t condition; // 条件变量
    pthread_t *threads;       // 线程数组
    int shutdown;          // 是否销毁线程池
} ThreadPool;

// 执行任务的线程函数
void *execute(void *arg) {
    ThreadPool *threadPool = (ThreadPool *) arg;
    while (threadPool->shutdown) {
        pthread_mutex_lock(&threadPool->mutex);
        while (threadPool->size == 0) {    // 等待直到有任务
            pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
        }
        // 取出任务
        Task task = threadPool->tasks[threadPool->front];
        threadPool->front = (threadPool->front + 1) % POOLSIZE;
        --threadPool->size;
        // 执行任务
        pthread_cond_signal(&threadPool->condition);
        pthread_mutex_unlock(&threadPool->mutex);
        task.function(task.argument);
        free(task.argument);
    }
    return NULL;
}

// 初始化线程池
void init_thread_pool(ThreadPool *threadPool) {
    threadPool->tasks = (Task *) malloc(sizeof(Task) * POOLSIZE);
    threadPool->size = 0;
    threadPool->front = 0;
    threadPool->rear = 0;
    threadPool->shutdown = -1;
    pthread_mutex_init(&threadPool->mutex, NULL);
    pthread_cond_init(&threadPool->condition, NULL);
    threadPool->threads = (pthread_t *) malloc(sizeof(pthread_t) * POOLSIZE);
    for (int i = 0; i < POOLSIZE; ++i) {
        pthread_create(&threadPool->threads[i], NULL, execute, threadPool);
    }
}

// 销毁线程池
void shutdown_thread_pool(ThreadPool *threadPool) {
    threadPool->shutdown = 0;
    for (int i = 0; i < POOLSIZE; ++i) {
        pthread_join(threadPool->threads[i], NULL);
    }
    free(threadPool->threads);
    free(threadPool->tasks);
    pthread_mutex_destroy(&threadPool->mutex);
    pthread_cond_destroy(&threadPool->condition);
}

// 向线程池中添加任务
void submit_task(ThreadPool *threadPool, void (*function)(void *), void *argument) {
    pthread_mutex_lock(&threadPool->mutex);
    while (threadPool->size == POOLSIZE) {    // 等待直到有空闲位置
        pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
    }
    // 添加任务到队列
    threadPool->tasks[threadPool->rear].function = function;
    threadPool->tasks[threadPool->rear].argument = argument;
    threadPool->rear = (threadPool->rear + 1) % POOLSIZE;
    threadPool->size++;
    // 通知线程有新任务
    pthread_cond_signal(&threadPool->condition);
    pthread_mutex_unlock(&threadPool->mutex);
}

void *f(void*arg) {
    printf("1\n");
    return NULL;
}

int main() {
    ThreadPool threadPool;
    init_thread_pool(&threadPool);
    // 添加一些任务到线程池
    for (int i = 0; i < 10; ++i) {
        submit_task(&threadPool, (void (*)(void *)) &f, NULL);
    }
    // 等待任务执行完毕
    shutdown_thread_pool(&threadPool);
    return 0;
}