结合 golang 中分析协程框架的实现

Golang
51
0
0
2025-01-19

协程

协程可以说是 golang 中的有名的框架,本文主要分析 Github 项目 Ntyco 协程框架的实现,由于本人目前 golang 写的不多,因此不会对 golang 的源码进行分析,只是根据 golang 的协程调度来分析 c 语言版本调度。

协程的基本元素

golang 中大名鼎鼎的协程有这样三个元素 G M P,G 表示 goroutinue 协程,m 是内核元素, p 表示处理器,用来管理和执行协程。

如果使用 c 语言的话,那么内核线程使用 epoll 进行管理最好,而在 golang ,当然我们主要实现的就是 goroutine 和 process ,说白了,我们需要设计一个协程的数据结构并对其进行操作,然后实现一个协程调度其对齐进行调度,并且和内核之间进行通信。

源码分析

项目中的代码结构如下:

协程代码结构

所有的实现代码都在 core 目录下。

所有实现代码结构

其中最关键的代码在 nty_coroutine 、 nty_schedule 和 nty_socket 中,其中 nty_tree ,是对红黑树的各种操作实现,nty_queue 是对队列的操作实现。

我们先从协程开始,关于协程的数据结构定义如下:

typedef struct _nty_coroutine { //协程的定义
//private
#ifdef _USE_UCONTEXT
ucontext_t ctx;
#else
nty_cpu_ctx ctx;
#endif
proc_coroutine func; //调度的函数
void *arg; //参数
void *data; //数据
size_t stack_size;
size_t last_stack_size;
nty_coroutine_status status; //协程状态
nty_schedule *sched; //调度器
uint64_t birth; //诞生时间
uint64_t id; //协程id
#if CANCEL_FD_WAIT_UINT64
int fd;
unsigned short events; //POLL_EVENT
#else
int64_t fd_wait;
#endif
char funcname[64]; //函数名
struct _nty_coroutine *co_join; //加入的协程类型
void **co_exit_ptr;
void *stack;
void *ebp;
uint32_t ops;
uint64_t sleep_usecs;
RB_ENTRY(_nty_coroutine) sleep_node; //红黑树维护的沉睡节点
RB_ENTRY(_nty_coroutine) wait_node; //红黑树维护的等待节点
LIST_ENTRY(_nty_coroutine) busy_next; //繁忙的接下一个位置
TAILQ_ENTRY(_nty_coroutine) ready_next; //等待
TAILQ_ENTRY(_nty_coroutine) defer_next;
TAILQ_ENTRY(_nty_coroutine) cond_next;
TAILQ_ENTRY(_nty_coroutine) io_next;
TAILQ_ENTRY(_nty_coroutine) compute_next;
struct {
void *buf;
size_t nbytes;
int fd;
int ret;
int err;
} io; // io 结构
struct _nty_coroutine_compute_sched *compute_sched; //计算的调度
int ready_fds;
struct pollfd *pfds;
nfds_t nfds;
} nty_coroutine;

说几个关键的内容一个是协程的状态,golang 中协程状态有三种,一种是 sleep 沉睡,另一种是 ready 在准备,然后正在运行,这里用了红黑树维护各个节点,没然后用队列维护正在准备的节点。

定义的状态如下:

typedef enum {//协程状态的类型
NTY_COROUTINE_STATUS_WAIT_READ,
NTY_COROUTINE_STATUS_WAIT_WRITE,
NTY_COROUTINE_STATUS_NEW,
NTY_COROUTINE_STATUS_READY,
NTY_COROUTINE_STATUS_EXITED,
NTY_COROUTINE_STATUS_BUSY,
NTY_COROUTINE_STATUS_SLEEPING,
NTY_COROUTINE_STATUS_EXPIRED,
NTY_COROUTINE_STATUS_FDEOF,
NTY_COROUTINE_STATUS_DETACH,
NTY_COROUTINE_STATUS_CANCELLED,
NTY_COROUTINE_STATUS_PENDING_RUNCOMPUTE,
NTY_COROUTINE_STATUS_RUNCOMPUTE,
NTY_COROUTINE_STATUS_WAIT_IO_READ,
NTY_COROUTINE_STATUS_WAIT_IO_WRITE,
NTY_COROUTINE_STATUS_WAIT_MULTI
} nty_coroutine_status;

上述状态就是各种细分的协程状态。

协程定义之后,就要对数据结构进行操作,然后让协程跟我们的调度进行进行交互。

