文章持续更新,微信搜一搜「 吴亲强的深夜食堂 」
上一篇etcd 实战基础篇(一)我们主要介绍了 etcd 使用场景以及最基础性的一些操作(put、get、watch)。 这一篇我们接着实战etcd其他业务场景。
基于 etcd 的分布式锁
基于 etcd 实现一个分布式锁特别简单。etcd 提供了开箱即用的包 concurrency,几行代码就实现一个分布式锁。
package src
import ("context""flag""fmt""github.com/coreos/etcd/clientv3""github.com/coreos/etcd/clientv3/concurrency""log""strings""time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
// 初始化etcd客户端
func initEtcdClient() *clientv3.Client {var client *clientv3.Client
var err error// 解析etcd的地址,编程[]string
endpoints := strings.Split(*addr, ",")// 创建一个 etcd 的客户端
client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
DialTimeout: 5 * time.Second})if err != nil {
fmt.Printf("初始化客户端失败:%v\\n", err)
log.Fatal(err)}return client
}
func Lock(id int, lockName string) {
client := initEtcdClient()defer client.Close()
// 创建一个 session,如果程序宕机奔溃,etcd可以知道
s, err := concurrency.NewSession(client)if err != nil {
log.Fatal(err)}defer s.Close()
// 创建一个etcd locker
locker := concurrency.NewLocker(s, lockName)
log.Printf("id:%v 尝试获取锁%v", id, lockName)
locker.Lock()
log.Printf("id:%v取得锁%v", id, lockName)
// 模拟业务耗时
time.Sleep(time.Millisecond * 300)
locker.Unlock()
log.Printf("id:%v释放锁%v", id, lockName)
}
我们再写个脚本运行,看看结果。
package main
import ("etcd-test/src""sync"
)
func main() {var lockName = "locker-test"var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)go func(item int) {defer wg.Done()
src.Lock(item, lockName)}(i)}
wg.Wait()
}
我们发起了10个并发抢同一个 key 锁的命令。运行结果如下,
从图片可以看到,同一时刻一定只有一个 G 得到锁,一个 G 获取到一个锁的前提一定是当前 key 未被锁。
有人要问了,当一个锁解开时,之前未获取到锁而发生等待的客户端谁先获取到这把锁? 这个问题,我们后续分析原理的时候再揭晓。
说到分布式锁,不得不提起 redis。它有一个看似安全实际一点都不安全的分布式锁。它的命令模式是,
set key value [EX seconds] [PX milliseconds] [NX|XX]
这其中,介绍两个关键的属性:
- EX 标示设置过期时间,单位是秒。
- NX 表示 当对应的 key 不存在时,才创建。
我们在使用 redis 做分布式锁的时候会这么写。(代码用了包 https://github.com/go-redis/redis
)
func RedisLock(item int) {
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",DB: 0,})
fmt.Printf("item:%v 尝试获取锁,时间:%v\\n", item, time.Now().String())
res, _ := rdb.SetNX(ctx, "key", "value", 2*time.Second).Result()if !res {
fmt.Printf("item:%v 尝试获取锁失败\\n", item)return}
fmt.Printf("item:%v 获取到锁,时间:%v\\n", item, time.Now().String())
time.Sleep(1 * time.Second) //模拟业务耗时
fmt.Printf("item:%v 释放锁,时间:%v\\n", item, time.Now().String())
rdb.Del(ctx, "key")
}
rdb.SetNX(ctx, "key", "value", 2*time.Second)
我们规定锁的过期时间是2秒,下面有一句 time.Sleep(1 * time.Second)
用来模拟处理业务的耗时。业务处理结束,我们删除 key rdb.Del(ctx, "key")
。
我们写个简单的脚本,
func main() {var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(item int) {
defer wg.Done()RedisLock(item)}(i)}
wg.Wait()
}
我们开启十个 G 并发的调用 RedisLock
函数。每次调用,函数内部都会新建一个 redis 客户端,本质上是10个客户端。
运行这段程序,
从图中看出,同一时刻只有一个客户端获取到锁,并且在一秒的任务处理后,释放了锁,好像没太大的问题。
那么,我再写一个简单的例子。
import ("context""fmt""github.com/go-redis/redis/v8""sync""time"
)
var ctx = context.Background()
var rdb *redis.Client
func main() {var wg sync.WaitGroup
wg.Add(2)go func() {defer wg.Done()ExampleLock(1, 0)}()
go func() {defer wg.Done()ExampleLock(2, 5)}()
wg.Wait()
}
func ExampleLock(item int, timeSleep time.Duration) {
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,})if timeSleep > 0 {
time.Sleep(time.Second * timeSleep)}
fmt.Printf("item:%v 尝试获取锁,时间:%v\\n", item, time.Now().String())
res, _ := rdb.SetNX(ctx, "key", "value", 3*time.Second).Result()if !res {
fmt.Printf("item:尝试获取锁失败:%v\\n", item)return}
fmt.Printf("item:%v 获取到锁,时间:%v\\n", item, time.Now().String())
time.Sleep(7 * time.Second)
fmt.Printf("item:%v 释放锁,时间:%v\\n", item, time.Now().String())
rdb.Del(ctx, "key")
}
我们设置锁的过期时间是 3 秒,而获取锁之后的任务处理时间为 7 秒。
然后我们开启两个 G。
ExampleLock(1, 0)ExampleLock(2, 5)
其中第二行数字5,从代码中可以看出,是指启动 G 后过5秒去获取锁。
这段代码整体流程是这样的:G(1) 获取到锁后,设置的锁持有时间是3秒,由于任务执行需要7秒的时间,因此在3秒过后锁会自动释放。G(2) 可以在第5秒的时候获取到锁,然后它执行任务也得7秒。
最后,G(1)在获取锁后7秒执行释放锁的操作,G(2)同理。
发现问题了吗?
G(1) 的锁在3秒后已经自动释放了。但是在任务处理结束后又执行了解锁的操作,可此时这个锁是 G(2) 的呀。
那么接下来由于 G(1) 误解了 G(2) 的锁,如果此时有其他的 G,那么就可以获取到锁。
等 G(2) 任务执行结束,同理又会误解其他 G 的锁,这是一个恶性循环。 这也是掘金一篇由 redis 分布式锁造成茅台超卖重大事故的原因之一。
至于其他的,可以自行查看这篇文章Redis——由分布式锁造成的重大事故。
基于 etcd 的分布式队列
对队列更多的理论知识就不加以介绍了。我们都知道,队列是一种先进先出的数据结构,一般也只有入队和出队两种操作。 我们常常在单机的应用中使用到队列。
那么,如何实现一个分布式的队列呢?。
我们可以使用 etcd 开箱即用的工具,在 etcd
底层 recipe
包里结构 Queue
,实现了一个多读多写的分布式队列。
type Queue struct {
client *v3.Client
ctx context.Context
keyPrefix string
}
func NewQueue(client *v3.Client, keyPrefix string) *Queue
func (q *Queue) Dequeue() (string, error)
func (q *Queue) Enqueue(val string)
我们基于此包可以很方便的实现。
package src
import ("github.com/coreos/etcd/clientv3"
recipe "github.com/coreos/etcd/contrib/recipes""log""strconv""strings""sync""time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
// 初始化etcd客户端
func initEtcdClient() *clientv3.Client {var client *clientv3.Client
var err error// 解析etcd的地址,编程[]string
endpoints := strings.Split(*addr, ",")// 创建一个 etcd 的客户端
client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
DialTimeout: 5 * time.Second})if err != nil {
log.Printf("初始化客户端失败:%v\\n", err)
log.Fatal(err)}return client
}
func Push(keyName string) {
client := initEtcdClient()defer client.Close()
q := recipe.NewQueue(client, keyName)var wg sync.WaitGroup
for i := 0; i < 3; i++ {for j := 0; j < 10; j++ {
wg.Add(1)go func(item int) {defer wg.Done()
err := q.Enqueue(strconv.Itoa(item))if err != nil {
log.Printf("push err:%v\\n", err)}}(j)}
time.Sleep(2 * time.Second)}
wg.Wait()
}
func Pop(keyName string) {
client := initEtcdClient()defer client.Close()
q := recipe.NewQueue(client, keyName)for {
res, err := q.Dequeue()if err != nil {
log.Fatal(err)return}
log.Printf("接收值:%v\\n", res)}
}
在 push
中,我们开启3轮发送值入队,每次发送10个,发送一轮休息2秒。 在 pop
中,通过死循环获取队列中的值。
运行脚本程序如下。
package main
import ("etcd-test/src""time"
)
func main() {
key := "test-queue"go src.Pop(key)
time.Sleep(1 * time.Second)go src.Push(key)
time.Sleep(20 * time.Second)
}
我们使用两个 G
代表 分别运行 push
和 pop
操作。 同时为了达到运行效果,我们先运行 pop 等待有入队的元素。 运行结果动画如下,
etcd
还提供了优先级的分布式的队列。和上面的用法相似。只是在入队的时候,不仅仅需要提供一个值,还需要提供一个整数,来表示当前 push
值的优先级。数值越小,优先级越高。
我们改动一下上述的代码。
package src
import ("github.com/coreos/etcd/clientv3"
recipe "github.com/coreos/etcd/contrib/recipes""log""strconv""strings""sync""time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
// 初始化etcd客户端
func initEtcdClient() *clientv3.Client {var client *clientv3.Client
var err error// 解析etcd的地址,编程[]string
endpoints := strings.Split(*addr, ",")// 创建一个 etcd 的客户端
client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
DialTimeout: 5 * time.Second})if err != nil {
log.Printf("初始化客户端失败:%v\\n", err)
log.Fatal(err)}return client
}
func PriorityPush(keyName string) {
client := initEtcdClient()defer client.Close()
q := recipe.NewPriorityQueue(client, keyName)var wg sync.WaitGroup
for j := 0; j < 10; j++ {
wg.Add(1)go func(item int) {defer wg.Done()
err := q.Enqueue(strconv.Itoa(item), uint16(item))if err != nil {
log.Printf("push err:%v\\n", err)}}(j)}
wg.Wait()
}
func PriorityPop(keyName string) {
client := initEtcdClient()defer client.Close()
q := recipe.NewPriorityQueue(client, keyName)for {
res, err := q.Dequeue()if err != nil {
log.Fatal(err)return}
log.Printf("接收值:%v\\n", res)}
}
然后以下是我们的测试代码:
package main
import ("etcd-test/src""sync""time"
)
func main() {
key := "test-queue"var wg sync.WaitGroup
wg.Add(1)go func() {defer wg.Done()
src.PriorityPush(key)}()
wg.Wait()go src.PriorityPop(key)
time.Sleep(20 * time.Second)
}
我们把0到9的数并发的 push
到队列中,对应的优先级整数值就是它本身,push
完毕,我们运行 PriorityPop
函数,看最终结果显示就是从0到9。
总结
这篇文章主要介绍了如何使用 etcd 实现分布式锁以及分布式队列。其他etcd的场景,可以自行实践。