http client
重要结构
1.Request, 保存http request数据的结构,包括header,body信息
2.Client, 保存Transport,cokie信息
3.Transport,管理一个连接池
client.go
| func main() {//自定义request |
| request, err := http.NewRequest("get", "https://xtz.getblock.io/mainnet/chains", http.NoBody)if err != nil { |
| fmt.Println(err)} |
| request.Header.Add("x-api-key", "0ee52bce-e6c2-4e3e-987b-629a3e08") |
| request.Header.Add("Content-type", "application/json") |
| |
| //自定义transport |
| proxyFunc := http.ProxyURL(proxyURL)var tlsConfig *tls.Config |
| transport := &http.Transport{ |
| Dial: (&net.Dialer{ |
| Timeout: config.GetDuration("rpc.dial_timeout"), |
| KeepAlive: config.GetDuration("rpc.keepalive"),}).Dial, |
| Proxy: proxyFunc, |
| TLSClientConfig: tlsConfig, |
| IdleConnTimeout: 60, |
| ResponseHeaderTimeout: 60, |
| ExpectContinueTimeout: 60, |
| MaxIdleConns: 5, |
| MaxIdleConnsPerHost: 5,}//自定义client |
| client := http.Client{ |
| Transport: transport,} |
| response, err := client.Do(request) |
| fmt.Println(response) |
| } |
请求的大致流程
1.根据请求条件,构建request对象
2.所有的client请求,都会经过client.do()处理
func (c *Client) do(req *Request) (retres *Response, reterr error)
2.1 request请求使用client.send()处理
| func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) |
| |
| resp, didTimeout, err = send(req, c.transport(), deadline) |
3.send函数
| func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { |
| resp, err = rt.RoundTrip(req) |
| } |
4.DefaultTransport的RoundTrip方法,实际就是Transport的RoundTrip方法
| func (t *Transport) roundTrip(req *Request) (*Response, error) { |
| treq := &transportRequest{Request: req, trace: trace} |
| cm, err := t.connectMethodForRequest(treq) |
| pconn, err := t.getConn(treq, cm) |
| resp, err = pconn.roundTrip(treq) |
| } |
5.使用连接池技术,获取连接对象*persistConn
| func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) { |
| w := &wantConn{ |
| cm: cm, |
| key: cm.key(), |
| ctx: ctx, |
| ready: make(chan struct{}, 1), |
| beforeDial: testHookPrePendingDial, |
| afterDial: testHookPostPendingDial,}if delivered := t.queueForIdleConn(w); delivered { |
| pc := w.pc |
| return pc, nil} |
| t.queueForDial(w) |
| |
| select {case <-w.ready: |
| |
| } |
5.1 Transport.queueForDial发起连接
| func (t *Transport) queueForDial(w *wantConn) {go t.dialConnFor(w) |
| } |
5.2 发起拨号dialConnFor
| func (t *Transport) dialConnFor(w *wantConn) { |
| pc, err := t.dialConn(w.ctx, w.cm) |
| delivered := w.tryDeliver(pc, err) |
| } |
5.3 发起拨号
| func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { |
| pconn = &persistConn{ |
| t: t, |
| cacheKey: cm.key(), |
| reqch: make(chan requestAndChan, 1), |
| writech: make(chan writeRequest, 1), |
| closech: make(chan struct{}), |
| writeErrCh: make(chan error, 1), |
| writeLoopDone: make(chan struct{}),} |
| conn, err := t.dial(ctx, "tcp", cm.addr()) |
| |
| pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) |
| pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) |
| |
| go pconn.readLoop() |
| } |
5.4读协程,虽然是for循环,但是一次性就把请求的response读完了,如果没有关闭,就会造成协程泄露了
| func (pc *persistConn) readLoop() { |
| alive := truefor alive { |
| rc := <-pc.reqch //读取request,写入的地方在步骤6 |
| |
| resp, err = pc.readResponse(rc, trace) //返回response//response的body是否可写,服务器code101才可写,所以正常这个是false |
| bodyWritable := resp.bodyIsWritable() |
| |
| //response.Close设置循环结束,退出协程if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { alive = false} |
| |
| //把response写入通道,在步骤6会读取这个通道select {case rc.ch <- responseAndError{res: resp}:case <-rc.callerGone:return}//循环结束的一些情况select {case bodyEOF := <-waitForBodyRead: //读完body也会自动结束 case <-rc.req.Cancel:case <-rc.req.Context().Done():case <-pc.closech: |
| alive = false |
| pc.t.CancelRequest(rc.req)}} |
5.4.1 pc.readResponse 获取response
| func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {for{ |
| resp, err = ReadResponse(pc.br, rc.req) |
| } |
5.4.2 ReadResponse读取response
| func ReadResponse(r *bufio.Reader, req *Request) (*Response, error) { |
| tp := textproto.NewReader(r) |
| resp := &Response{ |
| Request: req,} |
| line, err := tp.ReadLine() |
| resp.Proto = line[:i] |
| resp.Status = strings.TrimLeft(line[i+1:], " ") |
| |
| mimeHeader, err := tp.ReadMIMEHeader() |
| resp.Header = Header(mimeHeader) |
| } |
5.5 写协程
| func (pc *persistConn) writeLoop() {for {select {case wr := <-pc.writech: |
| startBytesWritten := pc.nwrite |
| err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) |
| } |
| } |
6.使用连接对象*persistConn获取response
| func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {var continueCh chan struct{} |
| resc := make(chan responseAndError) |
| |
| pc.writech <- writeRequest{req, writeErrCh, continueCh} |
| |
| pc.reqch <- requestAndChan{ |
| req: req.Request, |
| ch: resc, |
| addedGzip: requestedGzip, |
| continueCh: continueCh, |
| callerGone: gone,}for { |
| case <-pc.closech: |
| case re := <-resc: |
| } |