C Linux实现线程池技术

C/C++
112
0
0
2024-08-22

介绍

C Linux实现线程池技术

作者第一次编写的线程池,推荐使用的时候修改thread_manager函数中部分逻辑

支持库

#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

代码

ThreadPool.h

typedef struct MissionAttr
{
    void (*mission)(void *);
    void *param;
    time_t createTime;
    char state;
} MissionAttr;
 
typedef struct MissionNode
{
    MissionAttr *mission;
    struct MissionNode *next;
    struct MissionNode *back;
} MissionNode;
 
typedef struct MissionList
{
    MissionNode *front;
    MissionNode *final;
} MissionList;
 
typedef struct ThreadAttr
{
    pthread_t id;
    time_t createTime;
    char state;
} ThreadAttr;
 
typedef struct ThreadNode
{
    ThreadAttr *threadAttr;
    struct ThreadNode *next;
    struct ThreadNode *back;
} ThreadNode;
 
typedef struct ThreadList
{
    ThreadNode *front;
    ThreadNode *final;
} ThreadList;
 
typedef struct ThreadPool
{
    int controlThreadId;
    ThreadList *threadList;
    MissionList *missionWaiting;
    MissionList *missionList;
    pthread_t managerID;
    int waitingMissionNumber;
    int hanldingMissionNumber;
    int busyNumber;
    int freeNumber;
    int needCloseNumber;
    int maxNumber;
    int minNumber;
    int existNumber;
    char close;
    pthread_mutex_t managerMutex;
    pthread_mutex_t missionMutex;
    pthread_mutex_t threadMutex;
    pthread_mutex_t workerMutex;
    pthread_cond_t workerCond;
} ThreadPool;
 
typedef struct ThreadArgs
{
    ThreadPool *threadPool;
    ThreadNode *threadNode;
} ThreadArgs;
 
// 主要函数
// 创建线程池
ThreadPool *create_thread_pool(int minNumber, int maxNumber);
// 提交任务
void thread_pool_submit(ThreadPool *threadPool, void *func, void *args);
// 启动线程池
int thread_pool_run(ThreadPool *threadPool);
// 关闭并释放线程池
void thread_shutdown_and_free(ThreadPool *threadPool);
 
// 其他函数
// 管理者线程
void *thread_manager(void *args);
// 工作者线程
void *thread_worker(void *args);
// 创建工作者线程
int creater_thread_worker(ThreadPool *threadPool, int number);
// 获取等待中的任务
MissionNode *get_mission(ThreadPool *threadPool);
// 释放完成的任务
void free_mission(ThreadPool *threadPool, MissionNode *missionNode);
 
// 基础函数
// 申请内存修复版
void *fixMalloc(size_t size);

ThreadPool.c

#include "./ThreadPool.h"
void *fixMalloc(size_t size)
{
    void *tmp = NULL;
    while (1)
    {
        tmp = malloc(size);
        if (tmp)
        {
            break;
        }
        sleep(1);
    }
    return tmp;
}
 
ThreadPool *create_thread_pool(int minNumber, int maxNumber)
{
    ThreadPool *threadPool = (ThreadPool *)fixMalloc(sizeof(ThreadPool));
    memset(threadPool, 0, sizeof(ThreadPool));
    threadPool->minNumber = minNumber;
    threadPool->maxNumber = maxNumber;
    pthread_mutex_init(&threadPool->missionMutex, NULL);
    pthread_mutex_init(&threadPool->threadMutex, NULL);
    pthread_mutex_init(&threadPool->workerMutex, NULL);
    pthread_cond_init(&threadPool->workerCond, NULL);
    pthread_mutex_init(&threadPool->managerMutex, NULL);
    threadPool->threadList = (ThreadList *)fixMalloc(sizeof(ThreadList));
    threadPool->missionList = (MissionList *)fixMalloc(sizeof(MissionList));
    threadPool->missionWaiting = (MissionList *)fixMalloc(sizeof(MissionList));
    memset(threadPool->threadList, 0, sizeof(ThreadList));
    memset(threadPool->missionList, 0, sizeof(MissionList));
    memset(threadPool->missionWaiting, 0, sizeof(ThreadList));
    threadPool->controlThreadId = 0;
    threadPool->waitingMissionNumber = 0;
    threadPool->hanldingMissionNumber = 0;
    threadPool->busyNumber = 0;
    threadPool->freeNumber = 0;
    threadPool->needCloseNumber = 0;
    threadPool->existNumber = 0;
    return threadPool;
}
 
