Stateful Goroutines in F#

open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks

type ReadOp = {
    Key: int
    Resp: TaskCompletionSource<int>
}

type WriteOp = {
    Key: int
    Val: int
    Resp: TaskCompletionSource<bool>
}

let random = Random()

let statefulAgent () =
    let state = Dictionary<int, int>()
    let reads = MailboxProcessor<ReadOp>.Start(fun inbox ->
        let rec loop() = async {
            let! msg = inbox.Receive()
            state.TryGetValue(msg.Key, ref 0) |> ignore
            msg.Resp.SetResult(state.[msg.Key])
            return! loop()
        }
        loop())
    
    let writes = MailboxProcessor<WriteOp>.Start(fun inbox ->
        let rec loop() = async {
            let! msg = inbox.Receive()
            state.[msg.Key] <- msg.Val
            msg.Resp.SetResult(true)
            return! loop()
        }
        loop())
    
    (reads, writes)

[<EntryPoint>]
let main argv =
    let mutable readOps = 0L
    let mutable writeOps = 0L
    
    let (reads, writes) = statefulAgent()
    
    let readWorker = async {
        for _ in 1..100 do
            while true do
                let read = {
                    Key = random.Next(5)
                    Resp = TaskCompletionSource<int>()
                }
                reads.Post(read)
                read.Resp.Task.Result |> ignore
                Interlocked.Increment(&readOps) |> ignore
                do! Async.Sleep 1
    }
    
    let writeWorker = async {
        for _ in 1..10 do
            while true do
                let write = {
                    Key = random.Next(5)
                    Val = random.Next(100)
                    Resp = TaskCompletionSource<bool>()
                }
                writes.Post(write)
                write.Resp.Task.Result |> ignore
                Interlocked.Increment(&writeOps) |> ignore
                do! Async.Sleep 1
    }
    
    [1..100] |> List.map (fun _ -> readWorker) |> Async.Parallel |> Async.Ignore |> Async.Start
    [1..10] |> List.map (fun _ -> writeWorker) |> Async.Parallel |> Async.Ignore |> Async.Start
    
    Thread.Sleep(1000)
    
    printfn "readOps: %d" readOps
    printfn "writeOps: %d" writeOps
    
    0

In this example, we use F#’s MailboxProcessor to manage state in a thread-safe manner, which is similar to the channel-based approach in the original example. The statefulAgent function creates two MailboxProcessors: one for reads and one for writes.

The ReadOp and WriteOp types are defined as records, with TaskCompletionSource used to provide a way for the agent to respond to requests.

In the main function:

  1. We start the stateful agent.
  2. We create 100 read workers and 10 write workers, each running in its own task.
  3. Each worker continuously sends read or write requests to the appropriate agent.
  4. We use Interlocked.Increment to safely increment the operation counters.
  5. After letting the workers run for a second, we print the final operation counts.

This F# version maintains the core concept of the original Go example: using message-passing concurrency to manage shared state. The MailboxProcessor in F# serves a similar purpose to goroutines and channels in Go, providing a way to safely communicate between concurrent processes.

Running this program will show the number of read and write operations completed in one second, demonstrating the concurrent nature of the operations.

$ dotnet run
readOps: 71708
writeOps: 7177

This approach using MailboxProcessor provides a thread-safe way to manage state in F#, similar to the goroutine-based approach in Go. It allows for concurrent access to shared state without explicit locking, aligning with the principle of sharing memory by communicating.

查看推荐产品