rpc原理很简单,客户端把请求方法,参数等信息编码后传给服务端,服务端反解码后,找到对应的方法执行,并且把结果编码后返回给客户端。
服务端
重要结构
type Server struct {
serviceMap sync.Map // map[string]*service 注册的服务存放
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
type gobServerCodec struct { //使用gob实现编码器
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
closed bool
}
type ServerCodec interface { //编码器接口
ReadRequestHeader(*Request) error
ReadRequestBody(interface{}) error
WriteResponse(*Response, interface{}) error
// Close can be called multiple times and must be idempotent.
Close() error
}
整体来说,Server处理ServerCodec,其他编解码封装conn实现ServerCodec,也是一样的。
server.go
type HelloService struct{}
func (p *HelloService) Hello(request common.Request, response *common.Response) error {
return nil
}
func main() {
//1.注册服务,写到serviceMap中
rpc.RegisterName("HelloService", new(HelloService))//把服务注册到rpc.Server.serviceMap
listener, _ := net.Listen("tcp", ":1234")
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
go rpc.ServeConn(conn)
}
}
1.封装ServerCodec
conn会用gob包封装,标准库gob是golang提供的“私有”的编解码方式,它的效率会比json,xml等更高,特别适合在Go语言程序间传递数据。
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),//封装成解码器,从conn读取数据并解码
enc: gob.NewEncoder(buf),//封装成编码器,编码后写入conn
encBuf: buf,
}
server.ServeCodec(srv)
}
2.读取请求并调用对应的service
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)//读取request,service
wg.Add(1)
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)//调用service
}
wg.Wait()
codec.Close()
}
客户端
重要结构
type Client struct {
codec ClientCodec //编解码封装的conn
reqMutex sync.Mutex // protects following
request Request
mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}
type ClientCodec interface {
WriteRequest(*Request, interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error
Close() error
}
type gobClientCodec struct { //gob实现的ClientCodec
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
}
client.go
func main() {
rpcClient, err := rpc.Dial("tcp", "localhost:1234")
if err != nil {
fmt.Println(err)
}
var reply common.Response
var req common.Request
req.UserId = 11
//向服务端发送"HelloService.Hello",req, 从服务端读到reply
err = rpcClient.Call("HelloService.Hello", req, &reply)
if err != nil {
fmt.Println(err)
}
fmt.Print(reply)
}
1.生成client
使用conn即可生成client,所有如果使用tls连接,可以自定义client
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}//使用gob编解码
return NewClientWithCodec(client)
}
2.发起调用
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
done = make(chan *Call, 10) // buffered.
call.Done = done
client.send(call)
return call
}
编解码
官方库提供了encoding/gob,encoding/json两种编码模式,把io.ReadWrite分别封装为编码器,解码器。
//gob包,需要一个io.ReadWrite参数,分别包装一层实现编码后写入,解码后读出
enc := gob.NewEncoder(conn)
enc.Encode(data) //相当于编码后conn.write(data)
dec := gob.NewDecoder(conn)
dec.Decode(&m) //解码到变量m
//也可以使用json编解码
enc := json.NewEncoder(conn)
json.Encode(data) //相当于编码后conn.write(data)
dec := json.NewDecoder(conn)
json.Decode(&m) //解码到变量m, 从conn读取数据后解码到变量m
gob包使用案例
底层使用了go的反射功能,所以这种方式编解码只能在go语言中使用
func main() {
info := map[string]string{
"name": "C语言中文网",
"website": "http://c.biancheng.net/golang/",
}
name := "demo.gob"
EncodeToByte(name, info)
DecodeFromFile(name)
}
func EncodeToByte(name string, data interface{}) {
fd, _ := os.OpenFile(name, os.O_RDWR|os.O_CREATE, 0777)
defer fd.Close()
enc := gob.NewEncoder(fd)//生成编码器
if err := enc.Encode(data); err != nil {//把数据编码后写入到文件中
fmt.Println(err)
}
}
func DecodeFromFile(name string) {
var m map[string]string
fd, _ := os.OpenFile(name, os.O_RDWR|os.O_CREATE, 0777)
defer fd.Close()
D := gob.NewDecoder(fd) //生成解码器
D.Decode(&m)//从文件读取数据,解码到变量中
fmt.Println(m)
}
使用json做编解码
服务端
使用的也是go标准库net/rpc/jsonrpc
func jsonServer() {
rpc.RegisterName("HelloService", new(HelloService)) //把服务注册到rpc.Server.serviceMap
listener, _ := net.Listen("tcp", ":1234")
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
客户端
func jsonClient() {
conn, err := net.Dial("tcp", "localhost:1234")
if err != nil {
log.Fatal("net.Dial:", err)
}
client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
var reply string
err = client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
http协议实现rpc
service.go
func HttpRpcServer() {
rpc.RegisterName("HelloService", new(HelloService)) //把服务注册到rpc.Server.serviceMap
rpc.HandleHTTP()//会注册一个默认路径到http,Handle注册为rpcServer
if err := http.ListenAndServe(":1234", nil); err != nil {
log.Fatal("Error serving: ", err)
}
}
client.go
func HttpRpcClient() {
rpcClient, _ := rpc.DialHTTP("tcp", ":1234") //访问服务端注册的默认路径
var reply common.Response
var req common.Request
req.UserId = 11
rpcClient.Call("HelloService.Hello", req, &reply)
fmt.Print(reply)
}
客户端rpc异步调用
client.go
func HttpRpcClient() {
rpcClient, err := rpc.DialHTTP("tcp", ":1234")
if err != nil {
panic(err)
}
var reply common.Response
var req common.Request
req.UserId = 11
async := rpcClient.Go("HelloService.Hello", req, &reply, nil)
<-async.Done //等待异步返回结果,可以放到一个单独协程等待,不用阻塞当前协程
fmt.Print(reply)
}
有位大佬用go重新实现了net/rpc并增加了功能