Worker Pools in Logo

Our example demonstrates how to implement a worker pool using threads and concurrent queues in Java.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Worker implements Runnable {
    private final int id;
    private final BlockingQueue<Integer> jobs;
    private final BlockingQueue<Integer> results;

    public Worker(int id, BlockingQueue<Integer> jobs, BlockingQueue<Integer> results) {
        this.id = id;
        this.jobs = jobs;
        this.results = results;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer j = jobs.take();
                System.out.println("worker " + id + " started  job " + j);
                TimeUnit.SECONDS.sleep(1);
                System.out.println("worker " + id + " finished job " + j);
                results.put(j * 2);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class WorkerPool {
    public static void main(String[] args) throws InterruptedException {
        final int numJobs = 5;
        BlockingQueue<Integer> jobs = new LinkedBlockingQueue<>(numJobs);
        BlockingQueue<Integer> results = new LinkedBlockingQueue<>(numJobs);

        ExecutorService executor = Executors.newFixedThreadPool(3);
        for (int w = 1; w <= 3; w++) {
            executor.submit(new Worker(w, jobs, results));
        }

        for (int j = 1; j <= numJobs; j++) {
            jobs.put(j);
        }

        for (int a = 1; a <= numJobs; a++) {
            results.take();
        }

        executor.shutdownNow();
        executor.awaitTermination(1, TimeUnit.SECONDS);
    }
}

In this Java version, we use BlockingQueue to represent channels, and ExecutorService to manage our worker threads. The Worker class implements the Runnable interface, which is similar to a function that can be run as a separate thread.

Here’s a breakdown of the main components:

  1. We create a Worker class that takes jobs from a queue, processes them (simulated by sleeping for a second), and puts the results in another queue.

  2. In the main method, we create two BlockingQueue instances for jobs and results.

  3. We use ExecutorService to create a fixed thread pool with 3 threads, each running a Worker instance.

  4. We submit 5 jobs to the jobs queue.

  5. We then wait for all 5 results to be processed by taking them from the results queue.

  6. Finally, we shut down the executor service.

This program demonstrates concurrent processing of jobs using a fixed number of worker threads. Despite doing about 5 seconds of total work, the program should complete in about 2 seconds because there are 3 workers operating concurrently.

To run this program, save it as WorkerPool.java, compile it with javac WorkerPool.java, and then run it with java WorkerPool. The output will show the jobs being executed by various workers concurrently.