Worker Pools in C#
Our example demonstrates how to implement a worker pool using threads and channels in C#.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class WorkerPool
{
// This is our worker, of which we'll run several concurrent instances.
// These workers will receive work on the jobs channel and send the corresponding
// results on results. We'll sleep a second per job to simulate an expensive task.
static void Worker(int id, BlockingCollection<int> jobs, BlockingCollection<int> results)
{
foreach (var j in jobs.GetConsumingEnumerable())
{
Console.WriteLine($"worker {id} started job {j}");
Thread.Sleep(1000);
Console.WriteLine($"worker {id} finished job {j}");
results.Add(j * 2);
}
}
static async Task Main()
{
// In order to use our pool of workers we need to send
// them work and collect their results. We make 2
// channels for this.
const int numJobs = 5;
var jobs = new BlockingCollection<int>(numJobs);
var results = new BlockingCollection<int>(numJobs);
// This starts up 3 workers, initially blocked
// because there are no jobs yet.
for (int w = 1; w <= 3; w++)
{
int workerId = w;
Task.Run(() => Worker(workerId, jobs, results));
}
// Here we send 5 jobs and then close that
// channel to indicate that's all the work we have.
for (int j = 1; j <= numJobs; j++)
{
jobs.Add(j);
}
jobs.CompleteAdding();
// Finally we collect all the results of the work.
// This also ensures that the worker tasks have finished.
for (int a = 1; a <= numJobs; a++)
{
results.Take();
}
results.CompleteAdding();
}
}
Our program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.
To run the program, save it as WorkerPool.cs
and use the dotnet
command:
$ dotnet run WorkerPool.cs
worker 1 started job 1
worker 3 started job 3
worker 2 started job 2
worker 1 finished job 1
worker 1 started job 4
worker 3 finished job 3
worker 2 finished job 2
worker 2 started job 5
worker 1 finished job 4
worker 2 finished job 5
The execution time will be around 2 seconds, demonstrating the concurrent nature of the worker pool.
In this C# implementation, we use BlockingCollection<T>
to represent channels, Task.Run()
to start worker threads, and GetConsumingEnumerable()
to consume items from the collection in a blocking manner. The CompleteAdding()
method is used to signal that no more items will be added to the collection, similar to closing a channel in the original example.