Worker Pools in F#

Our example demonstrates how to implement a worker pool using F# asynchronous workflows and MailboxProcessor.

open System
open System.Threading

// Define a job as a simple integer
type Job = int

// Define a message type for our worker mailbox
type WorkerMessage =
    | Job of Job
    | Stop

// Worker function that processes jobs
let worker (id: int) (inbox: MailboxProcessor<WorkerMessage>) = 
    let rec loop() = async {
        let! msg = inbox.Receive()
        match msg with
        | Job j ->
            printfn "worker %d started  job %d" id j
            do! Async.Sleep 1000 // Simulate an expensive task
            printfn "worker %d finished job %d" id j
            return! loop()
        | Stop -> 
            printfn "worker %d stopping" id
            return ()
    }
    loop()

// Create a worker pool
let createWorkerPool numWorkers =
    [1..numWorkers]
    |> List.map (fun id -> MailboxProcessor.Start(worker id))

[<EntryPoint>]
let main argv =
    let numJobs = 5
    let workers = createWorkerPool 3

    // Create jobs and send them to workers
    for j in 1..numJobs do
        workers.[j % 3].Post(Job j)

    // Wait for all jobs to complete
    Thread.Sleep(numJobs * 1000)

    // Stop all workers
    for w in workers do
        w.Post Stop

    0 // Return an integer exit code

In this F# version:

  1. We define a Job type and a WorkerMessage discriminated union to represent the messages our workers can receive.

  2. The worker function uses a MailboxProcessor (also known as an agent) to receive and process jobs. It prints messages when starting and finishing jobs, and simulates work with a 1-second delay.

  3. createWorkerPool creates a list of MailboxProcessor instances, each running a worker.

  4. In the main function, we create a pool of 3 workers and send 5 jobs to them. We distribute the jobs among the workers using a simple round-robin approach.

  5. After sending all jobs, we wait for them to complete (using a simple Thread.Sleep, though in a real application you’d want a more robust synchronization mechanism).

  6. Finally, we send a Stop message to all workers to shut them down.

When you run this program, you’ll see output similar to the following:

worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
worker 1 stopping
worker 2 stopping
worker 3 stopping

This demonstrates the concurrent execution of jobs by multiple workers, similar to the original example. The program takes about 5 seconds to complete despite doing about 5 seconds of total work, because the 3 workers operate concurrently.

查看推荐产品