目录
- 协程解决了什么问题
- 简介
- 对协程的抽象
- 如何保存上下文信息
- 协程的调度
- 总结
协程解决了什么问题
我们先从一次网络IO请求过程中的read操作为例,请求数据会先拷贝到系统内核空间中,再从操作系统的内核空间拷贝到应用程序的用户空间中。从内核空间将数据拷贝到用户空间过程中,会经历两个阶段:
- 等待数据准备
- 拷贝数据
因为有这两个阶段,所以就有了各种网络IO的模型:
同步编程:应用程序等待IO结果(比如等待打开一个大的文件,或者等待远端服务器的响应),阻塞当前线程。
- 优点:逻辑简单。
- 缺点:效率太低,其他与IO无关的业务也要等待IO的响应。
异步多线程/进程:将IO操作频繁的逻辑、或者单纯的IO操作独立到一/多个线程中,业务线程与IO线程间靠通信/全局变量来共享数据。
- 优点:充分利用CPU资源,防止阻塞资源。
- 缺点:线程切换代价相对较高,异步逻辑代码复杂。
异步消息+回调函数:设计一个消息循环处理器,接收外部消息(包括系统通知和网络报文等),收到消息时调用注册的回调函数。
- 优点:充分利用CPU资源,防止阻塞资源。
- 缺点:代码逻辑复杂。
而协程,就是用同步的语义去解决异步问题,即业务逻辑看起来是同步的,但实际上并不阻塞当前线程(一般是靠事件循环处理来分发消息)。所以协程实际上是在单线程的环境下实现的应用程序级别的并发,就是把本来由操作系统控制的切换+保存状态在应用程序里面实现了。
由于协程在应用程序级别来处理任务,所以协程更像是一个函数,只是比普通的函数多了两个动作:yield()和resume(),即让出和恢复。让出的时候我们需要将寄存器的协程上下文保存起来,恢复的时候再将上下文重新压入寄存器,继续执行。
简介
Libtask 是一个简单的协程库,它展示了最简单的一种协程实现方式。操作系统只能看见一个内核线程,无法感知到客户端协程的存在。
Libtask中的协程是协作式的,也就是说,使用的不是时间片轮转算法,调度器根据先来先服务的策略来执行就绪队列中的协程,只有当每个协程主动退出时调度器才会把CPU分配给下一个协程。
对协程的抽象
在libtask中,协程被抽象成一个Task结构体,结构体中的字段用于描述协程的相关信息:
// 一个Task可以看成是一个需要异步执行的任务,coroutine的抽象描述
struct Task
{
char name[];
char state[];
// 前后指针
Task *next;
Task *prev;
Task *allnext;
Task *allprev;
// 执行上下文
Context context;
// 睡眠时间
uvlong alarmtime;
uint id;
// 协程栈指针
uchar *stk;
// 协程栈大小
uint stksize;
// 协程是否退出了
int exiting;
// 在在alltask的中的索引下标
int alltaskslot;
// 是否是系统协程
int system;
// 是否在就绪状态
int ready;
// Task需要执行的函数
void (*startfn)(void*);
// startfn的参数
void *startarg;
// 自定义数据
void *udata;
};
创建协程
int taskcreate(void (*fn)(void*), void *arg, uint stack)
{
int id;
Task *t;
// 分配task和stack的空间
t = taskalloc(fn, arg, stack);
// 协程的数量+
taskcount++;
id = t->id;
if(nalltask% == 0){
alltask = realloc(alltask, (nalltask+)*sizeof(alltask[0]));
if(alltask == nil){
fprint(, "out of memory\n");
abort();
}
}
// 记录位置
t->alltaskslot = nalltask;
// 保存到alltask中
alltask[nalltask++] = t;
// 修改状态为就绪,可以被调度,并且加入到就绪队列
taskready(t);
return id;
}
我们可以使用taskcreate函数来创建协程,在taskcreate函数中,首先会调用taskalloc函数为Task和执行栈分配内存,然后初始化协程的上下文信息,在此之后,一个协程就被创建成功了。
/*
* taskalloc函数的主要逻辑是申请Task结构体所需的内存和执行时栈的内存,
* 然后初始化各个字段。在此之后,一个协程就被创建成功了,接着执行taskready
* 把协程加入就绪队列中。
* */
static Task*
taskalloc(void (*fn)(void*), void *arg, uint stack)
{
Task *t;
sigset_t zero;
uint x, y;
ulong z;
/* allocate the task and stack together */
// 结构体本身的大小和栈大小
// 协程栈大小是*1024
t = malloc(sizeof *t+stack);
if(t == nil){
fprint(, "taskalloc malloc: %r\n");
abort();
}
memset(t,, sizeof *t);
// 栈的内存位置
t->stk = (uchar*)(t+);
// 栈大小
t->stksize = stack;
// 协程id
t->id = ++taskidgen;
// 协程工作函数和参数
t->startfn = fn;
t->startarg = arg;
/* do a reasonable initialization */
memset(&t->context.uc,, sizeof t->context.uc);
sigemptyset(&zero);
// 初始化uc_sigmask字段为空,即不阻塞信号
sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);
/* must initialize with current context */
// 初始化uc字段
if(getcontext(&t->context.uc) <){
fprint(, "getcontext: %r\n");
abort();
}
/* call makecontext to do the real work. */
/* leave a few words open on both ends */
// 设置协程执行时的栈位置和大小
t->context.uc.uc_stack.ss_sp = t->stk+;
t->context.uc.uc_stack.ss_size = t->stksize-;
#if defined(__sun__) && !defined(__MAKECONTEXT_V_SOURCE) /* sigh */
#warning "doing sun thing"
/* can avoid this with __MAKECONTEXT_V_SOURCE but only on SunOS 5.9 */
t->context.uc.uc_stack.ss_sp =
(char*)t->context.uc.uc_stack.ss_sp
+t->context.uc.uc_stack.ss_size;
#endif
/*
* All this magic is because you have to pass makecontext a
* function that takes some number of word-sized variables,
* and on-bit machines pointers are bigger than words.
*/
//print("make %p\n", t);
z = (ulong)t;
y = z;
z >>=; /* hide undefined 32-bit shift from 32-bit compilers */
x = z>>;
// 保存信息到uc字段
makecontext(&t->context.uc, (void(*)())taskstart,, y, x);
return t;
}
创建好一个协程之后,taskcreate函数会调用taskready()函数把协程的状态修改为就绪态,并加入到就绪队列中。
/*
* 修改协程的状态并加入就绪队列
* */
void taskready(Task *t)
{
t->ready =;
addtask(&taskrunqueue, t);
}
如何保存上下文信息
我们可以发现,在调用taskalloc函数初始化协程的时候,我们还会对协程的上下文进行初始化,以下代码的流程是将当前CPU寄存器的上下文信息保存到当前Task的上下文中,同时将当前Task的栈位置和大小保存进上下文中,最后将协程的工作函数保存到上下文信息中。
// 将上下文置为零值
memset(&t->context.uc,, sizeof t->context.uc);
// 将信号集zero清空
sigemptyset(&zero);
// 初始化uc_sigmask字段为空,即不阻塞信号
sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);
// 将当前上下文信息保存到t-context.uc结构体中
if(getcontext(&t->context.uc) <){
fprint(, "getcontext: %r\n");
abort();
}
// 设置协程执行时的栈位置和大小
t->context.uc.uc_stack.ss_sp = t->stk+;
t->context.uc.uc_stack.ss_size = t->stksize-;
z = (ulong)t;
y = z;
z >>=;
x = z>>;
// 设置协程的工作函数到上下文信息中
makecontext(&t->context.uc, (void(*)())taskstart,, y, x);
ucontext族函数
其实对协程上下文的初始化以及保存是通过linux下的ucontext族函数来实现的。
ucontext_t结构体
我们发现Task结构体中有一个Context字段,这个字段其实就是对ucontext_t结构体的封装,ucontext_t结构体用于保存当前的上下文信息,它的结构是这样的:
typedef struct ucontext
{
unsigned long int uc_flags;
struct ucontext *uc_link;//后序上下文
__sigset_t uc_sigmask;// 信号屏蔽字掩码
stack_t uc_stack;// 上下文所使用的栈
// 保存的上下文的寄存器信息
// 比如pc、sp、bp
// pc程序计数器:记录下一条指令的地址
// sp堆栈指针:指向函数调用栈栈顶的指针,所以新数据入栈将存入sp+的地址
// bp基址指针:指向函数调用栈的首地址
mcontext_t uc_mcontext;
long int uc_filler[];
} ucontext_t;
//其中mcontext_t 定义如下
typedef struct
{
gregset_t __ctx(gregs);//所装载寄存器
fpregset_t __ctx(fpregs);//寄存器的类型
} mcontext_t;
//其中gregset_t 定义如下
typedef greg_t gregset_t[NGREG];//包括了所有的寄存器的信息
getcontext()函数
函数原型:
int getcontext(ucontext_t* ucp)
getcontext()函数的底层是通过汇编来实现的,其主要的功能是将当前运行到的寄存器信息保存到参数ucp中。
setcontext()函数
函数原型:
int setcontext(const ucontext_t *ucp)
setcontext()函数的作用是将ucontext_t结构体变量ucp中的上下文信息重新恢复到cpu中并执行。
makecontext()函数
函数原型:
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...)
makecontext()函数的主要功能是设置协程的工作函数到上下文(ucontext_t)中,同时在用户设置的栈上保存一些信息,并且设置栈顶指针的值到上下文信息中。
argc是入口函数的参数个数,后面的...是具体的入口函数参数,该参数必须是整形值。
swapcontext()函数
函数原型:
int swapcontext(ucontext_t *oucp, ucontext_t *ucp)
该函数可以将当前cpu中的上下文信息保存到oucp结构体变量中,然后将ucp结构体的上下文信息恢复到cpu中。
这里可以理解为调用了两个函数,第一次是调用了getcontext(oucp)然后再调用setcontext(ucp)。
协程的调度
在使用taskcreate创建协程的时候,这个函数内部会调用taskready函数修改新建协程的状态并加入就绪队列taskrunqueue中,taskrunqueue中的协程需要一个调度器来调度执行。
tasklib库中实现了一个协程调度中心的函数。调度中心会不断的从就绪队列中取出协程来执行,它的核心逻辑是这样的:
- 从就绪队列中拿出一个协程t,并把t移出就绪队列。
- 通过contextswitch函数将协程t的上下文信息切换到taskschedcontext中执行。
- 将协程t切换回调度中心,如果t已经退出,修改数据结构,然后回收他的内存,然后继续调度其它的协程执行。这里的调度机制比较简单,是非抢占式的协作式调度,没有时间片的概念,一个协程的执行时间由自己决定,放弃执行的权力也是自己控制的,当协程不想执行了可以调用taskyield()函数让出cpu。
static void taskscheduler(void)
{
int i;
Task *t;
taskdebug("scheduler enter");
for(;;){
// 如果没有就绪态协程了,就退出
if(taskcount ==)
exit(taskexitval);
// 从就绪队列中拿出一个协程
t = taskrunqueue.head;
if(t == nil){
fprint(, "no runnable tasks! %d tasks stalled\n", taskcount);
exit();
}
// 从就绪队列中删除这个协程
deltask(&taskrunqueue, t);
// 将协程状态改为非就绪态
t->ready =;
// 保存正在执行的协程
taskrunning = t;
// 切换次数+
tasknswitch++;
taskdebug("run %d (%s)", t->id, t->name);
// 切换到t执行,将当前cpu中的上下文信息保存到taskschedcontext中
// 然后将t->context中的上下文信息恢复到cpu中执行
contextswitch(&taskschedcontext, &t->context);
// 执行结束
taskrunning = nil;
// 刚才执行的协程t退出了
if(t->exiting){
// 如果不是系统协程,协程个数减一
if(!t->system)
taskcount--;
// 保存当前协程在alltask的索引
i = t->alltaskslot;
// 将最后一个协程切换到当前协程的位置,因为当前协程要退出了
alltask[i] = alltask[--nalltask];
// 更新被置换协程的索引
alltask[i]->alltaskslot = i;
// 释放堆内存
free(t);
}
}
}
从上述调度器执行的代码中可以发现,我们使用contextswitch函数实现了协程间的上下文切换,这个函数的内部调用了swapcontext(&from->uc, &to->uc)函数,这个函数将当前cpu中的上下文信息保存到from结构体变量中,然后将to结构体的上下文信息恢复到cpu中执行。
执行结束之后,会重新将上下文切换回调度中心。
static void contextswitch(Context *from, Context *to)
{
if(swapcontext(&from->uc, &to->uc) <){
fprint(, "swapcontext failed: %r\n");
assert();
}
}
其实我们还可以通过调用taskyield函数来控制协程在没有执行完就主动让出,那么当前正在执行的task会被 插入就绪队列的尾部,等待后续的调度,然后调度器会从就绪队列的头部重新取出一个task来执行。
/*
* 协程主动让出CPU
*.将主动让出的协程重新加入到就绪队列
*.将当前协程的状态标记为让出
*.执行协程切换的逻辑
* */
int taskyield(void)
{
int n;
// 协程的让出次数
n = tasknswitch;
// 将当前主动让出的协程放进等待队列
taskready(taskrunning);
// 标记当前协程的状态为“让出”
taskstate("yield");
// 切换协程
taskswitch();
// 等于说明当前只有自己一个协程,调度的时候taskswitch加1,所以这里要减1
return tasknswitch - n -;
}
我们可以发现,切换流程的时候实际上是调用了taskswitch()函数,这个函数内部会调用contextswitch函数来切换上下文。
/*
* 切换协程
* */
void taskswitch(void)
{
needstack();
// 将当前CPU中的上下文信息保存到taskrunning->context结构体中
// 然后将调度中心上下文恢复到CPU中执行
contextswitch(&taskrunning->context, &taskschedcontext);
}
总结
所以整个调度流程是这样的:
每一个协程对应一个Task结构体。然后调度中心不断地按照先进先出的方式去调度协程的执行就可以。因为没有抢占机制,所以调度中心是依赖协程本身去驱动的,协程需要主动让出cpu,把上下文切换回调度中心,调度中心才能进行下一轮的调度。
当然我们也可以调用taskyield函数主动让出CPU,他会将当前正在执行的task插入就绪队列的尾部,等待后续的调度,然后调度器会从就绪队列的头部重新取出一个task来执行。