void thread_pool_submit(ThreadPool *threadPool, void *func, void *args)
{
    if (!threadPool->close && func != NULL)
    {
        MissionList *missionWaiting = threadPool->missionWaiting;
        MissionNode *newMissionNode = (MissionNode *)fixMalloc(sizeof(MissionNode));
        newMissionNode->mission = (MissionAttr *)fixMalloc(sizeof(MissionAttr));
        newMissionNode->mission->createTime = time(NULL);
        newMissionNode->mission->mission = func;
        newMissionNode->mission->param = args;
        newMissionNode->mission->state = 0;
        newMissionNode->next = NULL;
        if (missionWaiting->final == NULL)
        {
            missionWaiting->front = newMissionNode;
        }
        else
        {
            newMissionNode->back = missionWaiting->final;
            missionWaiting->final->next = newMissionNode;
        }
 
        missionWaiting->final = newMissionNode;
        threadPool->waitingMissionNumber = threadPool->waitingMissionNumber + 1;
    }
}
 
MissionNode *get_mission(ThreadPool *threadPool)
{
    MissionList *missionWaiting = threadPool->missionWaiting;
    if (missionWaiting->front == NULL)
    {
        return NULL;
    }
    MissionList *missionList = threadPool->missionList;
    MissionNode *missionNode = missionWaiting->front;
    if (missionWaiting->front == missionWaiting->final)
    {
        missionWaiting->final = NULL;
    }
    missionWaiting->front = missionNode->next;
    if (missionNode->next)
    {
        missionNode->next->back = NULL;
    }
 
    missionNode->next = NULL;
 
    if (missionList->front == NULL)
    {
        missionList->front = missionNode;
    }
    else
    {
        missionNode->back = missionList->final;
        missionList->final->next = missionNode;
    }
 
    missionList->final = missionNode;
 
    return missionNode;
}
 
void free_mission(ThreadPool *threadPool, MissionNode *missionNode)
{
    MissionList *missionList = threadPool->missionList;
    if (missionNode->back == NULL)
    {
        missionList->front = missionNode->next;
    }
 
    if (missionNode->back)
    {
        missionNode->back->next = missionNode->next;
    }
 
    if (missionNode->next)
    {
        missionNode->next->back = missionNode->back;
    }
    else
    {
        missionList->final = missionNode->back;
    }
 
    if (missionNode->mission)
        free(missionNode->mission);
    if (missionNode)
        free(missionNode);
}
 
void *thread_worker(void *args)
{
    ThreadArgs *threadArgs = (ThreadArgs *)args;
    ThreadPool *threadPool = threadArgs->threadPool;
    ThreadNode *selfNode = threadArgs->threadNode;
 
    threadPool->existNumber = threadPool->existNumber + 1;
    threadPool->freeNumber = threadPool->freeNumber + 1;
    selfNode->threadAttr->state = 1;
 
    pthread_mutex_lock(&threadPool->workerMutex);
 
    while (threadPool->needCloseNumber <= 0 && !threadPool->close)
    {
        // 等待任务
        pthread_cond_wait(&threadPool->workerCond, &threadPool->workerMutex);
        pthread_mutex_unlock(&threadPool->workerMutex);
        if (threadPool->waitingMissionNumber > 0)
        {
            pthread_mutex_lock(&threadPool->missionMutex);
            threadPool->freeNumber = threadPool->freeNumber - 1;
            selfNode->threadAttr->state = 2;
            if (threadPool->close)
                break;
            MissionNode *missionNode = get_mission(threadPool);
            MissionAttr *missionAttr = NULL;
            if (missionNode != NULL)
            {
                missionAttr = missionNode->mission;
            }
            selfNode->threadAttr->state = 3;
            if (missionAttr != NULL)
            {
                missionAttr->state = 1;
            }
            threadPool->busyNumber = threadPool->busyNumber + 1;
            threadPool->waitingMissionNumber = threadPool->waitingMissionNumber - 1;
            threadPool->hanldingMissionNumber = threadPool->hanldingMissionNumber + 1;
            if (threadPool->close)
                break;
            pthread_mutex_unlock(&threadPool->missionMutex);
            if (missionAttr != NULL)
                if (missionAttr->mission)
                {
                    missionAttr->mission(missionAttr->param);
                }
            pthread_mutex_lock(&threadPool->missionMutex);
            if (threadPool->close)
                break;
            if (missionAttr != NULL)
                if (missionAttr->mission)
                {
                    free_mission(threadPool, missionNode);
                }
            threadPool->hanldingMissionNumber = threadPool->hanldingMissionNumber - 1;
            threadPool->busyNumber = threadPool->busyNumber - 1;
            threadPool->freeNumber = threadPool->freeNumber + 1;
            selfNode->threadAttr->state = 1;
            pthread_mutex_unlock(&threadPool->missionMutex);
        }
    }
    if (threadPool->close)
    {
        pthread_mutex_unlock(&threadPool->workerMutex);
        pthread_exit(NULL);
    }
    // 线程自杀
    pthread_mutex_lock(&threadPool->threadMutex);
    ThreadList *threadList = threadPool->threadList;
    selfNode->threadAttr->state = -1;
    if (threadList->final)
        if (threadList->final->threadAttr)
            if (threadList->final->threadAttr->id == selfNode->threadAttr->id)
            {
                threadList->final = selfNode->back;
            }
    if (threadList->front)
        if (threadList->front->threadAttr)
            if (threadList->front->threadAttr->id == selfNode->threadAttr->id)
            {
                threadList->front = selfNode->next;
            }
    if (selfNode->back)
        if (selfNode->back->next)
        {
            selfNode->back->next = selfNode->next;
        }
    if (selfNode->next)
        if (selfNode->next->back)
        {
            selfNode->next->back = selfNode->back;
        }
    free(selfNode->threadAttr);
    free(selfNode);
    threadPool->existNumber = threadPool->existNumber - 1;
    threadPool->freeNumber = threadPool->freeNumber - 1;
    threadPool->needCloseNumber = threadPool->needCloseNumber - 1;
    pthread_mutex_unlock(&threadPool->threadMutex);
    pthread_mutex_unlock(&threadPool->workerMutex);
    pthread_exit(NULL);
}
 
