基于mysql分布式悲观锁原理
悲观锁在库存服务中的应用:
在这个过程中我们是使用gorm来完成mysql的分布式悲观锁的
核心的代码在这里,该方法就能完成悲观锁了
DB.Clauses(clause.Locking{Strength: "UPDATE"})
完整代码:
//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
//并发情况下可能会出现超买,需要使用锁来将并发串行化
//事务开始
tx := global.DB.Begin()
//悲观锁 对数据库进行上锁,会降低一定性能
var inventory model.Inventory
if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
}
if inventory.Stocks < goodsInfo.Num {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
}
inventory.Stocks -= goodsInfo.Num
tx.Save(&inventory)
//提交事务
tx.Commit()
return &empty.Empty{}, nil
}
2. 分布式乐观锁:
原理:
如图:
乐观锁的应用
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
//并发情况下可能会出现超买,需要使用锁来将并发串行化
//事务开始
tx := global.DB.Begin()
for _, goodsInfo := range req.GoodsInfo {
//分布式乐观锁
var inventory model.Inventory
for {
if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
}
if inventory.Stocks < goodsInfo.Num {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
}
inventory.Stocks -= goodsInfo.Num
//注意这里gorm在处理零值时,他会自动忽略零值的更新,这里需要使用select强制更新某些字段
if result := tx.Model(&model.Inventory{}).Select("Stocks", "Version").Where("goods = ? and version = ?",
goodsInfo.GoodsId, inventory.Version).Updates(model.Inventory{Stocks: inventory.Stocks, Version: inventory.Version + 1}); result.RowsAffected == 0 {
zap.S().Info("库存扣减失败")
} else {
break
}
}
}
tx.Commit()
return &empty.Empty{}, nil
}
基于redis的分布式锁
原理:
setnx命令
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
语法
redis Setnx 命令基本语法如下:
redis 127.0.0.1:6379> SETNX KEY_NAME VALUE
返回值
设置成功,返回 1 。 设置失败,返回 0 。
- setnx的作用
- 将获取和设置值变成原子性的操作
- 如果我的服务挂掉了- 死锁
- 设置过期时间
- 如果你设置了过期时间,那么如果过期时间到了我的业务逻辑没有执行完怎么办?
- 在过期之前刷新一下
- 需要自己去启动协程完成延时的工作
- 延时的接口可能会带来负面影响 - 如果其中某一个服务hung住了, 2s就能执行完,但是你hung住那么你就会一直去申请延长锁,导致别人永远获取不到锁,这个很要命
- 分布锁需要解决的问题 - lua脚本去做
- 互斥性 - setnx
- 死锁
- 安全性
- 锁只能被持有该锁的用户删除,不能被其他用户删除
- 当时设置的value值是多少只有当时的g才能知道
- 在删除的时取出redis中的值和当前自己保存下来的值对比一下
这样我们使用setnx就可以完成原子操作了
下面来看看如何使用redisync
redisync
package main
import (
"fmt"
"sync"
"time"
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func main() {
// Create a pool with go-redis (or redigo) which is the pool redisync will
// use while communicating with Redis. This can also be any pool that
// implements the `redis.Pool` interface.
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
// Create an instance of redisync to be used to obtain a mutual exclusion
// lock.
rs := redsync.New(pool)
var wg sync.WaitGroup
wg.Add(3)
for i := 0; i < 3; i++ {
go func() {
defer wg.Done()
mutexname := fmt.Sprintf("mytest_%s", i)
mutex := rs.NewMutex(mutexname)
if err := mutex.Lock(); err != nil {
panic(err)
}
fmt.Printf("获取锁成功\n")
time.Sleep(time.Second * 1)
fmt.Printf("执行结束\n")
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
fmt.Printf("释放锁成功\n")
}()
}
wg.Wait()
}
在库存服务中的应用:
//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
//并发情况下可能会出现超买,需要使用锁来将并发串行化
//将数据库作为事务性
tx := global.DB.Begin()
var mutexs []*redsync.Mutex
for _, goodsInfo := range req.GoodsInfo {
var inventory model.Inventory
mutex := global.Rs.NewMutex(fmt.Sprintf("goods_%d", goodsInfo.GoodsId))
if err := mutex.Lock(); err != nil {
return nil, status.Errorf(codes.Internal, "获取redis分布式锁异常")
}
if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
}
if inventory.Stocks < goodsInfo.Num {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
}
inventory.Stocks -= goodsInfo.Num
tx.Save(&inventory)
mutexs = append(mutexs, mutex)
//if ok, err := mutex.Unlock(); !ok || err != nil {
// return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
//}
}
tx.Commit()
for _, mutex := range mutexs {
if ok, err := mutex.Unlock(); !ok || err != nil {
return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
}
}
return &empty.Empty{}, nil
}