Worker Pools in Groovy

Our example demonstrates how to implement a worker pool using threads and queues in Groovy.

import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.CountDownLatch

// Here's the worker, of which we'll run several
// concurrent instances. These workers will receive
// work on the `jobs` queue and send the corresponding
// results on `results`. We'll sleep a second per job to
// simulate an expensive task.
def worker(int id, BlockingQueue jobs, BlockingQueue results) {
    while (true) {
        def j = jobs.take()
        if (j == -1) break  // Signal to stop the worker
        println "worker $id started  job $j"
        sleep(1000)
        println "worker $id finished job $j"
        results.put(j * 2)
    }
}

// In the main function, we'll coordinate our worker pool
def main() {
    // In order to use our pool of workers we need to send
    // them work and collect their results. We make 2
    // queues for this.
    def numJobs = 5
    def jobs = new LinkedBlockingQueue()
    def results = new LinkedBlockingQueue()

    // This starts up 3 workers, initially blocked
    // because there are no jobs yet.
    def numWorkers = 3
    def latch = new CountDownLatch(numWorkers)
    (1..numWorkers).each { w ->
        Thread.start {
            worker(w, jobs, results)
            latch.countDown()
        }
    }

    // Here we send 5 `jobs` and then signal the workers to stop
    (1..numJobs).each { jobs.put(it) }
    (1..numWorkers).each { jobs.put(-1) }

    // Finally we collect all the results of the work.
    // This also ensures that the worker threads have
    // finished.
    latch.await()
    numJobs.times { results.take() }
}

main()

Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

$ groovy worker_pools.groovy
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

real    0m2.358s

In this Groovy version:

  1. We use java.util.concurrent.BlockingQueue instead of channels.
  2. Workers are implemented as closures running in separate threads.
  3. We use a CountDownLatch to wait for all workers to finish.
  4. Instead of closing the jobs channel, we send a special value (-1) to signal workers to stop.
  5. The main function is defined as a closure and called at the end of the script.

This implementation maintains the core concepts of the original example while adapting to Groovy’s syntax and Java’s concurrency utilities.