Worker Pools in Crystal

Our example demonstrates how to implement a worker pool using fibers and channels.

require "fiber"

# Here's the worker, of which we'll run several
# concurrent instances. These workers will receive
# work on the `jobs` channel and send the corresponding
# results on `results`. We'll sleep a second per job to
# simulate an expensive task.
def worker(id : Int32, jobs : Channel(Int32), results : Channel(Int32))
  jobs.each do |j|
    puts "worker #{id} started  job #{j}"
    sleep 1
    puts "worker #{id} finished job #{j}"
    results.send(j * 2)
  end
end

def main
  # In order to use our pool of workers we need to send
  # them work and collect their results. We make 2
  # channels for this.
  num_jobs = 5
  jobs = Channel(Int32).new(num_jobs)
  results = Channel(Int32).new(num_jobs)

  # This starts up 3 workers, initially blocked
  # because there are no jobs yet.
  3.times do |w|
    spawn worker(w + 1, jobs, results)
  end

  # Here we send 5 `jobs` and then `close` that
  # channel to indicate that's all the work we have.
  num_jobs.times do |j|
    jobs.send(j + 1)
  end
  jobs.close

  # Finally we collect all the results of the work.
  # This also ensures that the worker fibers have
  # finished.
  num_jobs.times do
    results.receive
  end
end

main

Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

$ time crystal run worker_pools.cr
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

real    0m2.358s

In this Crystal version, we use fibers instead of goroutines, but the concept is similar. Channels are used for communication between fibers, just like in the original example. The spawn keyword is used to create new fibers, which is analogous to using go in the original code.

The structure and functionality of the program remain the same, demonstrating how to create a pool of workers that can process jobs concurrently.

查看推荐产品