Stateful Goroutines in Minitab

This example demonstrates how to use stateful goroutines in Java using threads and concurrent data structures.

import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

class ReadOp {
    int key;
    CompletableFuture<Integer> resp;

    ReadOp(int key) {
        this.key = key;
        this.resp = new CompletableFuture<>();
    }
}

class WriteOp {
    int key;
    int val;
    CompletableFuture<Boolean> resp;

    WriteOp(int key, int val) {
        this.key = key;
        this.val = val;
        this.resp = new CompletableFuture<>();
    }
}

public class StatefulThreads {
    public static void main(String[] args) throws InterruptedException {
        // We'll count how many operations we perform.
        AtomicLong readOps = new AtomicLong();
        AtomicLong writeOps = new AtomicLong();

        // The reads and writes channels will be used by other threads to issue read and write requests.
        BlockingQueue<ReadOp> reads = new LinkedBlockingQueue<>();
        BlockingQueue<WriteOp> writes = new LinkedBlockingQueue<>();

        // Here is the thread that owns the state, which is a map as in the previous example but now private to the stateful thread.
        Thread stateManager = new Thread(() -> {
            Map<Integer, Integer> state = new ConcurrentHashMap<>();
            while (true) {
                try {
                    ReadOp read = reads.poll();
                    if (read != null) {
                        read.resp.complete(state.getOrDefault(read.key, 0));
                    }

                    WriteOp write = writes.poll();
                    if (write != null) {
                        state.put(write.key, write.val);
                        write.resp.complete(true);
                    }

                    if (read == null && write == null) {
                        Thread.sleep(1);
                    }
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        stateManager.start();

        // This starts 100 threads to issue reads to the state-owning thread via the reads channel.
        for (int r = 0; r < 100; r++) {
            new Thread(() -> {
                while (true) {
                    ReadOp read = new ReadOp(ThreadLocalRandom.current().nextInt(5));
                    try {
                        reads.put(read);
                        read.resp.get();
                        readOps.incrementAndGet();
                        Thread.sleep(1);
                    } catch (InterruptedException | ExecutionException e) {
                        break;
                    }
                }
            }).start();
        }

        // We start 10 writes as well, using a similar approach.
        for (int w = 0; w < 10; w++) {
            new Thread(() -> {
                while (true) {
                    WriteOp write = new WriteOp(
                            ThreadLocalRandom.current().nextInt(5),
                            ThreadLocalRandom.current().nextInt(100));
                    try {
                        writes.put(write);
                        write.resp.get();
                        writeOps.incrementAndGet();
                        Thread.sleep(1);
                    } catch (InterruptedException | ExecutionException e) {
                        break;
                    }
                }
            }).start();
        }

        // Let the threads work for a second.
        Thread.sleep(1000);

        // Finally, capture and report the op counts.
        System.out.println("readOps: " + readOps.get());
        System.out.println("writeOps: " + writeOps.get());

        System.exit(0);
    }
}

Running our program shows that the thread-based state management example completes about 80,000 total operations.

$ javac StatefulThreads.java
$ java StatefulThreads
readOps: 71708
writeOps: 7177

For this particular case, the thread-based approach was a bit more involved than a mutex-based one might be. It might be useful in certain cases though, for example where you have other channels involved or when managing multiple such locks would be error-prone. You should use whichever approach feels most natural, especially with respect to understanding the correctness of your program.

In Java, we’ve used Threads to represent the concept of goroutines, BlockingQueues for channels, and CompletableFutures for the response channels. The ConcurrentHashMap is used as a thread-safe alternative to the regular Map in the stateful thread.

查看推荐产品