分布式锁

MySQL
591
0
0
2022-11-14
标签   MySQL锁

基于mysql分布式悲观锁原理

悲观锁在库存服务中的应用:

在这个过程中我们是使用gorm来完成mysql的分布式悲观锁的

核心的代码在这里,该方法就能完成悲观锁了

更多关于gorm的学习

DB.Clauses(clause.Locking{Strength: "UPDATE"})

完整代码:

//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {

    //并发情况下可能会出现超买,需要使用锁来将并发串行化 
    //事务开始
    tx := global.DB.Begin()

    //悲观锁 对数据库进行上锁,会降低一定性能 
    var inventory model.Inventory
    if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
     //失败进行事务回滚
        tx.Rollback()
        return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
    }
  if inventory.Stocks < goodsInfo.Num {
        //失败进行事务回滚
        tx.Rollback()
        return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
    }
    inventory.Stocks -= goodsInfo.Num
  tx.Save(&inventory)
  //提交事务
  tx.Commit()
    return &empty.Empty{}, nil
}

2. 分布式乐观锁:

原理:

如图:

分布式锁

乐观锁的应用
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
    //并发情况下可能会出现超买,需要使用锁来将并发串行化 
    //事务开始
    tx := global.DB.Begin()
    for _, goodsInfo := range req.GoodsInfo {

        //分布式乐观锁 
        var inventory model.Inventory
        for {
            if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
                //失败进行事务回滚
                tx.Rollback()
                return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
            }

            if inventory.Stocks < goodsInfo.Num {
                //失败进行事务回滚
                tx.Rollback()
                return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
            }
            inventory.Stocks -= goodsInfo.Num
            //注意这里gorm在处理零值时,他会自动忽略零值的更新,这里需要使用select强制更新某些字段 
            if result := tx.Model(&model.Inventory{}).Select("Stocks", "Version").Where("goods = ? and version = ?",
                goodsInfo.GoodsId, inventory.Version).Updates(model.Inventory{Stocks: inventory.Stocks, Version: inventory.Version + 1}); result.RowsAffected == 0 {
                zap.S().Info("库存扣减失败")
            } else {
                break
            }
        }
    }
    tx.Commit()
    return &empty.Empty{}, nil
}

基于redis的分布式锁

原理:

分布式锁

setnx命令

Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

语法

redis Setnx 命令基本语法如下:

redis 127.0.0.1:6379> SETNX KEY_NAME VALUE
返回值

设置成功,返回 1 。 设置失败,返回 0 。

分布式锁

  1. setnx的作用
  2. 将获取和设置值变成原子性的操作
  3. 如果我的服务挂掉了- 死锁
  4. 设置过期时间
  5. 如果你设置了过期时间,那么如果过期时间到了我的业务逻辑没有执行完怎么办?
  6. 在过期之前刷新一下
  7. 需要自己去启动协程完成延时的工作
  8. 延时的接口可能会带来负面影响 - 如果其中某一个服务hung住了, 2s就能执行完,但是你hung住那么你就会一直去申请延长锁,导致别人永远获取不到锁,这个很要命
  9. 分布锁需要解决的问题 - lua脚本去做
  10. 互斥性 - setnx
  11. 死锁
  12. 安全性
  13. 锁只能被持有该锁的用户删除,不能被其他用户删除
  14. 当时设置的value值是多少只有当时的g才能知道
  15. 在删除的时取出redis中的值和当前自己保存下来的值对比一下

这样我们使用setnx就可以完成原子操作了

下面来看看如何使用redisync

redisync
package main

import (
    "fmt" 
    "sync" 
    "time"

    goredislib "github.com/go-redis/redis/v8" 
    "github.com/go-redsync/redsync/v4" 
    "github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
    // Create a pool with go-redis (or redigo) which is the pool redisync will 
    // use while communicating with Redis. This can also be any pool that 
    // implements the `redis.Pool` interface.
    client := goredislib.NewClient(&goredislib.Options{
        Addr: "localhost:6379",
    })
    pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

    // Create an instance of redisync to be used to obtain a mutual exclusion 
    // lock.
    rs := redsync.New(pool)

    var wg sync.WaitGroup
    wg.Add(3)
    for i := 0; i < 3; i++ {
        go func() {
            defer wg.Done()
            mutexname := fmt.Sprintf("mytest_%s", i)
            mutex := rs.NewMutex(mutexname)
            if err := mutex.Lock(); err != nil {
                panic(err)
            }
            fmt.Printf("获取锁成功\n")

            time.Sleep(time.Second * 1)
            fmt.Printf("执行结束\n")

            if ok, err := mutex.Unlock(); !ok || err != nil {
                panic("unlock failed")
            }
            fmt.Printf("释放锁成功\n")
        }()
    }
    wg.Wait()
}
在库存服务中的应用:
//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {

    //并发情况下可能会出现超买,需要使用锁来将并发串行化 
  //将数据库作为事务性
    tx := global.DB.Begin()
    var mutexs []*redsync.Mutex
    for _, goodsInfo := range req.GoodsInfo {

        var inventory model.Inventory
        mutex := global.Rs.NewMutex(fmt.Sprintf("goods_%d", goodsInfo.GoodsId))

        if err := mutex.Lock(); err != nil {
            return nil, status.Errorf(codes.Internal, "获取redis分布式锁异常")
        }

        if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
            //失败进行事务回滚
            tx.Rollback()
            return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
        }

        if inventory.Stocks < goodsInfo.Num {
            //失败进行事务回滚
            tx.Rollback()
            return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
        }
        inventory.Stocks -= goodsInfo.Num
        tx.Save(&inventory)

        mutexs = append(mutexs, mutex)

        //if ok, err := mutex.Unlock(); !ok || err != nil { 
        //    return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常") 
        //}

    }
    tx.Commit()

    for _, mutex := range mutexs {
        if ok, err := mutex.Unlock(); !ok || err != nil {
            return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
        }
    }
    return &empty.Empty{}, nil
}