Worker Pools in Kotlin

Our example demonstrates how to implement a worker pool using coroutines and channels in Kotlin.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

// This is our worker function. We'll run several concurrent instances of it.
// These workers will receive work on the `jobs` channel and send the corresponding
// results on `results`. We'll sleep a second per job to simulate an expensive task.
suspend fun worker(id: Int, jobs: ReceiveChannel<Int>, results: SendChannel<Int>) {
    for (j in jobs) {
        println("worker $id started  job $j")
        delay(1000) // simulate a second of work
        println("worker $id finished job $j")
        results.send(j * 2)
    }
}

suspend fun main() = coroutineScope {
    // In order to use our pool of workers we need to send them work and collect their results. 
    // We make 2 channels for this.
    val numJobs = 5
    val jobs = Channel<Int>(numJobs)
    val results = Channel<Int>(numJobs)

    // This launches 3 workers, initially blocked because there are no jobs yet.
    repeat(3) { workerId ->
        launch {
            worker(workerId + 1, jobs, results)
        }
    }

    // Here we send 5 jobs and then close that channel to indicate that's all the work we have.
    launch {
        for (j in 1..numJobs) {
            jobs.send(j)
        }
        jobs.close()
    }

    // Finally we collect all the results of the work.
    // This also ensures that the worker coroutines have finished.
    repeat(numJobs) {
        val result = results.receive()
        println("Received result: $result")
    }
}

Our program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

Here’s an example of what the output might look like:

worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
Received result: 2
Received result: 4
Received result: 6
Received result: 8
Received result: 10

This example demonstrates how to use coroutines and channels in Kotlin to create a worker pool. Coroutines provide a way to write asynchronous, non-blocking code in a sequential manner, making it easier to manage concurrent operations. Channels are used for communication between coroutines, similar to how channels are used in the original example.

查看推荐产品