Go-kratos 框架商城微服务实战之用户服务 (五)
这篇主要给服务加入链路追踪,完善 consul,并测试 shop 的 http api 接口 文章写的不清晰的地方可通过 GitHub 源码进行查看, 也感谢您指出不足之处,欢迎大佬指教。
注:竖排 … 代码省略,为了保持文章的篇幅简洁,我会将一些不必要的代码使用竖排的 . 来代替,你在复制本文代码块的时候,切记不要将 . 也一同复制进去。
准备工作
安装 consul
# 这里使用的是 docker 工具进行创建的 | |
docker run -d -p 8500:8500 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8600:8600/udp consul consul agent -dev -client=0.0.0.0 |
- 浏览器访问 127.0.0.1:8500/ui/dc1/services 验证是否安装成功
jaeger
的安装
# 这里使用的是 docker 工具进行创建的 | |
docker run --rm --name jaeger -p14268:14268 -p16686:16686 jaegertracing/all-in-one | |
// 执行完毕之后,切记别退出服务 |
- 浏览器访问
http://127.0.0.1:16686/
验证是否安装成功
user 服务添加配置代码
consul 的配置前几篇文章都已经添加过了,这里就不重复添加了
- user 项目中添加
# user/configs/config.yaml 配置文件新增 | |
... | |
trace: | |
endpoint: http://127.0.0.1:14268/api/traces |
- 修改 user 的配置文件
... | |
message Bootstrap { | |
Server server = 1; | |
Data data = 2; | |
Trace trace = 3; // 此处为新增的配置 | |
} | |
... | |
message Trace { | |
string endpoint = 1; | |
} |
- 生成 user 的 conf 文件
user 根目录执行命令,生成新的配置文件 | |
make config |
- 修改
grpc.go
文件
package server | |
import ( | |
. | |
. | |
. | |
"github.com/go-kratos/kratos/v2/middleware/tracing" // 新增引入 | |
) | |
// NewGRPCServer new a gRPC server. | |
func NewGRPCServer(c *conf.Server, u *service.UserService, logger log.Logger) *grpc.Server { | |
var opts = []grpc.ServerOption{ | |
grpc.Middleware( | |
recovery.Recovery(), | |
tracing.Server(), // 新增 tracing | |
), | |
} | |
. | |
. | |
. | |
v1.RegisterUserServer(srv, u) | |
return srv | |
} |
- 修改
main.go
文件
package main | |
import ( | |
"flag" | |
"github.com/go-kratos/kratos/v2/registry" | |
"go.opentelemetry.io/otel" | |
"go.opentelemetry.io/otel/attribute" | |
"os" | |
"github.com/go-kratos/kratos/v2" | |
"github.com/go-kratos/kratos/v2/config" | |
"github.com/go-kratos/kratos/v2/config/file" | |
"github.com/go-kratos/kratos/v2/log" | |
"github.com/go-kratos/kratos/v2/middleware/tracing" | |
"github.com/go-kratos/kratos/v2/transport/grpc" | |
"go.opentelemetry.io/otel/exporters/jaeger" | |
"go.opentelemetry.io/otel/sdk/resource" | |
tracesdk "go.opentelemetry.io/otel/sdk/trace" | |
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" | |
"user/internal/conf" | |
) | |
// go build -ldflags "-X main.Version=x.y.z" | |
var ( | |
// Name is the name of the compiled software. | |
Name = "shop.user.service" | |
// Version is the version of the compiled software. | |
Version "user.v1" | |
// flagconf is the config flag. | |
flagconf string | |
id, _ = os.Hostname() | |
) | |
func init() { | |
flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml") | |
} | |
func newApp(logger log.Logger, gs *grpc.Server, rr registry.Registrar) *kratos.App { | |
return kratos.New( | |
kratos.ID(id+"user service"), | |
kratos.Name(Name), | |
kratos.Version(Version), | |
kratos.Metadata(map[string]string{}), | |
kratos.Logger(logger), | |
kratos.Server( | |
gs, | |
), | |
kratos.Registrar(rr), // 服务注册与发现 | |
) | |
} | |
// Set global trace provider 设置链路追逐的方法 | |
func setTracerProvider(url string) error { | |
// Create the Jaeger exporter | |
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) | |
if err != nil { | |
return err | |
} | |
tp := tracesdk.NewTracerProvider( | |
// Set the sampling rate based on the parent span to 100% | |
tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.TraceIDRatioBased(1.0))), | |
// Always be sure to batch in production. | |
tracesdk.WithBatcher(exp), | |
// Record information about this application in an Resource. | |
tracesdk.WithResource(resource.NewSchemaless( | |
semconv.ServiceNameKey.String(Name), | |
attribute.String("env", "dev"), | |
)), | |
) | |
otel.SetTracerProvider(tp) | |
return nil | |
} | |
func main() { | |
flag.Parse() | |
logger := log.With(log.NewStdLogger(os.Stdout), | |
"ts", log.DefaultTimestamp, | |
"caller", log.DefaultCaller, | |
"service.id", id, | |
"service.name", Name, | |
"service.version", Version, | |
"trace_id", tracing.TraceID(), | |
"span_id", tracing.SpanID(), | |
) | |
c := config.New( | |
config.WithSource( | |
file.NewSource(flagconf), | |
), | |
) | |
defer c.Close() | |
if err := c.Load(); err != nil { | |
panic(err) | |
} | |
var bc conf.Bootstrap | |
if err := c.Scan(&bc); err != nil { | |
panic(err) | |
} | |
// 加入链路追踪的配置 | |
if err := setTracerProvider(bc.Trace.Endpoint); err != nil { | |
panic(err) | |
} | |
var rc conf.Registry | |
if err := c.Scan(&rc); err != nil { | |
panic(err) | |
} | |
app, cleanup, err := initApp(bc.Server, &rc, bc.Data, logger) | |
if err != nil { | |
panic(err) | |
} | |
defer cleanup() | |
// start and wait for stop signal | |
if err := app.Run(); err != nil { | |
panic(err) | |
} | |
} |
修改 wire.go 文件
根目录执行命令,生成新的 wire_gen.go 文件 | |
make wire |
shop 项目中添加配置代码
前几篇已经把 consul service 的一些配置加入到了 config 文件中,这里就不重复添加了
- 修改 config.yaml 文件
考虑到这个配置文件的重要性,这里贴出来了全部的配置
name: shop.api | |
server: | |
http: | |
addr: 0.0.0.0:8097 | |
timeout: 1s | |
grpc: | |
addr: 0.0.0.0:9001 | |
timeout: 1s | |
data: | |
database: | |
driver: mysql | |
source: root:root@tcp(127.0.0.1:3306)/test | |
redis: | |
addr: 127.0.0.1:6379 | |
read_timeout: 0.2s | |
write_timeout: 0.2s | |
trace: | |
endpoint: http://127.0.0.1:14268/api/traces | |
auth: | |
jwt_key: hqFr%3ddt32DGlSTOI5cO6@TH#fFwYnP$S | |
service: | |
user: | |
endpoint: discovery:///shop.user.service | |
goods: | |
endpoint: discovery:///shop.goods.service |
- 修改 conf.proto 文件
考虑到这个配置文件的重要性,这里贴出来了全部的配置
syntax = "proto3"; | |
package shop.api; | |
option go_package = "shop/internal/conf;conf"; | |
import "google/protobuf/duration.proto"; | |
message Bootstrap { | |
Server server = 1; | |
Data data = 2; | |
Trace trace = 3; | |
Auth auth = 4; | |
Service service = 5; | |
} | |
message Server { | |
message HTTP { | |
string network = 1; | |
string addr = 2; | |
google.protobuf.Duration timeout = 3; | |
} | |
message GRPC { | |
string network = 1; | |
string addr = 2; | |
google.protobuf.Duration timeout = 3; | |
} | |
HTTP http = 1; | |
GRPC grpc = 2; | |
} | |
message Data { | |
message Database { | |
string driver = 1; | |
string source = 2; | |
} | |
message Redis { | |
string network = 1; | |
string addr = 2; | |
google.protobuf.Duration read_timeout = 3; | |
google.protobuf.Duration write_timeout = 4; | |
} | |
Database database = 1; | |
Redis redis = 2; | |
} | |
message Service { | |
message User { | |
string endpoint = 1; | |
} | |
message Goods { | |
string endpoint = 1; | |
} | |
User user = 1; | |
Goods goods = 2; | |
} | |
message Trace { | |
string endpoint = 1; | |
} | |
message Registry { | |
message Consul { | |
string address = 1; | |
string scheme = 2; | |
} | |
Consul consul = 1; | |
} | |
message Auth { | |
string jwt_key = 1; | |
} |
- 生成新的配置
user 根目录执行命令,生成新的配置文件 | |
make config |
- 修改链接用户服务的连接
shop/internal/data/data.go
package data | |
import ( | |
"context" | |
consul "github.com/go-kratos/kratos/contrib/registry/consul/v2" | |
"github.com/go-kratos/kratos/v2/log" | |
"github.com/go-kratos/kratos/v2/middleware/recovery" | |
"github.com/go-kratos/kratos/v2/middleware/tracing" | |
"github.com/go-kratos/kratos/v2/registry" | |
"github.com/go-kratos/kratos/v2/transport/grpc" | |
"github.com/google/wire" | |
consulAPI "github.com/hashicorp/consul/api" | |
grpcx "google.golang.org/grpc" | |
userV1 "shop/api/service/user/v1" | |
"shop/internal/conf" | |
"time" | |
) | |
// ProviderSet is data providers. | |
var ProviderSet = wire.NewSet(NewData, NewUserRepo, NewUserServiceClient, NewRegistrar, NewDiscovery) | |
// Data . | |
type Data struct { | |
log *log.Helper | |
uc userV1.UserClient | |
} | |
// NewData . | |
func NewData(c *conf.Data, uc userV1.UserClient, logger log.Logger) (*Data, error) { | |
l := log.NewHelper(log.With(logger, "module", "data")) | |
return &Data{log: l, uc: uc}, nil | |
} | |
// NewUserServiceClient 链接用户服务 grpc | |
func NewUserServiceClient(ac *conf.Auth, sr *conf.Service, rr registry.Discovery) userV1.UserClient { | |
conn, err := grpc.DialInsecure( | |
context.Background(), | |
grpc.WithEndpoint(sr.User.Endpoint), | |
grpc.WithDiscovery(rr), | |
grpc.WithMiddleware( | |
tracing.Client(), // 链路追踪 | |
recovery.Recovery(), | |
), | |
grpc.WithTimeout(2*time.Second), | |
grpc.WithOptions(grpcx.WithStatsHandler(&tracing.ClientHandler{})), | |
) | |
if err != nil { | |
panic(err) | |
} | |
c := userV1.NewUserClient(conn) | |
return c | |
} | |
// NewRegistrar add consul | |
func NewRegistrar(conf *conf.Registry) registry.Registrar { | |
c := consulAPI.DefaultConfig() | |
c.Address = conf.Consul.Address | |
c.Scheme = conf.Consul.Scheme | |
cli, err := consulAPI.NewClient(c) | |
if err != nil { | |
panic(err) | |
} | |
r := consul.New(cli, consul.WithHealthCheck(false)) | |
return r | |
} | |
func NewDiscovery(conf *conf.Registry) registry.Discovery { | |
c := consulAPI.DefaultConfig() | |
c.Address = conf.Consul.Address | |
c.Scheme = conf.Consul.Scheme | |
cli, err := consulAPI.NewClient(c) | |
if err != nil { | |
panic(err) | |
} | |
r := consul.New(cli, consul.WithHealthCheck(false)) | |
return r | |
} |
- 修改 server http 服务
package server | |
import ( | |
"context" | |
"github.com/go-kratos/kratos/v2/log" | |
"github.com/go-kratos/kratos/v2/middleware/auth/jwt" | |
"github.com/go-kratos/kratos/v2/middleware/logging" | |
"github.com/go-kratos/kratos/v2/middleware/recovery" | |
"github.com/go-kratos/kratos/v2/middleware/selector" | |
"github.com/go-kratos/kratos/v2/middleware/tracing" | |
"github.com/go-kratos/kratos/v2/middleware/validate" | |
"github.com/go-kratos/kratos/v2/transport/http" | |
jwt2 "github.com/golang-jwt/jwt/v4" | |
"github.com/gorilla/handlers" | |
v1 "shop/api/shop/v1" | |
"shop/internal/conf" | |
"shop/internal/service" | |
) | |
// NewHTTPServer new an HTTP server. | |
func NewHTTPServer(c *conf.Server, ac *conf.Auth, s *service.ShopService, logger log.Logger) *http.Server { | |
var opts = []http.ServerOption{ | |
http.Middleware( | |
recovery.Recovery(), | |
validate.Validator(), | |
tracing.Server(), // 这里是本篇新增的 | |
selector.Server( | |
jwt.Server(func(token *jwt2.Token) (interface{}, error) { | |
return []byte(ac.JwtKey), nil | |
}, jwt.WithSigningMethod(jwt2.SigningMethodHS256)), | |
).Match(NewWhiteListMatcher()).Build(), | |
logging.Server(logger), | |
), | |
http.Filter(handlers.CORS( | |
handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"}), | |
handlers.AllowedMethods([]string{"GET", "POST", "PUT", "HEAD", "OPTIONS"}), | |
handlers.AllowedOrigins([]string{"*"}), | |
)), | |
} | |
... | |
return srv | |
} | |
... |
- 修改启动文件
package main | |
import ( | |
"flag" | |
"os" | |
"github.com/go-kratos/kratos/v2" | |
"github.com/go-kratos/kratos/v2/config" | |
"github.com/go-kratos/kratos/v2/config/file" | |
"github.com/go-kratos/kratos/v2/log" | |
"github.com/go-kratos/kratos/v2/middleware/tracing" | |
"github.com/go-kratos/kratos/v2/registry" | |
"github.com/go-kratos/kratos/v2/transport/grpc" | |
"github.com/go-kratos/kratos/v2/transport/http" | |
"go.opentelemetry.io/otel" | |
"go.opentelemetry.io/otel/attribute" | |
"go.opentelemetry.io/otel/exporters/jaeger" | |
"go.opentelemetry.io/otel/sdk/resource" | |
tracesdk "go.opentelemetry.io/otel/sdk/trace" | |
semconv "go.opentelemetry.io/otel/semconv/v1.7.0" | |
"shop/internal/conf" | |
) | |
// go build -ldflags "-X main.Version=x.y.z" | |
var ( | |
// Name is the name of the compiled software. | |
Name = "shop.api" | |
// Version is the version of the compiled software. | |
Version = "shop.api.v1" | |
// flagconf is the config flag. | |
flagconf string | |
id, _ = os.Hostname() | |
) | |
func init() { | |
flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml") | |
} | |
func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server, rr registry.Registrar) *kratos.App { | |
return kratos.New( | |
kratos.ID(id+"shop.api"), | |
kratos.Name(Name), | |
kratos.Version(Version), | |
kratos.Metadata(map[string]string{}), | |
kratos.Logger(logger), | |
kratos.Server( | |
hs, | |
//gs, | |
), | |
kratos.Registrar(rr), | |
) | |
} | |
func main() { | |
flag.Parse() | |
logger := log.With(log.NewStdLogger(os.Stdout), | |
"ts", log.DefaultTimestamp, | |
"caller", log.DefaultCaller, | |
"service.id", id, | |
"service.name", Name, | |
"service.version", Version, | |
"trace_id", tracing.TraceID(), | |
"span_id", tracing.SpanID(), | |
) | |
c := config.New( | |
config.WithSource( | |
file.NewSource(flagconf), | |
), | |
) | |
defer c.Close() | |
if err := c.Load(); err != nil { | |
panic(err) | |
} | |
var bc conf.Bootstrap | |
if err := c.Scan(&bc); err != nil { | |
panic(err) | |
} | |
var rc conf.Registry | |
if err := c.Scan(&rc); err != nil { | |
panic(err) | |
} | |
err := setTracerProvider(bc.Trace.Endpoint) | |
if err != nil { | |
panic(err) | |
} | |
app, cleanup, err := initApp(bc.Server, bc.Data, bc.Auth, bc.Service, &rc, logger) | |
if err != nil { | |
panic(err) | |
} | |
defer cleanup() | |
// start and wait for stop signal | |
if err := app.Run(); err != nil { | |
panic(err) | |
} | |
} | |
func setTracerProvider(url string) error { | |
// Create the Jaeger exporter | |
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) | |
if err != nil { | |
return err | |
} | |
tp := tracesdk.NewTracerProvider( | |
// Set the sampling rate based on the parent span to 100% | |
tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.TraceIDRatioBased(1.0))), | |
// Always be sure to batch in production. | |
tracesdk.WithBatcher(exp), | |
// Record information about this application in an Resource. | |
tracesdk.WithResource(resource.NewSchemaless( | |
semconv.ServiceNameKey.String(Name), | |
attribute.String("env", "dev"), | |
)), | |
) | |
otel.SetTracerProvider(tp) | |
return nil | |
} |
- 修改 wire.go 文件
根目录执行命令,生成新的 wire_gen.go 文件 | |
make wire |
完整流程测试
- 启动 user 服务
user 目录下执行命令 | |
kratos run |
- 启动 shop 服务
shop 目录下执行命令 | |
kratos run |
- consul 验证是不是两个服务都启动了
浏览器访问 127.0.0.1:8500/
访问用户创建接口
这里使用的是 apipost 接口测试工具, 具体操作看图示,注意传入的参数类型为 json
如图正确返回了,就证明接口访问成功,可以去数据库表中验证,是否是同样的数据插入
- 验证链路追踪
浏览器访问 127.0.0.1:16686
如图所示选择 shop.api 然后点击下方的 Find Traces
搜索之后点击进去,看到如图
这里需要注意,在咱们的代码中并未设置成统一的 SpanId,只有 TraceId 是一样的。
结束语
整个服务流程已经通了,这里需要注意的点是,访问用户登陆接口的时候,需要先获取一个验证码接口,然后拿到验证码给的 ID 和 code 进行登陆请求。code 是个 url 需要通过浏览器访问才能看到具体 code 是什么。还有查询用户详细信息的时候,需要携带注册或登陆给的 token。携带的方式是 bearer auth 的方式。
接下来会开始完善用户服务的其他信息,如:用户的收获地址之类的。感谢您的耐心阅读,动动手指点个赞吧。