Worker Pools in Groovy

Our example demonstrates how to implement a worker pool using threads and queues in Groovy.

import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.CountDownLatch

// Here's the worker, of which we'll run several
// concurrent instances. These workers will receive
// work on the `jobs` queue and send the corresponding
// results on `results`. We'll sleep a second per job to
// simulate an expensive task.
def worker(int id, BlockingQueue jobs, BlockingQueue results) {
    while (true) {
        def j = jobs.take()
        if (j == -1) break  // Signal to stop the worker
        println "worker $id started  job $j"
        sleep(1000)
        println "worker $id finished job $j"
        results.put(j * 2)
    }
}

// In the main function, we'll coordinate our worker pool
def main() {
    // In order to use our pool of workers we need to send
    // them work and collect their results. We make 2
    // queues for this.
    def numJobs = 5
    def jobs = new LinkedBlockingQueue()
    def results = new LinkedBlockingQueue()

    // This starts up 3 workers, initially blocked
    // because there are no jobs yet.
    def numWorkers = 3
    def latch = new CountDownLatch(numWorkers)
    (1..numWorkers).each { w ->
        Thread.start {
            worker(w, jobs, results)
            latch.countDown()
        }
    }

    // Here we send 5 `jobs` and then signal the workers to stop
    (1..numJobs).each { jobs.put(it) }
    (1..numWorkers).each { jobs.put(-1) }

    // Finally we collect all the results of the work.
    // This also ensures that the worker threads have
    // finished.
    latch.await()
    numJobs.times { results.take() }
}

main()

Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

$ groovy worker_pools.groovy
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

real    0m2.358s

In this Groovy version:

  1. We use java.util.concurrent.BlockingQueue instead of channels.
  2. Workers are implemented as closures running in separate threads.
  3. We use a CountDownLatch to wait for all workers to finish.
  4. Instead of closing the jobs channel, we send a special value (-1) to signal workers to stop.
  5. The main function is defined as a closure and called at the end of the script.

This implementation maintains the core concepts of the original example while adapting to Groovy’s syntax and Java’s concurrency utilities.

查看推荐产品

Comments powered by Disqus