Stateful Goroutines in F#
open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
type ReadOp = {
Key: int
Resp: TaskCompletionSource<int>
}
type WriteOp = {
Key: int
Val: int
Resp: TaskCompletionSource<bool>
}
let random = Random()
let statefulAgent () =
let state = Dictionary<int, int>()
let reads = MailboxProcessor<ReadOp>.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
state.TryGetValue(msg.Key, ref 0) |> ignore
msg.Resp.SetResult(state.[msg.Key])
return! loop()
}
loop())
let writes = MailboxProcessor<WriteOp>.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
state.[msg.Key] <- msg.Val
msg.Resp.SetResult(true)
return! loop()
}
loop())
(reads, writes)
[<EntryPoint>]
let main argv =
let mutable readOps = 0L
let mutable writeOps = 0L
let (reads, writes) = statefulAgent()
let readWorker = async {
for _ in 1..100 do
while true do
let read = {
Key = random.Next(5)
Resp = TaskCompletionSource<int>()
}
reads.Post(read)
read.Resp.Task.Result |> ignore
Interlocked.Increment(&readOps) |> ignore
do! Async.Sleep 1
}
let writeWorker = async {
for _ in 1..10 do
while true do
let write = {
Key = random.Next(5)
Val = random.Next(100)
Resp = TaskCompletionSource<bool>()
}
writes.Post(write)
write.Resp.Task.Result |> ignore
Interlocked.Increment(&writeOps) |> ignore
do! Async.Sleep 1
}
[1..100] |> List.map (fun _ -> readWorker) |> Async.Parallel |> Async.Ignore |> Async.Start
[1..10] |> List.map (fun _ -> writeWorker) |> Async.Parallel |> Async.Ignore |> Async.Start
Thread.Sleep(1000)
printfn "readOps: %d" readOps
printfn "writeOps: %d" writeOps
0
In this example, we use F#’s MailboxProcessor
to manage state in a thread-safe manner, which is similar to the channel-based approach in the original example. The statefulAgent
function creates two MailboxProcessor
s: one for reads and one for writes.
The ReadOp
and WriteOp
types are defined as records, with TaskCompletionSource
used to provide a way for the agent to respond to requests.
In the main
function:
- We start the stateful agent.
- We create 100 read workers and 10 write workers, each running in its own task.
- Each worker continuously sends read or write requests to the appropriate agent.
- We use
Interlocked.Increment
to safely increment the operation counters. - After letting the workers run for a second, we print the final operation counts.
This F# version maintains the core concept of the original Go example: using message-passing concurrency to manage shared state. The MailboxProcessor
in F# serves a similar purpose to goroutines and channels in Go, providing a way to safely communicate between concurrent processes.
Running this program will show the number of read and write operations completed in one second, demonstrating the concurrent nature of the operations.
$ dotnet run
readOps: 71708
writeOps: 7177
This approach using MailboxProcessor
provides a thread-safe way to manage state in F#, similar to the goroutine-based approach in Go. It allows for concurrent access to shared state without explicit locking, aligning with the principle of sharing memory by communicating.