Worker Pools in Logo
Our example demonstrates how to implement a worker pool using threads and concurrent queues in Java.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Worker implements Runnable {
private final int id;
private final BlockingQueue<Integer> jobs;
private final BlockingQueue<Integer> results;
public Worker(int id, BlockingQueue<Integer> jobs, BlockingQueue<Integer> results) {
this.id = id;
this.jobs = jobs;
this.results = results;
}
@Override
public void run() {
try {
while (true) {
Integer j = jobs.take();
System.out.println("worker " + id + " started job " + j);
TimeUnit.SECONDS.sleep(1);
System.out.println("worker " + id + " finished job " + j);
results.put(j * 2);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class WorkerPool {
public static void main(String[] args) throws InterruptedException {
final int numJobs = 5;
BlockingQueue<Integer> jobs = new LinkedBlockingQueue<>(numJobs);
BlockingQueue<Integer> results = new LinkedBlockingQueue<>(numJobs);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int w = 1; w <= 3; w++) {
executor.submit(new Worker(w, jobs, results));
}
for (int j = 1; j <= numJobs; j++) {
jobs.put(j);
}
for (int a = 1; a <= numJobs; a++) {
results.take();
}
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
}
In this Java version, we use BlockingQueue
to represent channels, and ExecutorService
to manage our worker threads. The Worker
class implements the Runnable
interface, which is similar to a function that can be run as a separate thread.
Here’s a breakdown of the main components:
We create a
Worker
class that takes jobs from a queue, processes them (simulated by sleeping for a second), and puts the results in another queue.In the
main
method, we create twoBlockingQueue
instances for jobs and results.We use
ExecutorService
to create a fixed thread pool with 3 threads, each running aWorker
instance.We submit 5 jobs to the jobs queue.
We then wait for all 5 results to be processed by taking them from the results queue.
Finally, we shut down the executor service.
This program demonstrates concurrent processing of jobs using a fixed number of worker threads. Despite doing about 5 seconds of total work, the program should complete in about 2 seconds because there are 3 workers operating concurrently.
To run this program, save it as WorkerPool.java
, compile it with javac WorkerPool.java
, and then run it with java WorkerPool
. The output will show the jobs being executed by various workers concurrently.