int creater_thread_worker(ThreadPool *threadPool, int number)
{
    if ((number + threadPool->existNumber) > threadPool->maxNumber)
    {
        number = threadPool->maxNumber - threadPool->existNumber;
    }
    pthread_mutex_lock(&threadPool->threadMutex);
    int i, ref;
    for (i = 0; i < number; i++)
    {
        pthread_t threadID = 0;
        ThreadNode *newThreadNode = (ThreadNode *)fixMalloc(sizeof(ThreadNode));
        ThreadArgs *threadArgs = (ThreadArgs *)fixMalloc(sizeof(ThreadArgs));
 
        // 设置Woker线程参数
        threadArgs->threadNode = newThreadNode;
        threadArgs->threadPool = threadPool;
 
        // 初始化线程节点
        newThreadNode->threadAttr = (ThreadAttr *)fixMalloc(sizeof(ThreadAttr));
        newThreadNode->back = NULL;
        newThreadNode->next = NULL;
 
        // 设置线程属性
        newThreadNode->threadAttr->createTime = time(NULL);
        newThreadNode->threadAttr->id = threadID;
        newThreadNode->threadAttr->state = 0;
 
        // 插入线程链表
 
        if (threadPool->existNumber == 0)
        {
            threadPool->threadList->front = newThreadNode;
        }
        else
        {
            ThreadNode *threadNode = threadPool->threadList->final;
            if (threadNode)
            {
                threadNode->next = newThreadNode;
            }
            newThreadNode->back = threadNode;
        }
 
        threadPool->threadList->final = newThreadNode;
 
        ref = pthread_create(&threadID, NULL, (void *)thread_worker, (void *)threadArgs);
        if (ref != 0)
        {
            return -2;
        }
    }
    pthread_mutex_unlock(&threadPool->threadMutex);
 
    return 0;
}
 
