Stateful Goroutines in Logo

In the previous example, we used explicit locking with mutexes to synchronize access to shared state across multiple threads. Another option is to use the built-in synchronization features of Java’s concurrent collections and executors to achieve the same result. This approach aligns with Java’s ideas of sharing memory by communicating and having each piece of data managed by a single thread.

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Random;

class ReadOp {
    int key;
    CompletableFuture<Integer> response;

    ReadOp(int key) {
        this.key = key;
        this.response = new CompletableFuture<>();
    }
}

class WriteOp {
    int key;
    int val;
    CompletableFuture<Boolean> response;

    WriteOp(int key, int val) {
        this.key = key;
        this.val = val;
        this.response = new CompletableFuture<>();
    }
}

public class StatefulConcurrency {
    public static void main(String[] args) throws InterruptedException {
        // As before we'll count how many operations we perform.
        AtomicLong readOps = new AtomicLong();
        AtomicLong writeOps = new AtomicLong();

        // The reads and writes queues will be used by other threads to issue read and write requests, respectively.
        BlockingQueue<ReadOp> reads = new LinkedBlockingQueue<>();
        BlockingQueue<WriteOp> writes = new LinkedBlockingQueue<>();

        // Here is the thread that owns the state, which is a ConcurrentHashMap in this case.
        // This thread repeatedly takes from the reads and writes queues, responding to requests as they arrive.
        ExecutorService stateManager = Executors.newSingleThreadExecutor();
        stateManager.submit(() -> {
            ConcurrentHashMap<Integer, Integer> state = new ConcurrentHashMap<>();
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    ReadOp read = reads.poll();
                    if (read != null) {
                        read.response.complete(state.getOrDefault(read.key, 0));
                        continue;
                    }

                    WriteOp write = writes.poll();
                    if (write != null) {
                        state.put(write.key, write.val);
                        write.response.complete(true);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // This starts 100 threads to issue reads to the state-managing thread via the reads queue.
        ExecutorService readers = Executors.newFixedThreadPool(100);
        for (int r = 0; r < 100; r++) {
            readers.submit(() -> {
                Random rand = new Random();
                while (!Thread.currentThread().isInterrupted()) {
                    ReadOp read = new ReadOp(rand.nextInt(5));
                    reads.offer(read);
                    try {
                        read.response.get();
                        readOps.incrementAndGet();
                        Thread.sleep(1);
                    } catch (InterruptedException | ExecutionException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }

        // We start 10 writer threads as well, using a similar approach.
        ExecutorService writers = Executors.newFixedThreadPool(10);
        for (int w = 0; w < 10; w++) {
            writers.submit(() -> {
                Random rand = new Random();
                while (!Thread.currentThread().isInterrupted()) {
                    WriteOp write = new WriteOp(rand.nextInt(5), rand.nextInt(100));
                    writes.offer(write);
                    try {
                        write.response.get();
                        writeOps.incrementAndGet();
                        Thread.sleep(1);
                    } catch (InterruptedException | ExecutionException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }

        // Let the threads work for a second.
        Thread.sleep(1000);

        // Finally, capture and report the op counts.
        System.out.println("readOps: " + readOps.get());
        System.out.println("writeOps: " + writeOps.get());

        // Shutdown the executors
        stateManager.shutdownNow();
        readers.shutdownNow();
        writers.shutdownNow();
    }
}

Running our program shows that the thread-based state management example completes about 80,000 total operations.

$ javac StatefulConcurrency.java
$ java StatefulConcurrency
readOps: 71708
writeOps: 7177

For this particular case, the thread-based approach was a bit more involved than a mutex-based one might be. It might be useful in certain cases though, for example where you have other concurrent collections involved or when managing multiple such synchronization primitives would be error-prone. You should use whichever approach feels most natural, especially with respect to understanding the correctness of your program.