重新说说Redis实现分布式公平可重入锁的实现,这次具体说说异步唤醒机制,这次带上QPS检测
在之前的文章中,我们对Redisson的lock进行了刨析
如下:
- 看门狗续期
- 使用hash+lua实现可重入锁
但是还有一点,我们之前使用的自旋+线程休眠来达到线程互斥阻塞的效果。
但是这样做会有一点问题,我们每次休眠的时间都是固定的,仍然会有一大部分空窗期,我设置30s的锁过期,那么好,20个线程足足跑了两三分钟,这个效率绝对不行,对吧。
那么我们还有没有其他办法?
Redssion源码中使用了异步消息订阅
呃呃呃,之前的朋友说我的代码run不起来,没错,我故意改了一些地方,毕竟天下没有免费的午餐对吧,我主要还是提供一下思路,和大概的代码,具体的大家自行去运行了,理解会更深(bushi,好吧,我承认之前我刨析的时候自己实现的代码没有跑 ,所以今天来带大家跑一下)
那么我们也来实现一个
我们使用的是RedisTemplate,然后线程阻塞和唤醒的话我们需要一个同步锁,但是感觉有点问题
因为Java的线程阻塞和唤醒,必须要同步锁,所以我们需要一个变量,那么每个线程都要一个变量,啊啊啊,然后又要使用hashmap来存储变量对应的线程???太他妈浪费内存了
其实在JDK6之后给我提供了一个api=>LockSupport
这里面封装了一些本地方法来进行线程操作
LockSupport
简单介绍下api吧
LockSupport.park(); //阻塞当前线程 | |
LockSupport.parkNanos(long nanos); // 让当前线程休眠 nancos ms | |
LockSupport.unpark(Thread thread); //唤醒指定线程 |
看到这里,我相信大家应该都有思路了。
我们实现一个FIFO队列来保证锁竞争的公平性
锁竞争公平性的保证
public static ConcurrentHashMap<String, ConcurrentLinkedQueue<Thread>> threadWaitQueue=new ConcurrentHashMap<>();;
每当锁释放的时候,我们使用消息订阅通知一下,然后unpark不就行了。
但是还有一个问题,如果说某个线程长时间占用,或者消息丢失怎么办?
所以说哦我们使用parkNanos
好了直接上源代码,这次我放能够运行的
Pub/Sub
第一个是消息监听的
package cn.katool.lock; | |
import cn.hutool.core.util.ObjectUtil; | |
import cn.katool.util.ScheduledTaskUtil; | |
import cn.katool.util.db.nosql.RedisUtils; | |
import lombok.SneakyThrows; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.data.redis.connection.Message; | |
import org.springframework.data.redis.connection.MessageListener; | |
import org.springframework.data.redis.connection.RedisConnectionFactory; | |
import org.springframework.data.redis.listener.ChannelTopic; | |
import org.springframework.data.redis.listener.RedisMessageListenerContainer; | |
import org.springframework.stereotype.Component; | |
import javax.annotation.Resource; | |
import java.util.HashMap; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.locks.LockSupport; | |
public class LockMessageWatchDog implements MessageListener { | |
public static ConcurrentHashMap<String, ConcurrentLinkedQueue<Thread>> threadWaitQueue=new ConcurrentHashMap<>();; | |
public static final String LOCK_MQ_NAME = "LOCK:RELASE:QUEUE"; | |
public void onMessage(Message message, byte[] pattern) { | |
log.info("从消息通道={}监听到消息",new String(pattern)); | |
log.info("从消息通道={}监听到消息",new String(message.getChannel())); | |
String lockName = new String(message.getBody()).substring(1, message.getBody().length - 1); | |
log.info("元消息={}", lockName); | |
log.info("threadWaitQueue:{}",threadWaitQueue);; | |
ConcurrentLinkedQueue<Thread> threads = threadWaitQueue.get(lockName); | |
if (ObjectUtil.isEmpty(threads)||threads.isEmpty()){ | |
log.info("没有线程需要lock:{}在等待", lockName); | |
threadWaitQueue.remove(lockName); | |
return ; | |
} | |
Thread poll = threads.poll(); | |
log.info("唤醒线程={}",poll.getName()); | |
LockSupport.unpark(poll); | |
} | |
//表示监听一个频道 | |
public RedisMessageListenerContainer container(RedisConnectionFactory factory) { | |
RedisMessageListenerContainer container = new RedisMessageListenerContainer(); | |
container.setConnectionFactory(factory); | |
container.addMessageListener(this, new ChannelTopic(LOCK_MQ_NAME)); | |
return container; | |
} | |
} |
下面是锁的工具类
LockUtil
/** | |
* Title | |
* | |
* @ClassName: LockUtil | |
* @Description:锁工具类,通过内部枚举类实现单例,防止反射攻击 | |
* @author: Karos | |
* @date: 2023/1/4 0:17 | |
* @Blog: https://www.wzl1.top/ | |
*/ | |
package cn.katool.lock; | |
import cn.hutool.core.util.BooleanUtil; | |
import cn.hutool.core.util.ObjectUtil; | |
import cn.hutool.core.util.RandomUtil; | |
import cn.katool.Config.LockConfig; | |
import cn.katool.Exception.ErrorCode; | |
import cn.katool.Exception.KaToolException; | |
import cn.katool.util.ScheduledTaskUtil; | |
import lombok.SneakyThrows; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Scope; | |
import org.springframework.data.redis.core.RedisTemplate; | |
import org.springframework.data.redis.core.TimeoutUtils; | |
import org.springframework.data.redis.core.script.DefaultRedisScript; | |
import org.springframework.stereotype.Component; | |
import org.springframework.transaction.annotation.Transactional; | |
import org.springframework.util.DigestUtils; | |
import org.springframework.util.ObjectUtils; | |
import javax.annotation.Resource; | |
import java.util.ArrayList; | |
import java.util.concurrent.*; | |
import java.util.concurrent.locks.LockSupport; | |
import static cn.katool.lock.LockMessageWatchDog.LOCK_MQ_NAME; | |
import static cn.katool.lock.LockMessageWatchDog.threadWaitQueue; | |
public class LockUtil { | |
RedisTemplate redisTemplate; | |
// lockName:uID:numbers | |
// key[1] 锁名 | |
// ARGV[1] UID | |
// ARGV[2] 时间 | |
//返回 | |
private String lockScript; | |
private String unLockScript; | |
private String delayLockScript; | |
public String serviceUUid=RandomUtil.randomString(16); | |
private volatile static boolean isOpenCorn=false; | |
/** | |
* 带看门狗机制上锁 | |
* @param lockObj | |
* @return | |
*/ | |
public boolean DistributedLock(Object lockObj){ | |
try { | |
return DistributedLock(lockObj,null,null,true); | |
} catch (KaToolException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
LockConfig lockConfig; | |
//加锁 | |
public Long luaToRedisByLock(String lockName,Long expTime,TimeUnit timeUnit,String[] hashkey){ | |
long id = Thread.currentThread().getId(); | |
//生成hashkey | |
String hashKey=hashkey[0]= DigestUtils.md5DigestAsHex(new String(id+serviceUUid).getBytes()); | |
log.info("serviceUUid:{},id:{},hashKey:{}",serviceUUid,id,hashKey); | |
DefaultRedisScript defaultRedisScript = new DefaultRedisScript(); | |
ArrayList<String> keys = new ArrayList<>(); | |
ArrayList<Object> args = new ArrayList(); | |
keys.add(lockName); | |
args.add(hashKey); | |
args.add(TimeoutUtils.toMillis(expTime,timeUnit)); | |
defaultRedisScript.setScriptText(lockScript); | |
defaultRedisScript.setResultType(Long.class); | |
// log.info("lockLua -> {}",lockScript); | |
Long execute = (Long) redisTemplate.execute(defaultRedisScript, keys, args.toArray()); | |
if (ObjectUtil.isEmpty(execute)) { | |
log.info("katool=> {}成功抢到锁,锁名:{},过期时间:{},单位:{}",hashKey,lockName,expTime,timeUnit); | |
} | |
return execute; | |
} | |
//释放锁 | |
public Long luaToRedisByUnLock(String lockName,Thread thread){ | |
long id = thread==null?Thread.currentThread().getId():thread.getId(); | |
//生成value1 | |
String hashKey= DigestUtils.md5DigestAsHex(new String(id+serviceUUid).getBytes()); | |
DefaultRedisScript defaultRedisScript = new DefaultRedisScript(); | |
ArrayList<String> keys = new ArrayList<>(); | |
ArrayList<String> args = new ArrayList(); | |
keys.add(lockName); | |
args.add(hashKey); | |
defaultRedisScript.setScriptText(unLockScript); | |
defaultRedisScript.setResultType(Long.class); | |
// log.info("unlockLua -> {}",unLockScript); | |
Long remainLocks = (Long) redisTemplate.execute(defaultRedisScript, keys, args.toArray()); | |
if (ObjectUtil.isEmpty(remainLocks)) { | |
log.error("katool=> {}释放锁失败,请释放自己的锁,锁名:{}",hashKey,lockName); | |
} | |
else if(remainLocks==0){ | |
log.info("katool=> {}成功释放锁,锁名:{}",hashKey,lockName); | |
redisTemplate.convertAndSend(LOCK_MQ_NAME,lockName); | |
} | |
return remainLocks; | |
} | |
public Boolean luaToRedisByDelay(String lockName,Long expTimeInc,TimeUnit timeUnit){ | |
long id = Thread.currentThread().getId(); | |
DefaultRedisScript defaultRedisScript = new DefaultRedisScript(); | |
ArrayList<String> keys = new ArrayList<>(); | |
ArrayList<Object> args = new ArrayList(); | |
keys.add(lockName); | |
args.add(TimeoutUtils.toMillis(expTimeInc,timeUnit)+3000); | |
defaultRedisScript.setScriptText(delayLockScript); | |
defaultRedisScript.setResultType(Long.class); | |
Long expire = redisTemplate.getExpire(lockName); | |
Long execute = (Long) redisTemplate.execute(defaultRedisScript, keys, args.toArray()); | |
return execute>expire; | |
} | |
/** | |
* @param obj | |
* @param exptime | |
* @param timeUnit | |
* @return | |
* @throws KaToolException | |
*/ | |
public boolean DistributedLock(Object obj,Long exptime,TimeUnit timeUnit,Boolean isDelay) throws KaToolException { | |
if (ObjectUtil.isEmpty(obj)){ | |
throw new KaToolException(ErrorCode.PARAMS_ERROR," Lock=> 传入obj为空"); | |
} | |
if(ObjectUtil.isEmpty(exptime)){ | |
exptime= lockConfig.getInternalLockLeaseTime();; | |
} | |
if (ObjectUtils.isEmpty(timeUnit)){ | |
timeUnit=lockConfig.getTimeUnit(); | |
} | |
String lockName="Lock:"+obj.toString(); | |
Long aLong = -1L; | |
// 进入互斥等待 | |
Thread currentThread = Thread.currentThread(); | |
while(true){ | |
String[] hashkey = new String[1]; | |
aLong=luaToRedisByLock(lockName, exptime, timeUnit, hashkey); | |
if (aLong==null) { | |
break; | |
} | |
log.debug("katool=> {}未抢到锁,线程等待通知唤醒,最多等待时间:{},锁名:{},过期时间:{},单位:{}",hashkey[0],aLong,lockName,exptime,timeUnit); | |
// Thread.sleep(aLong/3); // 初步改进:使用线程休眠,采用自旋锁+线程互斥 | |
ConcurrentLinkedQueue<Thread> threads = threadWaitQueue.get(lockName); | |
if (ObjectUtil.isEmpty(threads)){ | |
threads=new ConcurrentLinkedQueue<>(); | |
log.info("新增线程进入等待lock:{}队列",lockName); | |
threadWaitQueue.putIfAbsent(lockName,threads); | |
log.debug("threadWaitQueue:{}",threadWaitQueue);; | |
} | |
if (!threads.contains(currentThread)) { | |
threads.add(currentThread); | |
} | |
log.debug("threadWaitQueue:{}",threadWaitQueue);; | |
LockSupport.parkNanos(aLong); // 等待通知唤醒 或者 超时自行处理 | |
log.debug("katool=> {}未抢到锁,线程被唤醒,重新抢锁,锁名:{},过期时间:{},单位:{}",hashkey[0],lockName,exptime,timeUnit); | |
// 线程被唤醒后删除自身的thread队列,避免死锁 | |
if (threads.contains(currentThread)) { | |
threads.remove(currentThread); | |
} | |
} | |
//实现看门狗 | |
if (isDelay){ | |
Thread thread = currentThread; | |
TimeUnit finalTimeUnit = timeUnit; | |
Long finalExptime = exptime; | |
ScheduledFuture<?> scheduledFuture = ScheduledTaskUtil.submitTask(new Runnable() { | |
public void run() { | |
boolean alive = thread.isAlive()||thread.isInterrupted(); | |
ScheduledFuture future = threadWatchDog.get(thread.getId()); | |
if (alive) { | |
log.info("Thread ID:{} 线程仍然存活,看门狗续期中...", thread.getId()); | |
delayDistributedLock(obj, finalExptime >= 3 ? (finalExptime) / 3 : finalExptime, finalTimeUnit); | |
return; | |
} else { | |
if (future.isCancelled()||future.isDone()) { | |
log.error("Thread ID:{} 线程已经死亡,但是没有对应的scheduleId", thread.getId()); | |
return; | |
} | |
log.info("Thread ID:{} 线程死亡,看门狗自动解锁", thread.getId()); | |
DistributedUnLock(obj,thread); | |
future.cancel(true); | |
return; | |
} | |
} | |
},finalExptime, finalExptime, finalTimeUnit); | |
threadWatchDog.put(thread.getId(),scheduledFuture); | |
} | |
return BooleanUtil.isTrue(aLong!=null); | |
} | |
ConcurrentHashMap<Long,ScheduledFuture> threadWatchDog=new ConcurrentHashMap<>(); | |
//延期 | |
public boolean delayDistributedLock(Object obj,Long exptime,TimeUnit timeUnit) throws KaToolException { | |
if (ObjectUtils.isEmpty(obj)){ | |
throw new KaToolException(ErrorCode.PARAMS_ERROR," Lock=> 传入obj为空"); | |
} | |
//本身就是一句话,具备原子性,没有必要使用lua脚本 | |
Boolean expire = luaToRedisByDelay("Lock:"+obj.toString(),exptime,timeUnit); | |
log.info("katool=> LockUntil => delayDistributedLock:{} value:{} extime:{} timeUnit:{}",obj.toString(), "1", exptime, timeUnit); | |
return BooleanUtil.isTrue(expire); | |
} | |
//释放锁 | |
public Long DistributedUnLock(Object obj) throws KaToolException { | |
Long remainLocks = DistributedUnLock(obj, null); | |
return remainLocks; | |
} | |
public Long DistributedUnLock(Object obj,Thread thread) throws KaToolException { | |
if (ObjectUtils.isEmpty(obj)){ | |
throw new KaToolException(ErrorCode.PARAMS_ERROR," Lock=> 传入obj为空"); | |
} | |
// 由于这里有了可重入锁,不应该直接删除Boolean aBoolean = redisTemplate.delete("Lock:" + obj.toString()); | |
Long remainLocks = luaToRedisByUnLock("Lock:" + obj.toString(),thread); | |
threadWatchDog.get(Thread.currentThread().getId()).cancel(true); | |
threadWatchDog.remove(Thread.currentThread().getId()); | |
log.info("katool=> LockUntil => unDistributedLock:{} isdelete:{} watchDog is cancel and drop",obj.toString(),true); | |
return remainLocks; | |
} | |
//利用枚举类实现单例模式,枚举类属性为静态的 | |
private static enum SingletonFactory{ | |
Singleton; | |
LockUtil lockUtil; | |
private SingletonFactory(){ | |
lockUtil=new LockUtil(); | |
} | |
public LockUtil getInstance(){ | |
return lockUtil; | |
} | |
} | |
public static LockUtil getInstance(){ | |
return SingletonFactory.Singleton.lockUtil; | |
} | |
private LockUtil(){ | |
} | |
} |
在消息发布那里,我本来打算放在Lua脚本里面运行的,但是凡在Lua脚本里面不知道为什么执行失败,但是单独的lua,使用eval来执行又没问题,所以我放在外面,加了一层事务来运行。
其中有一个定时任务工具,在之前我是使用的HuTool的CronUtil来实现的定时任务,但是对于动态修改执行周期,不太方便,需要生成Corn表达式,所以我改成了使用ScheduledThreadPoolExecutor
线程池的方式
package cn.katool.util; | |
import cn.hutool.cron.CronUtil; | |
import java.sql.Time; | |
import java.util.concurrent.*; | |
public class ScheduledTaskUtil { | |
private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(500, | |
Executors.defaultThreadFactory(), | |
new ThreadPoolExecutor.CallerRunsPolicy()); | |
public static ScheduledFuture<?> submitTask(Runnable run, long delay, TimeUnit timeUnit){ | |
ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(run, 0,delay, timeUnit); | |
return schedule; | |
} | |
public static ScheduledFuture<?> submitTask(Callable run, long delay, TimeUnit timeUnit){ | |
ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(()->{ | |
try { | |
run.call(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
}, 0,delay, timeUnit); | |
return schedule; | |
} | |
public static ScheduledFuture<?> submitTask(Runnable run,long initdeay, long delay, TimeUnit timeUnit){ | |
ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(run, initdeay,delay, timeUnit); | |
return schedule; | |
} | |
public static ScheduledFuture<?> submitTask(Callable run,long initdeay, long delay, TimeUnit timeUnit){ | |
ScheduledFuture<?> schedule = executor.scheduleWithFixedDelay(()->{ | |
try { | |
run.call(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
}, initdeay,delay, timeUnit); | |
return schedule; | |
} | |
} |
测试代码
下面我们来看看试验吧,我已经将代码上传到了Maven中央仓库,大家引入依赖即可
<dependency> | |
<groupId>cn.katool</groupId> | |
<artifactId>katool</artifactId> | |
<version>1.9.0</version> | |
</dependency> |
然后我们在yml里面进行配置
server: | |
port: 8081 | |
katool: | |
lock: | |
internalLockLeaseTime: 30 # 分布式锁默认租约时间 | |
timeUnit: seconds # 租约时间单位 | |
util: | |
redis: | |
policy: "caffeine" # 选择内存缓存策略,caffeine | |
exptime: {5*60*1000} # LFU过期时间 | |
time-unit: milliseconds # 过期时间单位 | |
spring: | |
redis: | |
database: 1 | |
host: 127.0.0.1 | |
port: 6379 | |
timeout: 5000 | |
lettuce: | |
pool: | |
max-active: 100 # 连接池最大连接数(使用负值表示没有限制) | |
max-idle: 100 # 连接池中的最大空闲连接 | |
min-idle: 3 # 连接池中的最小空闲连接 | |
max-wait: -50000ms # 连接池最大阻塞等待时间(使用负值表示没有限制) | |
task: | |
execution: | |
pool: | |
core-size: 300 | |
max-size: 700 | |
queue-capacity: 500 | |
keep-alive: 300s |
Controller层
package cn.katool.katooltest; | |
import cn.hutool.core.util.ObjectUtil; | |
import cn.hutool.core.util.RandomUtil; | |
import cn.katool.util.db.nosql.RedisUtils; | |
import org.springframework.web.bind.annotation.GetMapping; | |
import org.springframework.web.bind.annotation.RequestMapping; | |
import org.springframework.web.bind.annotation.RestController; | |
import javax.annotation.Resource; | |
import java.util.concurrent.TimeUnit; | |
public class TestController { | |
RedisUtils redisUtils; | |
public String test() throws InterruptedException { | |
redisUtils.lock("this"); | |
//synchronized ("this".intern()){ | |
String value = (String) redisUtils.getValue("1234"); | |
if (ObjectUtil.isEmpty(value)){ | |
redisUtils.setValue("1234", RandomUtil.randomString(10),5L, TimeUnit.MINUTES); | |
value = (String) redisUtils.getValue("1234"); | |
} | |
//} | |
redisUtils.unlock("this"); | |
return value.toString(); | |
} | |
} |
使用ApiFox进行API测试
100个线程同时并发来测试
第一次测验
第二次测验
第三次测验
第四次测验
第五次测验
总结
出去最高和最低的,QPS的均值在1700左右
用jmetter来测试下
大概是1600
1000个线程同时并发测试
ApiFox,并发量上去了贼卡,还是用Jmeter吧
不过,这有点低啊,为了看是不是我们分布式锁的问题,我们用同步锁来试试
还真是
我再来试试Redisson
package cn.katool.katooltest; | |
import cn.hutool.core.util.ObjectUtil; | |
import cn.hutool.core.util.RandomUtil; | |
import cn.katool.util.db.nosql.RedisUtils; | |
import lombok.extern.slf4j.Slf4j; | |
import org.redisson.Redisson; | |
import org.redisson.api.RLock; | |
import org.springframework.web.bind.annotation.GetMapping; | |
import org.springframework.web.bind.annotation.RequestMapping; | |
import org.springframework.web.bind.annotation.RestController; | |
import javax.annotation.Resource; | |
import java.util.concurrent.TimeUnit; | |
public class TestController { | |
RedisUtils redisUtils; | |
public int i=0; | |
Redisson redisson; | |
Integer q=100; | |
public String test() throws InterruptedException { | |
// redisUtils.lock("this"); | |
RLock aThis = redisson.getLock("this"); | |
aThis.lock(); | |
//synchronized ("this".intern()){ | |
String value = (String) redisUtils.getValue("1234"); | |
log.info("i={},q={}",i++,q); | |
if (ObjectUtil.isEmpty(value)){ | |
redisUtils.setValue("1234", RandomUtil.randomString(10),5L, TimeUnit.MINUTES); | |
value = (String) redisUtils.getValue("1234"); | |
} | |
// redisUtils.unlock("this"); | |
aThis.unlock(); | |
return value.toString(); | |
//} | |
} | |
} |
人家的QPS将近1000
好吧我是fw
会不会是因为Redisson没有使用公平锁的问题?
我们来试一试公平锁
事实证明,我确实是fw
思考优化
我的思路和Redisson底层思路差不多,但是为什么会出现这么大的差错?
数据结构的选择?
我们的公平锁队列的实现方式还记得吗?
public static ConcurrentHashMap<String, ConcurrentLinkedQueue<Thread>> threadWaitQueue=new ConcurrentHashMap<>();;
我们为了并发安全使用了ConcurrentHashMap
和ConcurrentLinkedQueue
使用ConcurrentHashMap
是为了避免扩容时rehash,导致出现循环链表
那么我们使用ConcurrentLinkedQueue
的目的呢?
我们多个线程加入的时候,是看谁先加入,Queue也是单机的
在删除的时候,如果有多个订阅者同时监听,那么可能会造成任务丢失,但是我们这里,一个服务内只有一个消费者,貌似没必要。
那么我们使用最基础的,现在改改代码
public void onMessage(Message message, byte[] pattern) { | |
log.info("从消息通道={}监听到消息",new String(pattern)); | |
log.info("从消息通道={}监听到消息",new String(message.getChannel())); | |
String lockName = new String(message.getBody()).substring(1, message.getBody().length - 1); | |
log.info("元消息={}", lockName); | |
log.info("threadWaitQueue:{}",threadWaitQueue);; | |
LinkedList<Thread> threads = threadWaitQueue.get(lockName); | |
if (ObjectUtil.isEmpty(threads)||threads.isEmpty()){ | |
log.info("没有线程需要lock:{}在等待", lockName); | |
threadWaitQueue.remove(lockName); | |
return ; | |
} | |
Thread peek = threads.peek(); | |
log.info("唤醒线程={}",peek.getName()); | |
LockSupport.unpark(peek); // 竞争到后会自行删除 | |
} | |
LinkedList<Thread> threads = threadWaitQueue.get(lockName); | |
while(true){ | |
String[] hashkey = new String[1]; | |
aLong=luaToRedisByLock(lockName, exptime, timeUnit, hashkey); | |
if (aLong==null) { | |
break; | |
} | |
log.debug("katool=> {}未抢到锁,线程等待通知唤醒,最多等待时间:{},锁名:{},过期时间:{},单位:{}",hashkey[0],aLong,lockName,exptime,timeUnit); | |
// Thread.sleep(aLong/3); // 初步改进:使用线程休眠,采用自旋锁+线程互斥 | |
if (ObjectUtil.isEmpty(threads)){ | |
threads=new LinkedList<Thread>(); | |
log.info("新增线程进入等待lock:{}队列",lockName); | |
threadWaitQueue.putIfAbsent(lockName,threads); | |
log.debug("threadWaitQueue:{}",threadWaitQueue);; | |
} | |
if (!threads.contains(currentThread)) { | |
threads.push(currentThread); | |
} | |
log.debug("threadWaitQueue:{}",threadWaitQueue);; | |
LockSupport.parkNanos(((aLong<<1)+aLong)>>1); // 自行争取一次 | |
log.debug("katool=> {}未抢到锁,线程被唤醒,重新抢锁,锁名:{},过期时间:{},单位:{}",hashkey[0],lockName,exptime,timeUnit); | |
} | |
// 线程被唤醒后删除自身的thread队列,避免死锁 | |
if (threads.contains(currentThread)) { | |
threads.remove(currentThread); | |
} |
看看优化后的效果?
和原来差不多?难道是log的问题?
我把log改到debug级别
woc???
那我试试用线程安全的queue
好好好,都差不多,还是改回去,免得出现一些bug,hhh
后来查了一下,log.info采用的是异步输出,也就是说会占用线程,从而大大减小了QPS,我的电脑目前单机项目最高的QPS在1k左右,最高能够并发5000个线程,超过就得报错。
3000并发测试
下面这个是3000并发的QPS
另外还有,这篇文章的代码我采用了掠夺式,也就是线程被锁一定时间后还没有被通知唤醒的话,自行回去争取一次拿锁,文章只是实现一些思路,具体的使用可以更具自己的开发环境进行修改,目前KaTool中支持掠夺和非掠夺两种方式。