Stateful Goroutines in OCaml

Our example demonstrates how to use OCaml’s built-in concurrency features to manage shared state across multiple threads. This approach aligns with OCaml’s functional programming paradigms and thread-safe communication.

open Unix
open Thread

(* Define types for read and write operations *)
type read_op = {
  key: int;
  resp: int Event.channel
}

type write_op = {
  key: int;
  value: int;
  resp: bool Event.channel
}

(* Function to create a new atomic reference *)
let atomic_ref initial =
  let cell = ref initial in
  let mutex = Mutex.create () in
  let get () = Mutex.lock mutex; let result = !cell in Mutex.unlock mutex; result in
  let set v = Mutex.lock mutex; cell := v; Mutex.unlock mutex in
  (get, set)

(* Main function *)
let main () =
  let (get_read_ops, set_read_ops) = atomic_ref 0 in
  let (get_write_ops, set_write_ops) = atomic_ref 0 in

  (* Channels for read and write operations *)
  let reads = Event.new_channel () in
  let writes = Event.new_channel () in

  (* Thread that manages the state *)
  let _ = Thread.create (fun () ->
    let state = Hashtbl.create 10 in
    while true do
      Event.sync (
        Event.choose [
          Event.receive reads (fun read ->
            let value = try Hashtbl.find state read.key with Not_found -> 0 in
            Event.sync (Event.send read.resp value)
          );
          Event.receive writes (fun write ->
            Hashtbl.replace state write.key write.value;
            Event.sync (Event.send write.resp true)
          )
        ]
      )
    done
  ) () in

  (* Create 100 threads for read operations *)
  for _ = 1 to 100 do
    let _ = Thread.create (fun () ->
      while true do
        let read = {
          key = Random.int 5;
          resp = Event.new_channel ()
        } in
        Event.sync (Event.send reads read);
        let _ = Event.sync (Event.receive read.resp) in
        set_read_ops (get_read_ops () + 1);
        Thread.delay 0.001
      done
    ) () in
    ()
  done;

  (* Create 10 threads for write operations *)
  for _ = 1 to 10 do
    let _ = Thread.create (fun () ->
      while true do
        let write = {
          key = Random.int 5;
          value = Random.int 100;
          resp = Event.new_channel ()
        } in
        Event.sync (Event.send writes write);
        let _ = Event.sync (Event.receive write.resp) in
        set_write_ops (get_write_ops () + 1);
        Thread.delay 0.001
      done
    ) () in
    ()
  done;

  (* Let the threads work for a second *)
  Thread.delay 1.0;

  (* Report the operation counts *)
  Printf.printf "readOps: %d\n" (get_read_ops ());
  Printf.printf "writeOps: %d\n" (get_write_ops ())

(* Run the main function *)
let _ = main ()

This OCaml program demonstrates the use of threads and channels to manage shared state. Here’s a breakdown of the key components:

  1. We define read_op and write_op types to encapsulate read and write requests.

  2. The atomic_ref function creates a thread-safe reference, simulating atomic operations.

  3. The main thread creates channels for read and write operations.

  4. A dedicated thread manages the state (a hash table) and responds to read and write requests.

  5. We spawn 100 threads for read operations and 10 threads for write operations. Each thread continuously sends requests to the state-managing thread.

  6. After letting the threads run for a second, we report the total number of read and write operations performed.

To run this program, save it as stateful_threads.ml and compile it with OCaml:

$ ocamlc -thread unix.cma threads.cma stateful_threads.ml -o stateful_threads
$ ./stateful_threads
readOps: 71708
writeOps: 7177

This example showcases OCaml’s concurrency features, using threads and channels instead of goroutines. It demonstrates how to manage shared state safely in a concurrent environment, adhering to OCaml’s functional programming principles.