Closing Channels in Dart

Closing a stream in Dart indicates that no more values will be sent on it. This can be useful to communicate completion to the stream’s listeners.

In this example, we’ll use a jobs stream to communicate work to be done from the main() function to a worker function. When we have no more jobs for the worker, we’ll close the jobs stream.

import 'dart:async';

void main() async {
  final jobs = StreamController<int>();
  final done = Completer<bool>();

  // Here's the worker function. It repeatedly listens to
  // the `jobs` stream. When the stream is closed and all
  // values have been received, the `listen` callback will
  // complete. We use this to complete the `done` future
  // when we've worked all our jobs.
  jobs.stream.listen((j) {
    print('received job $j');
  }, onDone: () {
    print('received all jobs');
    done.complete(true);
  });

  // This sends 3 jobs to the worker over the `jobs`
  // stream, then closes it.
  for (var j = 1; j <= 3; j++) {
    jobs.add(j);
    print('sent job $j');
  }
  await jobs.close();
  print('sent all jobs');

  // We await the worker using the `done` future.
  await done.future;

  // In Dart, once a stream is closed, any further attempts
  // to listen to it will result in an empty stream.
  final moreJobs = await jobs.stream.isEmpty;
  print('received more jobs: $moreJobs');
}

When you run this program, you should see output similar to this:

sent job 1
received job 1
sent job 2
received job 2
sent job 3
received job 3
sent all jobs
received all jobs
received more jobs: true

In Dart, streams are similar to channels in other languages. The StreamController class is used to create a stream that can be written to and listened from. The Completer class is used to create a future that can be completed later, which is similar to how we used channels for synchronization in the original example.

The concept of closing a stream leads naturally to our next example: listening to streams.