void *thread_manager(void *args)
{
    // 管理者
    ThreadPool *threadPool = (ThreadPool *)args;
    int minNumber = threadPool->minNumber;
    int waitingMissionNumber = 0;
    int existNumber = 0;
    int needCloseNumber = 0;
    int busyNumber = 0;
    int runNumber = 0;
    int i;
    pthread_t threadID;
    creater_thread_worker(threadPool, minNumber);
    while (!threadPool->close)
    {
        pthread_mutex_lock(&threadPool->managerMutex);
        waitingMissionNumber = threadPool->waitingMissionNumber;
        existNumber = threadPool->existNumber;
        needCloseNumber = threadPool->needCloseNumber;
        busyNumber = threadPool->busyNumber;
        pthread_mutex_unlock(&threadPool->managerMutex);
 
        // 这里的逻辑写的很随意,推荐使用时重写此逻辑
        if (waitingMissionNumber > 0)
        {
            if (waitingMissionNumber > existNumber)
            {
                creater_thread_worker(threadPool, 2);
            }
            runNumber = existNumber < waitingMissionNumber ? existNumber : waitingMissionNumber;
            for (i = 0; i < runNumber; i++)
            {
                pthread_cond_signal(&threadPool->workerCond);
            }
        }
        else
        {
            threadPool->needCloseNumber = existNumber - threadPool->minNumber;
            pthread_cond_broadcast(&threadPool->workerCond);
        }
 
        usleep(1000 * 100);
    }
 
    // 唤醒所有线程
    pthread_cond_broadcast(&threadPool->workerCond);
 
    // 释放Worker线程
    ThreadList *threadList = threadPool->threadList;
    ThreadNode *threadNode = threadList->front;
    ThreadNode *tmpThreadNode = NULL;
    while (threadNode != NULL)
    {
        threadID = threadNode->threadAttr->id;
        if (threadID)
        {
            pthread_join(threadID, NULL);
        }
        free(threadNode->threadAttr);
        tmpThreadNode = threadNode->next;
        free(threadNode);
        threadNode = tmpThreadNode;
    }
    free(threadList);
 
    MissionList *missionWaiting = threadPool->missionWaiting;
    MissionNode *missionWaitingNode = missionWaiting->front;
    MissionNode *tmpMissionWaitingNode = NULL;
    // 释放等待列队
    while (missionWaitingNode != NULL)
    {
        free(missionWaitingNode->mission);
        tmpMissionWaitingNode = missionWaitingNode->next;
        free(missionWaitingNode);
        missionWaitingNode = tmpMissionWaitingNode;
    }
    free(missionWaiting);
 
    // 释放任务列队
    MissionList *missionList = threadPool->missionList;
    MissionNode *missionNode = missionList->front;
    MissionNode *tmpMissionNode = NULL;
    while (missionNode != NULL)
    {
        free(missionNode->mission);
        tmpMissionNode = missionNode->next;
        free(missionNode);
        missionNode = tmpMissionNode;
    }
    free(missionList);
 
    pthread_mutex_destroy(&threadPool->missionMutex);
    pthread_mutex_destroy(&threadPool->workerMutex);
    pthread_mutex_destroy(&threadPool->threadMutex);
    pthread_mutex_destroy(&threadPool->managerMutex);
 
    pthread_cond_destroy(&threadPool->workerCond);
 
    free(threadPool);
    pthread_exit(NULL);
}
 
int thread_pool_run(ThreadPool *threadPool)
{
    int ref = pthread_create(&threadPool->managerID, NULL, thread_manager, (void *)threadPool);
    return ref;
}
 
void thread_shutdown_and_free(ThreadPool *threadPool)
{
    threadPool->close = 1;
    pthread_join(threadPool->managerID, NULL);
}

示例

#include <stdio.h>
#include "./ThreadPool.c"
 
struct testData
{
    int a;
};
 
ThreadPool *pool = NULL;
int times = 0;
void test(void *a)
{
    // 模拟工作
 
    // 处理参数
    int *b = (int *)a;
    times++;
    sleep(*b);
}
 
int main()
{
    // 初始化线程池
    pool = create_thread_pool(1, 50);
    // 启动线程池
    thread_pool_run(pool);
 
    int i, a = 1;
    // 提交任务
    for (i = 0; i < 110; i++)
    {
        // 指针传参(无参数填NULL)
        thread_pool_submit(pool, &test, &a);
    }
 
    i = 0;
    while (i < 52)
    {
        printf("当前任务数量:%d\n", pool->waitingMissionNumber);
        printf("当前线程数量:%d\n", pool->existNumber);
        printf("当前执行线程:%d\n", pool->busyNumber);
        printf("当前空闲线程:%d\n", pool->freeNumber);
        printf("当前执行任务:%d\n", pool->hanldingMissionNumber);
        printf("当前自杀线程:%d\n", pool->needCloseNumber);
        printf("函数执行次数:%d\n", times);
        printf("================================\n");
        sleep(1);
        int rands = (rand() / 1000000000.0) * 20 + 10;
        printf("当前随机数:%d\n", rands);
        // 模拟不定时提交
        if (rands > 40)
            for (i = 0; i < rands; i++)
            {
                thread_pool_submit(pool, &test, &a);
            }
    }
 
    // 关机并释放
    thread_shutdown_and_free(pool);
 
    return 0;
}

编译命令示例

gcc ./main.c -o ./test.out -lpthread