Worker Pools in Kotlin

Our example demonstrates how to implement a worker pool using coroutines and channels in Kotlin.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

// This is our worker function. We'll run several concurrent instances of it.
// These workers will receive work on the `jobs` channel and send the corresponding
// results on `results`. We'll sleep a second per job to simulate an expensive task.
suspend fun worker(id: Int, jobs: ReceiveChannel<Int>, results: SendChannel<Int>) {
    for (j in jobs) {
        println("worker $id started  job $j")
        delay(1000) // simulate a second of work
        println("worker $id finished job $j")
        results.send(j * 2)
    }
}

suspend fun main() = coroutineScope {
    // In order to use our pool of workers we need to send them work and collect their results. 
    // We make 2 channels for this.
    val numJobs = 5
    val jobs = Channel<Int>(numJobs)
    val results = Channel<Int>(numJobs)

    // This launches 3 workers, initially blocked because there are no jobs yet.
    repeat(3) { workerId ->
        launch {
            worker(workerId + 1, jobs, results)
        }
    }

    // Here we send 5 jobs and then close that channel to indicate that's all the work we have.
    launch {
        for (j in 1..numJobs) {
            jobs.send(j)
        }
        jobs.close()
    }

    // Finally we collect all the results of the work.
    // This also ensures that the worker coroutines have finished.
    repeat(numJobs) {
        val result = results.receive()
        println("Received result: $result")
    }
}

Our program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

Here’s an example of what the output might look like:

worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
Received result: 2
Received result: 4
Received result: 6
Received result: 8
Received result: 10

This example demonstrates how to use coroutines and channels in Kotlin to create a worker pool. Coroutines provide a way to write asynchronous, non-blocking code in a sequential manner, making it easier to manage concurrent operations. Channels are used for communication between coroutines, similar to how channels are used in the original example.

查看推荐产品

Comments powered by Disqus