Processes and Threads
进程
操作系统会为该应用程序创建一个进程。作为一个应用程序,它像一个为所有资源而运行的容器。这些资源包括内存地址空间、文件句柄(文件描述符)、设备和线程。
不同的应用程序使用的内存空间不同,在操作系统里,就是用进程来做的资源管理、隔离。
线程
线程是操作系统调度的一种执行路径,用于在处理器执行我们在函数中编写的代码。一个进程从一个线程开始,即主线程,当该线程终止时,进程终止。这是因为主线程是应用程序的原点。
main方法不是以主线程运行
go
主线程是一个物理线程,而main函数只是作为goroutine运行的,但是main退出,其他goroutine也会退出。
Java
main方法并不是主线程运行,也就是main方法退出,JVM进程不一定退出,main里开启的子线程会继续运行。java虚拟机(相当于进程)退出的时机是:虚拟机中所有存活的线程都是守护线程。
Goroutines and Parallelism
操作系统调度线程在可用处理器上运行,Go运行时调度 goroutines 在绑定到单个操作系统线程的逻辑处理器中运行(P)。即使使用这个单一的逻辑处理器和操作系统线程,也可以调度数十万 goroutine 以惊人的效率和性能并发运行。
Go 的并发更高效
goroutine增加,只会导致少量内存增加,不会增加操作系统线程,而且goroutine的上下文切换就交给了go runtime,也就是内核态改为用户态。
Java前不久也推出类似goroutine的虚拟线程。
Concurrency is not Parallelism.
如果将运行时配置为使用多个逻辑处理器,则调度程序将在这些逻辑处理器之间分配 goroutine,这将导致 goroutine 在不同的操作系统线程上运行。但是,要获得真正的并行性,您需要在具有多个物理处理器的计算机上运行程序。否则,goroutines 将针对单个物理处理器并发运行,即使 Go 运行时使用多个逻辑处理器。
Keep yourself busy or do the work yourself
package main
import (
"fmt"
"log"
"net/http"
)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, GopherCon SG")
})
go func() {
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}()
for {
}
}
main协程异步调用阻塞函数http.ListenAndServe()
后,自己啥也不干,空转CPU
要不异步调用后做其他事情,要不就自己来做
import (
"fmt"
"log"
"net/http"
)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, GopherCon SG")
})
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
- log.Fatal 内部调用 os.Exit() 会导致defer不执行,建议用panic
http.ListenAndServe() 怎么阻塞的?
for {
rw, err := l.Accept()
if err != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
time.Sleep(tempDelay)
continue
}
return err
}
connCtx := ctx
if cc := srv.ConnContext; cc != nil {
connCtx = cc(connCtx, rw)
if connCtx == nil {
panic("ConnContext returned nil")
}
}
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
}
- 使用for循环不断获取请求
- 通过新启协程委派出去,这里用协程池复用协程应该能提升性能
Leave concurrency to the caller
函数启动 goroutine,则必须向调用方提供显式停止该goroutine 的方法。例如http的ListenAndServe()
和Shutdown()
一样。
通常,将异步执行函数的决定权交给该函数的调用方通常更容易。
读取目录信息
func ListDirectory(dir string)([]string,error)
- 存在的问题
- 必须读完所有数据才结束,对大型目录而言,即使已经读取到想要的数据,也得等读取完。
- 需要将所以目录信息加载到内存,占用内存空间大。
func ListDirectory(dir string) chan string
- 内部启动goroutine读取数据填充到chan,读取完毕关闭chan
- 没有保存所有目录信息到内存,占用内存空间小。
- 存在的问题
- 调用者无法区分读取期间出现error,和空目录导致的error。
- chan可能没有包含完整的数据,因为读取时可能发生错误。
- 即使得到想要的数据,也无法终止读取操作。
func WalkDir(root string, fn fs.WalkDirFunc) error
type WalkDirFunc func(path string, d DirEntry, err error) error
- 这是标准库里的实现方式
- WalkDirFunc能感知到读取文件和目录产生的error。
- WalkDirFunc可以返回SkipDir,让WalkDir跳过当前目录的读取。
- WalkDirFunc返回的其他error,将导致WalkDir终止,并返回此error。
Never start a goroutine without knowning when it will stop
一个应用需要同时启动两个端口监听,一个是业务的,另一个是获取应用状态信息的。
版本一
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
fmt.Fprintln(resp, "Hello, QCon!")
})
go http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux) // debug
http.ListenAndServe("0.0.0.0:8080", mux) // app traffic
}
版本二
func serveApp() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
fmt.Fprintln(resp, "Hello, QCon!")
})
http.ListenAndServe("0.0.0.0:8080", mux)
}
func serveDebug() {
http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
}
func main() {
go serveDebug()
serveApp()
}
- 解耦、拆分、封装
- 同样把并发交给调用者决定
- 应用无法感知serveDebug的goroutine退出。我们要确保应用的必要goroutine退出时,停止应用程序。
- serveApp退出会导致应用退出,进而由进程管理者来决定是否重启。就像函数的并发交给调用者一样,应用的重启也应该交给应用外的程序,例如K8S,Linux进程管理工具supervisor。
版本三
func serveApp() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
fmt.Fprintln(resp, "Hello, QCon!")
})
if err := http.ListenAndServe("0.0.0.0:8080", mux); err != nil {
log.Fatal(err)
}
}
func serveDebug() {
if err := http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux); err != nil {
log.Fatal(err)
}
}
func main() {
go serveDebug()
go serveApp()
select {}
}
- ListenAndServer返回nil(编码时注意有没有这种可能),不会终止应用
log.Fatal
调用os.Exit
,无条件退出进程。不会触发defer
,导致无法通知其他goroutine停止、无法关闭资源等。
版本四
func serveApp() error {
mux := http.NewServeMux()
mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
fmt.Fprintln(resp, "Hello, QCon!")
})
return http.ListenAndServe("0.0.0.0:8080", mux)
}
func serveDebug() error {
return http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
}
func main() {
done := make(chan error, 2)
go func() {
done <- serveDebug()
}()
go func() {
done <- serveApp()
}()
for i := 0; i < cap(done); i++ {
if err := <-done; err != nil {
fmt.Println("error: %v", err)
}
}
}
- 获取到goroutine退出的error
- 某个goroutine退出,并没有通知其他goroutine退出
版本五
func serve(addr string, handler http.Handler, stop <-chan struct{}) error {
s := http.Server{
Addr: addr,
Handler: handler,
}
go func() {
<-stop // wait for stop signal
s.Shutdown(context.Background())
}()
return s.ListenAndServe()
}
func serveApp(stop <-chan struct{}) error {
mux := http.NewServeMux()
mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
fmt.Fprintln(resp, "Hello, QCon!")
})
return serve("0.0.0.0:8080", mux, stop)
}
func serveDebug(stop <-chan struct{}) error {
return serve("127.0.0.1:8001", http.DefaultServeMux, stop)
}
func main() {
done := make(chan error, 2)
stop := make(chan struct{})
go func() {
done <- serveDebug(stop)
}()
go func() {
done <- serveApp(stop)
}()
var stopped bool
for i := 0; i < cap(done); i++ {
if err := <-done; err != nil {
fmt.Println("error: %v", err)
}
if !stopped {
stopped = true
close(stop)
}
}
}
- 记录goroutine退出时的error
- 某个goroutine退出,通知其他goroutine退出,进而停止应用
- go-workgroup
Go 平滑重启 endless
复制父进程占有的文件描述符
netListener.File
会通过系统调用 dup 复制一份 file descriptor 文件描述符。返回的新文件描述符和参数 oldfd 指向同一个文件,共享所有的索性、读写指针、各项权限或标志位等。但是不共享关闭标志位,也就是说 oldfd 已经关闭了,也不影响写入新的数据到 newfd 中。
Fork 子进程
在Go语言中 exec 包为我们很好的封装好了 Fork 调用,并且使用 ExtraFiles
可以很好的继承父进程已打开的文件。
平滑重启流程
- 监听 SIGHUP 信号;
- 收到信号时 fork 子进程(使用相同的启动命令),将服务监听的 socket 文件描述符传递给子进程;
- 子进程监听父进程的 socket,这个时候父进程和子进程都可以接收请求;
- 子进程启动成功之后发送 SIGTERM 信号给父进程,父进程停止接收新的连接(调用http的shutdown),等待旧连接处理完成(或超时);
- 父进程退出,升级完成;
goroutine 泄露
func leak(){
ch make(chan int)
go func()
val :=<-ch
fmt.Println("We received a value:"val)
}()
}
保证异步工作完成
func (t *Tracker)Event(data string){
time.Sleep(time.Millisecond)
log.Println(data)
}
- 使用服务端埋点来跟踪记录一些事件。
// Handle represents an example handler for the web service.
func (a *App)Handle(w http.ResponseWriter,r *http.Request){
//Do some actual work.
//Respond to the client.
w.WriteHeader(http.StatusCreated)
//Fire and Hope.
//BUG:We are not managing this goroutine.
go a.track.Event("this event")
}
- 无法保证创建的 goroutine 生命周期管理,会导致在服务关闭时候,有一些事件丢失。
func (t *Tracker)Event(data string){
//Increment counter so Shutdown knows to wait for this event.
t.wg.Add(1)
//Track event in a goroutine so caller is not blocked.
go func(){
/Decrement counter to tell Shutdown this goroutine finished.
defer t.wg.Done()
time.Sleep(time.Millisecond)//Simulate network write latency.
log.Println(data)
}()
//Shutdown waits for all tracked events to finish processing.
func (t *Tracker)Shutdown(){
t.wg.Wait()
}
func main(){
//Start a server.
var a App
//Shut the server down.
//Wait for all event goroutines to finish.
a.track.Shutdown()
}
- 等待所有记录事件的goroutine完成
func main(){
//Start a server.
var a App
//Shut the server down.
//Wait up to 5 seconds for all event goroutines to finish.
const timeout 5 time.Second
ctx,cancel context.WithTimeout(context.Background(),timeout)
defer cancel()
a.track.Shutdown(ctx)
}
//Shutdown waits for all tracked events to finish processing
//or for the provided context to be canceled.
func (t *Tracker)Shutdown(ctx context.Context)error
//Create a channel to signal when the waitgroup is finished
ch make(chan struct{})
//Create a goroutine to wait for all other goroutines to
//be done then close the channel to unblock the select.
go func(){
t.wg.Wait()
close(ch)
}()
//Block this function from returning.Wait for either the
//waitgroup to finish or the context to expire.
select{
case <-ch:
return nil
case <-ctx.Done():
return errors.New("timeout")
}
}
- 增加超时控制