灵活运用分布式锁解决数据重复插入问题

Java
292
0
0
2023-05-29

一、业务背景

许多面向用户的互联网业务都会在系统后端维护一份用户数据,快应用中心业务也同样做了这件事。快应用中心允许用户对快应用进行收藏,并在服务端记录了用户的收藏列表,通过用户账号标识OpenID来关联收藏的快应用包名。

为了使用户在快应用中心的收藏列表能够与快应用Menubar的收藏状态打通,我们同时也记录了用户账号标识OpenID与客户端本地标识local_identifier的绑定关系。因为快应用Manubar由快应用引擎持有,独立于快应用中心外,无法通过账号体系获取到用户账号标识,只能获取到客户端本地标识local_identifier,所以我们只能通过二者的映射关系来保持状态同步。

在具体实现上,我们是在用户启动快应用中心的时候触发一次同步操作,由客户端将OpenID和客户端本地标识提交到服务端进行绑定。服务端的绑定逻辑是:判断OpenID是否已经存在,如果不存在则插入数据库,否则更新对应数据行的local_identifier字段(因为用户可能先后在两个不同的手机上登录同一个vivo账号)。在后续的业务流程中,我们就可以根据OpenID查询对应的local_identifier,反之亦可。

但是代码上线一段时间后,我们发现t_account数据表中居然存在许多重复的OpenID记录。根据如上所述的绑定逻辑,这种情况理论上是不应该发生的。所幸这些重复数据并没有对更新和查询的场景造成影响,因为在查询的SQL中我们加入了LIMIT 1的限制,因此针对一个OpenID的更新和查询操作实际上都只作用于ID最小的那条记录。

二、问题分析与定位

虽然冗余数据没有对实际业务造成影响,但是这种明显的数据问题也肯定是不能容忍的。因此我们开始着手排查问题。

首先想到的就是从数据本身入手。先通过对t_account表数据进行粗略观察,发现大约有3%的OpenID会存在重复的情况。也就是说重复插入的情况是偶现的,大多数请求的处理都是按照预期被正确处理了。我们对代码重新进行了走读,确认了代码在实现上确实不存在什么明显的逻辑错误。

我们进一步对数据进行细致观察。我们挑选了几个出现重复情况的OpenID,将相关的数据记录查询出来,发现这些OpenID重复的次数也不尽相同,有的只重复一次,有的则更多。但是,这时候我们发现了一个更有价值的信息——这些相同OpenID的数据行的创建时间都是完全相同的,而且自增ID是连续的。

于是,我们猜测问题的产生应该是由于并发请求造成的!我们模拟了客户端对接口的并发调用,确实出现了重复插入数据的现象,进一步证实了这个猜测的合理性。但是,明明客户端的逻辑是每个用户在启动的时候进行一次同步,为什么会出现同一个OpenID并发请求呢?

事实上,代码的实际运行并不如我们想象中的那么理想,计算机的运行过程中往往存在一些不稳定的因素,比如网络环境、服务器的负载情况。而这些不稳定因素就可能导致客户端发送请求失败,这里的“失败”可能并不意味着真正的失败,而是可能整个请求时间过长,超过了客户端设定的超时时间,从而被人为地判定为失败,于是通过重试机制再次发送请求。那么最终就可能导致同样的请求被提交了多次,而且这些请求也许在中间某个环节被阻塞了(比如当服务器的处理线程负载过大,来不及处理请求,请求进入了缓冲队列),当阻塞缓解后这几个请求就可能在很短的时间内被并发处理了。

这其实是一个典型的并发冲突问题,可以把这个问题简单抽象为:如何避免并发情况下写入重复数据。事实上,有很多常见的业务场景都可能面临这个问题,比如用户注册时不允许使用相同的用户名。

一般来说,我们在处理这类问题时,最直观的方式就是先进行一次查询,当判断数据库中不存在当前数据时才允许插入。

