协程
协程可以说是 golang 中的有名的框架,本文主要分析 Github 项目 Ntyco 协程框架的实现,由于本人目前 golang 写的不多,因此不会对 golang 的源码进行分析,只是根据 golang 的协程调度来分析 c 语言版本调度。
协程的基本元素
golang 中大名鼎鼎的协程有这样三个元素 G M P,G 表示 goroutinue 协程,m 是内核元素, p 表示处理器,用来管理和执行协程。
如果使用 c 语言的话,那么内核线程使用 epoll 进行管理最好,而在 golang ,当然我们主要实现的就是 goroutine 和 process ,说白了,我们需要设计一个协程的数据结构并对其进行操作,然后实现一个协程调度其对齐进行调度,并且和内核之间进行通信。
源码分析
项目中的代码结构如下:
所有的实现代码都在 core 目录下。
其中最关键的代码在 nty_coroutine 、 nty_schedule 和 nty_socket 中,其中 nty_tree ,是对红黑树的各种操作实现,nty_queue 是对队列的操作实现。
我们先从协程开始,关于协程的数据结构定义如下:
typedef struct _nty_coroutine { //协程的定义 | |
//private | |
ucontext_t ctx; | |
nty_cpu_ctx ctx; | |
proc_coroutine func; //调度的函数 | |
void *arg; //参数 | |
void *data; //数据 | |
size_t stack_size; | |
size_t last_stack_size; | |
nty_coroutine_status status; //协程状态 | |
nty_schedule *sched; //调度器 | |
uint64_t birth; //诞生时间 | |
uint64_t id; //协程id | |
int fd; | |
unsigned short events; //POLL_EVENT | |
int64_t fd_wait; | |
char funcname[64]; //函数名 | |
struct _nty_coroutine *co_join; //加入的协程类型 | |
void **co_exit_ptr; | |
void *stack; | |
void *ebp; | |
uint32_t ops; | |
uint64_t sleep_usecs; | |
RB_ENTRY(_nty_coroutine) sleep_node; //红黑树维护的沉睡节点 | |
RB_ENTRY(_nty_coroutine) wait_node; //红黑树维护的等待节点 | |
LIST_ENTRY(_nty_coroutine) busy_next; //繁忙的接下一个位置 | |
TAILQ_ENTRY(_nty_coroutine) ready_next; //等待 | |
TAILQ_ENTRY(_nty_coroutine) defer_next; | |
TAILQ_ENTRY(_nty_coroutine) cond_next; | |
TAILQ_ENTRY(_nty_coroutine) io_next; | |
TAILQ_ENTRY(_nty_coroutine) compute_next; | |
struct { | |
void *buf; | |
size_t nbytes; | |
int fd; | |
int ret; | |
int err; | |
} io; // io 结构 | |
struct _nty_coroutine_compute_sched *compute_sched; //计算的调度 | |
int ready_fds; | |
struct pollfd *pfds; | |
nfds_t nfds; | |
} nty_coroutine; |
说几个关键的内容一个是协程的状态,golang 中协程状态有三种,一种是 sleep 沉睡,另一种是 ready 在准备,然后正在运行,这里用了红黑树维护各个节点,没然后用队列维护正在准备的节点。
定义的状态如下:
typedef enum {//协程状态的类型 | |
NTY_COROUTINE_STATUS_WAIT_READ, | |
NTY_COROUTINE_STATUS_WAIT_WRITE, | |
NTY_COROUTINE_STATUS_NEW, | |
NTY_COROUTINE_STATUS_READY, | |
NTY_COROUTINE_STATUS_EXITED, | |
NTY_COROUTINE_STATUS_BUSY, | |
NTY_COROUTINE_STATUS_SLEEPING, | |
NTY_COROUTINE_STATUS_EXPIRED, | |
NTY_COROUTINE_STATUS_FDEOF, | |
NTY_COROUTINE_STATUS_DETACH, | |
NTY_COROUTINE_STATUS_CANCELLED, | |
NTY_COROUTINE_STATUS_PENDING_RUNCOMPUTE, | |
NTY_COROUTINE_STATUS_RUNCOMPUTE, | |
NTY_COROUTINE_STATUS_WAIT_IO_READ, | |
NTY_COROUTINE_STATUS_WAIT_IO_WRITE, | |
NTY_COROUTINE_STATUS_WAIT_MULTI | |
} nty_coroutine_status; |
上述状态就是各种细分的协程状态。
协程定义之后,就要对数据结构进行操作,然后让协程跟我们的调度进行进行交互。
//初始化协程 | |
static void nty_coroutine_init(nty_coroutine *co) { //协程初始化 | |
getcontext(&co->ctx); //获取协程上下文 | |
co->ctx.uc_stack.ss_sp = co->sched->stack; | |
co->ctx.uc_stack.ss_size = co->sched->stack_size; | |
co->ctx.uc_link = &co->sched->ctx; | |
// printf("TAG21\n"); | |
makecontext(&co->ctx, (void (*)(void)) _exec, 1, (void*)co); | |
// printf("TAG22\n"); | |
void **stack = (void **)(co->stack + co->stack_size); | |
stack[-3] = NULL; | |
stack[-2] = (void *)co; | |
co->ctx.esp = (void*)stack - (4 * sizeof(void*)); | |
co->ctx.ebp = (void*)stack - (3 * sizeof(void*)); | |
co->ctx.eip = (void*)_exec; | |
co->status = BIT(NTY_COROUTINE_STATUS_READY); //将协程状态设置为准备状态 | |
} | |
//协程的调度函数 | |
void nty_coroutine_yield(nty_coroutine *co) { //协程调度函数 | |
co->ops = 0; //协程操作标志,表示正在调度 | |
if ((co->status & BIT(NTY_COROUTINE_STATUS_EXITED)) == 0) { //协程退出 | |
_save_stack(co); //保存栈 | |
} | |
swapcontext(&co->ctx, &co->sched->ctx); //交换上下文 | |
_switch(&co->sched->ctx, &co->ctx); | |
} |
这里只展示了对于协程的调度以及对于协程的初始化,另外还有协程创建,状态转换,加入不同状态的队列中,这一部分的内容相对较为容易。
协程调度
协程定义出来之后,我们需要这样的前置只是,协程到底要怎么调度,这就是我们需要对栈进行操作,在 x86 处理器上,我们汇编代码都是在栈上进行处理的,如果了解过 liunx 操作系统,我们就知道操作系统在从用户态切换到内核态就需要进行系统调用,这个时候会保留进程的上下文,然后从内核态执行完毕后就恢复过来。
因此我们在协程调度的数据结构中定义如下。
typedef struct _nty_schedule { //协程调度结构 | |
uint64_t birth; //协程诞生时间 | |
ucontext_t ctx; | |
nty_cpu_ctx ctx; | |
void *stack; | |
size_t stack_size; //栈大小 | |
int spawned_coroutines; | |
uint64_t default_timeout; //默认过期时间 | |
struct _nty_coroutine *curr_thread; //当前协程 | |
int page_size; | |
int poller_fd; //poll fd | |
int eventfd; //时间的fd | |
struct epoll_event eventlist[NTY_CO_MAX_EVENTS]; //epoll 时间 | |
int nevents; | |
int num_new_events; //新时间个数 | |
pthread_mutex_t defer_mutex; //锁 | |
nty_coroutine_queue ready; //准备好的队列 | |
nty_coroutine_queue defer; //结束队列 | |
nty_coroutine_link busy; //繁忙的协程 | |
nty_coroutine_rbtree_sleep sleeping; //沉睡的协程 | |
nty_coroutine_rbtree_wait waiting; //等待的协程 | |
//private | |
} nty_schedule; //调度器的定义 |
我们来看一下这个协程调度器的大概作用,首先就是跟协程进行交互,对协程的上下文进行保存加载。
另外就是跟内核线程进行交互,他需要将协程的内容加入内核 epoll 中进行调度。
先从调度器的创建开始
int nty_schedule_create(int stack_size) { //协程调度器的创建 | |
int sched_stack_size = stack_size ? stack_size : NTY_CO_MAX_STACKSIZE; //创建一个传入的栈大小 | |
nty_schedule *sched = (nty_schedule*)calloc(1, sizeof(nty_schedule)); //创建一个调度器结构 | |
if (sched == NULL) { | |
printf("Failed to initialize scheduler\n"); | |
return -1; | |
} | |
assert(pthread_setspecific(global_sched_key, sched) == 0); //设置线程数据键位调度器结构 | |
sched->poller_fd = nty_epoller_create(); //创建epoller | |
if (sched->poller_fd == -1) { | |
printf("Failed to initialize epoller\n"); | |
nty_schedule_free(sched); | |
return -2; | |
} | |
nty_epoller_ev_register_trigger(); //对调度的协程时间进行操作,加入 epoll | |
sched->stack_size = sched_stack_size; //调度器栈大小 | |
sched->page_size = getpagesize(); //页大小 | |
int ret = posix_memalign(&sched->stack, sched->page_size, sched->stack_size); //分配页对齐内存 | |
assert(ret == 0); | |
sched->stack = NULL; | |
bzero(&sched->ctx, sizeof(nty_cpu_ctx)); | |
sched->spawned_coroutines = 0; //创建的协程 | |
sched->default_timeout = 3000000u; //默认过期时间 | |
RB_INIT(&sched->sleeping); //红黑树初始化 | |
RB_INIT(&sched->waiting); | |
sched->birth = nty_coroutine_usec_now(); //获取当前时间 | |
TAILQ_INIT(&sched->ready); //队列初始化 | |
TAILQ_INIT(&sched->defer); | |
LIST_INIT(&sched->busy); //链表初始化 |
这里调度器就是对各种状态协程调度,同时针对内核线程的 epoll 也要进行交互。
注意,因为我们协程不可能只创建一个, golang 中,有多个 Process 进行处理,因此调度上也会需要对各调度的数据结构。
接着实现就是一些协程数据结构的对协程在各个节点之间转换操作,因为篇幅原因就不再赘述,我们最后来看,调度器跑起来的函数
void nty_schedule_run(void) { | |
nty_schedule *sched = nty_coroutine_get_sched(); //获取调度 | |
if (sched == NULL) return ; | |
while (!nty_schedule_isdone(sched)) {//当还有协程运行 | |
// 1. expired --> sleep rbtree | |
nty_coroutine *expired = NULL; | |
while ((expired = nty_schedule_expired(sched)) != NULL) { //超过设定的过期时间 | |
nty_coroutine_resume(expired); //恢复协程然后执行 | |
} | |
// 2. ready queue //再在准备的队列 | |
nty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue); | |
while (!TAILQ_EMPTY(&sched->ready)) { | |
nty_coroutine *co = TAILQ_FIRST(&sched->ready); | |
TAILQ_REMOVE(&co->sched->ready, co, ready_next); | |
if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) { | |
nty_coroutine_free(co); | |
break; | |
} | |
nty_coroutine_resume(co); | |
if (co == last_co_ready) break; | |
} | |
// 3. wait rbtree //对等待的队列 | |
nty_schedule_epoll(sched); | |
while (sched->num_new_events) { | |
int idx = --sched->num_new_events; | |
struct epoll_event *ev = sched->eventlist+idx; | |
int fd = ev->data.fd; | |
int is_eof = ev->events & EPOLLHUP; | |
if (is_eof) errno = ECONNRESET; | |
nty_coroutine *co = nty_schedule_search_wait(fd); | |
if (co != NULL) { | |
if (is_eof) { | |
co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF); | |
} | |
nty_coroutine_resume(co); | |
} | |
is_eof = 0; | |
} | |
} | |
nty_schedule_free(sched); | |
return ; | |
} |
上述就是一个针对睡眠,准备和等待队列的一个调度,睡眠的队列有个时间,当这个协程创建时间超过了这个时间,那么就要进入调度执行,对于准备的队列直接从队列中取然后执行,最后就是执行等待的队列。
内核epoll 的调度
在调度 run 函数中,我们开到最后是对 epoll 的调度,集中在 nty_schedule_epoll,nty_schedule_search_wait ,nty_coroutine_resume。
我们先来看看 nty_schedule_epoll 函数上:
static int nty_schedule_epoll(nty_schedule *sched) { | |
sched->num_new_events = 0; | |
struct timespec t = {0, 0}; | |
uint64_t usecs = nty_schedule_min_timeout(sched); | |
if (usecs && TAILQ_EMPTY(&sched->ready)) { //对 ready 的队列进行计算 | |
t.tv_sec = usecs / 1000000u; | |
if (t.tv_sec != 0) { | |
t.tv_nsec = (usecs % 1000u) * 1000u; | |
} else { | |
t.tv_nsec = usecs * 1000u; | |
} | |
} else { | |
return 0; | |
} | |
int nready = 0; | |
while (1) { | |
nready = nty_epoller_wait(t); | |
if (nready == -1) { | |
if (errno == EINTR) continue; | |
else assert(0); | |
} | |
break; | |
} | |
sched->nevents = 0; | |
sched->num_new_events = nready; | |
return 0; | |
} |
函数较为见到那,通过 nty_epoller_wait 得到 nready 的时间,然后将时间的数量设置为 nready ,然后调度器后边就是会处理时间。
另外就是 nty_schedule_search_wait 就是从 wait 的红黑树中找到就绪时间,最后就是协程的恢复执行的一个过程。
结尾
上述就是一个简单的协程框架的分析,源码整个部分较为复杂,很多细节没有讲到,建议大家结合 golang 的原理跟着查看,关键内容在这个调度器的实现上,源码中还有 poll socket 之类网络 I/O的封装,不过不再本文讲述的范围内了。