Worker Pools in Clojure
(ns worker-pools
(:require [clojure.core.async :as async]))
;; Here's the worker, of which we'll run several
;; concurrent instances. These workers will receive
;; work on the `jobs` channel and send the corresponding
;; results on `results`. We'll sleep a second per job to
;; simulate an expensive task.
(defn worker [id jobs results]
(async/go-loop []
(when-let [j (async/<! jobs)]
(println "worker" id "started job" j)
(Thread/sleep 1000)
(println "worker" id "finished job" j)
(async/>! results (* j 2))
(recur))))
(defn -main []
;; In order to use our pool of workers we need to send
;; them work and collect their results. We make 2
;; channels for this.
(let [num-jobs 5
jobs (async/chan num-jobs)
results (async/chan num-jobs)]
;; This starts up 3 workers, initially blocked
;; because there are no jobs yet.
(dotimes [w 3]
(worker (inc w) jobs results))
;; Here we send 5 `jobs` and then `close` that
;; channel to indicate that's all the work we have.
(doseq [j (range 1 (inc num-jobs))]
(async/>!! jobs j))
(async/close! jobs)
;; Finally we collect all the results of the work.
;; This also ensures that the worker processes have
;; finished.
(dotimes [_ num-jobs]
(async/<!! results))))
;; To run the program:
;; $ lein run
;;
;; Our running program shows the 5 jobs being executed by
;; various workers. The program only takes about 2 seconds
;; despite doing about 5 seconds of total work because
;; there are 3 workers operating concurrently.
;;
;; worker 1 started job 1
;; worker 2 started job 2
;; worker 3 started job 3
;; worker 1 finished job 1
;; worker 1 started job 4
;; worker 2 finished job 2
;; worker 2 started job 5
;; worker 3 finished job 3
;; worker 1 finished job 4
;; worker 2 finished job 5
;;
;; real 0m2.358s
This Clojure implementation uses the core.async
library to create a worker pool similar to the original example. Here’s a breakdown of the changes:
We use
ns
to define our namespace and require thecore.async
library.The
worker
function is defined usingdefn
and usesgo-loop
to create a process that continuously reads from thejobs
channel and writes to theresults
channel.Instead of goroutines, we use
core.async
’sgo
blocks, which provide similar concurrency semantics in Clojure.Channels are created using
async/chan
instead ofmake(chan)
.We use
dotimes
to start the workers and send jobs, which is similar to thefor
loops in the original example.Channel operations use
async/<!!
andasync/>!!
for blocking reads and writes, andasync/<!
andasync/>!
for non-blocking operations insidego
blocks.We use
Thread/sleep
instead oftime.Sleep
to simulate work.
The overall structure and behavior of the program remain the same, demonstrating how to implement a worker pool using Clojure’s concurrency primitives.