Redisson源码解读-分布式锁( 三 )

value.getLatch().release() 也就是Semaphore#release  , 会唤醒Semaphore#tryAcquire阻塞的线程
解锁上面我们聊了加锁 , 本小节来聊下解锁 。调用路径如下
// RedissonLock#unlock// RedissonBaseLock#unlockAsync(long threadId)public RFuture<Void> unlockAsync(long threadId) {// 调用lua解锁RFuture<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {// 取消看门狗cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}解锁的逻辑不复杂,调用lua脚本解锁以及取消看门狗 。看门狗晚点说,先说下lua解锁
// RedissonLock#unlockInnerAsyncprotected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}老规矩,分析下这段lua:

  1. 如果锁不存在 , 返回null
  2. 锁的值减1,如果锁的值大于0(也就是可重入锁仍然有加锁次数),则重新设置过期时间
  3. 如果锁的值小于等于0,这说明可以真正解锁了,删除锁并通过发布订阅机制发布解锁消息
从 lua 中可以看到,解锁时会发布消息到 channel 中,加锁方法RedissonLock#tryLock中有相对应的订阅操作 。
看门狗试想一个场景:程序执行需要10秒 , 程序执行完成才去解锁,而锁的存活时间只有5秒,也就是程序执行到一半的时候锁就可以被其他程序获取了 , 这显然不合适 。那么怎么解决呢?
  1. 方式一:锁永远存在,直到解锁 。不设置存活时间 。
    这种方法的弊端在于,如果程序没解锁就挂了,锁就成了死锁
  2. 方式二:依然设置锁存活时间,但是监控程序的执行,如果程序还没有执行完成 , 则定期给锁续期 。
方式二就是Redisson的看门狗机制 。看门狗只有在没有显示指定锁的持有时间(leaseTime)时才会生效 。
// RedissonLock#tryAcquireAsync// RedissonBaseLock#scheduleExpirationRenewalprotected void scheduleExpirationRenewal(long threadId) {// 创建ExpirationEntry,并放入EXPIRATION_RENEWAL_MAP中,下面的renewExpiration()方法会从map中再拿出来用ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {// 看门狗的具体逻辑renewExpiration();} finally {// 如果线程被中断了 , 就取消看门狗if (Thread.currentThread().isInterrupted()) {// 取消看门狗cancelExpirationRenewal(threadId);}}}}scheduleExpirationRenewal 方法处理了ExpirationEntry和如果出现异常则取消看门狗,具体看门狗逻辑在 renewExpiration 方法中
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}// 创建延时任务,延时时间是internalLockLeaseTime / 3Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}// lua脚本判断 , 如果锁存在,则续期并返回true,不存在则返回falseCompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getRawName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// 锁续期成功 , 则再启动一个延时任务,继续监测renewExpiration();} else {// 取消看门狗cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}Timeout 是一个延时任务 , 延时 internalLockLeaseTime / 3 时间执行 。任务的内容主要是通过 renewExpirationAsync 方法对锁进行续期,如果续期失败(解锁了、锁到期等),则取消看门狗,如果续期成功,则递归 renewExpiration 方法,继续创建延时任务 。

推荐阅读