Worker Pools in F#
Our example demonstrates how to implement a worker pool using F# asynchronous workflows and MailboxProcessor.
open System
open System.Threading
// Define a job as a simple integer
type Job = int
// Define a message type for our worker mailbox
type WorkerMessage =
| Job of Job
| Stop
// Worker function that processes jobs
let worker (id: int) (inbox: MailboxProcessor<WorkerMessage>) =
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Job j ->
printfn "worker %d started job %d" id j
do! Async.Sleep 1000 // Simulate an expensive task
printfn "worker %d finished job %d" id j
return! loop()
| Stop ->
printfn "worker %d stopping" id
return ()
}
loop()
// Create a worker pool
let createWorkerPool numWorkers =
[1..numWorkers]
|> List.map (fun id -> MailboxProcessor.Start(worker id))
[<EntryPoint>]
let main argv =
let numJobs = 5
let workers = createWorkerPool 3
// Create jobs and send them to workers
for j in 1..numJobs do
workers.[j % 3].Post(Job j)
// Wait for all jobs to complete
Thread.Sleep(numJobs * 1000)
// Stop all workers
for w in workers do
w.Post Stop
0 // Return an integer exit code
In this F# version:
We define a
Job
type and aWorkerMessage
discriminated union to represent the messages our workers can receive.The
worker
function uses aMailboxProcessor
(also known as an agent) to receive and process jobs. It prints messages when starting and finishing jobs, and simulates work with a 1-second delay.createWorkerPool
creates a list ofMailboxProcessor
instances, each running a worker.In the
main
function, we create a pool of 3 workers and send 5 jobs to them. We distribute the jobs among the workers using a simple round-robin approach.After sending all jobs, we wait for them to complete (using a simple
Thread.Sleep
, though in a real application you’d want a more robust synchronization mechanism).Finally, we send a
Stop
message to all workers to shut them down.
When you run this program, you’ll see output similar to the following:
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
worker 1 stopping
worker 2 stopping
worker 3 stopping
This demonstrates the concurrent execution of jobs by multiple workers, similar to the original example. The program takes about 5 seconds to complete despite doing about 5 seconds of total work, because the 3 workers operate concurrently.