有了服务注册和发现机制,消费者不需要知道具体服务提供者的真实物理地址就可以进行调用,也无须知道具体有多少个服务者可用;而服务提供者只需要注册到注册中心,就可以对外提供服务,在对外服务时不需要知道具体是哪些服务调用了自己。
RPC 配置
| Etcd: |
| Hosts: |
| - 127.0.0.1:2379 |
| Key: user.rpc |
被调方-服务注册
mall/user/rpc/user.go
源码如下
| package main |
| |
| import ( |
| "flag" |
| "fmt" |
| |
| "go-zero-demo-rpc/mall/user/rpc/internal/config" |
| "go-zero-demo-rpc/mall/user/rpc/internal/server" |
| "go-zero-demo-rpc/mall/user/rpc/internal/svc" |
| "go-zero-demo-rpc/mall/user/rpc/types/user" |
| |
| "github.com/zeromicro/go-zero/core/conf" |
| "github.com/zeromicro/go-zero/core/service" |
| "github.com/zeromicro/go-zero/zrpc" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/reflection" |
| ) |
| |
| var configFile = flag.String("f", "etc/user.yaml", "the config file") |
| |
| func main() { |
| flag.Parse() |
| |
| var c config.Config |
| conf.MustLoad(*configFile, &c) |
| ctx := svc.NewServiceContext(c) |
| svr := server.NewUserServer(ctx) |
| |
| s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { |
| user.RegisterUserServer(grpcServer, svr) |
| |
| if c.Mode == service.DevMode || c.Mode == service.TestMode { |
| reflection.Register(grpcServer) |
| } |
| }) |
| defer s.Stop() |
| |
| fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) |
| s.Start() |
| } |
MustNewServer
内部实现调用了NewServer
方法, 这里我们关注NewServer
通过internal.NewRpcPubServer
方法实例化了internal.Server
| if c.HasEtcd() { |
| server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, serverOptions...) |
| if err != nil { |
| return nil, err |
| } |
| } |
internal.NewRpcPubServer
中的registerEtcd
会调用Publisher.KeepAlive
方法
| |
| func (p *Publisher) KeepAlive() error { |
| |
| cli, err := internal.GetRegistry().GetConn(p.endpoints) |
| if err != nil { |
| return err |
| } |
| |
| p.lease, err = p.register(cli) |
| if err != nil { |
| return err |
| } |
| |
| proc.AddWrapUpListener(func() { |
| p.Stop() |
| }) |
| |
| return p.keepAliveAsync(cli) |
| } |
| func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) { |
| |
| |
| resp, err := client.Grant(client.Ctx(), TimeToLive) |
| if err != nil { |
| return clientv3.NoLease, err |
| } |
| |
| |
| lease := resp.ID |
| |
| |
| if p.id > 0 { |
| p.fullKey = makeEtcdKey(p.key, p.id) |
| } else { |
| p.fullKey = makeEtcdKey(p.key, int64(lease)) |
| } |
| |
| |
| _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease)) |
| |
| return lease, err |
| } |
- 注册完之后,
keepAliveAsync
开了一个协程保活这个服务 - 当这个服务意外宕机时, 就不会再向
etcd
保活, etcd
就会删除这个key
- 注册好的服务如图

