Worker Pools in Dart
Our example demonstrates how to implement a worker pool using Dart’s isolates and streams.
import 'dart:isolate';
import 'dart:async';
// This is our worker function, which will run in separate isolates.
// It receives jobs through the jobs ReceivePort and sends results
// back through the results SendPort.
void worker(List<dynamic> args) {
int id = args[0];
ReceivePort jobs = args[1];
SendPort results = args[2];
jobs.listen((job) {
print('worker $id started job $job');
sleep(Duration(seconds: 1)); // Simulate an expensive task
print('worker $id finished job $job');
results.send(job * 2);
});
}
Future<void> main() async {
const numJobs = 5;
final jobs = StreamController<int>();
final results = ReceivePort();
// Start 3 worker isolates
for (var w = 1; w <= 3; w++) {
ReceivePort workerJobs = ReceivePort();
await Isolate.spawn(worker, [w, workerJobs.sendPort, results.sendPort]);
jobs.stream.listen((job) {
workerJobs.sendPort.send(job);
});
}
// Send 5 jobs
for (var j = 1; j <= numJobs; j++) {
jobs.add(j);
}
// Wait for all results
await for (var _ in results.take(numJobs)) {}
// Clean up
jobs.close();
results.close();
}In this example, we use Dart’s Isolates to create a worker pool. Here’s how it works:
We define a
workerfunction that will run in separate isolates. It receives jobs through aReceivePortand sends results back through aSendPort.In the
mainfunction, we create aStreamControllerfor jobs and aReceivePortfor results.We spawn three worker isolates using
Isolate.spawn. Each worker is given its ID, aSendPortto receive jobs, and aSendPortto send results.We send 5 jobs by adding them to the
jobsStreamController.We wait for all results using
await foron the results ReceivePort.
To run this program, save it as worker_pools.dart and use the dart command:
$ dart worker_pools.dart
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 2 finished job 2
worker 3 finished job 3
worker 1 started job 4
worker 2 started job 5
worker 1 finished job 4
worker 2 finished job 5The program demonstrates how the 5 jobs are executed by various workers. It takes about 2 seconds to complete despite doing about 5 seconds of total work because there are 3 workers operating concurrently.
Note that Dart’s isolates are similar to threads but do not share memory, which is different from the original example’s goroutines. However, they serve a similar purpose in enabling concurrent execution.