Closing Channels in Miranda

In this example, we’ll use a jobs queue to communicate work from the main thread to a worker thread. When we have no more jobs for the worker, we’ll signal the end of work.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ClosingChannels {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> jobs = new LinkedBlockingQueue<>(5);
        Object done = new Object();

        // Here's the worker thread. It repeatedly takes from `jobs`.
        // We use a poison pill (null) to notify that all jobs are done.
        Thread worker = new Thread(() -> {
            while (true) {
                try {
                    Integer j = jobs.take();
                    if (j == null) {
                        System.out.println("received all jobs");
                        synchronized (done) {
                            done.notify();
                        }
                        return;
                    }
                    System.out.println("received job " + j);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        });
        worker.start();

        // This sends 3 jobs to the worker over the `jobs` queue,
        // then sends a null to signal the end of jobs.
        for (int j = 1; j <= 3; j++) {
            jobs.put(j);
            System.out.println("sent job " + j);
        }
        jobs.put(null);
        System.out.println("sent all jobs");

        // We wait for the worker using the synchronization approach.
        synchronized (done) {
            done.wait();
        }

        // Trying to take from a empty queue will block indefinitely,
        // so we use poll() which returns null if the queue is empty.
        Integer j = jobs.poll();
        System.out.println("received more jobs: " + (j != null));
    }
}

To run the program:

$ javac ClosingChannels.java
$ java ClosingChannels
sent job 1
received job 1
sent job 2
received job 2
sent job 3
received job 3
sent all jobs
received all jobs
received more jobs: false

In Java, we don’t have the concept of channels, but we can use a BlockingQueue to achieve similar functionality. Instead of closing a channel, we send a special value (null in this case) to signal that no more values will be sent.

The BlockingQueue interface in Java provides thread-safe operations for adding elements to and removing elements from a queue. The put() method is used to add elements, which will block if the queue is full. The take() method is used to remove and return elements, which will block if the queue is empty.

We use a separate Object (done) for synchronization to wait for the worker thread to finish processing all jobs.

The idea of signaling the end of work leads naturally to other patterns in concurrent programming, such as the producer-consumer pattern.