常见限流算法

固定窗口

临界突发问题:在窗口的边界,可能有大量流量

image-20241009180923079

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class FixedWindowRateLimiter {

// 每个窗口允许的最大请求数
private final int maxRequests;
// 窗口大小,单位为毫秒
private final long windowSizeInMillis;
// 记录当前窗口开始的时间
private long windowStart;
// 当前窗口内的请求计数
private final AtomicInteger requestCount;

public FixedWindowRateLimiter(int maxRequests, long windowSizeInMillis) {
this.maxRequests = maxRequests;
this.windowSizeInMillis = windowSizeInMillis;
this.windowStart = System.currentTimeMillis();
this.requestCount = new AtomicInteger(0);
}

public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();

// 判断当前时间是否已经进入新的窗口
if (currentTime - windowStart >= windowSizeInMillis) {
// 重置窗口
windowStart = currentTime;
requestCount.set(0);
}

// 判断是否可以在当前窗口内处理请求
if (requestCount.incrementAndGet() <= maxRequests) {
return true; // 允许请求
} else {
return false; // 拒绝请求
}
}
}

滑动窗口

简单实现:内存占用可能较大,每次请求都需要清理过期数据,可能影响性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.LinkedList;
import java.util.Queue;

public class SlidingWindowRateLimiter {
private final long windowSizeInMillis;
private final int maxRequests;
private final Queue<Long> requestTimestamps;

public SlidingWindowRateLimiter(int windowSizeInSeconds, int maxRequests) {
this.windowSizeInMillis = windowSizeInSeconds * 1000L;
this.maxRequests = maxRequests;
this.requestTimestamps = new LinkedList<>();
}

public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();

// 移除窗口外的请求时间戳
while (!requestTimestamps.isEmpty() && currentTime - requestTimestamps.peek() >= windowSizeInMillis) {
requestTimestamps.poll();
}

// 检查是否超过限制
if (requestTimestamps.size() < maxRequests) {
requestTimestamps.offer(currentTime);
return true;
}

return false;
}

public static void main(String[] args) throws InterruptedException {
SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(1, 3); // 1秒内最多3个请求

for (int i = 0; i < 5; i++) {
System.out.println("Request " + (i + 1) + " allowed: " + rateLimiter.allowRequest());
Thread.sleep(300); // 每次请求间隔300毫秒
}
}
}

平滑滑动窗口(划分多个小窗口)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class SmoothSlidingWindowRateLimiter {
private final long windowSizeInMillis;
private final int maxRequests;
private final int subWindowCount;
private final AtomicInteger[] subWindows;
private final ReentrantLock lock = new ReentrantLock();
private long currentWindowStart;

public SmoothSlidingWindowRateLimiter(int windowSizeInSeconds, int maxRequests, int subWindowCount) {
this.windowSizeInMillis = windowSizeInSeconds * 1000L;
this.maxRequests = maxRequests;
this.subWindowCount = subWindowCount;
this.subWindows = new AtomicInteger[subWindowCount];
for (int i = 0; i < subWindowCount; i++) {
subWindows[i] = new AtomicInteger(0);
}
this.currentWindowStart = System.currentTimeMillis();
}

public boolean allowRequest() {
long now = System.currentTimeMillis();
lock.lock();
try {
if (now - currentWindowStart > windowSizeInMillis) {
slideWindow(now);
}

long totalRequests = getTotalRequests();
if (totalRequests < maxRequests) {
int index = (int) (((now - currentWindowStart) * subWindowCount) / windowSizeInMillis);
subWindows[index].incrementAndGet();
return true;
}
return false;
} finally {
lock.unlock();
}
}

private void slideWindow(long now) {
long newWindowStart = now - windowSizeInMillis;
long windowDiff = newWindowStart - currentWindowStart;
int resetCount = (int) ((windowDiff * subWindowCount) / windowSizeInMillis);

if (resetCount >= subWindowCount) {
for (AtomicInteger subWindow : subWindows) {
subWindow.set(0);
}
} else {
for (int i = 0; i < resetCount; i++) {
int index = (int) (((currentWindowStart - newWindowStart) * subWindowCount) / windowSizeInMillis + i) % subWindowCount;
subWindows[index].set(0);
}
}
currentWindowStart = newWindowStart;
}

private long getTotalRequests() {
long total = 0;
for (AtomicInteger subWindow : subWindows) {
total += subWindow.get();
}
return total;
}

public static void main(String[] args) throws InterruptedException {
SmoothSlidingWindowRateLimiter rateLimiter = new SmoothSlidingWindowRateLimiter(1, 10, 10); // 1秒内最多10个请求,分成10个小窗口

for (int i = 0; i < 15; i++) {
System.out.println("Request " + (i + 1) + " allowed: " + rateLimiter.allowRequest());
Thread.sleep(100); // 每次请求间隔100毫秒
}
}
}

也常用于热点key的检测

漏桶算法

固定速率处理请求,处理任意流量更加平滑,可以实现流量整形;

image-20241009182620292

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class LeakyBucket {
private int capacity; //漏桶容量
private int rate; //漏水速率
private int water; //当前水量
private Instant timestamp; //上次漏水时间

public LeakyBucket(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.water = 0;
this.timestamp = Instant.now();
}

public synchronized boolean allow() { //判断是否允许通过
Instant now = Instant.now();
long duration = now.toEpochMilli() - timestamp.toEpochMilli(); //计算距上次漏水过去了多久
int outflow = (int) (duration * rate / 1000); //计算过去的时间内漏出的水量
water = Math.max(0, water - outflow); //更新当前水量,不能小于0
if (water < capacity) { //如果漏桶还没满,放行
water++;
timestamp = now;
return true;
}
return false; //否则拒绝通过
}
}

// 感觉这个代码只做到了 对超过桶容量的请求拒绝对接受的请求,好像并没有按照漏桶的速度进行处理,而是小于capacity就立马处理了?

令牌桶算法

通过控制桶中的令牌实现限流,可以处理一定的突发流量,比如处理一次洪峰。

image-20241009182632661

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TokenBucket {

private long lastTime; // 上次请求时间
private double rate; // 令牌放入速率
private long capacity; // 令牌桶容量
private long tokens; // 当前令牌数量

public TokenBucket(double rate, long capacity) {
this.lastTime = System.currentTimeMillis();
this.rate = rate;
this.capacity = capacity;
this.tokens = capacity;
}

public synchronized boolean getToken() {
long now = System.currentTimeMillis();
long timeElapsed = now - lastTime;
tokens += timeElapsed * rate;
if (tokens > capacity) {
tokens = capacity;
}
lastTime = now;
if (tokens >= 1) {
tokens--;
return true;
} else {
return false;
}
}
}

参考