Rate Limiting in Fortress

import java.time.Instant;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RateLimiting {
    public static void main(String[] args) throws InterruptedException {
        // First we'll look at basic rate limiting. Suppose
        // we want to limit our handling of incoming requests.
        // We'll serve these requests off a queue of the same name.
        BlockingQueue<Integer> requests = new LinkedBlockingQueue<>(5);
        for (int i = 1; i <= 5; i++) {
            requests.offer(i);
        }

        // This limiter will receive a value every 200 milliseconds.
        // This is the regulator in our rate limiting scheme.
        ScheduledExecutorService limiter = new ScheduledThreadPoolExecutor(1);

        // By blocking on a take from the limiter before serving each request,
        // we limit ourselves to 1 request every 200 milliseconds.
        limiter.scheduleAtFixedRate(() -> {
            try {
                Integer req = requests.take();
                System.out.println("request " + req + " " + Instant.now());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 0, 200, TimeUnit.MILLISECONDS);

        // We may want to allow short bursts of requests in
        // our rate limiting scheme while preserving the
        // overall rate limit. We can accomplish this by
        // using a semaphore. This burstyLimiter
        // will allow bursts of up to 3 events.
        BlockingQueue<Instant> burstyLimiter = new LinkedBlockingQueue<>(3);

        // Fill up the queue to represent allowed bursting.
        for (int i = 0; i < 3; i++) {
            burstyLimiter.offer(Instant.now());
        }

        // Every 200 milliseconds we'll try to add a new
        // value to burstyLimiter, up to its limit of 3.
        ScheduledExecutorService burstyLimiterFiller = new ScheduledThreadPoolExecutor(1);
        burstyLimiterFiller.scheduleAtFixedRate(() -> {
            burstyLimiter.offer(Instant.now());
        }, 200, 200, TimeUnit.MILLISECONDS);

        // Now simulate 5 more incoming requests. The first
        // 3 of these will benefit from the burst capability
        // of burstyLimiter.
        BlockingQueue<Integer> burstyRequests = new LinkedBlockingQueue<>(5);
        for (int i = 1; i <= 5; i++) {
            burstyRequests.offer(i);
        }

        for (int i = 0; i < 5; i++) {
            burstyLimiter.take();
            Integer req = burstyRequests.take();
            System.out.println("request " + req + " " + Instant.now());
        }

        // Shutdown the executors
        limiter.shutdown();
        burstyLimiterFiller.shutdown();
    }
}

Running our program we see the first batch of requests handled once every ~200 milliseconds as desired.

request 1 2023-06-01T12:00:00.000Z
request 2 2023-06-01T12:00:00.200Z
request 3 2023-06-01T12:00:00.400Z
request 4 2023-06-01T12:00:00.600Z
request 5 2023-06-01T12:00:00.800Z

For the second batch of requests we serve the first 3 immediately because of the burstable rate limiting, then serve the remaining 2 with ~200ms delays each.

request 1 2023-06-01T12:00:01.000Z
request 2 2023-06-01T12:00:01.000Z
request 3 2023-06-01T12:00:01.000Z
request 4 2023-06-01T12:00:01.200Z
request 5 2023-06-01T12:00:01.400Z

This Java implementation uses ScheduledExecutorService for periodic tasks and BlockingQueue for rate limiting. The burstyLimiter is implemented using a BlockingQueue with a capacity of 3, allowing for bursts of up to 3 requests. The overall structure and behavior of the program remain similar to the original example, demonstrating basic and bursty rate limiting in Java.

查看推荐产品