Worker Pools in Racket

#lang racket

(require racket/async-channel)

; Here's the worker, of which we'll run several
; concurrent instances. These workers will receive
; work on the `jobs` channel and send the corresponding
; results on `results`. We'll sleep a second per job to
; simulate an expensive task.
(define (worker id jobs results)
  (let loop ()
    (match (async-channel-get jobs)
      ['done (void)]
      [j
       (printf "worker ~a started  job ~a\n" id j)
       (sleep 1)
       (printf "worker ~a finished job ~a\n" id j)
       (async-channel-put results (* j 2))
       (loop)])))

(define (main)
  ; In order to use our pool of workers we need to send
  ; them work and collect their results. We make 2
  ; channels for this.
  (define num-jobs 5)
  (define jobs (make-async-channel))
  (define results (make-async-channel))

  ; This starts up 3 workers, initially blocked
  ; because there are no jobs yet.
  (for ([w (in-range 1 4)])
    (thread (lambda () (worker w jobs results))))

  ; Here we send 5 `jobs` and then `close` that
  ; channel to indicate that's all the work we have.
  (for ([j (in-range 1 (add1 num-jobs))])
    (async-channel-put jobs j))
  (for ([_ (in-range 3)])
    (async-channel-put jobs 'done))

  ; Finally we collect all the results of the work.
  ; This also ensures that the worker threads have
  ; finished. An alternative way to wait for multiple
  ; threads is to use a semaphore.
  (for ([_ (in-range num-jobs)])
    (async-channel-get results)))

(main)

Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

$ time racket worker-pools.rkt
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

real    0m2.358s

In this Racket implementation:

  1. We use async-channels instead of channels to simulate the concurrent behavior.
  2. The worker function is defined as a recursive loop that processes jobs until it receives a ‘done signal.
  3. We use thread to create new threads instead of goroutines.
  4. The main function sets up the worker pool and distributes the jobs.
  5. We use sleep to simulate time-consuming tasks.

This example demonstrates how to implement a worker pool pattern in Racket, showing how to distribute work across multiple threads and collect results.

查看推荐产品