漏桶(Leaky Bucket)
令牌桶(Token Bucket)
Guava RateLimiter
SmoothBursty:
兼容突发流量的令牌桶实现,也就是上一节描述的经典令牌桶算法。
SmoothWarmingUp:
带预热过程的令牌桶实现,即在桶较满时产生令牌的速度较慢,随着令牌的消耗慢慢增长到恒定速率。
如下图所示,x轴为令牌数,y轴为延迟,从右向左看的梯形区域就是令牌消耗的预热过程。
storedPermits:
当前桶里有的令牌数量。
maxPermits:
桶装满时的令牌数量,storedPermits不会比它大。
stableIntervalMicros:
产生令牌的频率(时间间隔),单位为微秒。
举个栗子,如果我们想限制系统的QPS为10,亦即每秒有10个令牌放入桶中,那么stableIntervalMicros的值就是100000。
nextFreeTicketMicros:
下一个令牌可用的时间戳,也就是下一个请求能够被处理的时间戳,单位为微秒。
该值会随着当前请求获得令牌而增大(因为时间是自然流动的)。
若当前请求的令牌数超出可用令牌数,这个时间就被推后(需要时间产生新的令牌)。
当然,如果有一段时间没有请求进入的话,它就会保持在上次请求的过去时间戳。
void resync(long nowMicros) {
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
@Override
double coolDownIntervalMicros() {
return stableIntervalMicros;
}
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}
调用上述resync()方法产生令牌。
计算实际能提供的令牌数storedPermitsToSpend,它其实就是本次请求需要的令牌数requiredPermits和桶中有的令牌数storedPermits的较小值。
计算需要新产生的令牌数freshPermits。
当上一步桶中有的令牌不够用时,该值就大于0。
根据freshPermits计算新产生这批令牌需要多长时间,记为waitMicros。
由于SmoothBursty始终以恒定速率产生令牌,只需要将它与令牌产生的速率简单相乘就行。
SmoothWarmingUp需要考虑预热的延时,所以storedPermitsToWaitTime()方法实现要复杂得多。
更新nextFreeTicketMicros和storedPermits的值。
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 );
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
maxPermits = maxBurstSeconds * permitsPerSecond;
@CanIgnoreReturnValuepublic double acquire() {return acquire(1); }@CanIgnoreReturnValuepublic double acquire(int permits) {long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait);return 1.0 * microsToWait / SECONDS.toMicros(1L); }final long reserve(int permits) { checkPermits(permits);synchronized (mutex()) {return reserveAndGetWaitLength(permits, stopwatch.readMicros()); }final long reserveAndGetWaitLength(int permits, long nowMicros) {long momentAvailable = reserveEarliestAvailable(permits, nowMicros);return max(momentAvailable - nowMicros, 0); }
获取令牌的过程是加锁的。
reserveAndGetWaitLength()返回的是获取令牌需要等待的时间,在acquire()方法中会借助Stopwatch阻塞(睡眠)直到获取成功。
Stopwatch是Guava自行实现的一个高精度计时器。
acquire()方法会将上述等待时间返回,但不需要用户再处理了,所以该返回值可以忽略(即@CanIgnoreReturnValue注解的含义)。
public class RateLimiterExample {
public static void main(String[] args) throws Exception {
RateLimiter rateLimiter = RateLimiter.create(10);
Random random = new Random();
for (int i = 0; i < 20; i++) {
int numPermits = random.nextInt(20);
System.out.println(numPermits + "\t" + rateLimiter.acquire(numPermits));
}
}
}
2 0.0
13 0.198792
4 1.294088
6 0.39622
18 0.59516
12 1.797798
14 1.198664
14 1.397321
13 1.39626
16 1.299534
3 1.595453
9 0.299325
4 0.898857
18 0.394973
2 1.795835
13 0.195926
11 1.294851
2 1.09551
3 0.194857
6 0.297884