目录
- 动手实现一个分布式注册中心
- 日志服务
- log/Server.go
- log/Client.go
- 主启动程序LogService
- 服务启动与注册
- service/service.go
- 服务注册与发现
- registry/client.go
- registry/registration.go
- registry/server.go
动手实现一个分布式注册中心
以一个日志微服务为例,将日志服务注册到注册中心展开!
日志服务
log/Server.go
其实这一个日志类的功能就是有基本的写文件功能,然后就是注册一个http的接口去写日志进去
package log
import (
"io/ioutil"
stlog "log"
"net/http"
"os"
)
var log *stlog.Logger
type fileLog string
// 编写日志的方法
func (fl fileLog) Write(data []byte) (int, error) {
f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return 0, err
}
defer f.Close()
return f.Write(data)
}
// 启动一个日志对象 参数为日志文件名
func Run(destination string) {
log = stlog.New(fileLog(destination), "[go] - ", stlog.LstdFlags)
}
// 自身注册的一个服务方法
func RegisterHandlers() {
http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodPost:
msg, err := ioutil.ReadAll(r.Body)
if err != nil || len(msg) == 0 {
w.WriteHeader(http.StatusBadRequest)
return
}
write(string(msg))
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
})
}
func write(message string) {
log.Printf("%v\n", message)
}
log/Client.go
提供给外部服务的接口,定义好日志的命名格式,来显示调用接口去使用已经注册好的日志接口并且返回状态
package log
import (
"bytes"
"distributed/registry"
"fmt"
"net/http"
stlog "log"
)
func SetClientLogger(serviceURL string, clientService registry.ServiceName) {
stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService))
stlog.SetFlags(0)
stlog.SetOutput(&clientLogger{url: serviceURL})
}
type clientLogger struct {
url string
}
func (cl clientLogger) Write(data []byte) (int, error) {
b := bytes.NewBuffer([]byte(data))
res, err := http.Post(cl.url+"/log", "text/plain", b)
if err != nil {
return 0, err
}
if res.StatusCode != http.StatusOK {
return 0, fmt.Errorf("Failed to send log message. Service responded with %d - %s", res.StatusCode, res.Status)
}
return len(data), nil
}
主启动程序LogService
启动服务Logservice
,主要执行start方法,里面有细节实现服务注册与服务发现
package main
import (
"context"
"distributed/log"
"distributed/registry"
"distributed/service"
"fmt"
stlog "log"
)
func main() {
// 初始化启动一个日志文件对象
log.Run("./distributed.log")
// 日志服务注册的端口和地址
host, port := "localhost", "4000"
serviceAddress := fmt.Sprintf("http://%s:%s", host, port)
// 初始化注册对象
r := registry.Registration{
ServiceName: registry.LogService, // 自身服务名
ServiceURL: serviceAddress, // 自身服务地址
RequiredServices: make([]registry.ServiceName, 0),// 依赖服务
ServiceUpdateURL: serviceAddress + "/services", // 服务列表
HeartbeatURL: serviceAddress + "/heartbeat", // 心跳
}
// 启动日志服务包含服务注册,发现等细节
ctx, err := service.Start(
context.Background(),
host,
port,
r,
log.RegisterHandlers,
)
// 异常写入到日志中
if err != nil {
stlog.Fatalln(err)
}
// 超时停止退出服务
<-ctx.Done()
fmt.Println("Shutting down log service.")
}
服务启动与注册
service/service.go
Start 启动服务的主方法
/*
host: 地址
port: 端口号
reg: 注册的服务对象
registerHandlersFunc: 注册方法
*/
func Start(ctx context.Context, host, port string,
reg registry.Registration,
registerHandlersFunc func()) (context.Context, error) {
registerHandlersFunc() // 启动注册方法
// 启动服务
ctx = startService(ctx, reg.ServiceName, host, port)
// 注册服务
err := registry.RegisterService(reg)
if err != nil {
return ctx, err
}
return ctx, nil
}
startService
func startService(ctx context.Context, serviceName registry.ServiceName,
host, port string) context.Context {
ctx, cancel := context.WithCancel(ctx)
var srv http.Server
srv.Addr = ":" + port
// 该协程为监听http服务,并且停止服务的时候cancel
go func() {
log.Println(srv.ListenAndServe())
// 删除对应的服务
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
if err != nil {
log.Println(err)
}
cancel()
}()
// 该协程为监听手动停止服务的信号
go func() {
fmt.Printf("%v started. Press any key to stop. \n", serviceName)
var s string
fmt.Scanln(&s)
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
if err != nil {
log.Println(err)
}
srv.Shutdown(ctx)
cancel()
}()
return ctx
}
服务注册与发现
registry/client.go
注册服务的时候会连着心跳以及服务更新的方法一起注册!
而服务更新里面的细节就是自己自定义了一个Handler然后ServeHttp方法里面去update
全局的服务提供对象,
update主要是更新服务和删除服务的最新消息
然后就是提供一个注销服务的方法
package registry
import (
"bytes"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"sync"
)
// 注册服务
func RegisterService(r Registration) error {
// 获得心跳地址并注册
heartbeatURL, err := url.Parse(r.HeartbeatURL)
if err != nil {
return err
}
http.HandleFunc(heartbeatURL.Path, func (w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
// 获得服务更新地址,并且自定义http服务的handler,因为每次更新服务的时候,可以在ServeHttp方法里面去维护
serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL)
if err != nil {
return err
}
http.Handle(serviceUpdateURL.Path, &serviceUpdateHanlder{})
// 写入buf值将服务对象发送给注册中心的services地址
buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
err = enc.Encode(r)
if err != nil {
return err
}
res, err := http.Post(ServicesURL, "application/json", buf)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to register service. Registry service "+
"responded with code %v", res.StatusCode)
}
return nil
}
type serviceUpdateHanlder struct{}
func (suh serviceUpdateHanlder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
dec := json.NewDecoder(r.Body)
var p patch
err := dec.Decode(&p)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
fmt.Printf("Updated received %v\n", p)
prov.Update(p) // 更新服务提供对象
}
// 删除对应注册中心的服务地址
func ShutdownService(url string) error {
req, err := http.NewRequest(http.MethodDelete, ServicesURL,
bytes.NewBuffer([]byte(url)))
if err != nil {
return err
}
req.Header.Add("Content-Type", "text/plain")
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to deregister service. Registry "+
"service responded with code %v", res.StatusCode)
}
return nil
}
// 更新服务列表
func (p *providers) Update(pat patch) {
p.mutex.Lock()
defer p.mutex.Unlock()
// 将patch中有新增的进行添加
for _, patchEntry := range pat.Added {
if _, ok := p.services[patchEntry.Name]; !ok {
p.services[patchEntry.Name] = make([]string, 0)
}
p.services[patchEntry.Name] = append(p.services[patchEntry.Name],
patchEntry.URL)
}
// 将patch中被标记删除的
for _, patchEntry := range pat.Removed {
if providerURLs, ok := p.services[patchEntry.Name]; ok {
for i := range providerURLs {
if providerURLs[i] == patchEntry.URL {
p.services[patchEntry.Name] = append(providerURLs[:i],
providerURLs[i+1:]...)
}
}
}
}
}
// 根据服务名负载均衡随机获取服务地址
func (p providers) get(name ServiceName) (string, error) {
providers, ok := p.services[name]
if !ok {
return "", fmt.Errorf("No providers available for service %v", name)
}
idx := int(rand.Float32() * float32(len(providers)))
return providers[idx], nil
}
// 对外暴露生产者的方法
func GetProvider(name ServiceName) (string, error) {
return prov.get(name)
}
type providers struct {
services map[ServiceName][]string
mutex *sync.RWMutex
}
// 服务提供对象
var prov = providers{
services: make(map[ServiceName][]string), // 服务列表 服务名->集群地址集合
mutex: new(sync.RWMutex), // 锁 防止服务注册更新时的并发情况
}
registry/registration.go
主要是一些关于服务使用到的参数以及对象!
package registry
type Registration struct {
ServiceName ServiceName // 服务名
ServiceURL string // 服务地址
RequiredServices []ServiceName // 依赖的服务
ServiceUpdateURL string // 服务更新的地址
HeartbeatURL string // 心跳地址
}
type ServiceName string
// 服务名集合
const (
LogService = ServiceName("LogService")
GradingService = ServiceName("GradingService")
PortalService = ServiceName("Portald")
)
// 服务对象参数
type patchEntry struct {
Name ServiceName
URL string
}
// 更新的服务对象参数
type patch struct {
Added []patchEntry
Removed []patchEntry
}
registry/server.go
服务端的注册中心服务的增删改查管理以及心跳检测,及时将最新的更新的服务消息通知回给客户端
package registry
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
)
const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services" // 注册中心地址
// 服务对象集合
type registry struct {
registrations []Registration
mutex *sync.RWMutex
}
// 添加服务
func (r *registry) add(reg Registration) error {
r.mutex.Lock()
r.registrations = append(r.registrations, reg)
r.mutex.Unlock()
err := r.sendRequiredServices(reg)
r.notify(patch{
Added: []patchEntry{
patchEntry{
Name: reg.ServiceName,
URL: reg.ServiceURL,
},
},
})
return err
}
// 通知服务接口请求去刷新改变后到服务
func (r registry) notify(fullPatch patch) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, reg := range r.registrations {
go func(reg Registration) {
for _, reqService := range reg.RequiredServices {
p := patch{Added: []patchEntry{}, Removed: []patchEntry{}}
sendUpdate := false
for _, added := range fullPatch.Added {
if added.Name == reqService {
p.Added = append(p.Added, added)
sendUpdate = true
}
}
for _, removed := range fullPatch.Removed {
if removed.Name == reqService {
p.Removed = append(p.Removed, removed)
sendUpdate = true
}
}
if sendUpdate {
err := r.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
log.Println(err)
return
}
}
}
}(reg)
}
}
// 更新每个服务的依赖服务
func (r registry) sendRequiredServices(reg Registration) error {
r.mutex.RLock()
defer r.mutex.RUnlock()
var p patch
for _, serviceReg := range r.registrations {
for _, reqService := range reg.RequiredServices {
if serviceReg.ServiceName == reqService {
p.Added = append(p.Added, patchEntry{
Name: serviceReg.ServiceName,
URL: serviceReg.ServiceURL,
})
}
}
}
err := r.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
return err
}
return nil
}
// 告诉客户端更新,最新的服务列表是这个
func (r registry) sendPatch(p patch, url string) error {
d, err := json.Marshal(p)
if err != nil {
return err
}
_, err = http.Post(url, "application/json", bytes.NewBuffer(d))
if err != nil {
return err
}
return nil
}
// 注册中心删除服务对象
func (r *registry) remove(url string) error {
for i := range reg.registrations {
if reg.registrations[i].ServiceURL == url {
// 通知客户端更新对象信息
r.notify(patch{
Removed: []patchEntry{
{
Name: r.registrations[i].ServiceName,
URL: r.registrations[i].ServiceURL,
},
},
})
r.mutex.Lock()
reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...)
r.mutex.Unlock()
return nil
}
}
return fmt.Errorf("Service at URL %s not found", url)
}
// 心跳检测
func (r *registry) heartbeat(freq time.Duration) {
for {
var wg sync.WaitGroup
for _, reg := range r.registrations {
wg.Add(1)
go func(reg Registration) {
defer wg.Done()
success := true
for attemps := 0; attemps < 3; attemps++ {
res, err := http.Get(reg.HeartbeatURL)
if err != nil {
log.Println(err)
} else if res.StatusCode == http.StatusOK {
log.Printf("Heartbeat check passed for %v", reg.ServiceName)
// 如果心跳恢复了,把服务重新注册回来
if !success {
r.add(reg)
}
break;
}
// 如果执行到这就代表着心跳没有响应,那就代表着需要回收注销该服务了
log.Printf("Heartbeat check failed for %v", reg.ServiceName)
if success {
success = false
r.remove(reg.ServiceURL)
}
time.Sleep(1 * time.Second)
}
}(reg)
wg.Wait()
time.Sleep(freq)
}
}
}
var once sync.Once
func SetupRegistryService() {
// 保证执行一次进行服务到心跳 每三秒循环一遍
once.Do(func() {
go reg.heartbeat(3 * time.Second)
})
}
var reg = registry{
registrations: make([]Registration, 0),
mutex: new(sync.RWMutex),
}
type RegistryService struct{}
func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println("Request received")
switch r.Method {
case http.MethodPost:
dec := json.NewDecoder(r.Body)
var r Registration
err := dec.Decode(&r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
log.Printf("Adding service: %v with URL: %s\n", r.ServiceName,
r.ServiceURL)
err = reg.add(r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
case http.MethodDelete:
payload, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
url := string(payload)
log.Printf("Removing service at URL: %s", url)
err = reg.remove(url)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}