前言
开始讲netpoller之前先讲一下socket网络编程和epoll,不管go也好,nginx也好,只要是开启网络服务就绕不开socket,它是网络服务的基础。下一章网络编程先讲讲socket的基础函数。
网络编程
socket(购买电话机)
int socket(int domain, int type, int protocol);
socket函数对应于普通文件的打开操作。普通文件的打开操作返回一个文件描述字,而socket()用于创建一个socket描述符(socket descriptor),它唯一标识一个socket。这个socket描述字跟文件描述字一样,后续的操作都有用到它,把它作为参数,通过它来进行一些读写操作。
正如可以给fopen的传入不同参数值,以打开不同的文件。创建socket的时候,也可以指定不同的参数创建不同的socket描述符,socket函数的三个参数分别为:
domain:即协议域,又称为协议族(family)。常用的协议族有,AF_INET(IPv4)、AF_INET6(IPv6)、AF_LOCAL(或称AF_UNIX,Unix域socket)、AF_ROUTE等等。协议族决定了socket的地址类型,在通信中必须采用对应的地址,如AF_INET决定了要用ipv4地址(32位的)与端口号(16位的)的组合、AF_UNIX决定了要用一个绝对路径名作为地址。
type:指定socket类型。常用的socket类型有,SOCK_STREAM(流式套接字)、SOCK_DGRAM(数据报式套接字)、SOCK_RAW、SOCK_PACKET、SOCK_SEQPACKET等等 ,linux2.1.17版本后增加了SOCK_NONBLOCK,代表了非阻塞模式
示例:
int s = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
protocol:就是指定协议。常用的协议有,IPPROTO_TCP、PPTOTO_UDP、IPPROTO_SCTP、IPPROTO_TIPC等,它们分别对应TCP传输协议、UDP传输协议、STCP传输协议、TIPC传输协议。
bind(接电话线)
bind()函数把一个地址族中的特定地址赋给socket。例如对应AF_INET、AF_INET6就是把一个ipv4或ipv6地址和端口号组合赋给socket。
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
函数的三个参数分别为:
sockfd:即socket描述字,它是通过socket()函数创建了,唯一标识一个socket。bind()函数就是将给这个描述字绑定一个名字
addr:一个const struct sockaddr *指针,指向要绑定给sockfd的协议地址,例如”127.0.0.1:80”
addrlen: 对应的是地址的长度。
listen(等电话中)
int listen(int sockfd, int backlog);
backlog: socket可以排队的最大连接个数
当有多个客户端一起请求的时候,服务端不可能来多少就处理多少,这样如果并发太多,就会因为性能的因素发生拥塞,然后造成雪崩。所有就搞了一个队列,先将请求放在队列里面,一个个来。socket_listen里面的第二个参数backlog就是设置这个队列的长度。如果将队列长度设置成10,那么如果有20个请求一起过来,服务端就会先放10个请求进入这个队列,因为长度只有10。然后其他的就直接拒绝。tcp协议这时候不会发送rst给客户端,这样的话客户端就会重新发送SYN,以便能进入这个队列。
如果三次握手完成了,就会将完成三次握手的请求取出来,放入另一个队列中,这样队列就空出一个位置,其他重发SYN的请求就可以进入队列中。
connect(打电话)
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
sockaddr:socket地址,”127.0.0.1:8080”
addrlen:地址长度
accept(接电话)
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
sockaddr:一个指针,接收客户端socket地址,”127.0.0.1:36820”
addrlen:客户端地址长度
read/write/recv/send (通话)
read函数是负责从fd中读取内容。
write函数将buf中的nbytes字节内容写入文件描述符fd。
io多路复用
所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O 事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。
select
#include <sys/select.h>
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
// 和 select 紧密结合的四个宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
理解 select 的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个 fd。select 的调用过程如下:
- 执行 FD_ZERO(&set), 则 set 用位表示是
0000,0000
- 若 fd=5, 执行 FD_SET(fd, &set); 后 set 变为 0001,0000(第 5 位置为 1)
- 再加入 fd=2, fd=1,则 set 变为
0001,0011
- 执行 select(6, &set, 0, 0, 0) 阻塞等待
- 若 fd=1, fd=2 上都发生可读事件,则 select 返回,此时 set 变为
0000,0011
(注意:没有事件发生的 fd=5 被清空)
基于上面的调用过程,可以得出 select 的特点:
- 可监控的文件描述符个数取决于 sizeof(fd_set) 的值。假设服务器上 sizeof(fd_set)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。
- 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数
- 可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET 判断是否有事件发生)
所以,select 有如下的缺点:
- 最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈
- 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
- 性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降
poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。
epoll
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
函数
epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
epoll_wailt
等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
优势
由于epoll的实现机制与select/poll机制完全不同,上面所说的 select的缺点在epoll上不复存在。
设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?
在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。
epoll的设计和实现与select完全不同。把原先的select/poll调用分成了3个部分:
1)调用epoll_create()建立一个epoll对象
2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字
3)调用epoll_wait收集发生的事件的连接
如此一来,要实现上面说是的场景,只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。
与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epoll_ctl 相对来说就是非频繁调用的,而 epoll_wait 则是会被高频调用的。所以 epoll 利用 epoll_ctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epoll_wait 的时候都拷贝一次。 epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。
在实现上 epoll 采用红黑树来存储所有监听的 fd,而红黑树本身插入和删除性能比较稳定,时间复杂度 O(logN)。通过 epoll_ctl 函数添加进来的 fd 都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把 fd 添加进来的时候时候会完成关键的一步:该 fd 会与相应的设备(网卡)驱动程序建立回调关系,也就是在内核中断处理程序为它注册一个回调函数,在 fd 相应的事件触发(中断)之后(设备就绪了),内核就会调用这个回调函数,该回调函数在内核中被称为: ep_poll_callback
,这个回调函数其实就是把这个 fd 添加到 rdllist 这个双向链表(就绪链表)中。epoll_wait 实际上就是去检查 rdllist 双向链表中是否有就绪的 fd,当 rdllist 为空(无就绪 fd)时挂起当前进程,直到 rdllist 非空时进程才被唤醒并返回。
边缘/水平触发模式
Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率
Edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符
为什么ET模式要用非阻塞socket
假设buffer有1024字节,一次读取的数据很大,超过1024的大小:这种情况就需要读取多次,也就是要调用多次recv函数,通常我们会用while循环一直读取,无论ET还是LT,这样LT模式就显得很鸡肋,所以一般不用LT。在ET模式下,(1)、如果用阻塞IO+while循环,当最后一个数据读取完后,程序是无法立刻跳出while循环的,因为阻塞IO会在 while(true){ int len=recv(); }这里阻塞住,除非对方关闭连接或者recv出错,这样程序就无法继续往下执行,这一次的epoll_wait没有办法处理其它的连接,会造成延迟、并发度下降。(2)、如果是非阻塞IO+while循环,当读取完数据后,recv会立即返回-1,并将errno设置为EAGAIN或EWOULDBLOCK,这就表示数据已经读取完成,已经没有数据了,可以退出循环了。这样就不会像阻塞IO一样卡在那里,这就减少了不必要的等待时间,性能自然更高。
网络编程+epoll代码示例
int main(){
struct epoll_event ev, events[MAX_EVENTS];
int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n;
struct sockaddr_in local, remote;
char buf[BUFSIZ];
//创建listen socket
if( (listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK , 0)) < 0) {
perror("sockfd\n");
exit(1);
}
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_addr.s_addr = htonl(INADDR_ANY);;
local.sin_port = htons(PORT);
if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0) {
perror("bind\n");
exit(1);
}
listen(listenfd, 20);
epfd = epoll_create(MAX_EVENTS);
if (epfd == -1) {
perror("epoll_create");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = listenfd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_pwait");
exit(EXIT_FAILURE);
}
for (i = 0; i < nfds; ++i) {
fd = events[i].data.fd;
if (fd == listenfd) {
while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote,
(size_t *)&addrlen)) > 0) {
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: add");
exit(EXIT_FAILURE);
}
}
if (conn_sock == -1) {
if (errno != EAGAIN && errno != ECONNABORTED
&& errno != EPROTO && errno != EINTR)
perror("accept");
}
continue;
}
if (events[i].events & EPOLLIN) {
n = 0;
while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {
n += nread;
}
if (nread == -1 && errno != EAGAIN) {
perror("read error");
}
ev.data.fd = fd;
ev.events = events[i].events | EPOLLOUT;
if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1) {
perror("epoll_ctl: mod");
}
}
if (events[i].events & EPOLLOUT) {
sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", 11);
int nwrite, data_size = strlen(buf);
n = data_size;
while (n > 0) {
nwrite = write(fd, buf + data_size - n, n);
if (nwrite < n) {
if (nwrite == -1 && errno != EAGAIN) {
perror("write error");
}
break;
}
n -= nwrite;
}
close(fd);
}
}
}
return 0;
}
go netpoller
Go netpoller 基本原理
Go netpoller 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。
Go 是一门跨平台的编程语言,而不同平台针对特定的功能有不用的实现,这当然也包括了 I/O 多路复用技术,比如 Linux 里的 I/O 多路复用有 select
、poll
和 epoll
,而 freeBSD 或者 MacOS 里则是 kqueue
,而 Windows 里则是基于异步 I/O 实现的 iocp
,等等;因此,Go 为了实现底层 I/O 多路复用的跨平台,分别基于上述的这些不同平台的系统调用实现了多版本的 netpollers
数据结构
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed.
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable. IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection. ZeroReadIsEOF bool
// Whether this is a file rather than a network socket.
isFile bool
}
代码示例(服务端)
package main
import (
"fmt"
"log" "net")
func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
log.Println("listen error: ", err)
return
}
for {
conn, err := listen.Accept()
if err != nil {
log.Println("accept error: ", err)
break
}
// start a new goroutine to handle the new connection.
go HandleConn(conn)
}
}
func HandleConn(conn net.Conn) {
defer conn.Close()
packet := make([]byte, 1024)
for {
// block here if socket is not available for reading data.
n, err := conn.Read(packet)
if err != nil {
log.Println("read socket error: ", err)
return
}
fmt.Println(string(packet[:n]))
// same as above, block here if socket is not available for writing.
_, _ = conn.Write([]byte("hi"))
}
}
代码示例(客户端)
package main
import (
"fmt"
"net"
"testing"
)
func TestClient(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:8888")
if err != nil {
panic(err)
}
defer conn.Close()
packet := make([]byte, 1024)
d := "hello"
_, err = conn.Write([]byte(d))
if err != nil {
panic(err)
}
n, err := conn.Read(packet)
if err != nil {
panic(err)
}
fmt.Println(string(packet[:n]))
}
epoll对应方法
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList
net.Listen
调用 net.Listen
之后,底层会通过 Linux 的系统调用 socket
方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD
,接着调用 netFD 的 listenStream
方法完成对 socket 的 bind&listen 操作以及对 netFD
的初始化
- 调用
epollcreate1
(netpollinit) 创建一个 epoll 实例epfd
,作为整个 runtime 的唯一 event-loop 使用; - 将
netpollBreakRd
通知信号量封装成epollevent
事件结构体注册(netpollopen)进 epoll 实例。
Listener.Accept()
netpoll
accept socket 的工作流程如下:
- 服务端的 netFD 在
listen
时会创建 epoll 的实例,并将 listenerFD 加入 epoll 的事件队列 - netFD 在
accept
时将返回的 connFD 也加入 epoll 的事件队列 - netFD 在读写时出现
syscall.EAGAIN
错误,通过 pollDesc 的waitRead
方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的waitRead
中返回
Conn.Read/Conn.Write
read/write 的工作流程如下:
- 在
accept
成功时会生成客户端connFD,并将fd 加入 epoll 的事件队列 - netFD 在read/write时出现
syscall.EAGAIN
错误,通过 pollDesc 的waitRead
方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的waitRead
中返回 - 核心是调用 syscall.Read/syscall.Write 系统调用
epoll_wait时机
首先,client 连接 server 的时候,listener 通过 accept 调用接收新 connection,每一个新 connection 都启动一个 goroutine 处理,accept 调用会把该 connection 的 fd 连带所在的 goroutine 上下文信息封装注册到 epoll 的监听列表里去,当 goroutine 调用conn.Read
或者conn.Write
等需要阻塞等待的函数时,会被gopark
给封存起来并使之休眠,让 P 去执行本地调度队列里的下一个可执行的 goroutine,往后 Go scheduler 会在循环调度的runtime.schedule()
函数以及 sysmon 监控线程中调用runtime.netpoll
以获取可运行的 goroutine 列表并通过调用 injectglist 把剩下的 g 放入全局调度队列或者当前 P 本地调度队列去重新执行。
那么当 I/O 事件发生之后,netpoller 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 runtime.netpoll
。
runtime.netpoll
的核心逻辑是:
- 根据调用方的入参 delay,设置对应的调用
epollwait
的 timeout 值; - 调用
epollwait
等待发生了可读/可写事件的 fd; - 循环
epollwait
返回的事件列表,处理对应的事件类型, 组装可运行的 goroutine 链表并返回。