Worker Pools in F#
Our example demonstrates how to implement a worker pool using F# asynchronous workflows and MailboxProcessor.
open System
open System.Threading
// Define a job as a simple integer
type Job = int
// Define a message type for our worker mailbox
type WorkerMessage =
| Job of Job
| Stop
// Worker function that processes jobs
let worker (id: int) (inbox: MailboxProcessor<WorkerMessage>) =
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Job j ->
printfn "worker %d started job %d" id j
do! Async.Sleep 1000 // Simulate an expensive task
printfn "worker %d finished job %d" id j
return! loop()
| Stop ->
printfn "worker %d stopping" id
return ()
}
loop()
// Create a worker pool
let createWorkerPool numWorkers =
[1..numWorkers]
|> List.map (fun id -> MailboxProcessor.Start(worker id))
[<EntryPoint>]
let main argv =
let numJobs = 5
let workers = createWorkerPool 3
// Create jobs and send them to workers
for j in 1..numJobs do
workers.[j % 3].Post(Job j)
// Wait for all jobs to complete
Thread.Sleep(numJobs * 1000)
// Stop all workers
for w in workers do
w.Post Stop
0 // Return an integer exit codeIn this F# version:
We define a
Jobtype and aWorkerMessagediscriminated union to represent the messages our workers can receive.The
workerfunction uses aMailboxProcessor(also known as an agent) to receive and process jobs. It prints messages when starting and finishing jobs, and simulates work with a 1-second delay.createWorkerPoolcreates a list ofMailboxProcessorinstances, each running a worker.In the
mainfunction, we create a pool of 3 workers and send 5 jobs to them. We distribute the jobs among the workers using a simple round-robin approach.After sending all jobs, we wait for them to complete (using a simple
Thread.Sleep, though in a real application you’d want a more robust synchronization mechanism).Finally, we send a
Stopmessage to all workers to shut them down.
When you run this program, you’ll see output similar to the following:
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
worker 1 stopping
worker 2 stopping
worker 3 stoppingThis demonstrates the concurrent execution of jobs by multiple workers, similar to the original example. The program takes about 5 seconds to complete despite doing about 5 seconds of total work, because the 3 workers operate concurrently.