博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊Guava的RateLimiter
阅读量:6378 次
发布时间:2019-06-23

本文共 11371 字,大约阅读时间需要 37 分钟。

本文主要研究一下Guava的RateLimiter

RateLimiter

guava-26.0-jre-sources.jar!/com/google/common/util/concurrent/RateLimiter.java

@Beta@GwtIncompatiblepublic abstract class RateLimiter {	//...... /**   * Acquires the given number of permits from this {@code RateLimiter}, blocking until the request   * can be granted. Tells the amount of time slept, if any.   *   * @param permits the number of permits to acquire   * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited   * @throws IllegalArgumentException if the requested number of permits is negative or zero   * @since 16.0 (present in 13.0 with {@code void} return type})   */  @CanIgnoreReturnValue  public double acquire(int permits) {    long microsToWait = reserve(permits);    stopwatch.sleepMicrosUninterruptibly(microsToWait);    return 1.0 * microsToWait / SECONDS.toMicros(1L);  }  /**   * Reserves the given number of permits from this {@code RateLimiter} for future use, returning   * the number of microseconds until the reservation can be consumed.   *   * @return time in microseconds to wait until the resource can be acquired, never negative   */  final long reserve(int permits) {    checkPermits(permits);    synchronized (mutex()) {      return reserveAndGetWaitLength(permits, stopwatch.readMicros());    }  }  private static void checkPermits(int permits) {    checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);  }  /**   * Reserves next ticket and returns the wait time that the caller must wait for.   *   * @return the required wait time, never negative   */  final long reserveAndGetWaitLength(int permits, long nowMicros) {    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);    return max(momentAvailable - nowMicros, 0);  }  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {    long timeoutMicros = max(unit.toMicros(timeout), 0);    checkPermits(permits);    long microsToWait;    synchronized (mutex()) {      long nowMicros = stopwatch.readMicros();      if (!canAcquire(nowMicros, timeoutMicros)) {        return false;      } else {        microsToWait = reserveAndGetWaitLength(permits, nowMicros);      }    }    stopwatch.sleepMicrosUninterruptibly(microsToWait);    return true;  }  private boolean canAcquire(long nowMicros, long timeoutMicros) {    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;  }  /**   * Reserves next ticket and returns the wait time that the caller must wait for.   *   * @return the required wait time, never negative   */  final long reserveAndGetWaitLength(int permits, long nowMicros) {    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);    return max(momentAvailable - nowMicros, 0);  }  /**   * Returns the earliest time that permits are available (with one caveat).   *   * @return the time that permits are available, or, if permits are available immediately, an   *     arbitrary past or present time   */  abstract long queryEarliestAvailable(long nowMicros);  /**   * Reserves the requested number of permits and returns the time that those permits can be used   * (with one caveat).   *   * @return the time that the permits may be used, or, if the permits may be used immediately, an   *     arbitrary past or present time   */  abstract long reserveEarliestAvailable(int permits, long nowMicros);  //......}复制代码
  • 这里主要看acquire以及tryAcquire方法
  • acquire主要依赖reserve方法,先调用reserveAndGetWaitLength,最后是调用reserveEarliestAvailable方法
  • tryAcquire也会调用reserveAndGetWaitLength,最后也是调用reserveEarliestAvailable方法
  • reserveEarliestAvailable是抽象方法,由子类去实现

SmoothRateLimiter

guava-26.0-jre-sources.jar!/com/google/common/util/concurrent/SmoothRateLimiter.java

@GwtIncompatibleabstract class SmoothRateLimiter extends RateLimiter {  //......  @Override  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {    resync(nowMicros);    long returnValue = nextFreeTicketMicros;    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);    double freshPermits = requiredPermits - storedPermitsToSpend;    long waitMicros =        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)            + (long) (freshPermits * stableIntervalMicros);    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);    this.storedPermits -= storedPermitsToSpend;    return returnValue;  }  /** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */  void resync(long nowMicros) {    // if nextFreeTicket is in the past, resync to now    if (nowMicros > nextFreeTicketMicros) {      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();      storedPermits = min(maxPermits, storedPermits + newPermits);      nextFreeTicketMicros = nowMicros;    }  }  /**   * Translates a specified portion of our currently stored permits which we want to spend/acquire,   * into a throttling time. Conceptually, this evaluates the integral of the underlying function we   * use, for the range of [(storedPermits - permitsToTake), storedPermits].   *   * 

This always holds: {@code 0 <= permitsToTake <= storedPermits} */ abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake); /** * Returns the number of microseconds during cool down that we have to wait to get a new permit. */ abstract double coolDownIntervalMicros(); //......}复制代码

  • SmoothRateLimiter是RateLimiter的抽象子类,是平滑限流实现类的抽象父类
  • 这里首先调用resync方法(用于处理根据速率添加token的逻辑),然后再去计算permits扣减以及等待时间的计算
  • 这里调用了两个抽象方法,分别是coolDownIntervalMicros以及storedPermitsToWaitTime

SmoothRateLimiter的两个子类

SmoothRateLimiter有两个内部静态子类,分别是SmoothBursty以及SmoothWarmingUp

SmoothBursty

/**   * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.   * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in   * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10   * seconds, we can save up to 2 * 10 = 20 permits.   */  static final class SmoothBursty extends SmoothRateLimiter {    /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */    final double maxBurstSeconds;    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {      super(stopwatch);      this.maxBurstSeconds = maxBurstSeconds;    }    @Override    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {      double oldMaxPermits = this.maxPermits;      maxPermits = maxBurstSeconds * permitsPerSecond;      if (oldMaxPermits == Double.POSITIVE_INFINITY) {        // if we don't special-case this, we would get storedPermits == NaN, below        storedPermits = maxPermits;      } else {        storedPermits =            (oldMaxPermits == 0.0)                ? 0.0 // initial state                : storedPermits * maxPermits / oldMaxPermits;      }    }    @Override    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {      return 0L;    }    @Override    double coolDownIntervalMicros() {      return stableIntervalMicros;    }  }复制代码
  • SmoothBursty是一个zero throttling的"bursty" RateLimiter
  • coolDownIntervalMicros返回的是stableIntervalMicros,而storedPermitsToWaitTime返回的为0

SmoothWarmingUp

static final class SmoothWarmingUp extends SmoothRateLimiter {    private final long warmupPeriodMicros;    /**     * The slope of the line from the stable interval (when permits == 0), to the cold interval     * (when permits == maxPermits)     */    private double slope;    private double thresholdPermits;    private double coldFactor;    SmoothWarmingUp(        SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {      super(stopwatch);      this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);      this.coldFactor = coldFactor;    }    @Override    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {      double oldMaxPermits = maxPermits;      double coldIntervalMicros = stableIntervalMicros * coldFactor;      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;      maxPermits =          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);      slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);      if (oldMaxPermits == Double.POSITIVE_INFINITY) {        // if we don't special-case this, we would get storedPermits == NaN, below        storedPermits = 0.0;      } else {        storedPermits =            (oldMaxPermits == 0.0)                ? maxPermits // initial state is cold                : storedPermits * maxPermits / oldMaxPermits;      }    }    @Override    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;      long micros = 0;      // measuring the integral on the right part of the function (the climbing line)      if (availablePermitsAboveThreshold > 0.0) {        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);        // TODO(cpovirk): Figure out a good name for this variable.        double length =            permitsToTime(availablePermitsAboveThreshold)                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);        micros = (long) (permitsAboveThresholdToTake * length / 2.0);        permitsToTake -= permitsAboveThresholdToTake;      }      // measuring the integral on the left part of the function (the horizontal line)      micros += (long) (stableIntervalMicros * permitsToTake);      return micros;    }    private double permitsToTime(double permits) {      return stableIntervalMicros + permits * slope;    }    @Override    double coolDownIntervalMicros() {      return warmupPeriodMicros / maxPermits;    }  }复制代码
  • coolDownIntervalMicros返回的是warmupPeriodMicros / maxPermits,而storedPermitsToWaitTime的计算相对复杂一些
  • SmoothBursty是基于token bucket算法,允许一定量的bursty流量,但是有些场景需要bursty流量更平滑些,这就需要使用SmoothWarmingUp
  • SmoothWarmingUp有一个warmup period,为thresholdPermits到maxPermits的这段范围
* 
   *          ^ throttling   *          |   *    cold  +                  /   * interval |                 /.   *          |                / .   *          |               /  .   ← "warmup period" is the area of the trapezoid between   *          |              /   .     thresholdPermits and maxPermits   *          |             /    .   *          |            /     .   *          |           /      .   *   stable +----------/  WARM .   * interval |          .   UP  .   *          |          . PERIOD.   *          |          .       .   *        0 +----------+-------+--------------→ storedPermits   *          0 thresholdPermits maxPermits   * 
复制代码

主要涉及如下几个公式

coldInterval = coldFactor * stableInterval.thresholdPermits = 0.5 * warmupPeriod / stableIntervalmaxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval)复制代码
  • coldFactor默认是3
  • stableInterval代码以毫秒计算,即stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond

小结

  • Guava的RateLimiter(SmoothRateLimiter)基于token bucket算法实现,具体有两个实现类,分别是SmoothBursty以及SmoothWarmingUp
  • SmoothBursty初始化的storedPermits为0,可以支持burst到maxPermits
  • SmoothWarmingUp初始化的storedPermits为maxPermits(thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros)),也支持burst,但是总体相对平滑

doc

转载地址:http://zkxqa.baihongyu.com/

你可能感兴趣的文章
bzoj2002
查看>>
《转》Pragma: no-cache 对性能的影响
查看>>
基于 OSGi 的面向服务的组件编程,helloworld
查看>>
StoryBoard - Segue 简单笔记
查看>>
容器ConcurrentHashMap原理(学习)
查看>>
pl/sql程序块
查看>>
L122
查看>>
iOS中如何使cocoapods管理第三方库
查看>>
<%%>,<%:%> <%=%>的区别及使用(转载)
查看>>
2017四川省赛E题( Longest Increasing Subsequence)
查看>>
创建对象为什么要 init?
查看>>
个人收集:(转)display:inline-block
查看>>
nodejs+MQTT协议实现远程主机控制
查看>>
jQuery prop方法
查看>>
linux使用yum的方式安装mysql实践
查看>>
A1006. Sign In and Sign Out(25)
查看>>
w3c事件流 & ie事件流
查看>>
artDialog基本使用
查看>>
hdu - Problem 1175 连连看 【bfs】
查看>>
git 忽略文件
查看>>