重新说说Redis实现分布式公平可重入锁的实现,这次具体说说异步唤醒机制,这次带上QPS检测
在之前的文章中,我们对Redisson的lock进行了刨析
如下:
- 看门狗续期
- 使用hash+lua实现可重入锁
但是还有一点,我们之前使用的自旋+线程休眠来达到线程互斥阻塞的效果。
但是这样做会有一点问题,我们每次休眠的时间都是固定的,仍然会有一大部分空窗期,我设置30s的锁过期,那么好,20个线程足足跑了两三分钟,这个效率绝对不行,对吧。
那么我们还有没有其他办法?
Redssion源码中使用了异步消息订阅
呃呃呃,之前的朋友说我的代码run不起来,没错,我故意改了一些地方,毕竟天下没有免费的午餐对吧,我主要还是提供一下思路,和大概的代码,具体的大家自行去运行了,理解会更深(bushi,好吧,我承认之前我刨析的时候自己实现的代码没有跑 ,所以今天来带大家跑一下)
那么我们也来实现一个
我们使用的是RedisTemplate,然后线程阻塞和唤醒的话我们需要一个同步锁,但是感觉有点问题
因为Java的线程阻塞和唤醒,必须要同步锁,所以我们需要一个变量,那么每个线程都要一个变量,啊啊啊,然后又要使用hashmap来存储变量对应的线程???太他妈浪费内存了
其实在JDK6之后给我提供了一个api=>LockSupport
这里面封装了一些本地方法来进行线程操作
LockSupport
简单介绍下api吧
LockSupport.park(); //阻塞当前线程
LockSupport.parkNanos(long nanos); // 让当前线程休眠 nancos ms
LockSupport.unpark(Thread thread); //唤醒指定线程
看到这里,我相信大家应该都有思路了。
我们实现一个FIFO队列来保证锁竞争的公平性
锁竞争公平性的保证
public static ConcurrentHashMap<String, ConcurrentLinkedQueue<Thread>> threadWaitQueue=new ConcurrentHashMap<>();;
每当锁释放的时候,我们使用消息订阅通知一下,然后unpark不就行了。
但是还有一个问题,如果说某个线程长时间占用,或者消息丢失怎么办?
所以说哦我们使用parkNanos
好了直接上源代码,这次我放能够运行的
Pub/Sub
第一个是消息监听的
package cn.katool.lock;
import cn.hutool.core.util.ObjectUtil;
import cn.katool.util.ScheduledTaskUtil;
import cn.katool.util.db.nosql.RedisUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.LockSupport;
@Component
@Slf4j
public class LockMessageWatchDog implements MessageListener {
public static ConcurrentHashMap<String, ConcurrentLinkedQueue<Thread>> threadWaitQueue=new ConcurrentHashMap<>();;
public static final String LOCK_MQ_NAME = "LOCK:RELASE:QUEUE";
@SneakyThrows
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("从消息通道={}监听到消息",new String(pattern));
log.info("从消息通道={}监听到消息",new String(message.getChannel()));
String lockName = new String(message.getBody()).substring(1, message.getBody().length - 1);
log.info("元消息={}", lockName);
log.info("threadWaitQueue:{}",threadWaitQueue);;
ConcurrentLinkedQueue<Thread> threads = threadWaitQueue.get(lockName);
if (ObjectUtil.isEmpty(threads)||threads.isEmpty()){
log.info("没有线程需要lock:{}在等待", lockName);
threadWaitQueue.remove(lockName);
return ;
}
Thread poll = threads.poll();
log.info("唤醒线程={}",poll.getName());
LockSupport.unpark(poll);
}
//表示监听一个频道
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(this, new ChannelTopic(LOCK_MQ_NAME));
return container;
}
}
下面是锁的工具类
LockUtil
/**
* Title
*
* @ClassName: LockUtil
* @Description:锁工具类,通过内部枚举类实现单例,防止反射攻击
* @author: Karos
* @date: 2023/1/4 0:17
* @Blog: https://www.wzl1.top/
*/
package cn.katool.lock;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.katool.Config.LockConfig;
import cn.katool.Exception.ErrorCode;
import cn.katool.Exception.KaToolException;
import cn.katool.util.ScheduledTaskUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.TimeoutUtils;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.DigestUtils;
import org.springframework.util.ObjectUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
import static cn.katool.lock.LockMessageWatchDog.LOCK_MQ_NAME;
import static cn.katool.lock.LockMessageWatchDog.threadWaitQueue;
@Component
@Scope("prototype")
@Slf4j
public class LockUtil {
@Resource
RedisTemplate redisTemplate;
// lockName:uID:numbers
// key[1] 锁名
// ARGV[1] UID
// ARGV[2] 时间
@Value("if redis.call('exists',KEYS[1]) ~= 0 then\n" + // 这个锁是否存在
" if redis.call('hexists',KEYS[1],ARGV[1] ) == 0 then\n" + // 存在但不是自己的锁
" return redis.call('pttl',KEYS[1]);\n" + // 返回剩余时间
" end\n" +
// " -- 如果是自己的锁就记录次数\n" +
" redis.call('hincrby',KEYS[1],ARGV[1],1);\n" + // 是自己的锁重入
"else\n" +
" redis.call('hset',KEYS[1],ARGV[1],1);\n" + // 没有的话就加上
"end\n" +
"redis.call('pexpire',KEYS[1],ARGV[2]);\n" + //延期
"return nil") //返回
private String lockScript;
@Value(" if redis.call('exists',KEYS[1]) ~= 0 then\n" + //如果锁存在
" if redis.call('hexists',KEYS[1],ARGV[1]) ~= 0 then\n" + // 如果是自己的锁
" local inc= redis.call('hincrby',KEYS[1],ARGV[1],-1);\n" +
" if inc == 0 then" +
" redis.call('hdel',KEYS[1],ARGV[1]);\n" +
" end" +
" return inc;\n" +
" end\n" +
" end\n" +
" return nil\n")
private String unLockScript;
@Value(" if redis.call('exists',KEYS[1]) ~= 0 then\n" +
" local lessTime = redis.call('pttl',KEYS[1]);\n" +
" redis.call('pexpire',KEYS[1],ARGV[1]);\n" + //如果锁存在
" end\n" +
" return redis.call('pttl',KEYS[1]);" )
private String delayLockScript;
public String serviceUUid=RandomUtil.randomString(16);
private volatile static boolean isOpenCorn=false;
/**
* 带看门狗机制上锁
* @param lockObj
* @return
*/
public boolean DistributedLock(Object lockObj){
try {
return DistributedLock(lockObj,null,null,true);
} catch (KaToolException e) {
throw new RuntimeException(e);
}
}
@Resource
LockConfig lockConfig;
//加锁
public Long luaToRedisByLock(String lockName,Long expTime,TimeUnit timeUnit,String[] hashkey){
long id = Thread.currentThread().getId();
//生成hashkey
String hashKey=hashkey[0]= DigestUtils.md5DigestAsHex(new String(id+serviceUUid).getBytes());
log.info("serviceUUid:{},id:{},hashKey:{}",serviceUUid,id,hashKey);
DefaultRedisScript defaultRedisScript = new DefaultRedisScript();
ArrayList<String> keys = new ArrayList<>();
ArrayList<Object> args = new ArrayList();
keys.add(lockName);
args.add(hashKey);
args.add(TimeoutUtils.toMillis(expTime,timeUnit));
defaultRedisScript.setScriptText(lockScript);
defaultRedisScript.setResultType(Long.class);
// log.info("lockLua -> {}",lockScript);
Long execute = (Long) redisTemplate.execute(defaultRedisScript, keys, args.toArray());
if (ObjectUtil.isEmpty(execute)) {
log.info("katool=> {}成功抢到锁,锁名:{},过期时间:{},单位:{}",hashKey,lockName,expTime,timeUnit);
}
return execute;
}
//释放锁
@Transactional
public Long luaToRedisByUnLock(String lockName,Thread thread){
long id = thread==null?Thread.currentThread().getId():thread.getId();
//生成value1
String hashKey= DigestUtils.md5DigestAsHex(new String(id+serviceUUid).getBytes());
DefaultRedisScript defaultRedisScript = new DefaultRedisScript();
ArrayList<String> keys = new ArrayList<>();
ArrayList<String> args = new ArrayList();
keys.add(lockName);
args.add(hashKey);
defaultRedisScript.setScriptText(unLockScript);
defaultRedisScript.setResultType(Long.class);
// log.info("unlockLua -> {}",unLockScript);
Long remainLocks = (Long) redisTemplate.execute(defaultRedisScript, keys, args.toArray());
if (ObjectUtil.isEmpty(remainLocks)) {
log.error("katool=> {}释放锁失败,请释放自己的锁,锁名:{}",hashKey,lockName);
}
else if(remainLocks==0){
log.info("katool=> {}成功释放锁,锁名:{}",hashKey,lockName);
redisTemplate.convertAndSend(LOCK_MQ_NAME,lockName);
}
return remainLocks;
}
public Boolean luaToRedisByDelay(String lockName,Long expTimeInc,TimeUnit timeUnit){
long id = Thread.currentThread().getId();
DefaultRedisScript defaultRedisScript = new DefaultRedisScript();
ArrayList<String> keys = new ArrayList<>();
ArrayList<Object> args = new ArrayList();
keys.add(lockName);
args.add(TimeoutUtils.toMillis(expTimeInc,timeUnit)+3000);
defaultRedisScript.setScriptText(delayLockScript);
defaultRedisScript.setResultType(Long.class);
Long expire = redisTemplate.getExpire(lockName);
Long execute = (Long) redisTemplate.execute(defaultRedisScript, keys, args.toArray());
return execute>expire;
}
/**
* @param obj
* @param exptime
* @param timeUnit
* @return
* @throws KaToolException
*/
public boolean DistributedLock(Object obj,Long exptime,TimeUnit timeUnit,Boolean isDelay) throws KaToolException {
if (ObjectUtil.isEmpty(obj)){
throw new KaToolException(ErrorCode.PARAMS_ERROR," Lock=> 传入obj为空");
}
if(ObjectUtil.isEmpty(exptime)){
exptime= lockConfig.getInternalLockLeaseTime();;
}
if (ObjectUtils.isEmpty(timeUnit)){
timeUnit=lockConfig.getTimeUnit();
}
String lockName="Lock:"+obj.toString();
Long aLong = -1L;
// 进入互斥等待
Thread currentThread = Thread.currentThread();
while(true){
String[] hashkey = new String[1];
aLong=luaToRedisByLock(lockName, exptime, timeUnit, hashkey);
if (aLong==null) {
break;
}
log.debug("katool=> {}未抢到锁,线程等待通知唤醒,最多等待时间:{},锁名:{},过期时间:{},单位:{}",hashkey[0],aLong,lockName,exptime,timeUnit);
// Thread.sleep(aLong/3); // 初步改进:使用线程休眠,采用自旋锁+线程互斥
ConcurrentLinkedQueue<Thread> threads = threadWaitQueue.get(lockName);
if (ObjectUtil.isEmpty(threads)){
threads=new ConcurrentLinkedQueue<>();
log.info("新增线程进入等待lock:{}队列",lockName);
threadWaitQueue.putIfAbsent(lockName,threads);
log.debug("threadWaitQueue:{}",threadWaitQueue);;
}
if (!threads.contains(currentThread)) {
threads.add(currentThread);
}
log.debug("threadWaitQueue:{}",threadWaitQueue);;
LockSupport.parkNanos(aLong); // 等待通知唤醒 或者 超时自行处理
log.debug("katool=> {}未抢到锁,线程被唤醒,重新抢锁,锁名:{},过期时间:{},单位:{}",hashkey[0],lockName,exptime,timeUnit);
// 线程被唤醒后删除自身的thread队列,避免死锁
if (threads.contains(currentThread)) {
threads.remove(currentThread);
}
}
//实现看门狗
if (isDelay){
Thread thread = currentThread;
TimeUnit finalTimeUnit = timeUnit;
Long finalExptime = exptime;
ScheduledFuture<?> scheduledFuture = ScheduledTaskUtil.submitTask(new Runnable() {
@SneakyThrows
@Override
public void run() {
boolean alive = thread.isAlive()||thread.isInterrupted();
ScheduledFuture future = threadWatchDog.get(thread.getId());
if (alive) {
log.info("Thread ID:{} 线程仍然存活,看门狗续期中...", thread.getId());
delayDistributedLock(obj, finalExptime >= 3 ? (finalExptime) / 3 : finalExptime, finalTimeUnit);
return;
} else {
if (future.isCancelled()||future.isDone()) {
log.error("Thread ID:{} 线程已经死亡,但是没有对应的scheduleId", thread.getId());
return;
}
log.info("Thread ID:{} 线程死亡,看门狗自动解锁", thread.getId());
DistributedUnLock(obj,thread);
future.cancel(true);
return;
}
}
},finalExptime, finalExptime, finalTimeUnit);
threadWatchDog.put(thread.getId(),scheduledFuture);
}
return BooleanUtil.isTrue(aLong!=null);
}
ConcurrentHashMap<Long,ScheduledFuture> threadWatchDog=new ConcurrentHashMap<>();
//延期
public boolean delayDistributedLock(Object obj,Long exptime,TimeUnit timeUnit) throws KaToolException {
if (ObjectUtils.isEmpty(obj)){
throw new KaToolException(ErrorCode.PARAMS_ERROR," Lock=> 传入obj为空");
}
//本身就是一句话,具备原子性,没有必要使用lua脚本
Boolean expire = luaToRedisByDelay("Lock:"+obj.toString(),exptime,timeUnit);
log.info("katool=> LockUntil => delayDistributedLock:{} value:{} extime:{} timeUnit:{}",obj.toString(), "1", exptime, timeUnit);
return BooleanUtil.isTrue(expire);
}
//释放锁
public Long DistributedUnLock(Object obj) throws KaToolException {
Long remainLocks = DistributedUnLock(obj, null);
return remainLocks;
}
public Long DistributedUnLock(Object obj,Thread thread) throws KaToolException {
if (ObjectUtils.isEmpty(obj)){
throw new KaToolException(ErrorCode.PARAMS_ERROR," Lock=> 传入obj为空");
}
// 由于这里有了可重入锁,不应该直接删除Boolean aBoolean = redisTemplate.delete("Lock:" + obj.toString());
Long remainLocks = luaToRedisByUnLock("Lock:" + obj.toString(),thread);
threadWatchDog.get(Thread.currentThread().getId()).cancel(true);
threadWatchDog.remove(Thread.currentThread().getId());
log.info("katool=> LockUntil => unDistributedLock:{} isdelete:{} watchDog is cancel and drop",obj.toString(),true);
return remainLocks;
}
//利用枚举类实现单例模式,枚举类属性为静态的
private static enum SingletonFactory{
Singleton;
LockUtil lockUtil;
private SingletonFactory(){
lockUtil=new LockUtil();
}
public LockUtil getInstance(){
return lockUtil;
}
}
@Bean("LockUtil")
public static LockUtil getInstance(){
return SingletonFactory.Singleton.lockUtil;
}
private LockUtil(){
}
}
在消息发布那里,我本来打算放在Lua脚本里面运行的,但是凡在Lua脚本里面不知道为什么执行失败,但是单独的lua,使用eval来执行又没问题,所以我放在外面,加了一层事务来运行。
其中有一个定时任务工具,在之前我是使用的HuTool的CronUtil来实现的定时任务,但是对于动态修改执行周期,不太方便,需要生成Corn表达式,所以我改成了使用ScheduledThreadPoolExecutor
线程池的方式
package cn.katool.util;
import cn.hutool.cron.CronUtil;
import java.sql.Time;
import java.util.concurrent.*;
public class ScheduledTaskUtil {
private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(500,
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
public static ScheduledFuture<?> submitTask(Runnable run, long delay, TimeUnit timeUnit){
ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(run, 0,delay, timeUnit);
return schedule;
}
public static ScheduledFuture<?> submitTask(Callable run, long delay, TimeUnit timeUnit){
ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(()->{
try {
run.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 0,delay, timeUnit);
return schedule;
}
public static ScheduledFuture<?> submitTask(Runnable run,long initdeay, long delay, TimeUnit timeUnit){
ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(run, initdeay,delay, timeUnit);
return schedule;
}
public static ScheduledFuture<?> submitTask(Callable run,long initdeay, long delay, TimeUnit timeUnit){
ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(()->{
try {
run.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, initdeay,delay, timeUnit);
return schedule;
}
}
测试代码
下面我们来看看试验吧,我已经将代码上传到了Maven中央仓库,大家引入依赖即可
<dependency>
<groupId>cn.katool</groupId>
<artifactId>katool</artifactId>
<version>1.9.0</version>
</dependency>
然后我们在yml里面进行配置
server:
port: 8081
katool:
lock:
internalLockLeaseTime: 30 # 分布式锁默认租约时间
timeUnit: seconds # 租约时间单位
util:
redis:
policy: "caffeine" # 选择内存缓存策略,caffeine
exptime: {5*60*1000} # LFU过期时间
time-unit: milliseconds # 过期时间单位
spring:
redis:
database: 1
host: 127.0.0.1
port: 6379
timeout: 5000
lettuce:
pool:
max-active: 100 # 连接池最大连接数(使用负值表示没有限制)
max-idle: 100 # 连接池中的最大空闲连接
min-idle: 3 # 连接池中的最小空闲连接
max-wait: -50000ms # 连接池最大阻塞等待时间(使用负值表示没有限制)
task:
execution:
pool:
core-size: 300
max-size: 700
queue-capacity: 500
keep-alive: 300s
Controller层
package cn.katool.katooltest;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.katool.util.db.nosql.RedisUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/test")
public class TestController {
@Resource
RedisUtils redisUtils;
@GetMapping
public String test() throws InterruptedException {
redisUtils.lock("this");
//synchronized ("this".intern()){
String value = (String) redisUtils.getValue("1234");
if (ObjectUtil.isEmpty(value)){
redisUtils.setValue("1234", RandomUtil.randomString(10),5L, TimeUnit.MINUTES);
value = (String) redisUtils.getValue("1234");
}
//}
redisUtils.unlock("this");
return value.toString();
}
}
使用ApiFox进行API测试
100个线程同时并发来测试
第一次测验
第二次测验
第三次测验
第四次测验
第五次测验
总结
出去最高和最低的,QPS的均值在1700左右
用jmetter来测试下
大概是1600
1000个线程同时并发测试
ApiFox,并发量上去了贼卡,还是用Jmeter吧
不过,这有点低啊,为了看是不是我们分布式锁的问题,我们用同步锁来试试
还真是
我再来试试Redisson
package cn.katool.katooltest;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.katool.util.db.nosql.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {
@Resource
RedisUtils redisUtils;
public int i=0;
@Resource
Redisson redisson;
Integer q=100;
@GetMapping
public String test() throws InterruptedException {
// redisUtils.lock("this");
RLock aThis = redisson.getLock("this");
aThis.lock();
//synchronized ("this".intern()){
String value = (String) redisUtils.getValue("1234");
log.info("i={},q={}",i++,q);
if (ObjectUtil.isEmpty(value)){
redisUtils.setValue("1234", RandomUtil.randomString(10),5L, TimeUnit.MINUTES);
value = (String) redisUtils.getValue("1234");
}
// redisUtils.unlock("this");
aThis.unlock();
return value.toString();
//}
}
}
人家的QPS将近1000
好吧我是fw
会不会是因为Redisson没有使用公平锁的问题?
我们来试一试公平锁
事实证明,我确实是fw
思考优化
我的思路和Redisson底层思路差不多,但是为什么会出现这么大的差错?
数据结构的选择?
我们的公平锁队列的实现方式还记得吗?
public static ConcurrentHashMap<String, ConcurrentLinkedQueue<Thread>> threadWaitQueue=new ConcurrentHashMap<>();;
我们为了并发安全使用了ConcurrentHashMap
和ConcurrentLinkedQueue
使用ConcurrentHashMap
是为了避免扩容时rehash,导致出现循环链表
那么我们使用ConcurrentLinkedQueue
的目的呢?
我们多个线程加入的时候,是看谁先加入,Queue也是单机的
在删除的时候,如果有多个订阅者同时监听,那么可能会造成任务丢失,但是我们这里,一个服务内只有一个消费者,貌似没必要。
那么我们使用最基础的,现在改改代码
@SneakyThrows
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("从消息通道={}监听到消息",new String(pattern));
log.info("从消息通道={}监听到消息",new String(message.getChannel()));
String lockName = new String(message.getBody()).substring(1, message.getBody().length - 1);
log.info("元消息={}", lockName);
log.info("threadWaitQueue:{}",threadWaitQueue);;
LinkedList<Thread> threads = threadWaitQueue.get(lockName);
if (ObjectUtil.isEmpty(threads)||threads.isEmpty()){
log.info("没有线程需要lock:{}在等待", lockName);
threadWaitQueue.remove(lockName);
return ;
}
Thread peek = threads.peek();
log.info("唤醒线程={}",peek.getName());
LockSupport.unpark(peek); // 竞争到后会自行删除
}
LinkedList<Thread> threads = threadWaitQueue.get(lockName);
while(true){
String[] hashkey = new String[1];
aLong=luaToRedisByLock(lockName, exptime, timeUnit, hashkey);
if (aLong==null) {
break;
}
log.debug("katool=> {}未抢到锁,线程等待通知唤醒,最多等待时间:{},锁名:{},过期时间:{},单位:{}",hashkey[0],aLong,lockName,exptime,timeUnit);
// Thread.sleep(aLong/3); // 初步改进:使用线程休眠,采用自旋锁+线程互斥
if (ObjectUtil.isEmpty(threads)){
threads=new LinkedList<Thread>();
log.info("新增线程进入等待lock:{}队列",lockName);
threadWaitQueue.putIfAbsent(lockName,threads);
log.debug("threadWaitQueue:{}",threadWaitQueue);;
}
if (!threads.contains(currentThread)) {
threads.push(currentThread);
}
log.debug("threadWaitQueue:{}",threadWaitQueue);;
LockSupport.parkNanos(((aLong<<1)+aLong)>>1); // 自行争取一次
log.debug("katool=> {}未抢到锁,线程被唤醒,重新抢锁,锁名:{},过期时间:{},单位:{}",hashkey[0],lockName,exptime,timeUnit);
}
// 线程被唤醒后删除自身的thread队列,避免死锁
if (threads.contains(currentThread)) {
threads.remove(currentThread);
}
看看优化后的效果?
和原来差不多?难道是log的问题?
我把log改到debug级别
woc???
那我试试用线程安全的queue
好好好,都差不多,还是改回去,免得出现一些bug,hhh
后来查了一下,log.info采用的是异步输出,也就是说会占用线程,从而大大减小了QPS,我的电脑目前单机项目最高的QPS在1k左右,最高能够并发5000个线程,超过就得报错。
3000并发测试
下面这个是3000并发的QPS
另外还有,这篇文章的代码我采用了掠夺式,也就是线程被锁一定时间后还没有被通知唤醒的话,自行回去争取一次拿锁,文章只是实现一些思路,具体的使用可以更具自己的开发环境进行修改,目前KaTool中支持掠夺和非掠夺两种方式。