Worker Pools in D Programming Language

Our example demonstrates how to implement a worker pool using threads and channels in D.

import std.stdio;
import std.concurrency;
import core.thread;

// Here's the worker, of which we'll run several
// concurrent instances. These workers will receive
// work on the `jobs` channel and send the corresponding
// results on `results`. We'll sleep a second per job to
// simulate an expensive task.
void worker(Tid owner, int id)
{
    while (true)
    {
        receive(
            (int job) {
                writefln("worker %d started  job %d", id, job);
                Thread.sleep(dur!"seconds"(1));
                writefln("worker %d finished job %d", id, job);
                owner.send(job * 2);
            },
            (OwnerTerminated) {
                return;
            }
        );
    }
}

void main()
{
    // In order to use our pool of workers we need to send
    // them work and collect their results. We make 2
    // channels for this.
    const numJobs = 5;
    auto jobs = new Tid[3];
    
    // This starts up 3 workers, initially blocked
    // because there are no jobs yet.
    for (int w = 1; w <= 3; w++)
    {
        jobs[w-1] = spawn(&worker, thisTid, w);
    }

    // Here we send 5 `jobs` and then `close` that
    // channel to indicate that's all the work we have.
    for (int j = 1; j <= numJobs; j++)
    {
        jobs[j % 3].send(j);
    }

    // Finally we collect all the results of the work.
    // This also ensures that the worker threads have
    // finished.
    for (int a = 1; a <= numJobs; a++)
    {
        receive((int result) {
            // We're not using the result, but we could do something with it here.
        });
    }
}

Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

$ dmd -run worker_pools.d
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

This example demonstrates how to use D’s concurrency features to implement a worker pool pattern. We use std.concurrency for message passing between threads, which is similar to channels in other languages. The spawn function is used to create new threads, and receive is used to handle incoming messages.

Note that D’s approach to concurrency is somewhat different from some other languages. Instead of using explicit channels, D uses message passing between threads. The OwnerTerminated message is a built-in message type that’s sent when the owner thread terminates, allowing for clean shutdown of worker threads.

查看推荐产品