Worker Pools in Kotlin
Our example demonstrates how to implement a worker pool using coroutines and channels in Kotlin.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
// This is our worker function. We'll run several concurrent instances of it.
// These workers will receive work on the `jobs` channel and send the corresponding
// results on `results`. We'll sleep a second per job to simulate an expensive task.
suspend fun worker(id: Int, jobs: ReceiveChannel<Int>, results: SendChannel<Int>) {
for (j in jobs) {
println("worker $id started job $j")
delay(1000) // simulate a second of work
println("worker $id finished job $j")
results.send(j * 2)
}
}
suspend fun main() = coroutineScope {
// In order to use our pool of workers we need to send them work and collect their results.
// We make 2 channels for this.
val numJobs = 5
val jobs = Channel<Int>(numJobs)
val results = Channel<Int>(numJobs)
// This launches 3 workers, initially blocked because there are no jobs yet.
repeat(3) { workerId ->
launch {
worker(workerId + 1, jobs, results)
}
}
// Here we send 5 jobs and then close that channel to indicate that's all the work we have.
launch {
for (j in 1..numJobs) {
jobs.send(j)
}
jobs.close()
}
// Finally we collect all the results of the work.
// This also ensures that the worker coroutines have finished.
repeat(numJobs) {
val result = results.receive()
println("Received result: $result")
}
}
Our program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.
Here’s an example of what the output might look like:
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
Received result: 2
Received result: 4
Received result: 6
Received result: 8
Received result: 10
This example demonstrates how to use coroutines and channels in Kotlin to create a worker pool. Coroutines provide a way to write asynchronous, non-blocking code in a sequential manner, making it easier to manage concurrent operations. Channels are used for communication between coroutines, similar to how channels are used in the original example.