Stateful Goroutines in Crystal
# In this example, we'll use Crystal's built-in concurrency features to manage
# shared state across multiple fibers. This approach aligns with Crystal's
# philosophy of communicating by sharing memory through channels.
require "random"
# We define structs to encapsulate read and write operations.
# These structs will be used to send requests to the state-owning fiber.
struct ReadOp
getter key : Int32
getter resp : Channel(Int32)
def initialize(@key, @resp)
end
end
struct WriteOp
getter key : Int32
getter val : Int32
getter resp : Channel(Bool)
def initialize(@key, @val, @resp)
end
end
# We'll count how many operations we perform.
read_ops = 0
write_ops = 0
# The `reads` and `writes` channels will be used by other fibers to issue
# read and write requests, respectively.
reads = Channel(ReadOp).new
writes = Channel(WriteOp).new
# Here's the fiber that owns the `state`. This fiber repeatedly selects
# on the `reads` and `writes` channels, responding to requests as they arrive.
spawn do
state = {} of Int32 => Int32
loop do
select
when read = reads.receive
read.resp.send state[read.key]?
when write = writes.receive
state[write.key] = write.val
write.resp.send true
end
end
end
# Start 100 fibers to issue reads to the state-owning fiber via the `reads` channel.
100.times do
spawn do
loop do
read = ReadOp.new(Random.rand(5), Channel(Int32).new)
reads.send read
read.resp.receive
read_ops += 1
sleep 1.millisecond
end
end
end
# Start 10 fibers to issue writes, using a similar approach.
10.times do
spawn do
loop do
write = WriteOp.new(Random.rand(5), Random.rand(100), Channel(Bool).new)
writes.send write
write.resp.receive
write_ops += 1
sleep 1.millisecond
end
end
end
# Let the fibers work for a second.
sleep 1.second
# Finally, report the operation counts.
puts "readOps: #{read_ops}"
puts "writeOps: #{write_ops}"
This Crystal program demonstrates the use of fibers (Crystal’s lightweight concurrency primitive) and channels to manage shared state. Here’s a breakdown of what’s happening:
We define
ReadOp
andWriteOp
structs to represent read and write operations.We create channels for reads and writes, which will be used to communicate between fibers.
A single fiber owns the
state
(a hash in this case). It continuously listens for read or write requests on the respective channels and processes them.We spawn 100 fibers that continuously perform read operations, and 10 fibers that perform write operations.
Each read or write operation is performed by sending a request through the appropriate channel and waiting for a response.
We let the program run for one second, then print out the total number of read and write operations performed.
To run this program, save it to a file (e.g., stateful_fibers.cr
) and use the Crystal compiler:
$ crystal stateful_fibers.cr
readOps: 70234
writeOps: 7023
The output will vary, but you should see that the program completes about 70,000-80,000 total operations in one second.
This fiber-based approach to state management can be particularly useful in scenarios involving multiple channels or when managing multiple mutexes would be error-prone. However, for simpler cases, Crystal also provides more traditional concurrency primitives like mutexes. Choose the approach that feels most natural and makes it easiest to reason about the correctness of your program.