固定窗口
临界突发问题:在窗口的边界,可能有大量流量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class FixedWindowRateLimiter {
private final int maxRequests; private final long windowSizeInMillis; private long windowStart; private final AtomicInteger requestCount;
public FixedWindowRateLimiter(int maxRequests, long windowSizeInMillis) { this.maxRequests = maxRequests; this.windowSizeInMillis = windowSizeInMillis; this.windowStart = System.currentTimeMillis(); this.requestCount = new AtomicInteger(0); }
public synchronized boolean allowRequest() { long currentTime = System.currentTimeMillis();
if (currentTime - windowStart >= windowSizeInMillis) { windowStart = currentTime; requestCount.set(0); }
if (requestCount.incrementAndGet() <= maxRequests) { return true; } else { return false; } } }
|
滑动窗口
简单实现:内存占用可能较大,每次请求都需要清理过期数据,可能影响性能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import java.util.LinkedList; import java.util.Queue;
public class SlidingWindowRateLimiter { private final long windowSizeInMillis; private final int maxRequests; private final Queue<Long> requestTimestamps;
public SlidingWindowRateLimiter(int windowSizeInSeconds, int maxRequests) { this.windowSizeInMillis = windowSizeInSeconds * 1000L; this.maxRequests = maxRequests; this.requestTimestamps = new LinkedList<>(); }
public synchronized boolean allowRequest() { long currentTime = System.currentTimeMillis(); while (!requestTimestamps.isEmpty() && currentTime - requestTimestamps.peek() >= windowSizeInMillis) { requestTimestamps.poll(); }
if (requestTimestamps.size() < maxRequests) { requestTimestamps.offer(currentTime); return true; }
return false; }
public static void main(String[] args) throws InterruptedException { SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(1, 3);
for (int i = 0; i < 5; i++) { System.out.println("Request " + (i + 1) + " allowed: " + rateLimiter.allowRequest()); Thread.sleep(300); } } }
|
平滑滑动窗口(划分多个小窗口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock;
public class SmoothSlidingWindowRateLimiter { private final long windowSizeInMillis; private final int maxRequests; private final int subWindowCount; private final AtomicInteger[] subWindows; private final ReentrantLock lock = new ReentrantLock(); private long currentWindowStart;
public SmoothSlidingWindowRateLimiter(int windowSizeInSeconds, int maxRequests, int subWindowCount) { this.windowSizeInMillis = windowSizeInSeconds * 1000L; this.maxRequests = maxRequests; this.subWindowCount = subWindowCount; this.subWindows = new AtomicInteger[subWindowCount]; for (int i = 0; i < subWindowCount; i++) { subWindows[i] = new AtomicInteger(0); } this.currentWindowStart = System.currentTimeMillis(); }
public boolean allowRequest() { long now = System.currentTimeMillis(); lock.lock(); try { if (now - currentWindowStart > windowSizeInMillis) { slideWindow(now); } long totalRequests = getTotalRequests(); if (totalRequests < maxRequests) { int index = (int) (((now - currentWindowStart) * subWindowCount) / windowSizeInMillis); subWindows[index].incrementAndGet(); return true; } return false; } finally { lock.unlock(); } }
private void slideWindow(long now) { long newWindowStart = now - windowSizeInMillis; long windowDiff = newWindowStart - currentWindowStart; int resetCount = (int) ((windowDiff * subWindowCount) / windowSizeInMillis); if (resetCount >= subWindowCount) { for (AtomicInteger subWindow : subWindows) { subWindow.set(0); } } else { for (int i = 0; i < resetCount; i++) { int index = (int) (((currentWindowStart - newWindowStart) * subWindowCount) / windowSizeInMillis + i) % subWindowCount; subWindows[index].set(0); } } currentWindowStart = newWindowStart; }
private long getTotalRequests() { long total = 0; for (AtomicInteger subWindow : subWindows) { total += subWindow.get(); } return total; }
public static void main(String[] args) throws InterruptedException { SmoothSlidingWindowRateLimiter rateLimiter = new SmoothSlidingWindowRateLimiter(1, 10, 10);
for (int i = 0; i < 15; i++) { System.out.println("Request " + (i + 1) + " allowed: " + rateLimiter.allowRequest()); Thread.sleep(100); } } }
|
也常用于热点key的检测
漏桶算法
固定速率处理请求,处理任意流量更加平滑,可以实现流量整形;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class LeakyBucket { private int capacity; private int rate; private int water; private Instant timestamp;
public LeakyBucket(int capacity, int rate) { this.capacity = capacity; this.rate = rate; this.water = 0; this.timestamp = Instant.now(); }
public synchronized boolean allow() { Instant now = Instant.now(); long duration = now.toEpochMilli() - timestamp.toEpochMilli(); int outflow = (int) (duration * rate / 1000); water = Math.max(0, water - outflow); if (water < capacity) { water++; timestamp = now; return true; } return false; } }
|
令牌桶算法
通过控制桶中的令牌实现限流,可以处理一定的突发流量,比如处理一次洪峰。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class TokenBucket { private long lastTime; private double rate; private long capacity; private long tokens; public TokenBucket(double rate, long capacity) { this.lastTime = System.currentTimeMillis(); this.rate = rate; this.capacity = capacity; this.tokens = capacity; } public synchronized boolean getToken() { long now = System.currentTimeMillis(); long timeElapsed = now - lastTime; tokens += timeElapsed * rate; if (tokens > capacity) { tokens = capacity; } lastTime = now; if (tokens >= 1) { tokens--; return true; } else { return false; } } }
|
参考