//初始化协程
static void nty_coroutine_init(nty_coroutine *co) { //协程初始化
#ifdef _USE_UCONTEXT
getcontext(&co->ctx); //获取协程上下文
co->ctx.uc_stack.ss_sp = co->sched->stack;
co->ctx.uc_stack.ss_size = co->sched->stack_size;
co->ctx.uc_link = &co->sched->ctx;
// printf("TAG21\n");
makecontext(&co->ctx, (void (*)(void)) _exec, 1, (void*)co);
// printf("TAG22\n");
#else
void **stack = (void **)(co->stack + co->stack_size);
stack[-3] = NULL;
stack[-2] = (void *)co;
co->ctx.esp = (void*)stack - (4 * sizeof(void*));
co->ctx.ebp = (void*)stack - (3 * sizeof(void*));
co->ctx.eip = (void*)_exec;
#endif
co->status = BIT(NTY_COROUTINE_STATUS_READY); //将协程状态设置为准备状态
}
//协程的调度函数
void nty_coroutine_yield(nty_coroutine *co) { //协程调度函数
co->ops = 0; //协程操作标志,表示正在调度
#ifdef _USE_UCONTEXT
if ((co->status & BIT(NTY_COROUTINE_STATUS_EXITED)) == 0) { //协程退出
_save_stack(co); //保存栈
}
swapcontext(&co->ctx, &co->sched->ctx); //交换上下文
#else
_switch(&co->sched->ctx, &co->ctx);
#endif
}

这里只展示了对于协程的调度以及对于协程的初始化,另外还有协程创建,状态转换,加入不同状态的队列中,这一部分的内容相对较为容易。

协程调度

协程定义出来之后,我们需要这样的前置只是,协程到底要怎么调度,这就是我们需要对栈进行操作,在 x86 处理器上,我们汇编代码都是在栈上进行处理的,如果了解过 liunx 操作系统,我们就知道操作系统在从用户态切换到内核态就需要进行系统调用,这个时候会保留进程的上下文,然后从内核态执行完毕后就恢复过来。

因此我们在协程调度的数据结构中定义如下。

typedef struct _nty_schedule { //协程调度结构
uint64_t birth; //协程诞生时间
#ifdef _USE_UCONTEXT //ucontext_t 实现上下文
ucontext_t ctx;
#else //cpu 保存上下文
nty_cpu_ctx ctx;
#endif
void *stack;
size_t stack_size; //栈大小
int spawned_coroutines;
uint64_t default_timeout; //默认过期时间
struct _nty_coroutine *curr_thread; //当前协程
int page_size;
int poller_fd; //poll fd
int eventfd; //时间的fd
struct epoll_event eventlist[NTY_CO_MAX_EVENTS]; //epoll 时间
int nevents;
int num_new_events; //新时间个数
pthread_mutex_t defer_mutex; //锁
nty_coroutine_queue ready; //准备好的队列
nty_coroutine_queue defer; //结束队列
nty_coroutine_link busy; //繁忙的协程
nty_coroutine_rbtree_sleep sleeping; //沉睡的协程
nty_coroutine_rbtree_wait waiting; //等待的协程
//private
} nty_schedule; //调度器的定义

我们来看一下这个协程调度器的大概作用,首先就是跟协程进行交互,对协程的上下文进行保存加载。

另外就是跟内核线程进行交互,他需要将协程的内容加入内核 epoll 中进行调度。

先从调度器的创建开始

