介绍
C Linux实现线程池技术
作者第一次编写的线程池,推荐使用的时候修改thread_manager函数中部分逻辑
支持库
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
代码
ThreadPool.h
typedef struct MissionAttr
{
void (*mission)(void *);
void *param;
time_t createTime;
char state;
} MissionAttr;
typedef struct MissionNode
{
MissionAttr *mission;
struct MissionNode *next;
struct MissionNode *back;
} MissionNode;
typedef struct MissionList
{
MissionNode *front;
MissionNode *final;
} MissionList;
typedef struct ThreadAttr
{
pthread_t id;
time_t createTime;
char state;
} ThreadAttr;
typedef struct ThreadNode
{
ThreadAttr *threadAttr;
struct ThreadNode *next;
struct ThreadNode *back;
} ThreadNode;
typedef struct ThreadList
{
ThreadNode *front;
ThreadNode *final;
} ThreadList;
typedef struct ThreadPool
{
int controlThreadId;
ThreadList *threadList;
MissionList *missionWaiting;
MissionList *missionList;
pthread_t managerID;
int waitingMissionNumber;
int hanldingMissionNumber;
int busyNumber;
int freeNumber;
int needCloseNumber;
int maxNumber;
int minNumber;
int existNumber;
char close;
pthread_mutex_t managerMutex;
pthread_mutex_t missionMutex;
pthread_mutex_t threadMutex;
pthread_mutex_t workerMutex;
pthread_cond_t workerCond;
} ThreadPool;
typedef struct ThreadArgs
{
ThreadPool *threadPool;
ThreadNode *threadNode;
} ThreadArgs;
// 主要函数
// 创建线程池
ThreadPool *create_thread_pool(int minNumber, int maxNumber);
// 提交任务
void thread_pool_submit(ThreadPool *threadPool, void *func, void *args);
// 启动线程池
int thread_pool_run(ThreadPool *threadPool);
// 关闭并释放线程池
void thread_shutdown_and_free(ThreadPool *threadPool);
// 其他函数
// 管理者线程
void *thread_manager(void *args);
// 工作者线程
void *thread_worker(void *args);
// 创建工作者线程
int creater_thread_worker(ThreadPool *threadPool, int number);
// 获取等待中的任务
MissionNode *get_mission(ThreadPool *threadPool);
// 释放完成的任务
void free_mission(ThreadPool *threadPool, MissionNode *missionNode);
// 基础函数
// 申请内存修复版
void *fixMalloc(size_t size);
ThreadPool.c
#include "./ThreadPool.h"
void *fixMalloc(size_t size)
{
void *tmp = NULL;
while (1)
{
tmp = malloc(size);
if (tmp)
{
break;
}
sleep(1);
}
return tmp;
}
ThreadPool *create_thread_pool(int minNumber, int maxNumber)
{
ThreadPool *threadPool = (ThreadPool *)fixMalloc(sizeof(ThreadPool));
memset(threadPool, 0, sizeof(ThreadPool));
threadPool->minNumber = minNumber;
threadPool->maxNumber = maxNumber;
pthread_mutex_init(&threadPool->missionMutex, NULL);
pthread_mutex_init(&threadPool->threadMutex, NULL);
pthread_mutex_init(&threadPool->workerMutex, NULL);
pthread_cond_init(&threadPool->workerCond, NULL);
pthread_mutex_init(&threadPool->managerMutex, NULL);
threadPool->threadList = (ThreadList *)fixMalloc(sizeof(ThreadList));
threadPool->missionList = (MissionList *)fixMalloc(sizeof(MissionList));
threadPool->missionWaiting = (MissionList *)fixMalloc(sizeof(MissionList));
memset(threadPool->threadList, 0, sizeof(ThreadList));
memset(threadPool->missionList, 0, sizeof(MissionList));
memset(threadPool->missionWaiting, 0, sizeof(ThreadList));
threadPool->controlThreadId = 0;
threadPool->waitingMissionNumber = 0;
threadPool->hanldingMissionNumber = 0;
threadPool->busyNumber = 0;
threadPool->freeNumber = 0;
threadPool->needCloseNumber = 0;
threadPool->existNumber = 0;
return threadPool;
}
void thread_pool_submit(ThreadPool *threadPool, void *func, void *args)
{
if (!threadPool->close && func != NULL)
{
MissionList *missionWaiting = threadPool->missionWaiting;
MissionNode *newMissionNode = (MissionNode *)fixMalloc(sizeof(MissionNode));
newMissionNode->mission = (MissionAttr *)fixMalloc(sizeof(MissionAttr));
newMissionNode->mission->createTime = time(NULL);
newMissionNode->mission->mission = func;
newMissionNode->mission->param = args;
newMissionNode->mission->state = 0;
newMissionNode->next = NULL;
if (missionWaiting->final == NULL)
{
missionWaiting->front = newMissionNode;
}
else
{
newMissionNode->back = missionWaiting->final;
missionWaiting->final->next = newMissionNode;
}
missionWaiting->final = newMissionNode;
threadPool->waitingMissionNumber = threadPool->waitingMissionNumber + 1;
}
}
MissionNode *get_mission(ThreadPool *threadPool)
{
MissionList *missionWaiting = threadPool->missionWaiting;
if (missionWaiting->front == NULL)
{
return NULL;
}
MissionList *missionList = threadPool->missionList;
MissionNode *missionNode = missionWaiting->front;
if (missionWaiting->front == missionWaiting->final)
{
missionWaiting->final = NULL;
}
missionWaiting->front = missionNode->next;
if (missionNode->next)
{
missionNode->next->back = NULL;
}
missionNode->next = NULL;
if (missionList->front == NULL)
{
missionList->front = missionNode;
}
else
{
missionNode->back = missionList->final;
missionList->final->next = missionNode;
}
missionList->final = missionNode;
return missionNode;
}
void free_mission(ThreadPool *threadPool, MissionNode *missionNode)
{
MissionList *missionList = threadPool->missionList;
if (missionNode->back == NULL)
{
missionList->front = missionNode->next;
}
if (missionNode->back)
{
missionNode->back->next = missionNode->next;
}
if (missionNode->next)
{
missionNode->next->back = missionNode->back;
}
else
{
missionList->final = missionNode->back;
}
if (missionNode->mission)
free(missionNode->mission);
if (missionNode)
free(missionNode);
}
void *thread_worker(void *args)
{
ThreadArgs *threadArgs = (ThreadArgs *)args;
ThreadPool *threadPool = threadArgs->threadPool;
ThreadNode *selfNode = threadArgs->threadNode;
threadPool->existNumber = threadPool->existNumber + 1;
threadPool->freeNumber = threadPool->freeNumber + 1;
selfNode->threadAttr->state = 1;
pthread_mutex_lock(&threadPool->workerMutex);
while (threadPool->needCloseNumber <= 0 && !threadPool->close)
{
// 等待任务
pthread_cond_wait(&threadPool->workerCond, &threadPool->workerMutex);
pthread_mutex_unlock(&threadPool->workerMutex);
if (threadPool->waitingMissionNumber > 0)
{
pthread_mutex_lock(&threadPool->missionMutex);
threadPool->freeNumber = threadPool->freeNumber - 1;
selfNode->threadAttr->state = 2;
if (threadPool->close)
break;
MissionNode *missionNode = get_mission(threadPool);
MissionAttr *missionAttr = NULL;
if (missionNode != NULL)
{
missionAttr = missionNode->mission;
}
selfNode->threadAttr->state = 3;
if (missionAttr != NULL)
{
missionAttr->state = 1;
}
threadPool->busyNumber = threadPool->busyNumber + 1;
threadPool->waitingMissionNumber = threadPool->waitingMissionNumber - 1;
threadPool->hanldingMissionNumber = threadPool->hanldingMissionNumber + 1;
if (threadPool->close)
break;
pthread_mutex_unlock(&threadPool->missionMutex);
if (missionAttr != NULL)
if (missionAttr->mission)
{
missionAttr->mission(missionAttr->param);
}
pthread_mutex_lock(&threadPool->missionMutex);
if (threadPool->close)
break;
if (missionAttr != NULL)
if (missionAttr->mission)
{
free_mission(threadPool, missionNode);
}
threadPool->hanldingMissionNumber = threadPool->hanldingMissionNumber - 1;
threadPool->busyNumber = threadPool->busyNumber - 1;
threadPool->freeNumber = threadPool->freeNumber + 1;
selfNode->threadAttr->state = 1;
pthread_mutex_unlock(&threadPool->missionMutex);
}
}
if (threadPool->close)
{
pthread_mutex_unlock(&threadPool->workerMutex);
pthread_exit(NULL);
}
// 线程自杀
pthread_mutex_lock(&threadPool->threadMutex);
ThreadList *threadList = threadPool->threadList;
selfNode->threadAttr->state = -1;
if (threadList->final)
if (threadList->final->threadAttr)
if (threadList->final->threadAttr->id == selfNode->threadAttr->id)
{
threadList->final = selfNode->back;
}
if (threadList->front)
if (threadList->front->threadAttr)
if (threadList->front->threadAttr->id == selfNode->threadAttr->id)
{
threadList->front = selfNode->next;
}
if (selfNode->back)
if (selfNode->back->next)
{
selfNode->back->next = selfNode->next;
}
if (selfNode->next)
if (selfNode->next->back)
{
selfNode->next->back = selfNode->back;
}
free(selfNode->threadAttr);
free(selfNode);
threadPool->existNumber = threadPool->existNumber - 1;
threadPool->freeNumber = threadPool->freeNumber - 1;
threadPool->needCloseNumber = threadPool->needCloseNumber - 1;
pthread_mutex_unlock(&threadPool->threadMutex);
pthread_mutex_unlock(&threadPool->workerMutex);
pthread_exit(NULL);
}
int creater_thread_worker(ThreadPool *threadPool, int number)
{
if ((number + threadPool->existNumber) > threadPool->maxNumber)
{
number = threadPool->maxNumber - threadPool->existNumber;
}
pthread_mutex_lock(&threadPool->threadMutex);
int i, ref;
for (i = 0; i < number; i++)
{
pthread_t threadID = 0;
ThreadNode *newThreadNode = (ThreadNode *)fixMalloc(sizeof(ThreadNode));
ThreadArgs *threadArgs = (ThreadArgs *)fixMalloc(sizeof(ThreadArgs));
// 设置Woker线程参数
threadArgs->threadNode = newThreadNode;
threadArgs->threadPool = threadPool;
// 初始化线程节点
newThreadNode->threadAttr = (ThreadAttr *)fixMalloc(sizeof(ThreadAttr));
newThreadNode->back = NULL;
newThreadNode->next = NULL;
// 设置线程属性
newThreadNode->threadAttr->createTime = time(NULL);
newThreadNode->threadAttr->id = threadID;
newThreadNode->threadAttr->state = 0;
// 插入线程链表
if (threadPool->existNumber == 0)
{
threadPool->threadList->front = newThreadNode;
}
else
{
ThreadNode *threadNode = threadPool->threadList->final;
if (threadNode)
{
threadNode->next = newThreadNode;
}
newThreadNode->back = threadNode;
}
threadPool->threadList->final = newThreadNode;
ref = pthread_create(&threadID, NULL, (void *)thread_worker, (void *)threadArgs);
if (ref != 0)
{
return -2;
}
}
pthread_mutex_unlock(&threadPool->threadMutex);
return 0;
}
void *thread_manager(void *args)
{
// 管理者
ThreadPool *threadPool = (ThreadPool *)args;
int minNumber = threadPool->minNumber;
int waitingMissionNumber = 0;
int existNumber = 0;
int needCloseNumber = 0;
int busyNumber = 0;
int runNumber = 0;
int i;
pthread_t threadID;
creater_thread_worker(threadPool, minNumber);
while (!threadPool->close)
{
pthread_mutex_lock(&threadPool->managerMutex);
waitingMissionNumber = threadPool->waitingMissionNumber;
existNumber = threadPool->existNumber;
needCloseNumber = threadPool->needCloseNumber;
busyNumber = threadPool->busyNumber;
pthread_mutex_unlock(&threadPool->managerMutex);
// 这里的逻辑写的很随意,推荐使用时重写此逻辑
if (waitingMissionNumber > 0)
{
if (waitingMissionNumber > existNumber)
{
creater_thread_worker(threadPool, 2);
}
runNumber = existNumber < waitingMissionNumber ? existNumber : waitingMissionNumber;
for (i = 0; i < runNumber; i++)
{
pthread_cond_signal(&threadPool->workerCond);
}
}
else
{
threadPool->needCloseNumber = existNumber - threadPool->minNumber;
pthread_cond_broadcast(&threadPool->workerCond);
}
usleep(1000 * 100);
}
// 唤醒所有线程
pthread_cond_broadcast(&threadPool->workerCond);
// 释放Worker线程
ThreadList *threadList = threadPool->threadList;
ThreadNode *threadNode = threadList->front;
ThreadNode *tmpThreadNode = NULL;
while (threadNode != NULL)
{
threadID = threadNode->threadAttr->id;
if (threadID)
{
pthread_join(threadID, NULL);
}
free(threadNode->threadAttr);
tmpThreadNode = threadNode->next;
free(threadNode);
threadNode = tmpThreadNode;
}
free(threadList);
MissionList *missionWaiting = threadPool->missionWaiting;
MissionNode *missionWaitingNode = missionWaiting->front;
MissionNode *tmpMissionWaitingNode = NULL;
// 释放等待列队
while (missionWaitingNode != NULL)
{
free(missionWaitingNode->mission);
tmpMissionWaitingNode = missionWaitingNode->next;
free(missionWaitingNode);
missionWaitingNode = tmpMissionWaitingNode;
}
free(missionWaiting);
// 释放任务列队
MissionList *missionList = threadPool->missionList;
MissionNode *missionNode = missionList->front;
MissionNode *tmpMissionNode = NULL;
while (missionNode != NULL)
{
free(missionNode->mission);
tmpMissionNode = missionNode->next;
free(missionNode);
missionNode = tmpMissionNode;
}
free(missionList);
pthread_mutex_destroy(&threadPool->missionMutex);
pthread_mutex_destroy(&threadPool->workerMutex);
pthread_mutex_destroy(&threadPool->threadMutex);
pthread_mutex_destroy(&threadPool->managerMutex);
pthread_cond_destroy(&threadPool->workerCond);
free(threadPool);
pthread_exit(NULL);
}
int thread_pool_run(ThreadPool *threadPool)
{
int ref = pthread_create(&threadPool->managerID, NULL, thread_manager, (void *)threadPool);
return ref;
}
void thread_shutdown_and_free(ThreadPool *threadPool)
{
threadPool->close = 1;
pthread_join(threadPool->managerID, NULL);
}
示例
#include <stdio.h>
#include "./ThreadPool.c"
struct testData
{
int a;
};
ThreadPool *pool = NULL;
int times = 0;
void test(void *a)
{
// 模拟工作
// 处理参数
int *b = (int *)a;
times++;
sleep(*b);
}
int main()
{
// 初始化线程池
pool = create_thread_pool(1, 50);
// 启动线程池
thread_pool_run(pool);
int i, a = 1;
// 提交任务
for (i = 0; i < 110; i++)
{
// 指针传参(无参数填NULL)
thread_pool_submit(pool, &test, &a);
}
i = 0;
while (i < 52)
{
printf("当前任务数量:%d\n", pool->waitingMissionNumber);
printf("当前线程数量:%d\n", pool->existNumber);
printf("当前执行线程:%d\n", pool->busyNumber);
printf("当前空闲线程:%d\n", pool->freeNumber);
printf("当前执行任务:%d\n", pool->hanldingMissionNumber);
printf("当前自杀线程:%d\n", pool->needCloseNumber);
printf("函数执行次数:%d\n", times);
printf("================================\n");
sleep(1);
int rands = (rand() / 1000000000.0) * 20 + 10;
printf("当前随机数:%d\n", rands);
// 模拟不定时提交
if (rands > 40)
for (i = 0; i < rands; i++)
{
thread_pool_submit(pool, &test, &a);
}
}
// 关机并释放
thread_shutdown_and_free(pool);
return 0;
}
编译命令示例
gcc ./main.c -o ./test.out -lpthread