Worker Pools in R Programming Language

Our example demonstrates how to implement a worker pool using R’s parallel processing capabilities.

library(parallel)

# Worker function
worker <- function(id, job) {
  cat("worker", id, "started  job", job, "\n")
  Sys.sleep(1)  # Simulate an expensive task
  cat("worker", id, "finished job", job, "\n")
  return(job * 2)
}

# Main function
main <- function() {
  num_jobs <- 5
  num_workers <- 3
  
  # Create a cluster of worker processes
  cl <- makeCluster(num_workers)
  
  # Prepare the jobs
  jobs <- 1:num_jobs
  
  # Distribute jobs to workers
  results <- parLapply(cl, jobs, function(job) {
    worker_id <- Sys.getpid() %% 1000  # Get a unique worker ID
    worker(worker_id, job)
  })
  
  # Stop the cluster
  stopCluster(cl)
  
  # Print results
  print(unlist(results))
}

# Run the main function
main()

In this R implementation:

  1. We use the parallel package to create a cluster of worker processes.

  2. The worker function simulates an expensive task by sleeping for 1 second and then returns the job number multiplied by 2.

  3. In the main function, we create a cluster with 3 worker processes using makeCluster.

  4. We use parLapply to distribute the jobs across the worker processes. This is similar to sending jobs through a channel in the original example.

  5. Each worker process executes the worker function for its assigned jobs.

  6. After all jobs are completed, we stop the cluster and print the results.

When you run this program, you’ll see output similar to this:

worker 123 started  job 1
worker 456 started  job 2
worker 789 started  job 3
worker 123 finished job 1
worker 123 started  job 4
worker 456 finished job 2
worker 456 started  job 5
worker 789 finished job 3
worker 123 finished job 4
worker 456 finished job 5
[1]  2  4  6  8 10

The program takes about 2 seconds to complete despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

Note that R’s parallel processing model is different from the original example’s goroutines and channels. However, this implementation achieves a similar result of distributing work across multiple workers for concurrent processing.

查看推荐产品