调用方-服务发现
| package main |
| |
| import ( |
| "flag" |
| "fmt" |
| |
| "go-zero-demo-rpc/order/api/internal/config" |
| "go-zero-demo-rpc/order/api/internal/handler" |
| "go-zero-demo-rpc/order/api/internal/svc" |
| |
| "github.com/zeromicro/go-zero/core/conf" |
| "github.com/zeromicro/go-zero/rest" |
| ) |
| |
| var configFile = flag.String("f", "etc/order.yaml", "the config file") |
| |
| func main() { |
| flag.Parse() |
| |
| var c config.Config |
| conf.MustLoad(*configFile, &c) |
| |
| server := rest.MustNewServer(c.RestConf) |
| defer server.Stop() |
| |
| ctx := svc.NewServiceContext(c) |
| handler.RegisterHandlers(server, ctx) |
| |
| fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) |
| server.Start() |
| } |
- 在
svc.NewServiceContext
方法内部又调用了zrpc.MustNewClient
, zrpc.MustNewClient
主要实现在zrpc.NewClient
| func NewServiceContext(c config.Config) *ServiceContext { |
| return &ServiceContext{ |
| Config: c, |
| UserRpc: user.NewUser(zrpc.MustNewClient(c.UserRpc)), |
| } |
| } |
- 最后实际调用了
internal.NewClient
去实例化rpc client
| func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) { |
| var opts []ClientOption |
| if c.HasCredential() { |
| opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{ |
| App: c.App, |
| Token: c.Token, |
| }))) |
| } |
| if c.NonBlock { |
| opts = append(opts, WithNonBlock()) |
| } |
| if c.Timeout > 0 { |
| opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond)) |
| } |
| |
| opts = append(opts, options...) |
| |
| target, err := c.BuildTarget() |
| if err != nil { |
| return nil, err |
| } |
| |
| client, err := internal.NewClient(target, opts...) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &RpcClient{ |
| client: client, |
| }, nil |
| } |
- 在
zrpc/internal/client.go
文件里, 包含一个init
方法, 这里就是实际发现服务的地方, 在这里注册服务发现者
| func init() { |
| resolver.Register() |
| } |
| package resolver |
| |
| import ( |
| "github.com/zeromicro/go-zero/zrpc/resolver/internal" |
| ) |
| |
| |
| |
| func Register() { |
| internal.RegisterResolver() |
| } |
- 最后又回到
interval
包的internal.RegisterResolver
方法, 这里我们关注etcdResolverBuilder
| func RegisterResolver() { |
| resolver.Register(&directResolverBuilder) |
| resolver.Register(&discovResolverBuilder) |
| resolver.Register(&etcdResolverBuilder) |
| resolver.Register(&k8sResolverBuilder) |
| } |
etcdBuilder
的内嵌了discovBuilder
结构体,Build
方法调用过程:- 实例化服务端:
internal.NewClient
->client.dial
->grpc.DialContext
- 由于
etcd
是resolver.BuildDiscovTarget
生成的taget
所以是类似这样子的: discov://127.0.0.1:2379/user.rpc
- 解析服务发现:
ClientConn.parseTargetAndFindResolver
->grpc.parseTarget
->ClientConn.getResolver
- 然后在
grpc.newCCResolverWrapper
调用resolver.Builder.Build
方法去发现服务 - 我们着重关注
discovBuilder.Build
方法
| type etcdBuilder struct { |
| discovBuilder |
| } |
| |
| |
| type discovBuilder struct{} |
| |
| func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) ( |
| resolver.Resolver, error) { |
| hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool { |
| return r == EndpointSepChar |
| }) |
| sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target)) |
| if err != nil { |
| return nil, err |
| } |
| |
| update := func() { |
| var addrs []resolver.Address |
| for _, val := range subset(sub.Values(), subsetSize) { |
| addrs = append(addrs, resolver.Address{ |
| Addr: val, |
| }) |
| } |
| if err := cc.UpdateState(resolver.State{ |
| Addresses: addrs, |
| }); err != nil { |
| logx.Error(err) |
| } |
| } |
| sub.AddListener(update) |
| update() |
| |
| return &nopResolver{cc: cc}, nil |
| } |
| |
| func (b *discovBuilder) Scheme() string { |
| return DiscovScheme |
| } |
discov.NewSubscriber
方法调用internal.GetRegistry().Monitor
最后调用Registry.monitor
方法进行监视cluster.getClient
拿到etcd
连接cluster.load
作为第一次载入数据cluster.watch
去watch
监听etcd
前缀key
的改动
| func (c *cluster) monitor(key string, l UpdateListener) error { |
| c.lock.Lock() |
| c.listeners[key] = append(c.listeners[key], l) |
| c.lock.Unlock() |
| |
| cli, err := c.getClient() |
| if err != nil { |
| return err |
| } |
| |
| c.load(cli, key) |
| c.watchGroup.Run(func() { |
| c.watch(cli, key) |
| }) |
| |
| return nil |
| } |
- 如下图是
cluster.load
的实现, 就是根据前缀拿到user.prc
服务注册的所有地址 
Q
- 为什么不用
Redis
做注册中心(反正只是把被调方的地址存储, 过期 Redis
也能胜任), 找了很久找到这个说法
简单从以下几个方面说一下瑞迪斯为啥在微服务中不能取代 etcd:
1、redis 没有版本的概念,历史版本数据在大规模微服务中非常有必要,对于状态回滚和故障排查,甚至定锅都很重要
2、redis 的注册和发现目前只能通过 pub 和 sub 来实现,这两个命令完全不能满足生产环境的要求,具体原因可以 gg 或看源码实现
3、etcd 在 2.+版本时,watch 到数据官方文档均建议再 get 一次,因为会存在数据延迟,3.+版本不再需要,可想 redis 的 pub 和 sub 能否达到此种低延迟的要求
4、楼主看到的微服务架构应该都是将 etcd 直接暴露给 client 和 server 的,etcd 的性能摆在那,能够承受多少的 c/s 直连呢,更好的做法应该是对 etcd 做一层保护,当然这种做法会损失一些功能
5、redis 和 etcd 的集群实现方案是不一致的,etcd 采用的是 raft 协议,一主多从,只能写主,底层采用 boltdb 作为 k/v 存储,直接落盘
6、redis 的持久化方案有 aof 和 rdb,这两种方案在宕机的时候都或多或少的会丢失数据
原文链接 www.shiguopeng.cn/posts/2022061518...