Worker Pools in Dart
Our example demonstrates how to implement a worker pool using Dart’s isolates and streams.
import 'dart:isolate';
import 'dart:async';
// This is our worker function, which will run in separate isolates.
// It receives jobs through the jobs ReceivePort and sends results
// back through the results SendPort.
void worker(List<dynamic> args) {
int id = args[0];
ReceivePort jobs = args[1];
SendPort results = args[2];
jobs.listen((job) {
print('worker $id started job $job');
sleep(Duration(seconds: 1)); // Simulate an expensive task
print('worker $id finished job $job');
results.send(job * 2);
});
}
Future<void> main() async {
const numJobs = 5;
final jobs = StreamController<int>();
final results = ReceivePort();
// Start 3 worker isolates
for (var w = 1; w <= 3; w++) {
ReceivePort workerJobs = ReceivePort();
await Isolate.spawn(worker, [w, workerJobs.sendPort, results.sendPort]);
jobs.stream.listen((job) {
workerJobs.sendPort.send(job);
});
}
// Send 5 jobs
for (var j = 1; j <= numJobs; j++) {
jobs.add(j);
}
// Wait for all results
await for (var _ in results.take(numJobs)) {}
// Clean up
jobs.close();
results.close();
}
In this example, we use Dart’s Isolate
s to create a worker pool. Here’s how it works:
We define a
worker
function that will run in separate isolates. It receives jobs through aReceivePort
and sends results back through aSendPort
.In the
main
function, we create aStreamController
for jobs and aReceivePort
for results.We spawn three worker isolates using
Isolate.spawn
. Each worker is given its ID, aSendPort
to receive jobs, and aSendPort
to send results.We send 5 jobs by adding them to the
jobs
StreamController.We wait for all results using
await for
on the results ReceivePort.
To run this program, save it as worker_pools.dart
and use the dart
command:
$ dart worker_pools.dart
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 2 finished job 2
worker 3 finished job 3
worker 1 started job 4
worker 2 started job 5
worker 1 finished job 4
worker 2 finished job 5
The program demonstrates how the 5 jobs are executed by various workers. It takes about 2 seconds to complete despite doing about 5 seconds of total work because there are 3 workers operating concurrently.
Note that Dart’s isolates are similar to threads but do not share memory, which is different from the original example’s goroutines. However, they serve a similar purpose in enabling concurrent execution.