Worker Pools in Haskell

Our example demonstrates how to implement a worker pool using Haskell’s concurrency primitives.

import Control.Concurrent
import Control.Monad
import Text.Printf

-- This is our worker function. It will receive jobs from the jobs channel
-- and send results to the results channel. We'll use threadDelay to 
-- simulate an expensive task.
worker :: Int -> Chan Int -> Chan Int -> IO ()
worker id jobs results = forever $ do
    job <- readChan jobs
    printf "worker %d started  job %d\n" id job
    threadDelay 1000000  -- sleep for 1 second
    printf "worker %d finished job %d\n" id job
    writeChan results (job * 2)

main :: IO ()
main = do
    -- Create channels for jobs and results
    let numJobs = 5
    jobs <- newChan
    results <- newChan

    -- Start 3 worker threads
    forM_ [1..3] $ \w ->
        forkIO $ worker w jobs results

    -- Send 5 jobs and close the jobs channel
    forM_ [1..numJobs] $ \j ->
        writeChan jobs j

    -- Collect all the results
    replicateM_ numJobs $ do
        _ <- readChan results
        return ()

In this example, we use Haskell’s Chan type to create channels for communication between threads. The worker function represents our worker, which receives jobs from the jobs channel and sends results to the results channel.

In the main function:

  1. We create channels for jobs and results.
  2. We start 3 worker threads using forkIO.
  3. We send 5 jobs to the jobs channel.
  4. Finally, we collect all the results from the results channel.

This program demonstrates concurrent execution of tasks. Despite doing about 5 seconds of total work, the program should complete in about 2 seconds because there are 3 workers operating concurrently.

To run the program:

$ ghc -threaded worker_pools.hs
$ ./worker_pools
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

This output shows the 5 jobs being executed by various workers. Note that the exact order of execution may vary due to the concurrent nature of the program.

In Haskell, we use forkIO to create lightweight threads, which are similar in concept to goroutines. The Chan type provides a way for these threads to communicate, similar to channels in other languages.

查看推荐产品