目录
- 为什么使用多级时间轮的方式
- 单级时间轮
- 多级时间轮
- 头文件
- 实现文件
为什么使用多级时间轮的方式
有序队列实现定时器
- 添加/删除任务: 遍历每一个节点, 找到相应的位置插入, 因此时间复杂度为O(n)
- 处理到期任务: 取出最小定时任务为首节点, 因此时间复杂度为O(1)
红黑树实现定时器
有序队列的性能瓶颈在于插入任务和删除任务(查找排序),而树形结构能对其进行优化
- 添加/删除/查找任务: 红黑树能将排序的的时间复杂度降到O(log2N)
- 处理到期任务: 红黑树查找到最后过期的任务节点为最左侧节点,因此时间复杂度为O(log2N)
最小堆实现定时器
- 添加/查找任务: 时间复杂度为O(log2N)
- 删除任务: 时间复杂度为O(n), 可通过辅助数据结构(map)来加快删除操作
- 处理到期任务: 最小节点为根节点, 时间复杂度为O(1)
跳表实现定时器
- 添加/删除/查找任务: 时间复杂度为O(log2N)
- 处理到期任务: 最小节点为最左侧节点, 时间复杂度为O(1), 但空间复杂度比较高, 为O(1.5n)
以上数据结构之所以没法做到O(1)的复杂度,究其原因是所有定时器节点挂在一条链表(或一棵树)上。 总有那么一点缺陷但是在定时器要求精度高的场景下是不允许的
就比如0点1秒钟需要执行某个任务,但是因为定时任务过多,需要一个一个遍历对比时间,导致任务延期了
最好的解决办法就是使用多级时间轮的方式,linux内核使用的定时器就是这个算法, 讲解多级时间轮前我们需要先了解单时间轮如何运作的
单级时间轮
单时间轮只有一个由bucket串起来的轮子,简单来说就是一个数组而已,每一个index对应一秒或者毫秒
假设从当前时刻0s开始计时,1s时定时器节点挂在bucket[1]下,2s时到期的定时器节点挂在bucket[2]下……
由于bucket是一个数组,能直接根据下标定位到具体定时器节点链,因此添加删除节点、定时器到期执行的时间复杂度均为O(1)。(有Hash内味了)
但使用这个定时器所受的限制也显而易见:待添加的timer到期时间必须在8s以内。这显然不能满足实际需求。当然要扩展也很容易,直接增加bucket的个数就可以了。
如果按照正常需要最大的到期时间范围要达到 2^32个(4294967296),如果采用上面这样的单时间轮,我们就需要4294967296个 bucket这会带来巨大的内存消耗,显然是需要优化改进的。
改进的单时间轮其实是一个对时间和空间折中的思路,即不会像单时间轮那样有O(1)的时间复杂度,但也不会像单时间轮那样对bucket个数有巨大的需求。其原理也很简单,就是重复转圈,假设一圈60秒,那么有一个任务是在2分钟后运行那么转2圈就行,但是也会有问题
因为一圈就60个插槽,那么如果有100万个任务呢,那么就导致每一个插槽都关联一个很长的链表,每遍历一个插槽都要遍历当前插槽内的所有任务判断是否到期,那么这就蜕化为使用链表方式实现定时器了
rotation表示节点在时间轮转了几圈后才到期,当前时间指针指向某个bucket时,不能像简单时间轮那样直接对bucket下的所有节点执行超时动作,而是需要对链表中节点遍历一遍,判断轮子转动的次数是否等于节点中的rotation值,当两者相等时,方可执行超时操作。
上面所述的时间轮都是单枪匹马战斗的,因此很难在时间和空间上都达到理想效果。Linux所实现的多时间轮算法,借鉴了日常生活中水表的度量方法,通过低刻度走得快的轮子带动高一级刻度轮子走动的方法,达到了仅使用较少刻度即可表示很大范围度量值的效果。
多级时间轮
多级时间轮的算法很简单就是创建多个单时间轮进行联动,就如上图3个时间轮,时分秒, 1天24小时那么就定义一个长度为24的数组,一个小数60分钟那么就定义一个长度为60的数组,秒也一样, 那么我们来说说怎么查找任务和执行任务以及添加和删除任务的,修改任务
添加任务
- 先找到对应的小时时间轮,然后依据任务的小时时间将任务插入到对应的插槽里
- 如果插槽里已有数据那么就使用链表的方式给链接起来
执行任务
- 规则是秒走一圈,分钟走一格,分钟走一圈,时钟走一格
- 系统运行时先计算初始时间,秒当前因该在第几个插槽,分钟当前在第几个插槽…,然后从小时时间轮里开始看看当前插槽内有没有需要执行的定时任务
- 如果小时时间轮插槽内有任务那么就将任务下发到分钟,如果分钟有任务那么就下发到秒钟
- 当秒时间轮执行到有任务的插槽里,那么就创建一个线程执行任务即可
删除任务
- 先在小时时间钟里遍历找到对应的任务,如果没有那么到分钟时间钟里找…
- 找到任务后删除任务即可
修改和查询都和删除任务都是一个道理,先找到任务然后在进行操作
锁的设计
执行,添加,删除,修改任务的时候都需要锁住任务本身任务移交需要锁住目标的插槽
描述全套过程:
此刻当前时间是02:59:01
我有一个任务是03:00:02
(上图蓝色方块),然后我们只需要将任务存放在小时数组的第[3]个插槽里即可,之后进行初始化各个时间轮的指针小时[2],分钟[59],秒[1]
,然后秒每走一圈,分钟走一格,分钟走一圈时钟走一格,此刻时钟[3]里有任务,那么下发到分钟里,分钟[0]检测到有任务,那么下发到秒钟[2]里,当秒走到[2]位置的时候发现有任务,那么就创建一个线程进行执行任务
下面我们就设计一个多级时间轮的定时器年,月,日,时,分,秒,毫秒
(7层定时器)
注意: 不可能每毫秒都处理,不然压力太大了,我们允许定时器误差10毫秒左右,也就是10毫秒走一格,那么1000/10=100 ,也就是毫秒时间轮长度为100
还有就是定时器都要配套一种时间解析语言: 常用的有cron, 但是我使用的是我自己研发的c语言-自研定时器计划任务语法
头文件
#ifndef STUDY_TIMER_H
#define STUDY_TIMER_H
#include <pthread.h>
#include <unistd.h>
#include "../util/time_util.h"
#include "../structure/char_kv_list.h"
#include "thread_pool.h"
#include "../util/str_util.h"
#include "../util/timer_resolver.h"
#include <stdio.h>
#include <stdlib.h>
typedef struct timer_node {
struct timer_node *next; //指向下一个节点
long expires; //到期时间
TimerResolver *timerResolver; //任务计划解析后对象
char *strResolver; //保留任务计划字符串,用于后期重新解析
void * (*func)(void *); //回调函数
void *args; //回调函数参数
char *timer_id; //定时器名称
int timer_status; //定时器状态(0:停止,1:启动,2:待删除,3确认删除,4重启任务)
int timer_dynamic; //0:等待执行,1:执行中 (用于展示任务的状况)
void * returnValue; //任务的返回值
} TimerNode;
//时间轮
typedef struct timer_wheel {
int slot_num; //时间轮槽数量
int type; //时间轮类型(0毫秒,1秒,2分,3时,4天,5月,6年)
int slot_index; //时间轮槽索引
CharKvList *slot_array; //时间轮插槽数组
pthread_mutex_t *locks; //插槽锁
} TimerWheel;
//7层时间轮
typedef struct timer {
TimerWheel *wheel_millisecond; //1级时间轮 (毫秒) 1s = 1000ms=1000个槽,按照10毫秒一个槽,那么1s就有100个槽
TimerWheel *wheel_second; //2级时间轮 (秒) 1m = 60s=60个槽,按照1秒一个槽,那么1m就有60个槽
TimerWheel *wheel_minute; //3级时间轮 (分钟) 1h = 60m=60个槽,按照1分钟一个槽,那么1h就有60个槽
TimerWheel *wheel_hour; //4级时间轮 (小时) 1d = 24h=24个槽,按照1小时一个槽,那么1d就有24个槽
TimerWheel *wheel_day; //5级时间轮 (天) 1天=30个槽,按照1天一个槽,那么1月就有30个槽
TimerWheel *wheel_month; //6级时间轮 (月) 1月=12个槽,按照1月一个槽,那么1年就有12个槽
TimerWheel *wheel_year; //7级时间轮 (年) 年=10000个槽,按照1年一个槽,那么10000年就有10000个槽
pthread_t *threadIDs; // 工作的线程IDs
int busyNum; // 忙的任务数量
pthread_mutex_t thread_busyNum; //忙的任务数量线程ID
int total; // 总的任务数量
pthread_mutex_t thread_total; //总的任务数量线程ID
ThreadPool *pool; // 线程池
CharKvList *tasks; //全部的任务
} Timer;
typedef struct par_timer_Wheel{
TimerWheel *wheel;
Timer *timer;
} ParTimerWheel;
typedef struct par_timer_Node{
TimerNode *timerNode;
Timer *timer;
} ParTimerNode;
#define TIMER_STATUS_STOP 0 //未启动
#define TIMER_STATUS_START 1 //已启动
#define TIMER_STATUS_DEL 2 //待删除 (不会删除任务,只是将任务状态改为待删除)
#define TIMER_STATUS_NOTARIZE_DEL 3 //确认删除(会删除任务,通过外部因素来改变任务状态进行删除)
#define TIMER_STATUS_RESTART 4 //重启任务(重置执行计划,然后将任务状态改为已启动,那么任务就会重新执行)
#define TIMER_DYNAMIC_AWAIT 0 //等待执行
#define TIMER_DYNAMIC_RUN 1 //执行中
//毫秒级时间轮
#define WHEEL_MILLISECOND_SLOT_NUM 100 //1级时间轮 (毫秒) 1s = 1000ms=1000个槽,按照10毫秒一个槽,那么1s就有100个槽
//秒级时间轮
#define WHEEL_SECOND_SLOT_NUM 60 //2级时间轮 (秒) 1m = 60s=60个槽,按照1秒一个槽,那么1m就有60个槽
//分钟级时间轮
#define WHEEL_MINUTE_SLOT_NUM 60 //3级时间轮 (分钟) 1h = 60m=60个槽,按照1分钟一个槽,那么1h就有60个槽
//小时级时间轮
#define WHEEL_HOUR_SLOT_NUM 24 //4级时间轮 (小时) 1d = 24h=24个槽,按照1小时一个槽,那么1d就有24个槽
//天级时间轮
#define WHEEL_DAY_SLOT_NUM 30 //5级时间轮 (天) 1天=30个槽,按照1天一个槽,那么1月就有30个槽
//月级时间轮
#define WHEEL_MONTH_SLOT_NUM 12 //6级时间轮 (月) 1月=12个槽,按照1月一个槽,那么1年就有12个槽
//年级时间轮
#define WHEEL_YEAR_SLOT_NUM 10000 //7级时间轮 (年) 年=10000个槽,按照1年一个槽,那么10000年就有10000个槽
Timer *create_timer(int maxSask);
void add_timer_sask(Timer *timer,char *timer_resolver,char *timer_name,void*(*func)(void *),void *args);
int getBusyNum( Timer *timer);
int getTotal( Timer *timer);
void updateTaskStatusDEL( Timer *timer,char *timer_id);
void updateTaskStatusRESTART( Timer *timer,char *timer_id);
void updateTaskStatusSTART( Timer *timer,char *timer_id);
void updateTaskTimerResolver( Timer *timer,char *timer_id,char * strResolver);
void updateTaskStatusSTOP( Timer *timer,char *timer_id);
#endif //STUDY_TIMER_H
实现文件
#include "timer.h"
static void *manager_millisecond(void *);
static void *manager_second_minute_hour_day_month_year(void *);
static void *sask_manager( void *);
static void *sask_transfer(TimerWheel *,TimerWheel *);
static void timer_node_sask_print(TimerNode *timerNode){
char *string = format_time(timerNode->expires, "%Y-%m-%d %H:%M:%S");
printf("%s任务,预计执行时间: %ld(时间戳)---%s(格式化时间)\n",timerNode->timer_id,timerNode->expires,string);
}
//创建参数
ParTimerWheel *create_par_timer_wheel(TimerWheel *timer_wheel, Timer *timer) {
ParTimerWheel *wheel = (ParTimerWheel *) malloc(sizeof(ParTimerWheel));
wheel->wheel = timer_wheel;
wheel->timer = timer;
return wheel;
}
//创建参数
ParTimerNode *create_par_timer_Node(Timer *timer, TimerNode *timerNode) {
ParTimerNode *parTimerTimerNode = malloc(sizeof(ParTimerNode));
parTimerTimerNode->timer = timer;
parTimerTimerNode->timerNode = timerNode;
return parTimerTimerNode;
}
//创建节点
TimerNode *create_timer_node(char *timerResolver, char *timer_id, void*(*func)(void *), void *args) {
TimerNode *timerNode = (TimerNode *) malloc(sizeof(TimerNode));
//解析定时计划
TimerResolver *timerResolver1 = create_timer_resolver(timerResolver);
//获取定时计划执行时间(10位时间戳(秒级))
long expires = resolver(timerResolver1);
//获取计划类型
int type = get_plan_type(timerResolver1);
int timer_status = type == 0 ? TIMER_STATUS_STOP : TIMER_STATUS_START;//如果是0那么是停止状态
timerNode->func = func;
timerNode->args = args;
timerNode->timerResolver = timerResolver1;
timerNode->expires = expires;
timerNode->timer_id = timer_id;
timerNode->timer_status = timer_status;
timerNode->timer_dynamic = TIMER_DYNAMIC_AWAIT; //默认等待执行
timerNode->next = NULL;
timerNode->strResolver = timerResolver;//保存原始任务解析字符串
timer_node_sask_print(timerNode);
return timerNode;
}
//创建时间轮
TimerWheel *create_timer_wheel(int slot_num, int type) {
TimerWheel *timerWheel = (TimerWheel *) malloc(sizeof(TimerWheel));
timerWheel->slot_num = slot_num;
timerWheel->slot_index = 0;
timerWheel->type = type;
timerWheel->locks = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t) * timerWheel->slot_num);
//创建时间轮槽锁
for (int i = 0; i < timerWheel->slot_num; ++i) {
pthread_mutex_init(&timerWheel->locks[i], NULL);
}
timerWheel->slot_array = createCharKvList(slot_num);
//初始化所有槽为空
for (int i = 0; i < timerWheel->slot_num; ++i) {
addCharKvList(timerWheel->slot_array, NULL, NULL);
}
return timerWheel;
}
//创建定时器(最大可执行任务数量为最大任务数*10超过了那么就可能导致崩溃,而最大任务数为代表同一时间能执行的任务)
Timer *create_timer(int maxSask) {
Timer *timer = (Timer *) malloc(sizeof(Timer));
timer->wheel_millisecond = create_timer_wheel(WHEEL_MILLISECOND_SLOT_NUM, 0);
timer->wheel_second = create_timer_wheel(WHEEL_SECOND_SLOT_NUM, 1);
timer->wheel_minute = create_timer_wheel(WHEEL_MINUTE_SLOT_NUM, 2);
timer->wheel_hour = create_timer_wheel(WHEEL_HOUR_SLOT_NUM, 3);
timer->wheel_day = create_timer_wheel(WHEEL_DAY_SLOT_NUM, 4);
timer->wheel_month = create_timer_wheel(WHEEL_MONTH_SLOT_NUM, 5);
timer->wheel_year = create_timer_wheel(WHEEL_YEAR_SLOT_NUM, 6);
timer->tasks= createCharKvList(maxSask);
timer->busyNum = 0;
timer->thread_busyNum = (pthread_mutex_t ) malloc(sizeof(pthread_mutex_t) );
timer->total = 0;
timer->thread_total = (pthread_mutex_t ) malloc(sizeof(pthread_mutex_t));
//创建线程池(最小线程数为160)
int min = 10;
timer->pool = threadPoolCreate(min, min + maxSask, maxSask * 10);
timer->threadIDs = (pthread_t *) malloc(sizeof(pthread_t) * 3);
pthread_create(&timer->threadIDs[0], NULL, manager_millisecond, timer);
pthread_create(&timer->threadIDs[1], NULL, manager_second_minute_hour_day_month_year, timer);
pthread_create(&timer->threadIDs[2], NULL, sask_manager, timer);
return timer;
}
void addBusyNum( Timer *timer){
//加锁
pthread_mutex_lock(&timer->thread_busyNum);
timer->busyNum++;
//解锁
pthread_mutex_unlock(&timer->thread_busyNum);
}
void subBusyNum( Timer *timer){
//加锁
pthread_mutex_lock(&timer->thread_busyNum);
timer->busyNum--;
//解锁
pthread_mutex_unlock(&timer->thread_busyNum);
}
void addTotal( Timer *timer){
//加锁
pthread_mutex_lock(&timer->thread_total);
timer->total++;
//解锁
pthread_mutex_unlock(&timer->thread_total);
}
void subTotal( Timer *timer){
//加锁
pthread_mutex_lock(&timer->thread_total);
timer->total--;
//解锁
pthread_mutex_unlock(&timer->thread_total);
}
//获取工作中的任务数量
int getBusyNum( Timer *timer){
return timer->busyNum;
}
//总任务数量
int getTotal( Timer *timer){
return timer->total;
}
//修改任务状态,为确认删除
void updateTaskStatusDEL( Timer *timer,char *timer_id){
CharKvListData *pData = getCharKvListByKey(timer->tasks, timer_id);
if(pData->data!=NULL){
TimerNode *timerNode = (TimerNode *) pData->data;
timerNode->timer_status = TIMER_STATUS_NOTARIZE_DEL;//修改为确认删除
}
}
//重启任务(前提任务没有被删除)
void updateTaskStatusRESTART( Timer *timer,char *timer_id){
CharKvListData *pData = getCharKvListByKey(timer->tasks, timer_id);
if(pData->data!=NULL){
TimerNode *timerNode = (TimerNode *) pData->data;
if(timerNode->timer_status!=TIMER_STATUS_NOTARIZE_DEL){
timerNode->timer_status = TIMER_STATUS_RESTART;//修改为重启任务
}
}
}
//修改任务状态为启动(前提任务必须是暂停状态)
void updateTaskStatusSTART( Timer *timer,char *timer_id){
CharKvListData *pData = getCharKvListByKey(timer->tasks, timer_id);
if(pData->data!=NULL){
TimerNode *timerNode = (TimerNode *) pData->data;
if(timerNode->timer_status==TIMER_STATUS_STOP){
timerNode->timer_status = TIMER_STATUS_START;//修改为启动
}
}
}
//修改任务状态为停止(前提任务不能是确认删除)
void updateTaskStatusSTOP( Timer *timer,char *timer_id){
CharKvListData *pData = getCharKvListByKey(timer->tasks, timer_id);
if(pData->data!=NULL){
TimerNode *timerNode = (TimerNode *) pData->data;
if(timerNode->timer_status!=TIMER_STATUS_NOTARIZE_DEL){
timerNode->timer_status = TIMER_STATUS_STOP;//修改为停止
}
}
}
//修改指定任务的执行计划并重启任务
void updateTaskTimerResolver( Timer *timer,char *timer_id,char * strResolver){
CharKvListData *pData = getCharKvListByKey(timer->tasks, timer_id);
if(pData->data!=NULL){
TimerNode *timerNode = (TimerNode *) pData->data;
timerNode->strResolver = strResolver;
timerNode->timer_status = TIMER_STATUS_RESTART;//修改为重启任务
}
}
//给指定时间轮添加任务
void add_timer_wheel_node(TimerWheel *wheel, TimerNode *timerNode) {
if (timerNode == NULL) {
return;
}
//获取时间轮槽索引
int slot_index = 0;
switch (wheel->type) {
case 0:
slot_index = timerNode->expires % WHEEL_MILLISECOND_SLOT_NUM;
break;
case 1:
slot_index = get_second(timerNode->expires);
break;
case 2:
slot_index = get_minute(timerNode->expires);
break;
case 3:
slot_index = get_hour(timerNode->expires);
break;
case 4:
slot_index = get_day(timerNode->expires);
break;
case 5:
slot_index = get_month(timerNode->expires);
break;
case 6:
slot_index = get_year(timerNode->expires);
break;
default:
printf("不存在的时间轮类型");
exit(1);
}
//给目标时间轮槽位加锁
pthread_mutex_lock(&wheel->locks[slot_index]);
//添加任务到目标时间轮槽位
CharKvListData *pData = getCharKvList(wheel->slot_array, slot_index);
if (pData->data == NULL) {//如果槽位是空的那么创建一个链表
pData->data = timerNode;
} else {//这个槽位已经有任务了,我们需要把任务添加到链表中最后
TimerNode *head = (TimerNode *) pData->data;
TimerNode *pNode = (TimerNode *) pData->data;
while (pNode != NULL) {
head = pNode;
if(str_equals(pNode->timer_id,timerNode->timer_id)){
printf("%s任务名称重复了,添加任务失败",pNode->timer_id);
return;
}
pNode = pNode->next;
}
head->next = timerNode;
}
//解锁
pthread_mutex_unlock(&wheel->locks[slot_index]);
}
//任务添加到时间轮
void add_timer_wheel(Timer *timer, TimerNode *timerNode) {
//获取执行时间的年月日时分秒
int sask_second = get_second(timerNode->expires);
int sask_minute = get_minute(timerNode->expires);
int sask_hour = get_hour(timerNode->expires);
int sask_day = get_day(timerNode->expires);
int sask_month = get_month(timerNode->expires);
int sask_year = get_year(timerNode->expires);
int current_second = get_current_second();
int current_minute = get_current_minute();
int current_hour = get_current_hour();
int current_day = get_current_day();
int current_month = get_current_month();
int current_year = get_current_year();
//将任务添加到对应的时间轮中
// 1.如果当前年大于等于任务的年
if (current_year >= sask_year) {
// 2.如果当前月大于等于任务的月
if (current_month >= sask_month) {
// 3.如果当前日大于等于任务的日
if (current_day >= sask_day) {
// 4.如果当前时大于等于任务的时
if (current_hour >= sask_hour) {
// 5.如果当前分大于等于任务的分
if (current_minute >= sask_minute) {
// 6.如果当前秒大于等于任务的秒
if (current_second >= sask_second) {
add_timer_wheel_node(timer->wheel_millisecond, timerNode);
} else {
//添加到秒时间轮
add_timer_wheel_node(timer->wheel_second, timerNode);
}
} else {
//添加到分时间轮
add_timer_wheel_node(timer->wheel_minute, timerNode);
}
} else {
//添加到时时间轮
add_timer_wheel_node(timer->wheel_hour, timerNode);
}
} else {
//添加到天时间轮
add_timer_wheel_node(timer->wheel_hour, timerNode);
}
} else {
//添加到月时间轮
add_timer_wheel_node(timer->wheel_month, timerNode);
}
} else {
//添加到年时间轮
add_timer_wheel_node(timer->wheel_year, timerNode);
}
}
void add_timer_sask(Timer *timer, char *timer_resolver, char *timer_name, void*(*func)(void *), void *args) {
//创建任务节点
TimerNode *timerNode = create_timer_node(timer_resolver, timer_name, func, args);
add_timer_wheel(timer, timerNode);//任务添加到时间轮
addTotal(timer);//添加任务总数
addCharKvList(timer->tasks, timerNode->timer_id, timerNode);//全部任务汇总
}
//根据当前的任务获取下次执行的任务
void next_timer_sask(TimerNode *timerNode) {
//获取定时计划执行时间(10位时间戳(秒级))
long expires = resolver(timerNode->timerResolver);
if (expires == 0) {
timerNode->timer_status = TIMER_STATUS_DEL;//任务已经执行完毕
}
timerNode->expires = expires;
timer_node_sask_print(timerNode);
}
//任务执行
void thread_sask_run(void *args) {
ParTimerNode *node = (ParTimerNode *) args;
Timer *pTimer = node->timer;
TimerNode *pNode = node->timerNode;
addBusyNum(pTimer); //添加忙碌线程数
pNode->timer_dynamic=TIMER_DYNAMIC_RUN;
void *pVoid = pNode->func(pNode->args); //执行任务
pNode->returnValue=pVoid;
pNode->timer_dynamic=TIMER_DYNAMIC_AWAIT;
subBusyNum(pTimer);//减少忙碌线程数
free(node);
}
static void manager_millisecond_sask(ParTimerWheel *parTimerTimerWheel) {
Timer *pTimer = parTimerTimerWheel->timer;
TimerWheel *pWheel = parTimerTimerWheel->wheel;
//获取当前时间
long now = get_current_timestamp();
//获取当前插槽
int slot_index = pWheel->slot_index;
//获取当前插槽的锁
pthread_mutex_t *lock = &pWheel->locks[slot_index];
//加锁
pthread_mutex_lock(lock);
//获取当前插槽的链表
CharKvList *list = pWheel->slot_array;
//获取当前插槽的节点
CharKvListData *charKvListData = getCharKvList(list, slot_index);
if (charKvListData->data != NULL) {
//获取当前插槽的链表,如果当前插槽的节点不为空,那么就执行当前插槽的所有任务
TimerNode *parent = charKvListData->data;
TimerNode *node = charKvListData->data;
while (node != NULL) {
parent=node;
//如果当前节点的过期时间小于当前时间,并且状态是可执行那么就执行回调函数
if (node->expires <= now && node->timer_status == TIMER_STATUS_START) {
ParTimerNode *pNode = create_par_timer_Node(pTimer, node);
//执行回调函数
threadPoolAdd(pTimer->pool,thread_sask_run, pNode);//添加任务到线程池
//从新计算过期时间,并且判断任务是否已经执行完毕,如果任务已经执行完毕,那么就不需要放入时间轮了
next_timer_sask(node);
if (node->timer_status != TIMER_STATUS_DEL) {
//添加任务到对应的时间轮里
add_timer_wheel(pTimer, node);
} else {
node->timer_status=TIMER_STATUS_DEL; //设置任务状态为带删除
//移除链表中的任务
if(node->next==NULL) {
parent->next = NULL;
break; //跳出循环
}else{
parent->next=node->next;
node = parent->next;
continue;
}
}
}
node = node->next;
}
//将当前插槽的节点设置为NULL
charKvListData->data = NULL;
charKvListData->key = NULL;
}
//解锁
pthread_mutex_unlock(lock);
}
//定时器管理-(毫秒)(如果插槽里有任务那么就会执行插槽内的所有任务,而任务最多延迟1秒执行)
static void *manager_millisecond(void *arg) {
Timer *timer = (Timer *) arg;
TimerWheel *pWheel = timer->wheel_millisecond;
//初始化插槽获取当前时间的毫秒数
long millisecond = get_current_millisecond() / 10;
pWheel->slot_index = millisecond;//设置当前插槽的索引
ParTimerWheel *pTimerWheel = create_par_timer_wheel(pWheel, timer);
//反复循环
while (1) {
//执行时间轮的任务
manager_millisecond_sask(pTimerWheel);
//休眠10毫秒
usleep(10000);
//插槽索引+1
pWheel->slot_index = pWheel->slot_index + 1;
//如果插槽索引大于插槽数量那么就重置插槽索引
if (pWheel->slot_index >= pWheel->slot_num) {
pWheel->slot_index = 0;
}
}
}
static void *manager_second_minute_hour_day_month_year(void *arg) {
Timer *timer = (Timer *) arg;
TimerWheel *pWheel_millisecond = timer->wheel_millisecond;
TimerWheel *pWheel_second = timer->wheel_second;
TimerWheel *pWheel_minute = timer->wheel_minute;
TimerWheel *pWheel_hour = timer->wheel_hour;
TimerWheel *pWheel_day = timer->wheel_day;
TimerWheel *pWheel_month = timer->wheel_month;
TimerWheel *pWheel_year = timer->wheel_year;
//初始化插槽获取当前时间的秒数
pWheel_second->slot_index = get_current_second();//设置当前插槽的索引
pWheel_minute->slot_index = get_current_minute();//设置当前插槽的索引
pWheel_hour->slot_index = get_current_hour();//设置当前插槽的索引
pWheel_day->slot_index = get_current_day();//设置当前插槽的索引
pWheel_month->slot_index = get_current_month();//设置当前插槽的索引
pWheel_year->slot_index = get_current_year();//设置当前插槽的索引
while (1) {
int second = get_current_second();
int minute = get_current_minute();
int hour = get_current_hour();
int day = get_current_day();
int month = get_current_month();
int year = get_current_year();
if (pWheel_year->slot_index == year
&& pWheel_month->slot_index == month
&& pWheel_day->slot_index == day
&& pWheel_hour->slot_index == hour
&& pWheel_minute->slot_index == minute
&& pWheel_second->slot_index == second) {
usleep(500000);// 休眠500毫秒
continue;
}
int cu_pWheel_second = pWheel_second->slot_index;
int cu_pWheel_minute = pWheel_minute->slot_index;
int cu_pWheel_hour = pWheel_hour->slot_index;
int cu_pWheel_day = pWheel_day->slot_index;
int cu_pWheel_month = pWheel_month->slot_index;
int cu_pWheel_year = pWheel_year->slot_index;
pWheel_second->slot_index = second;
pWheel_minute->slot_index = minute;
pWheel_hour->slot_index = hour;
pWheel_day->slot_index = day;
pWheel_month->slot_index = month;
pWheel_year->slot_index = year;
if(pWheel_second->slot_index!=cu_pWheel_second){
sask_transfer(pWheel_second,pWheel_millisecond);
// printf("second===second:%d,minute:%d,hour:%d,day:%d,month:%d,year:%d\n", second, minute, hour, day, month, year);
}
if(pWheel_minute->slot_index!=cu_pWheel_minute){
sask_transfer(pWheel_minute,pWheel_second);
}
if(pWheel_hour->slot_index!=cu_pWheel_hour){
sask_transfer(pWheel_hour,pWheel_minute);
}
if(pWheel_day->slot_index!=cu_pWheel_day){
sask_transfer(pWheel_day,pWheel_hour);
}
if(pWheel_month->slot_index!=cu_pWheel_month){
sask_transfer(pWheel_month,pWheel_day);
}
if(pWheel_year->slot_index!=cu_pWheel_year){
sask_transfer(pWheel_year,pWheel_month);
}
}
}
static void *sask_transfer(TimerWheel *pWheel,TimerWheel *nextWheel) {
//判断当前插槽是否有任务,如果有任务那么就下发任务
CharKvListData *pData = getCharKvList(pWheel->slot_array, pWheel->slot_index);
if (pData->data != NULL) {
//将任务下发
add_timer_wheel_node(nextWheel, pData->data);
//清除当前插槽的数据
pData->data = NULL;
}
}
//任务管理
static void *sask_manager( void *args){
Timer *timer=(Timer *)args;
while (1){
//迭代所有的任务
CharKvListIterator *pIterator = createCharKvListIterator(timer->tasks);
while (hasNextCharKvList(pIterator)) {
CharKvListData *pData = nextCharKvList(pIterator);
TimerNode *pNode = (TimerNode *) pData->data;
if (pNode->timer_status == TIMER_STATUS_NOTARIZE_DEL) {//如果任务状态为删除状态那么就删除任务
//删除节点
removeCharKvListByKey(timer->tasks, pNode->timer_id);
free( pNode->timerResolver);//释放定时计划对象
//释放任务节点
free(pNode);
subTotal(timer);//减少总线任务数
printf("=================删除任务:%s\n", pNode->timer_id);
}else if(pNode->timer_status==TIMER_STATUS_RESTART){//重启任务
printf("=================%s:,任务重新加载\n", pNode->timer_id);
//重新解析定时计划
TimerResolver *timerResolver1 = create_timer_resolver( pNode->strResolver);
long expires = resolver(timerResolver1);
pNode->timerResolver = timerResolver1;
pNode->expires = expires;
pNode->timer_status=TIMER_STATUS_START;//设置任务状态为启动
timer_node_sask_print(pNode);//打印任务信息
//将任务重新加入到对应的时间轮中
add_timer_wheel(timer, pNode);
}
}
sleep(1);//休眠1秒检测一次
}
}