Title here
Summary here
Our example demonstrates how to implement a worker pool using threads and queues in Groovy.
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.CountDownLatch
// Here's the worker, of which we'll run several
// concurrent instances. These workers will receive
// work on the `jobs` queue and send the corresponding
// results on `results`. We'll sleep a second per job to
// simulate an expensive task.
def worker(int id, BlockingQueue jobs, BlockingQueue results) {
while (true) {
def j = jobs.take()
if (j == -1) break // Signal to stop the worker
println "worker $id started job $j"
sleep(1000)
println "worker $id finished job $j"
results.put(j * 2)
}
}
// In the main function, we'll coordinate our worker pool
def main() {
// In order to use our pool of workers we need to send
// them work and collect their results. We make 2
// queues for this.
def numJobs = 5
def jobs = new LinkedBlockingQueue()
def results = new LinkedBlockingQueue()
// This starts up 3 workers, initially blocked
// because there are no jobs yet.
def numWorkers = 3
def latch = new CountDownLatch(numWorkers)
(1..numWorkers).each { w ->
Thread.start {
worker(w, jobs, results)
latch.countDown()
}
}
// Here we send 5 `jobs` and then signal the workers to stop
(1..numJobs).each { jobs.put(it) }
(1..numWorkers).each { jobs.put(-1) }
// Finally we collect all the results of the work.
// This also ensures that the worker threads have
// finished.
latch.await()
numJobs.times { results.take() }
}
main()Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.
$ groovy worker_pools.groovy
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
real 0m2.358sIn this Groovy version:
java.util.concurrent.BlockingQueue instead of channels.CountDownLatch to wait for all workers to finish.main function is defined as a closure and called at the end of the script.This implementation maintains the core concepts of the original example while adapting to Groovy’s syntax and Java’s concurrency utilities.
Comments powered by Disqus