Rate Limiting in Kotlin

Rate limiting is an important mechanism for controlling resource utilization and maintaining quality of service. Kotlin can implement rate limiting using coroutines, channels, and timers.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import java.time.Instant
import kotlin.time.Duration.Companion.milliseconds

suspend fun main() = coroutineScope {
    // First we'll look at basic rate limiting. Suppose
    // we want to limit our handling of incoming requests.
    // We'll serve these requests off a channel of the
    // same name.
    val requests = Channel<Int>(5)
    for (i in 1..5) {
        requests.send(i)
    }
    requests.close()

    // This limiter flow will emit a value every 200 milliseconds.
    // This is the regulator in our rate limiting scheme.
    val limiter = flow {
        while (true) {
            emit(Unit)
            delay(200.milliseconds)
        }
    }

    // By collecting from the limiter flow before serving each request,
    // we limit ourselves to 1 request every 200 milliseconds.
    launch {
        requests.consumeAsFlow().collect { req ->
            limiter.first()
            println("request $req ${Instant.now()}")
        }
    }

    // We may want to allow short bursts of requests in
    // our rate limiting scheme while preserving the
    // overall rate limit. We can accomplish this by
    // buffering our limiter channel. This burstyLimiter
    // channel will allow bursts of up to 3 events.
    val burstyLimiter = Channel<Unit>(3)

    // Fill up the channel to represent allowed bursting.
    repeat(3) {
        burstyLimiter.send(Unit)
    }

    // Every 200 milliseconds we'll try to add a new
    // value to burstyLimiter, up to its limit of 3.
    launch {
        while (true) {
            delay(200.milliseconds)
            burstyLimiter.trySend(Unit)
        }
    }

    // Now simulate 5 more incoming requests. The first
    // 3 of these will benefit from the burst capability
    // of burstyLimiter.
    val burstyRequests = Channel<Int>(5)
    for (i in 1..5) {
        burstyRequests.send(i)
    }
    burstyRequests.close()

    burstyRequests.consumeAsFlow().collect { req ->
        burstyLimiter.receive()
        println("request $req ${Instant.now()}")
    }
}

Running our program we see the first batch of requests handled once every ~200 milliseconds as desired.

request 1 2023-06-01T10:00:00.000Z
request 2 2023-06-01T10:00:00.200Z
request 3 2023-06-01T10:00:00.400Z
request 4 2023-06-01T10:00:00.600Z
request 5 2023-06-01T10:00:00.800Z

For the second batch of requests we serve the first 3 immediately because of the burstable rate limiting, then serve the remaining 2 with ~200ms delays each.

request 1 2023-06-01T10:00:01.000Z
request 2 2023-06-01T10:00:01.000Z
request 3 2023-06-01T10:00:01.000Z
request 4 2023-06-01T10:00:01.200Z
request 5 2023-06-01T10:00:01.400Z

This Kotlin implementation uses coroutines and channels to achieve similar functionality to the original example. The flow is used to create a timer-like behavior, and channels are used to manage the flow of requests and implement the bursty limiter. The overall structure and logic remain similar, with adaptations to fit Kotlin’s concurrency model.