Worker Pools in OCaml

Our example demonstrates how to implement a worker pool using OCaml’s concurrent programming features.

open Unix

(* Here's the worker function, of which we'll run several
   concurrent instances. These workers will receive
   work on the `jobs` channel and send the corresponding
   results on `results`. We'll sleep a second per job to
   simulate an expensive task. *)
let worker id jobs results =
  try
    while true do
      let j = Event.sync (Event.receive jobs) in
      Printf.printf "worker %d started  job %d\n" id j;
      Unix.sleep 1;
      Printf.printf "worker %d finished job %d\n" id j;
      Event.sync (Event.send results (j * 2))
    done
  with End_of_file -> ()

let main () =
  (* In order to use our pool of workers we need to send
     them work and collect their results. We make 2
     channels for this. *)
  let num_jobs = 5 in
  let jobs = Event.new_channel () in
  let results = Event.new_channel () in

  (* This starts up 3 workers, initially blocked
     because there are no jobs yet. *)
  for w = 1 to 3 do
    ignore (Thread.create (worker w) jobs results)
  done;

  (* Here we send 5 `jobs` *)
  for j = 1 to num_jobs do
    Event.sync (Event.send jobs j)
  done;

  (* Close the jobs channel *)
  Event.sync (Event.send jobs (-1));

  (* Finally we collect all the results of the work.
     This also ensures that the worker threads have
     finished. *)
  for _ = 1 to num_jobs do
    ignore (Event.sync (Event.receive results))
  done

let () = main ()

Our program shows the 5 jobs being executed by various workers. The program takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

To run the program, save it as worker_pools.ml and compile it with:

$ ocamlc -thread unix.cma threads.cma worker_pools.ml -o worker_pools

Then execute it:

$ ./worker_pools
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

This example demonstrates the use of OCaml’s event-based concurrency model with channels and threads. The Event module provides synchronous communication channels, while the Thread module allows for the creation of multiple threads. This combination allows for the implementation of a worker pool pattern similar to the original example.

查看推荐产品