Worker Pools in Karel

Our example demonstrates how to implement a worker pool using threads and blocking queues in Java.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class WorkerPools {

    // This is our worker, of which we'll run several concurrent instances.
    // These workers will receive work on the 'jobs' queue and send the
    // corresponding results on 'results'. We'll sleep a second per job to
    // simulate an expensive task.
    static class Worker implements Runnable {
        private int id;
        private BlockingQueue<Integer> jobs;
        private BlockingQueue<Integer> results;

        Worker(int id, BlockingQueue<Integer> jobs, BlockingQueue<Integer> results) {
            this.id = id;
            this.jobs = jobs;
            this.results = results;
        }

        public void run() {
            try {
                while (true) {
                    Integer j = jobs.take();
                    System.out.println("worker " + id + " started  job " + j);
                    Thread.sleep(1000); // Sleep for a second
                    System.out.println("worker " + id + " finished job " + j);
                    results.put(j * 2);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // In order to use our pool of workers we need to send them work
        // and collect their results. We create 2 queues for this.
        final int numJobs = 5;
        BlockingQueue<Integer> jobs = new LinkedBlockingQueue<>(numJobs);
        BlockingQueue<Integer> results = new LinkedBlockingQueue<>(numJobs);

        // This starts up 3 workers, initially blocked because there are no jobs yet.
        for (int w = 1; w <= 3; w++) {
            Thread worker = new Thread(new Worker(w, jobs, results));
            worker.start();
        }

        // Here we send 5 jobs and then close that queue to indicate that's all the work we have.
        for (int j = 1; j <= numJobs; j++) {
            jobs.put(j);
        }

        // Finally we collect all the results of the work.
        // This also ensures that the worker threads have finished.
        for (int a = 1; a <= numJobs; a++) {
            results.take();
        }
    }
}

Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

$ java WorkerPools
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

This example demonstrates the use of Java’s BlockingQueue interface and the LinkedBlockingQueue implementation to create a worker pool. The Worker class implements Runnable, allowing it to be executed in separate threads. The main differences from the original example are:

  1. We use BlockingQueue instead of channels.
  2. Workers run in an infinite loop, taking jobs from the queue until interrupted.
  3. We start worker threads explicitly instead of using goroutines.
  4. We use Thread.sleep() to simulate work instead of time.Sleep().

This approach provides similar functionality to the original example, showcasing concurrent processing in Java.

查看推荐产品