显然,这个流程从单个请求的角度来看是没有问题的。但是当多个请求并发时,请求A和请求B都先发起一次查询,并且都得到结果是不存在,于是两者都又执行了数据插入,最终导致并发冲突。

三、探索可行的方案

既然问题定位到了,接下来就要开始寻求解决方案了。面对这种情况,我们通常有两种选择,一种是让数据库来解决,另一种是由应用程序来解决。

3.1 数据库层面处理——唯一索引

当使用 MySQL 数据库及 InnoDB 存储引擎时,我们可以利用唯一索引来保障同一个列的值具有唯一性。显然,在t_account这张表中,我们最开始是没有为open_id列创建唯一索引的。如果我们想要此时加上唯一索引的话,可以利用下列的ALTER TABLE语句。

 ALTER TABLE t_account ADD UNIQUE uk_open_id( open_id ); 

一旦为open_id列加上唯一索引后,当上述并发情况发生时,请求A和请求B中必然有一者会优先完成数据的插入操作,而另一者则会得到类似错误。因此,最终保证t_account表中只有一条openid=xxx的记录存在。

 Error Code: 1062. Duplicate entry 'xxx' for key 'uk_open_id' 

3.2 应用程序层面处理——分布式锁

另一种解决的思路是我们不依赖底层的数据库来为我们提供唯一性的保障,而是靠应用程序自身的代码逻辑来避免并发冲突。应用层的保障其实是一种更具通用性的方案,毕竟我们不能假设所有系统使用的数据持久化组件都具备数据唯一性检测的能力。

那具体怎么做呢?简单来说,就是化并行为串行。之所以我们会遇到重复插入数据的问题,是因为“检测数据是否已经存在”和“插入数据”两个动作被分割开来。由于这两个步骤不具备原子性,才导致两个不同的请求可以同时通过第一步的检测。如果我们能够把这两个动作合并为一个原子操作,就可以避免数据冲突了。这时候我们就需要通过加锁,来实现这个代码块的原子性。

对于Java语言,大家最熟悉的锁机制就是 synchronized 关键字了。

 public synchronized void submit(String openId, String localIdentifier){
    Account account = accountDao.find(openId);
    if (account == null) {
        // insert
    }
    else {
        // update
    }
} 

但是,事情可没这么简单。要知道,我们的程序可不是只部署在一台服务器上,而是部署了多个节点。也就是说这里的并发不仅仅是线程间的并发,而是进程间的并发。因此,我们无法通过java语言层面的锁机制来解决这个同步问题,我们这里需要的应该是分布式锁。

3.3 两种解决方案的权衡

基于以上的分析,看上去两种方案都是可行的,但最终我们选择了分布式锁的方案。为什么明明第一种方案只需要简单地加个索引,我们却不采用呢?

因为现有的线上数据已然在open_id列上存在重复数据,如果此时直接去加唯一索引是无法成功的。为了加上唯一索引,我们必须首先将已有的重复数据先进行清理。但是问题又来了,线上的程序一直持续运行着,重复数据可能会源源不断地产生。那我们能不能找一个用户请求不活跃的时间段去进行清理,并在新的重复数据插入之前完成唯一索引的建立?答案当然是肯定的,只不过这种方案需要运维、DBA、开发多方协同处理,而且由于业务特性,最合适的处理时间段应该是凌晨这种夜深人静的时候。即便是采取这么苛刻的修复措施,也不能百分之百完全保证数据清理完成到索引建立之间不会有新的重复数据插入。因此,基于唯一索引的修复方案乍看之下非常合适,但是具体操作起来还是略为麻烦。

事实上,建立唯一索引最合适的契机应该是在系统最初的设计阶段,这样就能有效避免重复数据的问题。然而木已成舟,在当前这个情景下,我们还是选择了可操作性更强的分布式锁方案。因为选择这个方案的话,我们可以先上线加入了分布式锁修复的新代码,阻断新的重复数据插入,然后再对原有的重复数据执行清理操作,这样一来只需要修改代码并一次上线即可。当然,待问题彻底解决之后,我们可以重新再考虑为数据表加上唯一索引。

