一、Redisson概述
什么是Redisson?—— Redisson Wiki
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Lettuce是一种可扩展的线程安全的 Redis 客户端,通讯框架基于Netty,支持高级的 Redis 特性,比如哨兵,集群,管道,自动重新连接和Redis数据模型。Spring Boot 2.x 开始 Lettuce 已取代 Jedis 成为首选 Redis 的客户端。
Redisson是架设在Redis基础上,通讯基于Netty的综合的、新型的中间件,企业级开发中使用Redis的最佳范本
Jedis把Redis命令封装好,Lettuce则进一步有了更丰富的Api,也支持集群等模式。但是两者也都点到为止,只给了你操作Redis数据库的脚手架,而Redisson则是基于Redis、Lua和Netty建立起了成熟的分布式解决方案,甚至redis官方都推荐的一种工具集。
二、分布式锁
// 加锁
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
// 解锁,防止删错别人的锁,以uuid为value校验是否自己的锁
public void unlock(String lockName, String uuid) {
if(uuid.equals(redisTemplate.opsForValue().get(lockName)){ redisTemplate.opsForValue().del(lockName); }
}
// 结构
if(tryLock){
// todo
}finally{
unlock;
}
Lua脚本是什么?
Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval/evalsha命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作。
if redis.call('get', KEYS[1]) == ARGV[1]
then
-- 执行删除操作
return redis.call('del', KEYS[1])
else
-- 不成功,返回0
return 0
end
// 解锁脚本
DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));
// 执行lua脚本解锁
redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);
2.0似乎更像一把锁,但好像又缺少了什么,小张一拍脑袋,synchronized和ReentrantLock都很丝滑,因为他们都是可重入锁,一个线程多次拿锁也不会死锁,我们需要可重入。
当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储偏向的线程ID,以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需简单测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下Mark Word中偏向锁标志是否设置成1:没有则CAS竞争;设置了,则CAS将对象头偏向锁指向当前线程。 再维护一个计数器,同个线程进入则自增1,离开再减1,直到为0才能释放
需要存储 锁名称lockName、获得该锁的线程id和对应线程的进入次数count
2.加锁
每次线程获取锁时,判断是否已存在该锁
不存在
设置hash的key为线程id,value初始化为1
设置过期时间
返回获取锁成功true
存在
继续判断是否存在当前线程id的hash key
存在,线程key的value + 1,重入次数增加1,设置过期时间
不存在,返回加锁失败
3.解锁
每次线程来解锁时,判断是否已存在该锁
存在
是否有该线程的id的hash key,有则减1,无则返回解锁失败
减1后,判断剩余count是否为0,为0则说明不再需要这把锁,执行del命令删除
1.存储结构
hget lockname1 threadIdlockname 锁名称
key1:threadId 唯一键,线程id
value1: count 计数器,记录该线程获取锁的次数
2.计数器的加减
当同一个线程获取同一把锁时,我们需要对对应线程的计数器count做加减
exists,而判断一个hash的key是否存在,可以用hexistshincrbyhincrby lockname1 threadId 1,自减1时 hincrby lockname1 threadId -13.解锁的判断
当一把锁不再被需要了,每次解锁一次,count减1,直到为0时,执行删除
local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];
-- lockname不存在
if(redis.call('exists', key) == 0) then
redis.call('hset', key, threadId, '1');
redis.call('expire', key, releaseTime);
return 1;
end;
-- 当前线程已id存在
if(redis.call('hexists', key, threadId) == 1) then
redis.call('hincrby', key, threadId, '1');
redis.call('expire', key, releaseTime);
return 1;
end;
return 0;
local key = KEYS[1];
local threadId = ARGV[1];
-- lockname、threadId不存在
if (redis.call('hexists', key, threadId) == 0) then
return nil;
end;
-- 计数器-1
local count = redis.call('hincrby', key, threadId, -1);
-- 删除lock
if (count == 0) then
redis.call('del', key);
return nil;
end;
/**
* @description 原生redis实现分布式锁
* @date 2021/2/6 10:51 下午
**/
@Getter
@Setter
public class RedisLock {
private RedisTemplate redisTemplate;
private DefaultRedisScript<Long> lockScript;
private DefaultRedisScript<Object> unlockScript;
public RedisLock(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// 加载加锁的脚本
lockScript = new DefaultRedisScript<>();
this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));
this.lockScript.setResultType(Long.class);
// 加载释放锁的脚本
unlockScript = new DefaultRedisScript<>();
this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
}
/**
* 获取锁
*/
public String tryLock(String lockName, long releaseTime) {
// 存入的线程信息的前缀
String key = UUID.randomUUID().toString();
// 执行脚本
Long result = (Long) redisTemplate.execute(
lockScript,
Collections.singletonList(lockName),
key + Thread.currentThread().getId(),
releaseTime);
if (result != null && result.intValue() == 1) {
return key;
} else {
return null;
}
}
/**
* 解锁
* @param lockName
* @param key
*/
public void unlock(String lockName, String key) {
redisTemplate.execute(unlockScript,
Collections.singletonList(lockName),
key + Thread.currentThread().getId()
);
}
}
Redisson就有这把你要的锁。
三、Redisson分布式锁
1.依赖
<!-- 原生,本章使用-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
<!-- 另一种Spring集成starter,本章未使用 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>2.配置
@Configuration
public class RedissionConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.password}")
private String password;
private int port = 6379;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().
setAddress("redis://" + redisHost + ":" + port).
setPassword(password);
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}
3.启用分布式锁
@Resource
private RedissonClient redissonClient;
RLock rLock = redissonClient.getLock(lockName);
try {
boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
if (isLocked) {
// TODO
}
} catch (Exception e) {
rLock.unlock();
}
四、RLock
从RLock进入,找到RedissonLock类,找到tryLock方法再递进到干事的tryAcquireOnceAsync方法,这是加锁的主要代码(版本不一此处实现有差别,和最新3.15.x有一定出入,但是核心逻辑依然未变。此处以3.13.6为例)
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then
-- 新增该锁并且hash中该线程id对应的count置1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 设置过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 线程重入次数++
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
// keyName
KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId组合的唯一值
ARGV[2] = this.getLockName(threadId)
判断该锁是否已经有对应hash表存在,
• 没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime
• 存在对应的hash表:则将该lockName的value执行+1操作,也就是计算进入次数,再设置失效时间leaseTime
• 最后返回这把锁的ttl剩余时间
也和上述自定义锁没有区别
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}
-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 计数器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 过期时间重设
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 删除并发布解锁消息
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
该Lua KEYS有2个 Arrays.asList(getName(), getChannelName())
name 锁名称
channelName,用于pubSub发布消息的channel名称
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0
internalLockLeaseTime,watchDog配置的超时时间,默认为30s
lockName 这里的lockName指的是uuid和threadId组合的唯一值
1.如果该锁不存在则返回nil;
2.如果该锁存在则将其线程的hash key计数器-1,
3.计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
// 订阅
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
// 省略
1.解锁消息
protected void onMessage(RedissonLockEntry value, Long message) {
Runnable runnableToExecute;
if (message.equals(unlockMessage)) {
// 从监听器队列取监听线程执行监听回调
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// getLatch()返回的是Semaphore,信号量,此处是释放信号量
// 释放信号量后会唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁
value.getLatch().release();
} else if (message.equals(readUnlockMessage)) {
while(true) {
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute == null) {
value.getLatch().release(value.getLatch().getQueueLength());
break;
}
runnableToExecute.run();
}
}
}
这时再回来看tryAcquireOnceAsync另一分支
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
})
在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 call()或 run()执行完毕意味着业务逻辑的完结,在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。
2.锁续约
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
• 添加一个netty的Timeout回调任务,每(internalLockLeaseTime / 3)毫秒执行一次,执行的方法是renewExpirationAsync
• renewExpirationAsync重置了锁超时时间,又注册一个监听器,监听回调又执行了renewExpiration
renewExpirationAsync 的Lua如下
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;3.流程概括
A、B线程争抢一把锁,A获取到后,B阻塞 B线程阻塞时并非主动CAS,而是PubSub方式订阅该锁的广播消息 A操作完成释放了锁,B线程收到订阅消息通知 B被唤醒开始继续抢锁,拿到锁
五、公平锁
以上介绍的可重入锁是非公平锁,Redisson还基于Redis的队列(List)和ZSet实现了公平锁
公平就是按照客户端的请求先来后到排队来获取锁,先到先得,也就是FIFO,所以队列和容器顺序编排必不可少
回顾JUC的ReentrantLock公平锁的实现
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
AQS已经提供了整个实现,是否公平取决于实现类取出节点逻辑是否顺序取
AQS维护一个同步队列,获取状态失败的线程都会加入到队列中进行自旋,移出队列或停止自旋的条件是前驱节点为头节点切成功获取了同步状态。
NonfairSync可以发现,控制公平和非公平的关键代码,在于hasQueuedPredecessors方法。static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
因为当一个线程请求锁时,只要获取来同步状态即成功获取。在此前提下,刚释放的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。但这样带来的好处是,非公平锁大大减少了系统线程上下文的切换开销。
RLock fairLock = redissonClient.getFairLock(lockName); fairLock.lock();
这里有2段冗长的Lua,但是Debug发现,公平锁的入口在 command == RedisCommands.EVAL_LONG 之后,此段Lua较长,参数也多,我们着重分析Lua的实现规则
-- lua中的几个参数
KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
KEYS[1]: lock_name, 锁名称
KEYS[2]: "redisson_lock_queue:{xxx}" 线程队列
KEYS[3]: "redisson_lock_timeout:{xxx}" 线程id对应的超时集合
ARGV = internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime
ARGV[1]: "{leaseTime}" 过期时间
ARGV[2]: "{Redisson.UUID}:{threadId}"
ARGV[3] = 当前时间 + 线程等待时间:(10:00:00) + 5000毫秒 = 10:00:05
ARGV[4] = 当前时间(10:00:00) 部署服务器时间,非redis-server服务器时间
-- 1.死循环清除过期key
while true do
-- 获取头节点
local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
-- 首次获取必空跳出循环
if firstThreadId2 == false then
break;
end;
-- 清除过期key
local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
if timeout <= tonumber(ARGV[4]) then
redis.call('zrem', KEYS[3], firstThreadId2);
redis.call('lpop', KEYS[2]);
else
break;
end;
end;
-- 2.不存在该锁 && (不存在线程等待队列 || 存在线程等待队列而且第一个节点就是此线程ID),加锁部分主要逻辑
if (redis.call('exists', KEYS[1]) == 0) and
((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then
-- 弹出队列中线程id元素,删除Zset中该线程id对应的元素
redis.call('lpop', KEYS[2]);
redis.call('zrem', KEYS[3], ARGV[2]);
local keys = redis.call('zrange', KEYS[3], 0, -1);
-- 遍历zSet所有key,将key的超时时间(score) - 当前时间ms
for i = 1, #keys, 1 do
redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
end;
-- 加锁设置锁过期时间
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 3.线程存在,重入判断
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
redis.call('hincrby', KEYS[1], ARGV[2],1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 4.返回当前线程剩余存活时间
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
if timeout ~= false then
-- 过期时间timeout的值在下方设置,此处的减法算出的依旧是当前线程的ttl
return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
end;
-- 5.尾节点剩余存活时间
local lastThreadId = redis.call('lindex', KEYS[2], -1);
local ttl;
-- 尾节点不空 && 尾节点非当前线程
if lastThreadId ~= false and lastThreadId ~= ARGV[2] then
-- 计算队尾节点剩余存活时间
ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
else
-- 获取lock_name剩余存活时间
ttl = redis.call('pttl', KEYS[1]);
end;
-- 6.末尾排队
-- zSet 超时时间(score),尾节点ttl + 当前时间 + 5000ms + 当前时间,无则新增,有则更新
-- 线程id放入队列尾部排队,无则插入,有则不再插入
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
redis.call('rpush', KEYS[2], ARGV[2]);
end;
return ttl;
1.公平锁加锁步骤
1. 队列清理
保证队列中只有未过期的等待线程
2. 首次加锁
hset加锁,pexpire过期时间
3. 重入判断
此处同可重入锁lua
4. 返回ttl
5. 计算尾节点ttl
5. 初始值为锁的剩余过期时间
6. 末尾排队
ttl + 2 * currentTime + waitTime是score的默认值计算公式
2.模拟
如果模拟以下顺序,就会明了redisson公平锁整个加锁流程
假设 t1 10:00:00 < t2 10:00:10 < t3 10:00:20
t1:当线程1初次获取锁
1.等待队列无头节点,跳出死循环->2
2.不存在该锁 && 不存在线程等待队列 成立
2.1 lpop和zerm、zincrby都是无效操作,只有加锁生效,说明是首次加锁,加锁后返回nil
加锁成功,线程1获取到锁,结束
t2:线程2尝试获取锁(线程1未释放锁)
1.等待队列无头节点,跳出死循环->2
2.不存在该锁 不成立->3
3.非重入线程 ->4
4.score无值 ->5
5.尾节点为空,设置ttl初始值为lock_name的ttl -> 6
6.按照ttl + waitTime + currentTime + currentTime 来设置zSet超时时间score,并且加入等待队列,线程2为头节点
score = 20S + 5000ms + 10:00:10 + 10:00:10 = 10:00:35 + 10:00:10
t3:线程3尝试获取锁(线程1未释放锁)
1.等待队列有头节点
1.1未过期->2
2.不存在该锁不成立->3
3.非重入线程->4
4.score无值 ->5
5.尾节点不为空 && 尾节点线程为2,非当前线程
5.1取出之前设置的score,减去当前时间:ttl = score - currentTime ->6
6.按照ttl + waitTime + currentTime + currentTime 来设置zSet超时时间score,并且加入等待队列
score = 10S + 5000ms + 10:00:20 + 10:00:20 = 10:00:35 + 10:00:20
如此一来,三个需要抢夺一把锁的线程,完成了一次排队,在list中排列他们等待线程id,在zSet中存放过期时间(便于排列优先级)。其中返回ttl的线程2客户端、线程3客户端将会一直按一定间隔自旋重复执行该段Lua,尝试加锁,如此一来便和AQS有了异曲同工之处。
而当线程1释放锁之后(这里依旧有通过Pub/Sub发布解锁消息,通知其他线程获取)
10:00:30 线程2尝试获取锁(线程1已释放锁)
1.等待队列有头节点,未过期->2 2.不存在该锁 & 等待队列头节点是当前线程 成立 2.1删除当前线程的队列信息和zSet信息,超时时间为: 线程2 10:00:35 + 10:00:10 - 10:00:30 = 10:00:15 线程3 10:00:35 + 10:00:20 - 10:00:30 = 10:00:25 2.2线程2获取到锁,重新设置过期时间 加锁成功,线程2获取到锁,结束
六、总结
所以,当你真的需要分布式锁时,不妨先来Redisson里找找。
来源:https://juejin.cn/post/6961380552519712798
PS:防止找不到本篇文章,可以收藏点赞,方便翻阅查找哦。
从阿里跳槽来的工程师,写个Controller都这么优雅!
公司40k招的架构师写的API网关选型总结,就是牛逼!
Spring Boot + Netty + WebSocket 实现消息推送
为什么建议大家使用 Linux 开发?