Worker Pools in Fortress

Based on the provided input, here’s the translation of the Go “Worker Pools” example to Java, formatted in Markdown suitable for Hugo:

In this example we’ll look at how to implement a worker pool using threads and concurrent queues.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class WorkerPools {

    // Here's the worker, of which we'll run several
    // concurrent instances. These workers will receive
    // work on the `jobs` queue and send the corresponding
    // results on `results`. We'll sleep a second per job to
    // simulate an expensive task.
    static class Worker implements Runnable {
        private int id;
        private BlockingQueue<Integer> jobs;
        private BlockingQueue<Integer> results;

        Worker(int id, BlockingQueue<Integer> jobs, BlockingQueue<Integer> results) {
            this.id = id;
            this.jobs = jobs;
            this.results = results;
        }

        public void run() {
            try {
                while (true) {
                    Integer j = jobs.take();
                    System.out.println("worker " + id + " started  job " + j);
                    Thread.sleep(1000);
                    System.out.println("worker " + id + " finished job " + j);
                    results.put(j * 2);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // In order to use our pool of workers we need to send
        // them work and collect their results. We make 2
        // queues for this.
        final int numJobs = 5;
        BlockingQueue<Integer> jobs = new LinkedBlockingQueue<>(numJobs);
        BlockingQueue<Integer> results = new LinkedBlockingQueue<>(numJobs);

        // This starts up 3 workers, initially blocked
        // because there are no jobs yet.
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for (int w = 1; w <= 3; w++) {
            executor.submit(new Worker(w, jobs, results));
        }

        // Here we send 5 `jobs` and then `close` that
        // queue to indicate that's all the work we have.
        for (int j = 1; j <= numJobs; j++) {
            jobs.put(j);
        }

        // Finally we collect all the results of the work.
        // This also ensures that the worker goroutines have
        // finished.
        for (int a = 1; a <= numJobs; a++) {
            results.take();
        }

        executor.shutdownNow();
        executor.awaitTermination(1, TimeUnit.SECONDS);
    }
}

Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

$ javac WorkerPools.java && java WorkerPools
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

This Java implementation uses ExecutorService to manage a fixed thread pool, and BlockingQueue for job and result queues. The Worker class implements Runnable to define the worker’s behavior. The main differences from the original example are:

  1. We use Java’s built-in concurrency utilities instead of goroutines and channels.
  2. Workers run in an infinite loop and are interrupted to stop, rather than stopping when the jobs channel is closed.
  3. We use executor.shutdownNow() and awaitTermination() to stop the worker threads at the end.

Despite these differences, the overall behavior and output of the program remain similar to the original example.

查看推荐产品