int nty_schedule_create(int stack_size) { //协程调度器的创建
int sched_stack_size = stack_size ? stack_size : NTY_CO_MAX_STACKSIZE; //创建一个传入的栈大小
nty_schedule *sched = (nty_schedule*)calloc(1, sizeof(nty_schedule)); //创建一个调度器结构
if (sched == NULL) {
printf("Failed to initialize scheduler\n");
return -1;
}
assert(pthread_setspecific(global_sched_key, sched) == 0); //设置线程数据键位调度器结构
sched->poller_fd = nty_epoller_create(); //创建epoller
if (sched->poller_fd == -1) {
printf("Failed to initialize epoller\n");
nty_schedule_free(sched);
return -2;
}
nty_epoller_ev_register_trigger(); //对调度的协程时间进行操作,加入 epoll
sched->stack_size = sched_stack_size; //调度器栈大小
sched->page_size = getpagesize(); //页大小
#ifdef _USE_UCONTEXT
int ret = posix_memalign(&sched->stack, sched->page_size, sched->stack_size); //分配页对齐内存
assert(ret == 0);
#else
sched->stack = NULL;
bzero(&sched->ctx, sizeof(nty_cpu_ctx));
#endif
sched->spawned_coroutines = 0; //创建的协程
sched->default_timeout = 3000000u; //默认过期时间
RB_INIT(&sched->sleeping); //红黑树初始化
RB_INIT(&sched->waiting);
sched->birth = nty_coroutine_usec_now(); //获取当前时间
TAILQ_INIT(&sched->ready); //队列初始化
TAILQ_INIT(&sched->defer);
LIST_INIT(&sched->busy); //链表初始化

这里调度器就是对各种状态协程调度,同时针对内核线程的 epoll 也要进行交互。

注意,因为我们协程不可能只创建一个, golang 中,有多个 Process 进行处理,因此调度上也会需要对各调度的数据结构。

接着实现就是一些协程数据结构的对协程在各个节点之间转换操作,因为篇幅原因就不再赘述,我们最后来看,调度器跑起来的函数

void nty_schedule_run(void) {
nty_schedule *sched = nty_coroutine_get_sched(); //获取调度
if (sched == NULL) return ;
while (!nty_schedule_isdone(sched)) {//当还有协程运行
// 1. expired --> sleep rbtree
nty_coroutine *expired = NULL;
while ((expired = nty_schedule_expired(sched)) != NULL) { //超过设定的过期时间
nty_coroutine_resume(expired); //恢复协程然后执行
}
// 2. ready queue //再在准备的队列
nty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue);
while (!TAILQ_EMPTY(&sched->ready)) {
nty_coroutine *co = TAILQ_FIRST(&sched->ready);
TAILQ_REMOVE(&co->sched->ready, co, ready_next);
if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) {
nty_coroutine_free(co);
break;
}
nty_coroutine_resume(co);
if (co == last_co_ready) break;
}
// 3. wait rbtree //对等待的队列
nty_schedule_epoll(sched);
while (sched->num_new_events) {
int idx = --sched->num_new_events;
struct epoll_event *ev = sched->eventlist+idx;
int fd = ev->data.fd;
int is_eof = ev->events & EPOLLHUP;
if (is_eof) errno = ECONNRESET;
nty_coroutine *co = nty_schedule_search_wait(fd);
if (co != NULL) {
if (is_eof) {
co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF);
}
nty_coroutine_resume(co);
}
is_eof = 0;
}
}
nty_schedule_free(sched);
return ;
}

上述就是一个针对睡眠,准备和等待队列的一个调度,睡眠的队列有个时间,当这个协程创建时间超过了这个时间,那么就要进入调度执行,对于准备的队列直接从队列中取然后执行,最后就是执行等待的队列。

内核epoll 的调度

在调度 run 函数中,我们开到最后是对 epoll 的调度,集中在 nty_schedule_epoll,nty_schedule_search_wait ,nty_coroutine_resume。

我们先来看看 nty_schedule_epoll 函数上:

static int nty_schedule_epoll(nty_schedule *sched) {
sched->num_new_events = 0;
struct timespec t = {0, 0};
uint64_t usecs = nty_schedule_min_timeout(sched);
if (usecs && TAILQ_EMPTY(&sched->ready)) { //对 ready 的队列进行计算
t.tv_sec = usecs / 1000000u;
if (t.tv_sec != 0) {
t.tv_nsec = (usecs % 1000u) * 1000u;
} else {
t.tv_nsec = usecs * 1000u;
}
} else {
return 0;
}
int nready = 0;
while (1) {
nready = nty_epoller_wait(t);
if (nready == -1) {
if (errno == EINTR) continue;
else assert(0);
}
break;
}
sched->nevents = 0;
sched->num_new_events = nready;
return 0;
}

函数较为见到那,通过 nty_epoller_wait 得到 nready 的时间,然后将时间的数量设置为 nready ,然后调度器后边就是会处理时间。

另外就是 nty_schedule_search_wait 就是从 wait 的红黑树中找到就绪时间,最后就是协程的恢复执行的一个过程。

结尾

上述就是一个简单的协程框架的分析,源码整个部分较为复杂,很多细节没有讲到,建议大家结合 golang 的原理跟着查看,关键内容在这个调度器的实现上,源码中还有 poll socket 之类网络 I/O的封装,不过不再本文讲述的范围内了。