那么接下来,我们就来看看基于分布式锁的方案如何实现。首先我们先来回顾一下分布式锁的相关知识。

四、分布式锁概述

4.1 分布式锁需要具备哪些特性?

  • 在分布式系统环境下,同一时间只有一台机器的一个线程可以获取到锁;
  • 高可用的获取锁与释放锁;
  • 高性能的获取锁与释放锁;
  • 具备可重入特性;
  • 具备锁失效机制,防止 死锁 ;
  • 具备阻塞/非阻塞锁特性。

4.2 分布式锁有哪些实现方式?

分布式锁实现主要有如下三种:

  • 基于数据库实现分布式锁;
  • 基于 ZooKeeper 实现分布式锁;
  • 基于 Redis 实现分布式锁;

4.2.1 基于数据库的实现方式

基于数据库的实现方式就是直接创建一张锁表,通过操作表数据来实现加锁、解锁。以MySQL数据库为例,我们可以创建这样一张表,并且对method_name进行加上唯一索引的约束:

 CREATE TABLE `myLock` (
 `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
 `method_name` varchar(100) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
 `value` varchar(1024) NOT NULL DEFAULT '锁信息',
 PRIMARY KEY (`id`),
 UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法'; 

然后,我们就可以通过插入数据和删除数据的方式来实现加锁和解锁:

 #加锁
insert into myLock(method_name, value) values ('m1', '1');
 
#解锁
delete from myLock where method_name ='m1'; 

基于数据库实现的方式虽然简单,但是存在一些明显的问题:

  • 没有锁失效时间,如果解锁失败,就会导致锁记录永远留在数据库中,造成死锁。
  • 该锁不可重入,因为它不认识请求方是不是当前占用锁的线程。
  • 当前数据库是单点,一旦宕机,锁机制就会完全崩坏。

4.2.2 基于Zookeeper的实现方式

ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下的节点名称都是唯一的。

ZooKeeper的节点(Znode)有4种类型:

  • 持久化节点(会话断开后节点还存在)
  • 持久化顺序节点
  • 临时节点(会话断开后节点就删除了)
  • 临时顺序节点

当一个新的Znode被创建为一个顺序节点时,ZooKeeper通过将10位的序列号附加到原始名称来设置Znode的路径。例如,如果将具有路径/mynode的Znode创建为顺序节点,则ZooKeeper会将路径更改为/mynode0000000001,并将下一个序列号设置为0000000002,这个序列号由父节点维护。如果两个顺序节点是同时创建的,那么ZooKeeper不会对每个Znode使用相同的数字。

基于ZooKeeper的特性,可以按照如下方式来实现分布式锁:

  • 创建一个目录mylock;
  • 线程A想获取锁就在mylock目录下创建临时顺序节点;
  • 获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
  • 线程B获取所有节点,判断自己不是最小节点,设置监听比自己更小的节点;
  • 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

由于创建的是临时节点,当持有锁的线程意外宕机时,锁依然可以得到释放,因此可以避免死锁的问题。另外,我们也可以通过节点排队监听机制实现阻塞特性,也可以通过在Znode中携带线程标识来实现可重入锁。同时,由于ZooKeeper集群的高可用特性,分布式锁的可用性也能够得到保障。不过,因为需要频繁的创建和删除节点,Zookeeper方式在性能上不如 redis 方式。

4.2.3 基于Redis的实现方式

Redis是一个开源的键值对(Key-Value)存储数据库,其基于内存实现,性能非常高,常常被用作缓存。

基于Redis实现分布式锁的核心原理是:尝试对特定key进行set操作,如果设置成功(key之前不存在)了,则相当于获取到锁,同时对该key设置一个过期时间,避免线程在释放锁之前退出造成死锁。线程执行完同步任务后主动释放锁则通过delete命令来完成。

这里需要特别注意的一点是如何加锁并设置过期时间。有的人会使用setnx + expire这两个命令来实现,但这是有问题的。假设当前线程执行setnx获得了锁,但是在执行expire之前宕机了,就会造成锁无法被释放。当然,我们可以将两个命令合并在一段lua脚本里,实现两条命令的原子提交。

其实,我们简单利用set命令可以直接在一条命令中实现setnx和设置过期时间,从而完成加锁操作:

 SET key value [EX seconds] [PX milliseconds] NX 

解锁操作只需要:

 DEL key 

五、基于Redis分布式锁的解决方案

在本案例中,我们采用了基于Redis实现分布式锁的方式。

5.1 分布式锁的Java实现

由于项目采用了Jedis框架,而且线上Redis部署为集群模式,因此我们基于redis.clients.jedis.JedisCluster封装了一个RedisLock类,提供加锁与解锁接口。

 public class RedisLock {
 
    private static final String LOCK_SUCCESS = "OK";
    private static final String LOCK_VALUE = "lock";
    private static final int EXPIRE_SECONDS = 3;
 
    @Autowired
    protected JedisCluster jedisCluster;
 
    public boolean lock(String openId) {
        String redisKey = this.formatRedisKey(openId);
        String ok = jedisCluster.set(redisKey, LOCK_VALUE, "NX", "EX", EXPIRE_SECONDS);
        return LOCK_SUCCESS.equals(ok);
    }
 
    public void unlock(String openId) {
        String redisKey = this.formatRedisKey(openId);
        jedisCluster.del(redisKey);
    }
 
    private String formatRedisKey(String openId){
        return "keyPrefix:" + openId;
    }
} 

在具体实现上,我们设置了3秒钟的过期时间,因为被加锁的任务是简单的数据库查询和插入,而且服务器与数据库部署在同个机房,正常情况下3秒钟已经完全能够足够满足代码的执行。

事实上,以上的实现是一个简陋版本的Redis分布式锁,我们在实现中并没有考虑线程的可重入性,也没有考虑锁被其他进程误释放的问题,但是它在这个业务场景下已经能够满足我们的需求了。假设推广到更为通用的业务场景,我们可以考虑在value中加入当前进程的特定标识,并在上锁和释放锁的阶段做相对应的匹配检测,就可以得到一个更为安全可靠的Redis分布式锁的实现了。

当然,像Redission之类的框架也提供了相当完备的Redis分布式锁的封装实现,在一些要求相对严苛的业务场景下,我建议直接使用这类框架。由于本文侧重于介绍排查及解决问题的思路,因此没有对 Redisson 分布式的具体实现原理做更多介绍,感兴趣的小伙伴可以在网上找到非常丰富的资料。

5.2 改进后的代码逻辑

现在,我们可以利用封装好的RedisLock来改进原来的代码了。

 public class AccountService {
 
    @Autowired
    private RedisLock redisLock;
 
    public void submit(String openId, String localIdentifier) {
        if (!redisLock.lock(openId)) {
            // 如果相同openId并发情况下,线程没有抢到锁,则直接丢弃请求
            return;
        }
 
        // 获取到锁,开始执行用户数据同步逻辑
        try {
            Account account = accountDao.find(openId);
            if (account == null) {
                // insert
            } else {
                // update
            }
        } finally {
            // 释放锁
            redisLock.unlock(openId);
        }
    }
} 

5.3 数据清理

最后再简单说一下收尾工作。由于重复数据的数据量较大,不太可能手工去慢慢处理。于是我们编写了一个定时任务类,每隔一分钟执行一次清理操作,每次清理1000个重复的OpenID,避免短时间内大量查询和删除操作对数据库性能造成影响。当确认重复数据已经完全清理完毕后就停掉定时任务的调度,并在下一次版本迭代中将此代码移除。

六、总结

在日常开发过程中难免会各种各样的问题,我们要学会顺藤摸瓜逐步分析,找到问题的根因;然后在自己的认知范围内尽量去寻找可行的解决方案,并且仔细权衡各种方案的利弊,才能最终高效地解决问题。