Stateful Goroutines in Crystal

# In this example, we'll use Crystal's built-in concurrency features to manage
# shared state across multiple fibers. This approach aligns with Crystal's
# philosophy of communicating by sharing memory through channels.

require "random"

# We define structs to encapsulate read and write operations.
# These structs will be used to send requests to the state-owning fiber.
struct ReadOp
  getter key : Int32
  getter resp : Channel(Int32)

  def initialize(@key, @resp)
  end
end

struct WriteOp
  getter key : Int32
  getter val : Int32
  getter resp : Channel(Bool)

  def initialize(@key, @val, @resp)
  end
end

# We'll count how many operations we perform.
read_ops = 0
write_ops = 0

# The `reads` and `writes` channels will be used by other fibers to issue
# read and write requests, respectively.
reads = Channel(ReadOp).new
writes = Channel(WriteOp).new

# Here's the fiber that owns the `state`. This fiber repeatedly selects
# on the `reads` and `writes` channels, responding to requests as they arrive.
spawn do
  state = {} of Int32 => Int32
  loop do
    select
    when read = reads.receive
      read.resp.send state[read.key]?
    when write = writes.receive
      state[write.key] = write.val
      write.resp.send true
    end
  end
end

# Start 100 fibers to issue reads to the state-owning fiber via the `reads` channel.
100.times do
  spawn do
    loop do
      read = ReadOp.new(Random.rand(5), Channel(Int32).new)
      reads.send read
      read.resp.receive
      read_ops += 1
      sleep 1.millisecond
    end
  end
end

# Start 10 fibers to issue writes, using a similar approach.
10.times do
  spawn do
    loop do
      write = WriteOp.new(Random.rand(5), Random.rand(100), Channel(Bool).new)
      writes.send write
      write.resp.receive
      write_ops += 1
      sleep 1.millisecond
    end
  end
end

# Let the fibers work for a second.
sleep 1.second

# Finally, report the operation counts.
puts "readOps: #{read_ops}"
puts "writeOps: #{write_ops}"

This Crystal program demonstrates the use of fibers (Crystal’s lightweight concurrency primitive) and channels to manage shared state. Here’s a breakdown of what’s happening:

  1. We define ReadOp and WriteOp structs to represent read and write operations.

  2. We create channels for reads and writes, which will be used to communicate between fibers.

  3. A single fiber owns the state (a hash in this case). It continuously listens for read or write requests on the respective channels and processes them.

  4. We spawn 100 fibers that continuously perform read operations, and 10 fibers that perform write operations.

  5. Each read or write operation is performed by sending a request through the appropriate channel and waiting for a response.

  6. We let the program run for one second, then print out the total number of read and write operations performed.

To run this program, save it to a file (e.g., stateful_fibers.cr) and use the Crystal compiler:

$ crystal stateful_fibers.cr
readOps: 70234
writeOps: 7023

The output will vary, but you should see that the program completes about 70,000-80,000 total operations in one second.

This fiber-based approach to state management can be particularly useful in scenarios involving multiple channels or when managing multiple mutexes would be error-prone. However, for simpler cases, Crystal also provides more traditional concurrency primitives like mutexes. Choose the approach that feels most natural and makes it easiest to reason about the correctness of your program.

查看推荐产品