Worker Pools in Fortress
Based on the provided input, here’s the translation of the Go “Worker Pools” example to Java, formatted in Markdown suitable for Hugo:
In this example we’ll look at how to implement a worker pool using threads and concurrent queues.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class WorkerPools {
// Here's the worker, of which we'll run several
// concurrent instances. These workers will receive
// work on the `jobs` queue and send the corresponding
// results on `results`. We'll sleep a second per job to
// simulate an expensive task.
static class Worker implements Runnable {
private int id;
private BlockingQueue<Integer> jobs;
private BlockingQueue<Integer> results;
Worker(int id, BlockingQueue<Integer> jobs, BlockingQueue<Integer> results) {
this.id = id;
this.jobs = jobs;
this.results = results;
}
public void run() {
try {
while (true) {
Integer j = jobs.take();
System.out.println("worker " + id + " started job " + j);
Thread.sleep(1000);
System.out.println("worker " + id + " finished job " + j);
results.put(j * 2);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
// In order to use our pool of workers we need to send
// them work and collect their results. We make 2
// queues for this.
final int numJobs = 5;
BlockingQueue<Integer> jobs = new LinkedBlockingQueue<>(numJobs);
BlockingQueue<Integer> results = new LinkedBlockingQueue<>(numJobs);
// This starts up 3 workers, initially blocked
// because there are no jobs yet.
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int w = 1; w <= 3; w++) {
executor.submit(new Worker(w, jobs, results));
}
// Here we send 5 `jobs` and then `close` that
// queue to indicate that's all the work we have.
for (int j = 1; j <= numJobs; j++) {
jobs.put(j);
}
// Finally we collect all the results of the work.
// This also ensures that the worker goroutines have
// finished.
for (int a = 1; a <= numJobs; a++) {
results.take();
}
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
}
Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.
$ javac WorkerPools.java && java WorkerPools
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
This Java implementation uses ExecutorService
to manage a fixed thread pool, and BlockingQueue
for job and result queues. The Worker
class implements Runnable
to define the worker’s behavior. The main differences from the original example are:
- We use Java’s built-in concurrency utilities instead of goroutines and channels.
- Workers run in an infinite loop and are interrupted to stop, rather than stopping when the jobs channel is closed.
- We use
executor.shutdownNow()
andawaitTermination()
to stop the worker threads at the end.
Despite these differences, the overall behavior and output of the program remain similar to the original example.