Worker Pools in Cilk

#include <cilk/cilk.h>
#include <cilk/cilk_api.h>
#include <iostream>
#include <chrono>
#include <thread>
#include <vector>

// Here's the worker function, of which we'll run several
// concurrent instances. These workers will receive
// work from the `jobs` vector and send the corresponding
// results to the `results` vector. We'll sleep a second per job to
// simulate an expensive task.
void worker(int id, std::vector<int>& jobs, std::vector<int>& results, int& job_index, int& result_index) {
    while (true) {
        int j;
        {
            __cilkrts_worker *w = __cilkrts_get_tls_worker();
            if (job_index >= jobs.size()) break;
            j = jobs[job_index++];
        }
        
        std::cout << "worker " << id << " started  job " << j << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "worker " << id << " finished job " << j << std::endl;
        
        {
            __cilkrts_worker *w = __cilkrts_get_tls_worker();
            results[result_index++] = j * 2;
        }
    }
}

int main() {
    const int numJobs = 5;
    std::vector<int> jobs(numJobs);
    std::vector<int> results(numJobs);
    int job_index = 0;
    int result_index = 0;

    // Prepare the jobs
    for (int j = 1; j <= numJobs; j++) {
        jobs[j-1] = j;
    }

    // This starts up 3 workers
    cilk_for (int w = 1; w <= 3; w++) {
        worker(w, jobs, results, job_index, result_index);
    }

    // Print the results
    for (int a = 0; a < numJobs; a++) {
        std::cout << "Result: " << results[a] << std::endl;
    }

    return 0;
}

In this example, we’ve implemented a worker pool using Cilk, which is a language extension for C and C++ that supports parallel programming. Here’s an explanation of the key differences and adaptations:

  1. Instead of using channels, we use shared vectors jobs and results to pass work and collect results.

  2. The worker function now takes additional parameters for the shared vectors and indices.

  3. We use cilk_for to spawn multiple worker instances concurrently.

  4. Synchronization is handled implicitly by Cilk, but we use __cilkrts_worker to ensure thread-safe access to shared indices.

  5. Instead of using Go’s time.Sleep, we use std::this_thread::sleep_for from the C++ standard library.

To compile and run this program, you would need a Cilk-enabled compiler. The exact command might vary, but it could look something like this:

$ cilk++ -O3 worker_pools.cpp -o worker_pools
$ ./worker_pools

This program demonstrates how to implement a worker pool pattern in Cilk, showcasing parallel execution of tasks. The output will show the 5 jobs being executed by various workers, and the program should complete in about 2 seconds despite doing about 5 seconds of total work, thanks to the concurrent execution of 3 workers.

查看推荐产品