目录
- 示例
- initListener
- serve
- activateEventLoops
- polling
前面说了go自带的原生netpoll模型,大致的流程就是每一个新的连接都会开启一个goroutine去处理,这样的处理的过程简单,高效,充分利用了go的底层的能力。
但是这里有几个问题,对于accept的时候,是否可以多个线程去accept,这样的话就不用每次有一个连接就开启一个线程。
同时看过accept的源码都知道,只会一个线程去accpet连接,因为这个套接字在创建的时候就被设置成了非阻塞,所以会变goruntime调用gopark挂起。
开启端口复用也就是SO_REUSEPORT功能。这样一方面可以避免惊群效应
接下来看一下一个demo,这里使用的gnet框架,github地址。
示例
接下来看一段基于reactor的示例。这里运行通过 go run main.go.
然后curl -i 127.0.0.1:8080.效果如下,也是返回了我们期望的结果
package main | |
import ( | |
"flag" | |
"fmt" | |
"log" | |
"strconv" | |
"strings" | |
"time" | |
"unsafe" | |
"learn/http/gnet" | |
) | |
var res string | |
type request struct { | |
proto, method string | |
path, query string | |
head, body string | |
remoteAddr string | |
} | |
type httpServer struct { | |
*gnet.EventServer | |
} | |
var ( | |
errMsg = "Internal Server Error" | |
errMsgBytes = []byte(errMsg) | |
) | |
type httpCodec struct { | |
req request | |
} | |
func (hc *httpCodec) Encode(c gnet.Conn, buf []byte) (out []byte, err error) { | |
if c.Context() == nil { | |
return buf, nil | |
} | |
return appendResp(out, " Error", "", errMsg+"\n"), nil | |
} | |
func (hc *httpCodec) Decode(c gnet.Conn) (out []byte, err error) { | |
buf := c.Read() | |
c.ResetBuffer() | |
// process the pipeline | |
var leftover []byte | |
pipeline: | |
leftover, err = parseReq(buf, &hc.req) | |
// bad thing happened | |
if err != nil { | |
c.SetContext(err) | |
return nil, err | |
} else if len(leftover) == len(buf) { | |
// request not ready, yet | |
return | |
} | |
out = appendHandle(out, res) | |
buf = leftover | |
goto pipeline | |
} | |
func (hs *httpServer) OnInitComplete(srv gnet.Server) (action gnet.Action) { | |
//log.Printf("HTTP server is listening on %s (multi-cores: %t, loops: %d)\n", | |
// srv.Addr.String(), srv.Multicore, srv.NumEventLoop) | |
return | |
} | |
func (hs *httpServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) { | |
if c.Context() != nil { | |
// bad thing happened | |
out = errMsgBytes | |
action = gnet.Close | |
return | |
} | |
// handle the request | |
out = frame | |
return | |
} | |
func main() { | |
var port int | |
var multicore bool | |
// Example command: go run http.go --port --multicore=true | |
flag.IntVar(&port, "port",, "server port") | |
flag.BoolVar(&multicore, "multicore", true, "multicore") | |
flag.Parse() | |
res = "Hello World!\r\n" | |
http := new(httpServer) | |
hc := new(httpCodec) | |
// Start serving! | |
log.Fatal(gnet.Serve(http, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore), gnet.WithCodec(hc), gnet.WithNumEventLoop(), gnet.WithReusePort(true))) | |
} | |
// appendHandle handles the incoming request and appends the response to | |
// the provided bytes, which is then returned to the caller. | |
func appendHandle(b []byte, res string) []byte { | |
return appendResp(b, " OK", "", res) | |
} | |
// appendResp will append a valid http response to the provide bytes. | |
// The status param should be the code plus text such as " OK". | |
// The head parameter should be a series of lines ending with "\r\n" or empty. | |
func appendResp(b []byte, status, head, body string) []byte { | |
b = append(b, "HTTP/.1"...) | |
b = append(b, ' ') | |
b = append(b, status...) | |
b = append(b, '\r', '\n') | |
b = append(b, "Server: gnet\r\n"...) | |
b = append(b, "Date: "...) | |
b = time.Now().AppendFormat(b, "Mon, Jan 2006 15:04:05 GMT") | |
b = append(b, '\r', '\n') | |
if len(body) > { | |
b = append(b, "Content-Length: "...) | |
b = strconv.AppendInt(b, int(len(body)), 10) | |
b = append(b, '\r', '\n') | |
} | |
b = append(b, head...) | |
b = append(b, '\r', '\n') | |
if len(body) > { | |
b = append(b, body...) | |
} | |
return b | |
} | |
func bs(b []byte) string { | |
return *(*string)(unsafe.Pointer(&b)) | |
} | |
func parseReq(data []byte, req *request) (leftover []byte, err error) { | |
sdata := bs(data) | |
var i, s int | |
var head string | |
var clen int | |
q := - | |
// method, path, proto line | |
for ; i < len(sdata); i++ { | |
if sdata[i] == ' ' { | |
req.method = sdata[s:i] | |
for i, s = i+, i+1; i < len(sdata); i++ { | |
if sdata[i] == '?' && q == - { | |
q = i - s | |
} else if sdata[i] == ' ' { | |
if q != - { | |
req.path = sdata[s:q] | |
req.query = req.path[q+ : i] | |
} else { | |
req.path = sdata[s:i] | |
} | |
for i, s = i+, i+1; i < len(sdata); i++ { | |
if sdata[i] == '\n' && sdata[i-] == '\r' { | |
req.proto = sdata[s:i] | |
i, s = i+, i+1 | |
break | |
} | |
} | |
break | |
} | |
} | |
break | |
} | |
} | |
if req.proto == "" { | |
return data, fmt.Errorf("malformed request") | |
} | |
head = sdata[:s] | |
for ; i < len(sdata); i++ { | |
if i > && sdata[i] == '\n' && sdata[i-1] == '\r' { | |
line := sdata[s : i-] | |
s = i + | |
if line == "" { | |
req.head = sdata[len(head)+ : i+1] | |
i++ | |
if clen > { | |
if len(sdata[i:]) < clen { | |
break | |
} | |
req.body = sdata[i : i+clen] | |
i += clen | |
} | |
return data[i:], nil | |
} | |
if strings.HasPrefix(line, "Content-Length:") { | |
n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]),, 64) | |
if err == nil { | |
clen = int(n) | |
} | |
} | |
} | |
} | |
// not enough data | |
return data, nil | |
} |
看一下这个源码解析,还是先从gnet.Serve看起来
gnet.Serve
// Serve starts handling events for the specified address. | |
// | |
// Address should use a scheme prefix and be formatted | |
// like `tcp://.168.0.10:9851` or `unix://socket`. | |
// Valid network schemes: | |
// tcp - bind to both IPv and IPv6 | |
// tcp - IPv4 | |
// tcp - IPv6 | |
// udp - bind to both IPv and IPv6 | |
// udp - IPv4 | |
// udp - IPv6 | |
// unix - Unix Domain Socket | |
// | |
// The "tcp" network scheme is assumed when one is not specified. | |
func Serve(eventHandler EventHandler, protoAddr string, opts ...Option) (err error) { | |
// 加载用户指定的配置 | |
options := loadOptions(opts...) | |
logging.Debugf("default logging level is %s", logging.LogLevel()) | |
var ( | |
logger logging.Logger | |
flush func() error | |
) | |
if options.LogPath != "" { | |
if logger, flush, err = logging.CreateLoggerAsLocalFile(options.LogPath, options.LogLevel); err != nil { | |
return | |
} | |
} else { | |
logger = logging.GetDefaultLogger() | |
} | |
if options.Logger == nil { | |
options.Logger = logger | |
} | |
defer func() { | |
if flush != nil { | |
_ = flush() | |
} | |
logging.Cleanup() | |
}() | |
// The maximum number of operating system threads that the Go program can use is initially set to, | |
// which should also be the maximum amount of I/O event-loops locked to OS threads that users can start up. | |
// 为了防止线程过多 | |
if options.LockOSThread && options.NumEventLoop > { | |
logging.Errorf("too many event-loops under LockOSThread mode, should be less than,000 "+ | |
"while you are trying to set up %d\n", options.NumEventLoop) | |
return errors.ErrTooManyEventLoopThreads | |
} | |
if rbc := options.ReadBufferCap; rbc <= { | |
options.ReadBufferCap =x10000 | |
} else { | |
options.ReadBufferCap = internal.CeilToPowerOfTwo(rbc) | |
} | |
// 解析addr | |
network, addr := parseProtoAddr(protoAddr) | |
// 初始化listener | |
var ln *listener | |
if ln, err = initListener(network, addr, options); err != nil { | |
return | |
} | |
defer ln.close() | |
return serve(eventHandler, ln, options, protoAddr) | |
} |
可以看出来参数是EventHandler 这样的interface
type ( | |
// EventHandler represents the server events' callbacks for the Serve call. | |
// Each event has an Action return value that is used manage the state | |
// of the connection and server. | |
EventHandler interface { | |
// OnInitComplete fires when the server is ready for accepting connections. | |
// The parameter:server has information and various utilities. | |
OnInitComplete(server Server) (action Action) | |
// OnShutdown fires when the server is being shut down, it is called right after | |
// all event-loops and connections are closed. | |
OnShutdown(server Server) | |
// OnOpened fires when a new connection has been opened. | |
// The parameter:c has information about the connection such as it's local and remote address. | |
// Parameter:out is the return value which is going to be sent back to the client. | |
// It is generally not recommended to send large amounts of data back to the client in OnOpened. | |
// | |
// Note that the bytes returned by OnOpened will be sent back to client without being encoded. | |
OnOpened(c Conn) (out []byte, action Action) | |
// OnClosed fires when a connection has been closed. | |
// The parameter:err is the last known connection error. | |
OnClosed(c Conn, err error) (action Action) | |
// PreWrite fires just before any data is written to any client socket, this event function is usually used to | |
// put some code of logging/counting/reporting or any prepositive operations before writing data to client. | |
PreWrite() | |
// React fires when a connection sends the server data. | |
// Call c.Read() or c.ReadN(n) within the parameter:c to read incoming data from client. | |
// Parameter:out is the return value which is going to be sent back to the client. | |
React(frame []byte, c Conn) (out []byte, action Action) | |
// Tick fires immediately after the server starts and will fire again | |
// following the duration specified by the delay return value. | |
Tick() (delay time.Duration, action Action) | |
} | |
// EventServer is a built-in implementation of EventHandler which sets up each method with a default implementation, | |
// you can compose it with your own implementation of EventHandler when you don't want to implement all methods | |
// in EventHandler. | |
EventServer struct{} | |
) |
initListener
然后看一下初始化监听
func initListener(network, addr string, options *Options) (l *listener, err error) { | |
var sockopts []socket.Option | |
// 判断是否开启重复使用端口 | |
if options.ReusePort || strings.HasPrefix(network, "udp") { | |
sockopt := socket.Option{SetSockopt: socket.SetReuseport, Opt:} | |
sockopts = append(sockopts, sockopt) | |
} | |
// 是否开启nagle算法 默认是关闭 | |
if options.TCPNoDelay == TCPNoDelay && strings.HasPrefix(network, "tcp") { | |
sockopt := socket.Option{SetSockopt: socket.SetNoDelay, Opt:} | |
sockopts = append(sockopts, sockopt) | |
} | |
// 设置socket的recv buffer | |
if options.SocketRecvBuffer > { | |
sockopt := socket.Option{SetSockopt: socket.SetRecvBuffer, Opt: options.SocketRecvBuffer} | |
sockopts = append(sockopts, sockopt) | |
} | |
// 设置socket的send buffer | |
if options.SocketSendBuffer > { | |
sockopt := socket.Option{SetSockopt: socket.SetSendBuffer, Opt: options.SocketSendBuffer} | |
sockopts = append(sockopts, sockopt) | |
} | |
l = &listener{network: network, addr: addr, sockopts: sockopts} | |
err = l.normalize() | |
return | |
} |
normalize最后调用的是tcpSocket方法。
// tcpSocket creates an endpoint for communication and returns a file descriptor that refers to that endpoint. | |
// Argument `reusePort` indicates whether the SO_REUSEPORT flag will be assigned. | |
func tcpSocket(proto, addr string, sockopts ...Option) (fd int, netAddr net.Addr, err error) { | |
var ( | |
family int | |
ipvonly bool | |
sockaddr unix.Sockaddr | |
) | |
// 获取地址 | |
if sockaddr, family, netAddr, ipvonly, err = getTCPSockaddr(proto, addr); err != nil { | |
return | |
} | |
// 调用 底层的socket方法 | |
// 调用 unix.Socket(family, sotype|unix.SOCK_NONBLOCK|unix.SOCK_CLOEXEC, proto) | |
if fd, err = sysSocket(family, unix.SOCK_STREAM, unix.IPPROTO_TCP); err != nil { | |
err = os.NewSyscallError("socket", err) | |
return | |
} | |
defer func() { | |
if err != nil { | |
_ = unix.Close(fd) | |
} | |
}() | |
if family == unix.AF_INET && ipv6only { | |
if err = SetIPvOnly(fd, 1); err != nil { | |
return | |
} | |
} | |
// 添加率socket的一些自定义参数 | |
for _, sockopt := range sockopts { | |
if err = sockopt.SetSockopt(fd, sockopt.Opt); err != nil { | |
return | |
} | |
} | |
// bind | |
if err = os.NewSyscallError("bind", unix.Bind(fd, sockaddr)); err != nil { | |
return | |
} | |
// 设置半连接数量的最大值 | |
// Set backlog size to the maximum. | |
err = os.NewSyscallError("listen", unix.Listen(fd, listenerBacklogMaxSize)) | |
return | |
} | |
serve
func serve(eventHandler EventHandler, listener *listener, options *Options, protoAddr string) error { | |
// Figure out the proper number of event-loops/goroutines to run. | |
numEventLoop := | |
if options.Multicore { | |
numEventLoop = runtime.NumCPU() | |
} | |
if options.NumEventLoop > { | |
numEventLoop = options.NumEventLoop | |
} | |
// 实例化server | |
svr := new(server) | |
svr.opts = options | |
svr.eventHandler = eventHandler | |
svr.ln = listener | |
// 判断选择的轮训方式 默认是RoundRobin | |
switch options.LB { | |
case RoundRobin: | |
svr.lb = new(roundRobinLoadBalancer) | |
case LeastConnections: | |
svr.lb = new(leastConnectionsLoadBalancer) | |
case SourceAddrHash: | |
svr.lb = new(sourceAddrHashLoadBalancer) | |
} | |
svr.cond = sync.NewCond(&sync.Mutex{}) | |
if svr.opts.Ticker { | |
svr.tickerCtx, svr.cancelTicker = context.WithCancel(context.Background()) | |
} | |
svr.codec = func() ICodec { | |
if options.Codec == nil { | |
return new(BuiltInFrameCodec) | |
} | |
return options.Codec | |
}() | |
server := Server{ | |
svr: svr, | |
Multicore: options.Multicore, | |
Addr: listener.lnaddr, | |
NumEventLoop: numEventLoop, | |
ReusePort: options.ReusePort, | |
TCPKeepAlive: options.TCPKeepAlive, | |
} | |
switch svr.eventHandler.OnInitComplete(server) { | |
case None: | |
case Shutdown: | |
return nil | |
} | |
// 开启svr的start | |
if err := svr.start(numEventLoop); err != nil { | |
svr.closeEventLoops() | |
svr.opts.Logger.Errorf("gnet server is stopping with error: %v", err) | |
return err | |
} | |
defer svr.stop(server) | |
allServers.Store(protoAddr, svr) | |
return nil | |
} | |
func (svr *server) start(numEventLoop int) error { | |
if svr.opts.ReusePort || svr.ln.network == "udp" { | |
// 启动eventLoops的事件循环 | |
return svr.activateEventLoops(numEventLoop) | |
} | |
return svr.activateReactors(numEventLoop) | |
} | |
然后看一下activateEventLoops方法。
activateEventLoops
func (svr *server) activateEventLoops(numEventLoop int) (err error) { | |
var striker *eventloop | |
// Create loops locally and bind the listeners. | |
for i :=; i < numEventLoop; i++ { | |
ln := svr.ln | |
if i > && (svr.opts.ReusePort || ln.network == "udp") { | |
// 再次调用initListener这个方法 生成新的socket | |
if ln, err = initListener(svr.ln.network, svr.ln.addr, svr.opts); err != nil { | |
return | |
} | |
} | |
var p *netpoll.Poller | |
if p, err = netpoll.OpenPoller(); err == nil { | |
// 实例化eventloop | |
el := new(eventloop) | |
el.ln = ln | |
el.svr = svr | |
el.poller = p | |
el.buffer = make([]byte, svr.opts.ReadBufferCap) | |
el.connections = make(map[int]*conn) | |
el.eventHandler = svr.eventHandler | |
// 添加监听的套接字 | |
// 注意这里的loopAccept是一个回调函数 | |
_ = el.poller.AddRead(el.ln.packPollAttachment(el.loopAccept)) | |
// 注册 | |
svr.lb.register(el) | |
// Start the ticker. | |
if el.idx == && svr.opts.Ticker { | |
striker = el | |
} | |
} else { | |
return | |
} | |
} | |
// Start event-loops in background. | |
svr.startEventLoops() | |
go striker.loopTicker(svr.tickerCtx) | |
return | |
} |
然后 看一下 OpenPoller方法
// OpenPoller instantiates a poller. | |
func OpenPoller() (poller *Poller, err error) { | |
// 创建poller实例 | |
poller = new(Poller) | |
// 调用 epoll_create | |
if poller.fd, err = unix.EpollCreate(unix.EPOLL_CLOEXEC); err != nil { | |
poller = nil | |
err = os.NewSyscallError("epoll_create", err) | |
return | |
} | |
// 创建eventfd用来唤醒epoll | |
if poller.wfd, err = unix.Eventfd(, unix.EFD_NONBLOCK|unix.EFD_CLOEXEC); err != nil { | |
_ = poller.Close() | |
poller = nil | |
err = os.NewSyscallError("eventfd", err) | |
return | |
} | |
poller.wfdBuf = make([]byte,) | |
// eventfd加入到监听中 | |
if err = poller.AddRead(&PollAttachment{FD: poller.wfd}); err != nil { | |
_ = poller.Close() | |
poller = nil | |
return | |
} | |
// 实例化asyncTaskQueue和priorAsyncTaskQueue | |
poller.asyncTaskQueue = queue.NewLockFreeQueue() | |
poller.priorAsyncTaskQueue = queue.NewLockFreeQueue() | |
return | |
} |
然后看一下loopAccept 这个方法
func (el *eventloop) loopAccept(_ netpoll.IOEvent) error { | |
if el.ln.network == "udp" { | |
return el.loopReadUDP(el.ln.fd) | |
} | |
// 因为前面在initListener这里只运行了bind方法 所以这里accept | |
nfd, sa, err := unix.Accept(el.ln.fd) | |
if err != nil { | |
if err == unix.EAGAIN { | |
return nil | |
} | |
el.getLogger().Errorf("Accept() fails due to error: %v", err) | |
return os.NewSyscallError("accept", err) | |
} | |
// 获取到了以后设置为非阻塞 | |
if err = os.NewSyscallError("fcntl nonblock", unix.SetNonblock(nfd, true)); err != nil { | |
return err | |
} | |
netAddr := socket.SockaddrToTCPOrUnixAddr(sa) | |
if el.svr.opts.TCPKeepAlive > && el.svr.ln.network == "tcp" { | |
err = socket.SetKeepAlive(nfd, int(el.svr.opts.TCPKeepAlive/time.Second)) | |
logging.LogErr(err) | |
} | |
// 根据套接字实例化连接 | |
c := newTCPConn(nfd, el, sa, netAddr) | |
// 在epoll中添加监听 | |
if err = el.poller.AddRead(c.pollAttachment); err == nil { | |
el.connections[c.fd] = c | |
return el.loopOpen(c) | |
} | |
return err | |
} |
然后看一下 startEventLoops 这个方法
func (svr *server) startEventLoops() { | |
// iterate 就是运行下面的方法 | |
svr.lb.iterate(func(i int, el *eventloop) bool { | |
svr.wg.Add() | |
go func() { | |
// 调用loopRun | |
el.loopRun(svr.opts.LockOSThread) | |
svr.wg.Done() | |
}() | |
return true | |
}) | |
} | |
func (el *eventloop) loopRun(lockOSThread bool) { | |
if lockOSThread { | |
runtime.LockOSThread() | |
defer runtime.UnlockOSThread() | |
} | |
defer func() { | |
el.closeAllConns() | |
el.ln.close() | |
el.svr.signalShutdown() | |
}() | |
// 调用Polling 注意这里Polling里面传的是一个方法 | |
err := el.poller.Polling(func(fd int, ev uint) (err error) { | |
// 注意里面这个连接有事件发生的时候 | |
if c, ok := el.connections[fd]; ok { | |
// Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN unless you're% | |
// sure what you're doing! | |
// Re-ordering can easily introduce bugs and bad side-effects, as I found out painfully in the past. | |
// We should always check for the EPOLLOUT event first, as we must try to send the leftover data back to | |
// client when any error occurs on a connection. | |
// | |
// Either an EPOLLOUT or EPOLLERR event may be fired when a connection is refused. | |
// In either case loopWrite() should take care of it properly: | |
//) writing data back, | |
//) closing the connection. | |
if ev&netpoll.OutEvents != && !c.outboundBuffer.IsEmpty() { | |
// 写事件 | |
if err := el.loopWrite(c); err != nil { | |
return err | |
} | |
} | |
// If there is pending data in outbound buffer, then we should omit this readable event | |
// and prioritize the writable events to achieve a higher performance. | |
// | |
// Note that the client may send massive amounts of data to server by write() under blocking mode, | |
// resulting in that it won't receive any responses before the server read all data from client, | |
// in which case if the socket send buffer is full, we need to let it go and continue reading the data | |
// to prevent blocking forever. | |
// 读事件 | |
if ev&netpoll.InEvents != && (ev&netpoll.OutEvents == 0 || c.outboundBuffer.IsEmpty()) { | |
return el.loopRead(c) | |
} | |
return nil | |
} | |
// 说明只是可以建立新的连接 | |
return el.loopAccept(ev) | |
}) | |
el.getLogger().Debugf("event-loop(%d) is exiting due to error: %v", el.idx, err) | |
} |
polling
这个方法是比较重要的,也是阻塞在epoll上面,去监听fd的事件
// Polling blocks the current goroutine, waiting for network-events. | |
func (p *Poller) Polling(callback func(fd int, ev uint) error) error { | |
el := newEventList(InitPollEventsCap) | |
var wakenUp bool | |
msec := - | |
for { | |
// 使用epoll_wait | |
n, err := unix.EpollWait(p.fd, el.events, msec) | |
if n == || (n < 0 && err == unix.EINTR) { | |
msec = - | |
runtime.Gosched() | |
continue | |
} else if err != nil { | |
logging.Errorf("error occurs in epoll: %v", os.NewSyscallError("epoll_wait", err)) | |
return err | |
} | |
msec = | |
// 判断每个套接字的事件 | |
for i :=; i < n; i++ { | |
ev := &el.events[i] | |
// 判断是不是唤醒的 | |
if fd := int(ev.Fd); fd != p.wfd { | |
switch err = callback(fd, ev.Events); err { | |
case nil: | |
case errors.ErrAcceptSocket, errors.ErrServerShutdown: | |
return err | |
default: | |
logging.Warnf("error occurs in event-loop: %v", err) | |
} | |
} else { // poller is awaken to run tasks in queues. | |
wakenUp = true | |
_, _ = unix.Read(p.wfd, p.wfdBuf) | |
} | |
} | |
// 进行唤醒 | |
if wakenUp { | |
wakenUp = false | |
task := p.priorAsyncTaskQueue.Dequeue() | |
// 运行任务 | |
for ; task != nil; task = p.priorAsyncTaskQueue.Dequeue() { | |
switch err = task.Run(task.Arg); err { | |
case nil: | |
case errors.ErrServerShutdown: | |
return err | |
default: | |
logging.Warnf("error occurs in user-defined function, %v", err) | |
} | |
// 放入任务 | |
queue.PutTask(task) | |
} | |
for i :=; i < MaxAsyncTasksAtOneTime; i++ { | |
if task = p.asyncTaskQueue.Dequeue(); task == nil { | |
break | |
} | |
switch err = task.Run(task.Arg); err { | |
case nil: | |
case errors.ErrServerShutdown: | |
return err | |
default: | |
logging.Warnf("error occurs in user-defined function, %v", err) | |
} | |
queue.PutTask(task) | |
} | |
atomic.StoreInt(&p.netpollWakeSig, 0) | |
if (!p.asyncTaskQueue.Empty() || !p.priorAsyncTaskQueue.Empty()) && atomic.CompareAndSwapInt(&p.netpollWakeSig, 0, 1) { | |
for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) { | |
} | |
} | |
} | |
if n == el.size { | |
el.expand() | |
} else if n < el.size>> { | |
el.shrink() | |
} | |
} | |
} |
这里主要分析的是在reuse port的情况下,根据你开多少线程那么开多少个open poll,这样的话线程数量就是固定的,就不会出现goroutine暴增的情况,同时因为每次accept连接后,便会设置成了非阻塞的,并且不会阻塞在read和write这样的io事件上,通过这些行为保证了整个流程的高可用