对不起,网上找的Redis分布式锁都有漏洞!
基于 Redis 的分布式锁对大家来说并不陌生,可是你的分布式锁有失败的时候吗?在失败的时候可曾怀疑过你在用的分布式锁真的靠谱吗?以下是结合自己的踩坑经验总结的一些经验之谈。
图片来自 Pexels
你真的需要分布式锁吗?
用到分布式锁说明遇到了多个进程共同访问同一个资源的问题。
一般是在两个场景下会防止对同一个资源的重复访问:
提高效率。比如多个节点计算同一批任务,如果某个任务已经有节点在计算了,那其他节点就不用重复计算了,以免浪费计算资源。不过重复计算也没事,不会造成其他更大的损失。也就是允许偶尔的失败。
保证正确性。这种情况对锁的要求就很高了,如果重复计算,会对正确性造成影响。这种不允许失败。
从一个简单的分布式锁实现说起
最简单的实现
加锁和解锁的锁必须是同一个,常见的解决方案是给每个锁一个钥匙(唯一 ID),加锁时生成,解锁时判断。
不能让一个资源永久加锁。常见的解决方案是给一个锁的过期时间。当然了还有其他方案,后面再说。
public static boolean tryLock(String key, String uniqueId, int seconds) {
return "OK".equals(jedis.set(key, uniqueId, "NX", "EX", seconds));
}
这里调用了 SET key value PX milliseoncds NX,不明白这个命令的可以参考 SET key value [EX seconds|PX milliseconds] [NX|XX] [KEEPTTL]:
https://redis.io/commands/set
解锁:
public static boolean releaseLock(String key, String uniqueId) {
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) else return 0 end";
return jedis.eval(
luaScript,
Collections.singletonList(key),
Collections.singletonList(uniqueId)
).equals(1L);
}
靠谱吗?
单点问题。上面的实现只要一个 Master 节点就能搞定,这里的单点指的是单 Master,就算是个集群,如果加锁成功后,锁从 Master 复制到 Slave 的时候挂了,也是会出现同一资源被多个 Client 加锁的。
执行时间超过了锁的过期时间。上面写到为了不出现一直上锁的情况,加了一个兜底的过期时间,时间到了锁自动释放,但是,如果在这期间任务并没有做完怎么办?由于 GC 或者网络延迟导致的任务时间变长,很难保证任务一定能在锁的过期时间内完成。
Redlock 算法
①获取当前时间。
②依次获取 N 个节点的锁。每个节点加锁的实现方式同上。这里有个细节,就是每次获取锁的时候的过期时间都不同,需要减去之前获取锁的操作的耗时,
比如传入的锁的过期时间为 500ms,获取第一个节点的锁花了 1ms,那么第一个节点的锁的过期时间就是 499ms;获取第二个节点的锁花了 2ms,那么第二个节点的锁的过期时间就是 497ms。
如果锁的过期时间小于等于 0 了,说明整个获取锁的操作超时了,整个操作失败。
③判断是否获取锁成功。如果 Client 在上述步骤中获取到了(N/2+1)个节点锁,并且每个锁的过期时间都是大于 0 的,则获取锁成功,否则失败。失败时释放锁。
④释放锁。对所有节点发送释放锁的指令,每个节点的实现逻辑和上面的简单实现一样。
为什么要对所有节点操作?因为分布式场景下从一个节点获取锁失败不代表在那个节点上加速失败,可能实际上加锁已经成功了,但是返回时因为网络抖动超时了。
分布式锁的坑
高并发场景下的问题
①获取锁的时间上。如果 Redlock 运用在高并发的场景下,存在 N 个 Master 节点,一个一个去请求,耗时会比较长,从而影响性能。
这个好解决,通过上面描述不难发现,从多个节点获取锁的操作并不是一个同步操作,可以是异步操作,这样可以多个节点同时获取。
即使是并行处理的,还是得预估好获取锁的时间,保证锁的 TTL>获取锁的时间+任务处理时间。
②被加锁的资源太大。加锁的方案本身就是会为了正确性而牺牲并发的,牺牲和资源大小成正比,这个时候可以考虑对资源做拆分。
拆分的方式有如下两种:
①从业务上将锁住的资源拆分成多段,每段分开加锁。比如,我要对一个商户做若干个操作,操作前要锁住这个商户,这时我可以将若干个操作拆成多个独立的步骤分开加锁,提高并发。
②用分桶的思想,将一个资源拆分成多个桶,一个加锁失败立即尝试下一个。比如批量任务处理的场景,要处理 200w 个商户的任务,为了提高处理速度,用多个线程,每个线程取 100 个商户处理,就得给这 100 个商户加锁。
如果不加处理,很难保证同一时刻两个线程加锁的商户没有重叠,这时可以按一个维度。
比如某个标签,对商户进行分桶,然后一个任务处理一个分桶,处理完这个分桶再处理下一个分桶,减少竞争。
节点宕机
假设有 5 个 Redis 的节点:A、B、C、D、E,没有做持久化。
Client1 从 A、B、C 这3 个节点获取锁成功,那么 client1 获取锁成功。
节点 C 挂了。
Client2 从 C、D、E 获取锁成功,client2 也获取锁成功,那么在同一时刻 Client1 和 Client2 同时获取锁,Redlock 被玩坏了。
任务执行时间超过锁的 TTL
虽然大多数程序员都会乐观的认为这种情况不可能发生,我也曾经这么认为,直到被现实一次又一次的打脸。
Client1 获取到锁。
Client1 开始任务,然后发生了 STW 的 GC,时间超过了锁的过期时间。
Client2 获取到锁,开始了任务。
Client1 的 GC 结束,继续任务,这个时候 Client1 和 Client2 都认为自己获取了锁,都会处理任务,从而发生错误。
这样说有些抽象,下面结合 Redisson 源码说下:
public class RedissonLock extends RedissonExpirable implements RLock {
...
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
...
}
接下来分析下不传参的方法的加锁逻辑:
public class RedissonLock extends RedissonExpirable implements RLock {
...
public static final long LOCK_EXPIRATION_INTERVAL_SECONDS = 30;
protected long internalLockLeaseTime = TimeUnit.SECONDS.toMillis(LOCK_EXPIRATION_INTERVAL_SECONDS);
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
...
}
他的方案是让加锁的资源自己维护一套保证不会因加锁失败而导致多个 Client 在同一时刻访问同一个资源的情况。
①无法保证事务。示意图中画的只有 34 访问了 Storage,但是在实际场景中,可能出现在一个任务内多次访问 Storage 的情况,而且必须是原子的。
如果 Client1 带着 33 的 Token 在 GC 前访问过一次 Storage,然后发生了 GC。
Client2 获取到锁,带着 34 的 Token 也访问了 Storage,这时两个 Client 写入的数据是否还能保证数据正确?
如果不能,那么这种方案就有缺陷,除非 Storage 自己有其他机制可以保证,比如事务机制;如果能,那么这里的 Token 就是多余的,fencing 的方案就是多此一举。
②高并发场景不实用。因为每次只有最大的 Token 能写,这样 Storage 的访问就是线性的,在高并发场景下,这种方式会极大的限制吞吐量,而分布式锁也大多是在这种场景下用的,很矛盾的设计。
③这是所有分布式锁的问题。这个方案是一个通用的方案,可以和 Redlock 用,也可以和其他的 lock 用。所以我理解仅仅是一个和 Redlock 无关的解决方案。
系统时钟漂移
系统的时钟和 NTP 服务器不同步。这个目前没有特别好的解决方案,只能相信运维同学了。
clock realtime 被人为修改。在实现分布式锁时,不要使用 clock realtime。
不过很可惜,Redis 使用的就是这个时间,我看了下 Redis 5.0 源码,使用的还是 clock realtime。
Antirez 说过改成 clock monotonic 的,不过大佬还没有改。也就是说,人为修改 Redis 服务器的时间,就能让 Redis 出问题了。
总结
作者:陈寒立
简介:一个不误正业的程序员。先后在物流金融组、物流末端业务组和压力平衡组打过杂,技术栈从 Python 玩到了 Java,依然没学会好好写业务代码,梦想着用抽象的模型拯救业务于水火之中。
编辑:陶家龙
出处:饿了么物流技术团队
精彩文章推荐: