Worker Pools in Haskell
Our example demonstrates how to implement a worker pool using Haskell’s concurrency primitives.
import Control.Concurrent
import Control.Monad
import Text.Printf
-- This is our worker function. It will receive jobs from the jobs channel
-- and send results to the results channel. We'll use threadDelay to
-- simulate an expensive task.
worker :: Int -> Chan Int -> Chan Int -> IO ()
worker id jobs results = forever $ do
job <- readChan jobs
printf "worker %d started job %d\n" id job
threadDelay 1000000 -- sleep for 1 second
printf "worker %d finished job %d\n" id job
writeChan results (job * 2)
main :: IO ()
main = do
-- Create channels for jobs and results
let numJobs = 5
jobs <- newChan
results <- newChan
-- Start 3 worker threads
forM_ [1..3] $ \w ->
forkIO $ worker w jobs results
-- Send 5 jobs and close the jobs channel
forM_ [1..numJobs] $ \j ->
writeChan jobs j
-- Collect all the results
replicateM_ numJobs $ do
_ <- readChan results
return ()
In this example, we use Haskell’s Chan
type to create channels for communication between threads. The worker
function represents our worker, which receives jobs from the jobs
channel and sends results to the results
channel.
In the main
function:
- We create channels for jobs and results.
- We start 3 worker threads using
forkIO
. - We send 5 jobs to the
jobs
channel. - Finally, we collect all the results from the
results
channel.
This program demonstrates concurrent execution of tasks. Despite doing about 5 seconds of total work, the program should complete in about 2 seconds because there are 3 workers operating concurrently.
To run the program:
$ ghc -threaded worker_pools.hs
$ ./worker_pools
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
This output shows the 5 jobs being executed by various workers. Note that the exact order of execution may vary due to the concurrent nature of the program.
In Haskell, we use forkIO
to create lightweight threads, which are similar in concept to goroutines. The Chan
type provides a way for these threads to communicate, similar to channels in other languages.