Go标准包——net/rpc包的使用

Golang
384
0
0
2022-07-13
标签   RPC框架

rpc原理很简单,客户端把请求方法,参数等信息编码后传给服务端,服务端反解码后,找到对应的方法执行,并且把结果编码后返回给客户端。

服务端

重要结构
type Server struct {
  serviceMap sync.Map   // map[string]*service 注册的服务存放
  reqLock    sync.Mutex // protects freeReq
  freeReq    *Request
  respLock   sync.Mutex // protects freeResp
  freeResp   *Response
}
type gobServerCodec struct { //使用gob实现编码器
  rwc    io.ReadWriteCloser
  dec    *gob.Decoder
  enc    *gob.Encoder
  encBuf *bufio.Writer
  closed bool
}
type ServerCodec interface { //编码器接口
  ReadRequestHeader(*Request) error
  ReadRequestBody(interface{}) error
  WriteResponse(*Response, interface{}) error 
  // Close can be called multiple times and must be idempotent.
  Close() error
}

整体来说,Server处理ServerCodec,其他编解码封装conn实现ServerCodec,也是一样的。

server.go

type HelloService struct{}
func (p *HelloService) Hello(request common.Request, response *common.Response) error {
  return nil
}
​
func main() {
 //1.注册服务,写到serviceMap中
  rpc.RegisterName("HelloService", new(HelloService))//把服务注册到rpc.Server.serviceMap
  listener, _ := net.Listen("tcp", ":1234")
  for {
  conn, err := listener.Accept()
  if err != nil {
      log.Fatal("Accept error:", err)
  }
  go rpc.ServeConn(conn)
  }
}
1.封装ServerCodec

conn会用gob包封装,标准库gob是golang提供的“私有”的编解码方式,它的效率会比json,xml等更高,特别适合在Go语言程序间传递数据。

func (server *Server) ServeConn(conn io.ReadWriteCloser) {
  buf := bufio.NewWriter(conn)
  srv := &gobServerCodec{
  rwc:    conn,
  dec:    gob.NewDecoder(conn),//封装成解码器,从conn读取数据并解码
  enc:    gob.NewEncoder(buf),//封装成编码器,编码后写入conn
  encBuf: buf,
  }
  server.ServeCodec(srv)
}
2.读取请求并调用对应的service
func (server *Server) ServeCodec(codec ServerCodec) {
  sending := new(sync.Mutex)
  wg := new(sync.WaitGroup)
  for {
  service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)//读取request,service
  wg.Add(1)
  go service.call(server, sending, wg, mtype, req, argv, replyv, codec)//调用service
  }
  wg.Wait()
  codec.Close()
}

客户端

重要结构
type Client struct {
  codec ClientCodec //编解码封装的conn
  reqMutex sync.Mutex // protects following
  request  Request
  mutex    sync.Mutex // protects following
  seq      uint64
  pending  map[uint64]*Call
  closing  bool // user has called Close
  shutdown bool // server has told us to stop
}
type ClientCodec interface {
  WriteRequest(*Request, interface{}) error
  ReadResponseHeader(*Response) error
  ReadResponseBody(interface{}) error
  Close() error
}
type gobClientCodec struct { //gob实现的ClientCodec
  rwc    io.ReadWriteCloser
  dec    *gob.Decoder
  enc    *gob.Encoder
  encBuf *bufio.Writer
}

client.go

func main() {
  rpcClient, err := rpc.Dial("tcp", "localhost:1234")
  if err != nil {
  fmt.Println(err)
  }
  var reply common.Response
  var req common.Request
  req.UserId = 11 
 //向服务端发送"HelloService.Hello",req, 从服务端读到reply
  err = rpcClient.Call("HelloService.Hello", req, &reply)
  if err != nil {
  fmt.Println(err)
  }
  fmt.Print(reply)
}
1.生成client

使用conn即可生成client,所有如果使用tls连接,可以自定义client

