序
本文主要研究一下Guava的RateLimiter
RateLimiter
guava-26.0-jre-sources.jar!/com/google/common/util/concurrent/RateLimiter.java
@Beta@GwtIncompatiblepublic abstract class RateLimiter { //...... /** * Acquires the given number of permits from this {@code RateLimiter}, blocking until the request * can be granted. Tells the amount of time slept, if any. * * @param permits the number of permits to acquire * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited * @throws IllegalArgumentException if the requested number of permits is negative or zero * @since 16.0 (present in 13.0 with {@code void} return type}) */ @CanIgnoreReturnValue public double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } /** * Reserves the given number of permits from this {@code RateLimiter} for future use, returning * the number of microseconds until the reservation can be consumed. * * @return time in microseconds to wait until the resource can be acquired, never negative */ final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } private static void checkPermits(int permits) { checkArgument(permits > 0, "Requested permits (%s) must be positive", permits); } /** * Reserves next ticket and returns the wait time that the caller must wait for. * * @return the required wait time, never negative */ final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; } /** * Reserves next ticket and returns the wait time that the caller must wait for. * * @return the required wait time, never negative */ final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } /** * Returns the earliest time that permits are available (with one caveat). * * @return the time that permits are available, or, if permits are available immediately, an * arbitrary past or present time */ abstract long queryEarliestAvailable(long nowMicros); /** * Reserves the requested number of permits and returns the time that those permits can be used * (with one caveat). * * @return the time that the permits may be used, or, if the permits may be used immediately, an * arbitrary past or present time */ abstract long reserveEarliestAvailable(int permits, long nowMicros); //......}复制代码
- 这里主要看acquire以及tryAcquire方法
- acquire主要依赖reserve方法,先调用reserveAndGetWaitLength,最后是调用reserveEarliestAvailable方法
- tryAcquire也会调用reserveAndGetWaitLength,最后也是调用reserveEarliestAvailable方法
- reserveEarliestAvailable是抽象方法,由子类去实现
SmoothRateLimiter
guava-26.0-jre-sources.jar!/com/google/common/util/concurrent/SmoothRateLimiter.java
@GwtIncompatibleabstract class SmoothRateLimiter extends RateLimiter { //...... @Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this.storedPermits -= storedPermitsToSpend; return returnValue; } /** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */ void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } } /** * Translates a specified portion of our currently stored permits which we want to spend/acquire, * into a throttling time. Conceptually, this evaluates the integral of the underlying function we * use, for the range of [(storedPermits - permitsToTake), storedPermits]. * *This always holds: {@code 0 <= permitsToTake <= storedPermits} */ abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake); /** * Returns the number of microseconds during cool down that we have to wait to get a new permit. */ abstract double coolDownIntervalMicros(); //......}复制代码
- SmoothRateLimiter是RateLimiter的抽象子类,是平滑限流实现类的抽象父类
- 这里首先调用resync方法(
用于处理根据速率添加token的逻辑
),然后再去计算permits扣减以及等待时间的计算 - 这里调用了两个抽象方法,分别是coolDownIntervalMicros以及storedPermitsToWaitTime
SmoothRateLimiter的两个子类
SmoothRateLimiter有两个内部静态子类,分别是SmoothBursty以及SmoothWarmingUp
SmoothBursty
/** * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling. * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10 * seconds, we can save up to 2 * 10 = 20 permits. */ static final class SmoothBursty extends SmoothRateLimiter { /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */ final double maxBurstSeconds; SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { return 0L; } @Override double coolDownIntervalMicros() { return stableIntervalMicros; } }复制代码
- SmoothBursty是一个zero throttling的"bursty" RateLimiter
- coolDownIntervalMicros返回的是stableIntervalMicros,而storedPermitsToWaitTime返回的为0
SmoothWarmingUp
static final class SmoothWarmingUp extends SmoothRateLimiter { private final long warmupPeriodMicros; /** * The slope of the line from the stable interval (when permits == 0), to the cold interval * (when permits == maxPermits) */ private double slope; private double thresholdPermits; private double coldFactor; SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super(stopwatch); this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); this.coldFactor = coldFactor; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; double coldIntervalMicros = stableIntervalMicros * coldFactor; thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = 0.0; } else { storedPermits = (oldMaxPermits == 0.0) ? maxPermits // initial state is cold : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // measuring the integral on the right part of the function (the climbing line) if (availablePermitsAboveThreshold > 0.0) { double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); // TODO(cpovirk): Figure out a good name for this variable. double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); permitsToTake -= permitsAboveThresholdToTake; } // measuring the integral on the left part of the function (the horizontal line) micros += (long) (stableIntervalMicros * permitsToTake); return micros; } private double permitsToTime(double permits) { return stableIntervalMicros + permits * slope; } @Override double coolDownIntervalMicros() { return warmupPeriodMicros / maxPermits; } }复制代码
- coolDownIntervalMicros返回的是warmupPeriodMicros / maxPermits,而storedPermitsToWaitTime的计算相对复杂一些
- SmoothBursty是基于token bucket算法,允许一定量的bursty流量,但是有些场景需要bursty流量更平滑些,这就需要使用SmoothWarmingUp
- SmoothWarmingUp有一个warmup period,为thresholdPermits到maxPermits的这段范围
** ^ throttling * | * cold + / * interval | /. * | / . * | / . ← "warmup period" is the area of the trapezoid between * | / . thresholdPermits and maxPermits * | / . * | / . * | / . * stable +----------/ WARM . * interval | . UP . * | . PERIOD. * | . . * 0 +----------+-------+--------------→ storedPermits * 0 thresholdPermits maxPermits *复制代码
主要涉及如下几个公式
coldInterval = coldFactor * stableInterval.thresholdPermits = 0.5 * warmupPeriod / stableIntervalmaxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval)复制代码
- coldFactor默认是3
- stableInterval代码以毫秒计算,即stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond
小结
- Guava的RateLimiter(
SmoothRateLimiter
)基于token bucket算法实现,具体有两个实现类,分别是SmoothBursty以及SmoothWarmingUp - SmoothBursty初始化的storedPermits为0,可以支持burst到maxPermits
- SmoothWarmingUp初始化的storedPermits为maxPermits(
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros)
),也支持burst,但是总体相对平滑