Worker Pools in C++

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <vector>

// Define a thread-safe queue for jobs
class SafeQueue {
private:
    std::queue<int> queue;
    std::mutex mutex;
    std::condition_variable cond;

public:
    void push(int value) {
        std::unique_lock<std::mutex> lock(mutex);
        queue.push(value);
        cond.notify_one();
    }

    bool pop(int& value) {
        std::unique_lock<std::mutex> lock(mutex);
        if (queue.empty()) {
            return false;
        }
        value = queue.front();
        queue.pop();
        return true;
    }

    bool empty() {
        std::unique_lock<std::mutex> lock(mutex);
        return queue.empty();
    }
};

// Worker function
void worker(int id, SafeQueue& jobs, SafeQueue& results) {
    while (true) {
        int job;
        if (jobs.pop(job)) {
            std::cout << "worker " << id << " started  job " << job << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::cout << "worker " << id << " finished job " << job << std::endl;
            results.push(job * 2);
        } else {
            break;
        }
    }
}

int main() {
    const int numJobs = 5;
    SafeQueue jobs;
    SafeQueue results;

    // Start 3 worker threads
    std::vector<std::thread> workers;
    for (int w = 1; w <= 3; w++) {
        workers.emplace_back(worker, w, std::ref(jobs), std::ref(results));
    }

    // Add jobs to the queue
    for (int j = 1; j <= numJobs; j++) {
        jobs.push(j);
    }

    // Wait for all jobs to be processed
    while (true) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        if (jobs.empty() && results.empty()) {
            break;
        }
    }

    // Signal workers to finish
    for (int i = 0; i < 3; i++) {
        jobs.push(-1);
    }

    // Wait for all worker threads to finish
    for (auto& w : workers) {
        w.join();
    }

    return 0;
}

In this example, we’ll look at how to implement a worker pool using threads and a thread-safe queue.

Here’s the worker function, of which we’ll run several concurrent instances. These workers will receive work on the jobs queue and send the corresponding results on the results queue. We’ll sleep a second per job to simulate an expensive task.

In order to use our pool of workers, we need to send them work and collect their results. We make 2 thread-safe queues for this.

This starts up 3 worker threads, initially waiting because there are no jobs yet.

Here we send 5 jobs to the jobs queue.

Finally, we wait for all the jobs to be processed and results to be collected. This also ensures that the worker threads have finished. An alternative way to wait for multiple threads is to use std::future.

Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

To compile and run the program:

$ g++ -std=c++11 -pthread worker_pools.cpp -o worker_pools
$ ./worker_pools
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

This C++ implementation uses std::thread for concurrency and custom thread-safe queues for job distribution and result collection. The overall structure and behavior are similar to the original example, demonstrating the concept of worker pools in C++.