Title here
Summary here
Our example demonstrates how to implement a worker pool using threads and queues in Groovy.
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.CountDownLatch
// Here's the worker, of which we'll run several
// concurrent instances. These workers will receive
// work on the `jobs` queue and send the corresponding
// results on `results`. We'll sleep a second per job to
// simulate an expensive task.
def worker(int id, BlockingQueue jobs, BlockingQueue results) {
while (true) {
def j = jobs.take()
if (j == -1) break // Signal to stop the worker
println "worker $id started job $j"
sleep(1000)
println "worker $id finished job $j"
results.put(j * 2)
}
}
// In the main function, we'll coordinate our worker pool
def main() {
// In order to use our pool of workers we need to send
// them work and collect their results. We make 2
// queues for this.
def numJobs = 5
def jobs = new LinkedBlockingQueue()
def results = new LinkedBlockingQueue()
// This starts up 3 workers, initially blocked
// because there are no jobs yet.
def numWorkers = 3
def latch = new CountDownLatch(numWorkers)
(1..numWorkers).each { w ->
Thread.start {
worker(w, jobs, results)
latch.countDown()
}
}
// Here we send 5 `jobs` and then signal the workers to stop
(1..numJobs).each { jobs.put(it) }
(1..numWorkers).each { jobs.put(-1) }
// Finally we collect all the results of the work.
// This also ensures that the worker threads have
// finished.
latch.await()
numJobs.times { results.take() }
}
main()
Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.
$ groovy worker_pools.groovy
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
real 0m2.358s
In this Groovy version:
java.util.concurrent.BlockingQueue
instead of channels.CountDownLatch
to wait for all workers to finish.main
function is defined as a closure and called at the end of the script.This implementation maintains the core concepts of the original example while adapting to Groovy’s syntax and Java’s concurrency utilities.