func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address) 
  if err != nil {
  return nil, err
  }
  return NewClient(conn), nil
}
func NewClient(conn io.ReadWriteCloser) *Client {
  encBuf := bufio.NewWriter(conn)
  client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}//使用gob编解码 
  return NewClientWithCodec(client)
}
2.发起调用
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
  call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
  return call.Error
}
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
  call := new(Call)
  call.ServiceMethod = serviceMethod
  call.Args = args
  call.Reply = reply
 done = make(chan *Call, 10) // buffered.
  call.Done = done
  client.send(call)
  return call
}

编解码

官方库提供了encoding/gob,encoding/json两种编码模式,把io.ReadWrite分别封装为编码器,解码器。

//gob包,需要一个io.ReadWrite参数,分别包装一层实现编码后写入,解码后读出

enc := gob.NewEncoder(conn)
enc.Encode(data) //相当于编码后conn.write(data)
​
dec := gob.NewDecoder(conn)
dec.Decode(&m) //解码到变量m
​
//也可以使用json编解码
enc := json.NewEncoder(conn)
json.Encode(data) //相当于编码后conn.write(data)
dec := json.NewDecoder(conn)
json.Decode(&m) //解码到变量m, 从conn读取数据后解码到变量m
gob包使用案例

底层使用了go的反射功能,所以这种方式编解码只能在go语言中使用

func main() {
  info := map[string]string{
  "name":    "C语言中文网",
  "website": "http://c.biancheng.net/golang/",
  }
  name := "demo.gob"
  EncodeToByte(name, info)
  DecodeFromFile(name)
}
func EncodeToByte(name string, data interface{}) {
  fd, _ := os.OpenFile(name, os.O_RDWR|os.O_CREATE, 0777)
  defer fd.Close()
  enc := gob.NewEncoder(fd)//生成编码器 
  if err := enc.Encode(data); err != nil {//把数据编码后写入到文件中
  fmt.Println(err)
  }
}
func DecodeFromFile(name string) {
  var m map[string]string
  fd, _ := os.OpenFile(name, os.O_RDWR|os.O_CREATE, 0777)
  defer fd.Close()
  D := gob.NewDecoder(fd) //生成解码器
  D.Decode(&m)//从文件读取数据,解码到变量中
  fmt.Println(m)
}
使用json做编解码

服务端

使用的也是go标准库net/rpc/jsonrpc

func jsonServer() {
  rpc.RegisterName("HelloService", new(HelloService)) //把服务注册到rpc.Server.serviceMap
  listener, _ := net.Listen("tcp", ":1234")
  for {
  conn, err := listener.Accept()
  if err != nil {
      log.Fatal("Accept error:", err)
  }
  go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
  }
}

客户端

func jsonClient() {
  conn, err := net.Dial("tcp", "localhost:1234")
  if err != nil {
  log.Fatal("net.Dial:", err)
  }
  client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
  var reply string
  err = client.Call("HelloService.Hello", "hello", &reply)
  if err != nil {
  log.Fatal(err)
  }
  fmt.Println(reply)
}

http协议实现rpc

service.go

func HttpRpcServer() {
  rpc.RegisterName("HelloService", new(HelloService)) //把服务注册到rpc.Server.serviceMap
  rpc.HandleHTTP()//会注册一个默认路径到http,Handle注册为rpcServer 
  if err := http.ListenAndServe(":1234", nil); err != nil {
  log.Fatal("Error serving: ", err)
  }
}

client.go

func HttpRpcClient() {
 rpcClient, _ := rpc.DialHTTP("tcp", ":1234") //访问服务端注册的默认路径 
  var reply common.Response
  var req common.Request
  req.UserId = 11
  rpcClient.Call("HelloService.Hello", req, &reply)  
  fmt.Print(reply)
}

客户端rpc异步调用

client.go

func HttpRpcClient() {
  rpcClient, err := rpc.DialHTTP("tcp", ":1234")
  if err != nil {
  panic(err)
  }
  var reply common.Response
  var req common.Request
  req.UserId = 11  
  async := rpcClient.Go("HelloService.Hello", req, &reply, nil)
  <-async.Done //等待异步返回结果,可以放到一个单独协程等待,不用阻塞当前协程
  fmt.Print(reply)
}

有位大佬用go重新实现了net/rpc并增加了功能

7天用Go从零实现RPC框架GeeRPC