限流算法

引言:限流,对流量加以控制

限流算法

服务总有一个承载上限,如果不加以控制,很有可能导致服务过大,打爆服务。

本节介绍四种经典的限流算法(图片来自于博客

固定窗口

固定窗口限流:固定时间处理阈值以内的流量,如果超过该时间的请求就丢弃

固定窗口限流

优缺点:

  • 优点:实现简单
  • 缺点:存在明显的临界问题,假设当前每秒限流10qps,第8s的最后100ms有8qps,第9s的前100ms有5qps,那么在这不到200ms的时间,流量有8+5=13qps,我们的限流失败了。

具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class FixedWindows implements RateLimiter {

public AtomicInteger counter = new AtomicInteger(0); //统计请求数
public long lastAcquireTime = System.currentTimeMillis();
public final Long windowUnit = 1000L ; // 假设固定时间窗口是1000ms
public final Integer threshold = 10; // 窗口阀值是10

@Override
public synchronized boolean allowReq() {
long currentTime = System.currentTimeMillis(); //获取系统当前时间
if (currentTime - lastAcquireTime > windowUnit) {
// 如果进入了新窗口期,清空计数器,并且开启新窗口
counter.set(0);
lastAcquireTime = currentTime;
}
if (counter.get() < threshold) {
// 如果计数还在阈值,计数器+1
counter.incrementAndGet();
return true;
}
return false;
}
}

滑动窗口

为了解决固定窗口的临界问题,引入了滑动窗口:

滑动窗口限流

优缺点:

  • 优点:简单,解决了临界问题
  • 缺点:突发流量无法处理(意思是:达到阈值,后续请求都会被拒绝,而不是排队等待处理)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class SlideWindows implements RateLimiter {
// 单位时间划分的小周期(单位时间是1分钟,10s一个小格子窗口,一共6个格子)
private int SUB_CYCLE = 10;

// 每分钟限流请求数
private final int threshold = 10;

// 计数器, key为当前窗口的开始时间值秒,value为当前窗口的计数
private final Map<Long, Integer> counters = new TreeMap<>();

@Override
public synchronized boolean allowReq() {
//获取当前时间在哪个小周期窗口
long epochSecond = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
long currentWindowTime = epochSecond / SUB_CYCLE * SUB_CYCLE;
// 这里先除再乘的意思:比如 100、102、109都是属于100这个段的,/10去掉个位数

//当前窗口总请求数
int currentWindowNum = countCurrentWindow(currentWindowTime);

//超过阀值限流
if (currentWindowNum >= threshold) {
return false;
}

counters.put(currentWindowTime, counters.getOrDefault(currentWindowTime, 0) + 1);
return true;
}

// 统计当前窗口的请求数
private int countCurrentWindow(long currentWindowTime) {
// 计算窗口开始位置
long startTime = currentWindowTime - SUB_CYCLE * (60 / SUB_CYCLE);
int count = 0;

// 遍历存储的计数器
Iterator<Map.Entry<Long, Integer>> iterator = counters.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, Integer> entry = iterator.next();
// 删除无效过期的子窗口计数器
if (entry.getKey() < startTime) {
iterator.remove();
} else {
//累加当前窗口的所有计数器之和
count += entry.getValue();
}
}
return count;
}
}

注意此处的计算窗口的开始位置:

1
2
// 计算窗口开始位置
long startTime = currentWindowTime - SUB_CYCLE * (60 / SUB_CYCLE);

(60 / SUB_CYCLE) 计算了一个分钟内有多少个小周期,我们设置的是10,因此一分钟有6个小周期

假设当前时间为 15:23:35,我们要计算的是在当前时间之前的窗口的起始时间戳。那么 currentWindowTime 就是当前时间的整十秒:currentWindowTime = 15:23:30

startTime 计算为:

1
2
3
4
5
startTime = currentWindowTime - SUB_CYCLE * (60 / SUB_CYCLE)
= 15:23:30 - 10 * (60 / 10)
= 15:23:30 - 10 * 6
= 15:23:30 - 60
= 15:22:30

漏桶算法

所谓漏桶算法如图所示,有一定的处理量,而且有一定的储存量,但是如果超出储存,请求还是会被拒绝:

img

优点:可以应对突增流量(输入可以是随机速率(就像是现实中的流量),输出是恒定速率(方便服务匀速处理请求))

缺点:

  • 桶存储请求,增大服务器消耗
  • 桶的参数调整不能随时变化
  • 不能遇强则强:无论当前流量如何,都会按固定速率处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class LeakyBucket implements RateLimiter {

private final int capacity = 10; // 桶的容量
private final long interval = 1000; // 漏水速率时间间隔(毫秒)
private int water; // 桶中的水量
private long lastLeakTime; // 上次漏水时间


@Override
public synchronized boolean allowReq() {
long currentTime = System.currentTimeMillis();
// 计算当前时间和上次漏水时间之间的时间差,即桶中应该漏出的水量
int leakAmount = (int) ((currentTime - lastLeakTime) / interval);
lastLeakTime = currentTime;
// 桶中的水量减去漏出的水量
water = Math.max(0, water - leakAmount);

// 如果桶中的水量加上新请求的水量不超过容量,则允许请求通过
if (water < capacity) {
water++;
return true;
}
// 否则拒绝请求
return false;
}
}

令牌桶算法

该算法的模型是:

现在有一个存放了很多令牌的桶(所谓令牌就是,拿到令牌的请求可以执行),每秒会向桶中存放令牌,一个请求消耗一个令牌。

img

相较于漏桶算法,令牌桶算法主要的区别是:允许流量短时间内突增

但是这个特点也带来的一个问题:短时间内可能用完令牌,导致一段时间内的其他请求都无法响应。

因此如果我们的应用场景就是要求削峰填谷,那么使用漏桶反而是更好的选择。

如果是为了应对短时间内的流量突增,那么可以考虑使用令牌桶。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TokenBucket implements RateLimiter {
private final int capacity = 10; // 桶的容量
private final long interval = 100; // 令牌生成速率时间间隔(毫秒)
private final BlockingQueue<Object> tokenBucket = new ArrayBlockingQueue<>(capacity); // 令牌桶队列

public TokenBucket() {
// 启动定时任务,定时向令牌桶中添加令牌
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
if (tokenBucket.size() < capacity) {
tokenBucket.offer(new Object());
}
}, 0, interval, TimeUnit.MILLISECONDS);
}

@Override
public boolean allowReq() {
// 从令牌桶中尝试获取令牌
return tokenBucket.poll() != null;
}
}