Worker Pools in Mercury In this example we’ll look at how to implement a worker pool using threads and concurrent queues.
import java.util.concurrent.* ;
public class WorkerPools {
// Here's the worker, of which we'll run several
// concurrent instances. These workers will receive
// work on the `jobs` queue and send the corresponding
// results on `results`. We'll sleep a second per job to
// simulate an expensive task.
static class Worker implements Runnable {
private final int id ;
private final BlockingQueue < Integer > jobs ;
private final BlockingQueue < Integer > results ;
Worker ( int id , BlockingQueue < Integer > jobs , BlockingQueue < Integer > results ) {
this . id = id ;
this . jobs = jobs ;
this . results = results ;
}
public void run () {
try {
while ( true ) {
Integer j = jobs . take ();
System . out . println ( "worker " + id + " started job " + j );
Thread . sleep ( 1000 );
System . out . println ( "worker " + id + " finished job " + j );
results . put ( j * 2 );
}
} catch ( InterruptedException e ) {
Thread . currentThread (). interrupt ();
}
}
}
public static void main ( String [] args ) throws InterruptedException {
// In order to use our pool of workers we need to send
// them work and collect their results. We make 2
// queues for this.
final int numJobs = 5 ;
BlockingQueue < Integer > jobs = new LinkedBlockingQueue <> ();
BlockingQueue < Integer > results = new LinkedBlockingQueue <> ();
// This starts up 3 workers, initially blocked
// because there are no jobs yet.
for ( int w = 1 ; w <= 3 ; w ++ ) {
Thread worker = new Thread ( new Worker ( w , jobs , results ));
worker . start ();
}
// Here we send 5 `jobs` and then `close` that
// queue to indicate that's all the work we have.
for ( int j = 1 ; j <= numJobs ; j ++ ) {
jobs . put ( j );
}
// Finally we collect all the results of the work.
// This also ensures that the worker threads have
// finished. An alternative way to wait for multiple
// threads is to use a `CountDownLatch`.
for ( int a = 1 ; a <= numJobs ; a ++ ) {
results . take ();
}
}
}
Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.
$ javac WorkerPools.java && java WorkerPools
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
Note that in Java, we use Thread
s and BlockingQueue
s to achieve similar functionality to goroutines and channels in Go. The Worker
class implements Runnable
, which allows it to be executed in a separate thread. The BlockingQueue
interface provides thread-safe operations for adding and removing elements, similar to Go’s channels.