Worker Pools in C#

Our example demonstrates how to implement a worker pool using threads and channels in C#.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class WorkerPool
{
    // This is our worker, of which we'll run several concurrent instances.
    // These workers will receive work on the jobs channel and send the corresponding
    // results on results. We'll sleep a second per job to simulate an expensive task.
    static void Worker(int id, BlockingCollection<int> jobs, BlockingCollection<int> results)
    {
        foreach (var j in jobs.GetConsumingEnumerable())
        {
            Console.WriteLine($"worker {id} started  job {j}");
            Thread.Sleep(1000);
            Console.WriteLine($"worker {id} finished job {j}");
            results.Add(j * 2);
        }
    }

    static async Task Main()
    {
        // In order to use our pool of workers we need to send
        // them work and collect their results. We make 2
        // channels for this.
        const int numJobs = 5;
        var jobs = new BlockingCollection<int>(numJobs);
        var results = new BlockingCollection<int>(numJobs);

        // This starts up 3 workers, initially blocked
        // because there are no jobs yet.
        for (int w = 1; w <= 3; w++)
        {
            int workerId = w;
            Task.Run(() => Worker(workerId, jobs, results));
        }

        // Here we send 5 jobs and then close that
        // channel to indicate that's all the work we have.
        for (int j = 1; j <= numJobs; j++)
        {
            jobs.Add(j);
        }
        jobs.CompleteAdding();

        // Finally we collect all the results of the work.
        // This also ensures that the worker tasks have finished.
        for (int a = 1; a <= numJobs; a++)
        {
            results.Take();
        }

        results.CompleteAdding();
    }
}

Our program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

To run the program, save it as WorkerPool.cs and use the dotnet command:

$ dotnet run WorkerPool.cs
worker 1 started  job 1
worker 3 started  job 3
worker 2 started  job 2
worker 1 finished job 1
worker 1 started  job 4
worker 3 finished job 3
worker 2 finished job 2
worker 2 started  job 5
worker 1 finished job 4
worker 2 finished job 5

The execution time will be around 2 seconds, demonstrating the concurrent nature of the worker pool.

In this C# implementation, we use BlockingCollection<T> to represent channels, Task.Run() to start worker threads, and GetConsumingEnumerable() to consume items from the collection in a blocking manner. The CompleteAdding() method is used to signal that no more items will be added to the collection, similar to closing a channel in the original example.