Worker Pools in Clojure

(ns worker-pools
  (:require [clojure.core.async :as async]))

;; Here's the worker, of which we'll run several
;; concurrent instances. These workers will receive
;; work on the `jobs` channel and send the corresponding
;; results on `results`. We'll sleep a second per job to
;; simulate an expensive task.
(defn worker [id jobs results]
  (async/go-loop []
    (when-let [j (async/<! jobs)]
      (println "worker" id "started  job" j)
      (Thread/sleep 1000)
      (println "worker" id "finished job" j)
      (async/>! results (* j 2))
      (recur))))

(defn -main []
  ;; In order to use our pool of workers we need to send
  ;; them work and collect their results. We make 2
  ;; channels for this.
  (let [num-jobs 5
        jobs (async/chan num-jobs)
        results (async/chan num-jobs)]

    ;; This starts up 3 workers, initially blocked
    ;; because there are no jobs yet.
    (dotimes [w 3]
      (worker (inc w) jobs results))

    ;; Here we send 5 `jobs` and then `close` that
    ;; channel to indicate that's all the work we have.
    (doseq [j (range 1 (inc num-jobs))]
      (async/>!! jobs j))
    (async/close! jobs)

    ;; Finally we collect all the results of the work.
    ;; This also ensures that the worker processes have
    ;; finished.
    (dotimes [_ num-jobs]
      (async/<!! results))))

;; To run the program:
;; $ lein run
;;
;; Our running program shows the 5 jobs being executed by
;; various workers. The program only takes about 2 seconds
;; despite doing about 5 seconds of total work because
;; there are 3 workers operating concurrently.
;;
;; worker 1 started  job 1
;; worker 2 started  job 2
;; worker 3 started  job 3
;; worker 1 finished job 1
;; worker 1 started  job 4
;; worker 2 finished job 2
;; worker 2 started  job 5
;; worker 3 finished job 3
;; worker 1 finished job 4
;; worker 2 finished job 5
;;
;; real    0m2.358s

This Clojure implementation uses the core.async library to create a worker pool similar to the original example. Here’s a breakdown of the changes:

  1. We use ns to define our namespace and require the core.async library.

  2. The worker function is defined using defn and uses go-loop to create a process that continuously reads from the jobs channel and writes to the results channel.

  3. Instead of goroutines, we use core.async’s go blocks, which provide similar concurrency semantics in Clojure.

  4. Channels are created using async/chan instead of make(chan).

  5. We use dotimes to start the workers and send jobs, which is similar to the for loops in the original example.

  6. Channel operations use async/<!! and async/>!! for blocking reads and writes, and async/<! and async/>! for non-blocking operations inside go blocks.

  7. We use Thread/sleep instead of time.Sleep to simulate work.

The overall structure and behavior of the program remain the same, demonstrating how to implement a worker pool using Clojure’s concurrency primitives.

查看推荐产品