Worker Pools in Dart

Our example demonstrates how to implement a worker pool using Dart’s isolates and streams.

import 'dart:isolate';
import 'dart:async';

// This is our worker function, which will run in separate isolates.
// It receives jobs through the jobs ReceivePort and sends results
// back through the results SendPort.
void worker(List<dynamic> args) {
  int id = args[0];
  ReceivePort jobs = args[1];
  SendPort results = args[2];

  jobs.listen((job) {
    print('worker $id started  job $job');
    sleep(Duration(seconds: 1)); // Simulate an expensive task
    print('worker $id finished job $job');
    results.send(job * 2);
  });
}

Future<void> main() async {
  const numJobs = 5;
  final jobs = StreamController<int>();
  final results = ReceivePort();

  // Start 3 worker isolates
  for (var w = 1; w <= 3; w++) {
    ReceivePort workerJobs = ReceivePort();
    await Isolate.spawn(worker, [w, workerJobs.sendPort, results.sendPort]);
    jobs.stream.listen((job) {
      workerJobs.sendPort.send(job);
    });
  }

  // Send 5 jobs
  for (var j = 1; j <= numJobs; j++) {
    jobs.add(j);
  }

  // Wait for all results
  await for (var _ in results.take(numJobs)) {}

  // Clean up
  jobs.close();
  results.close();
}

In this example, we use Dart’s Isolates to create a worker pool. Here’s how it works:

  1. We define a worker function that will run in separate isolates. It receives jobs through a ReceivePort and sends results back through a SendPort.

  2. In the main function, we create a StreamController for jobs and a ReceivePort for results.

  3. We spawn three worker isolates using Isolate.spawn. Each worker is given its ID, a SendPort to receive jobs, and a SendPort to send results.

  4. We send 5 jobs by adding them to the jobs StreamController.

  5. We wait for all results using await for on the results ReceivePort.

To run this program, save it as worker_pools.dart and use the dart command:

$ dart worker_pools.dart
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 2 finished job 2
worker 3 finished job 3
worker 1 started  job 4
worker 2 started  job 5
worker 1 finished job 4
worker 2 finished job 5

The program demonstrates how the 5 jobs are executed by various workers. It takes about 2 seconds to complete despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

Note that Dart’s isolates are similar to threads but do not share memory, which is different from the original example’s goroutines. However, they serve a similar purpose in enabling concurrent execution.

查看推荐产品