Worker Pools in OCaml
Our example demonstrates how to implement a worker pool using OCaml’s concurrent programming features.
open Unix
(* Here's the worker function, of which we'll run several
concurrent instances. These workers will receive
work on the `jobs` channel and send the corresponding
results on `results`. We'll sleep a second per job to
simulate an expensive task. *)
let worker id jobs results =
try
while true do
let j = Event.sync (Event.receive jobs) in
Printf.printf "worker %d started job %d\n" id j;
Unix.sleep 1;
Printf.printf "worker %d finished job %d\n" id j;
Event.sync (Event.send results (j * 2))
done
with End_of_file -> ()
let main () =
(* In order to use our pool of workers we need to send
them work and collect their results. We make 2
channels for this. *)
let num_jobs = 5 in
let jobs = Event.new_channel () in
let results = Event.new_channel () in
(* This starts up 3 workers, initially blocked
because there are no jobs yet. *)
for w = 1 to 3 do
ignore (Thread.create (worker w) jobs results)
done;
(* Here we send 5 `jobs` *)
for j = 1 to num_jobs do
Event.sync (Event.send jobs j)
done;
(* Close the jobs channel *)
Event.sync (Event.send jobs (-1));
(* Finally we collect all the results of the work.
This also ensures that the worker threads have
finished. *)
for _ = 1 to num_jobs do
ignore (Event.sync (Event.receive results))
done
let () = main ()
Our program shows the 5 jobs being executed by various workers. The program takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.
To run the program, save it as worker_pools.ml
and compile it with:
$ ocamlc -thread unix.cma threads.cma worker_pools.ml -o worker_pools
Then execute it:
$ ./worker_pools
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
This example demonstrates the use of OCaml’s event-based concurrency model with channels and threads. The Event
module provides synchronous communication channels, while the Thread
module allows for the creation of multiple threads. This combination allows for the implementation of a worker pool pattern similar to the original example.