Closing Channels in Dart
Closing a stream in Dart indicates that no more values will be sent on it. This can be useful to communicate completion to the stream’s listeners.
In this example, we’ll use a jobs
stream to communicate work to be done from the main()
function to a worker function. When we have no more jobs for the worker, we’ll close the jobs
stream.
import 'dart:async';
void main() async {
final jobs = StreamController<int>();
final done = Completer<bool>();
// Here's the worker function. It repeatedly listens to
// the `jobs` stream. When the stream is closed and all
// values have been received, the `listen` callback will
// complete. We use this to complete the `done` future
// when we've worked all our jobs.
jobs.stream.listen((j) {
print('received job $j');
}, onDone: () {
print('received all jobs');
done.complete(true);
});
// This sends 3 jobs to the worker over the `jobs`
// stream, then closes it.
for (var j = 1; j <= 3; j++) {
jobs.add(j);
print('sent job $j');
}
await jobs.close();
print('sent all jobs');
// We await the worker using the `done` future.
await done.future;
// In Dart, once a stream is closed, any further attempts
// to listen to it will result in an empty stream.
final moreJobs = await jobs.stream.isEmpty;
print('received more jobs: $moreJobs');
}
When you run this program, you should see output similar to this:
sent job 1
received job 1
sent job 2
received job 2
sent job 3
received job 3
sent all jobs
received all jobs
received more jobs: true
In Dart, streams are similar to channels in other languages. The StreamController
class is used to create a stream that can be written to and listened from. The Completer
class is used to create a future that can be completed later, which is similar to how we used channels for synchronization in the original example.
The concept of closing a stream leads naturally to our next example: listening to streams.