Worker Pools in Scala

In this example, we’ll look at how to implement a worker pool using Scala’s actors and futures.

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._

// Here's the worker, of which we'll run several concurrent instances.
// These workers will receive work on the `jobs` channel and send the
// corresponding results on `results`. We'll sleep a second per job to
// simulate an expensive task.
class Worker(id: Int) extends Actor {
  def receive = {
    case job: Int =>
      println(s"worker $id started  job $job")
      Thread.sleep(1000)
      println(s"worker $id finished job $job")
      sender() ! (job * 2)
  }
}

object WorkerPool extends App {
  val system = ActorSystem("WorkerSystem")

  // In order to use our pool of workers we need to send them work and
  // collect their results. We make 2 channels for this.
  val numJobs = 5
  val jobs = (1 to numJobs).toList
  val results = scala.collection.mutable.ListBuffer.empty[Int]

  // This starts up 3 workers, initially blocked because there are no jobs yet.
  val workers = (1 to 3).map(id => system.actorOf(Props(new Worker(id))))

  // Here we send 5 `jobs` and then collect the results
  val futures = jobs.map { job =>
    val worker = workers(job % workers.length)
    (worker ? job)(5.seconds).mapTo[Int]
  }

  // Finally we collect all the results of the work.
  // This also ensures that the worker actors have finished.
  val resultsFuture = Future.sequence(futures)

  resultsFuture.onComplete {
    case scala.util.Success(res) =>
      results ++= res
      println(s"All jobs completed. Results: $results")
      system.terminate()
    case scala.util.Failure(e) =>
      println(s"An error occurred: ${e.getMessage}")
      system.terminate()
  }

  Await.result(system.whenTerminated, Duration.Inf)
}

Our program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

To run the program, you would typically use SBT (Scala Build Tool) or similar:

$ sbt "runMain WorkerPool"
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
All jobs completed. Results: ArrayBuffer(2, 4, 6, 8, 10)

This Scala version uses Akka actors to represent workers, and Scala’s Future API to handle asynchronous operations. The structure is similar to the original example, with workers processing jobs concurrently. The main differences are:

  1. We use Akka’s actor system instead of goroutines.
  2. Communication is done through actor messages instead of channels.
  3. We use Scala’s Future API to collect results asynchronously.

This approach showcases Scala’s capabilities for concurrent programming, providing a similar level of concurrency to the original example.

查看推荐产品