Stateful Goroutines in Minitab
This example demonstrates how to use stateful goroutines in Java using threads and concurrent data structures.
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
class ReadOp {
int key;
CompletableFuture<Integer> resp;
ReadOp(int key) {
this.key = key;
this.resp = new CompletableFuture<>();
}
}
class WriteOp {
int key;
int val;
CompletableFuture<Boolean> resp;
WriteOp(int key, int val) {
this.key = key;
this.val = val;
this.resp = new CompletableFuture<>();
}
}
public class StatefulThreads {
public static void main(String[] args) throws InterruptedException {
// We'll count how many operations we perform.
AtomicLong readOps = new AtomicLong();
AtomicLong writeOps = new AtomicLong();
// The reads and writes channels will be used by other threads to issue read and write requests.
BlockingQueue<ReadOp> reads = new LinkedBlockingQueue<>();
BlockingQueue<WriteOp> writes = new LinkedBlockingQueue<>();
// Here is the thread that owns the state, which is a map as in the previous example but now private to the stateful thread.
Thread stateManager = new Thread(() -> {
Map<Integer, Integer> state = new ConcurrentHashMap<>();
while (true) {
try {
ReadOp read = reads.poll();
if (read != null) {
read.resp.complete(state.getOrDefault(read.key, 0));
}
WriteOp write = writes.poll();
if (write != null) {
state.put(write.key, write.val);
write.resp.complete(true);
}
if (read == null && write == null) {
Thread.sleep(1);
}
} catch (InterruptedException e) {
break;
}
}
});
stateManager.start();
// This starts 100 threads to issue reads to the state-owning thread via the reads channel.
for (int r = 0; r < 100; r++) {
new Thread(() -> {
while (true) {
ReadOp read = new ReadOp(ThreadLocalRandom.current().nextInt(5));
try {
reads.put(read);
read.resp.get();
readOps.incrementAndGet();
Thread.sleep(1);
} catch (InterruptedException | ExecutionException e) {
break;
}
}
}).start();
}
// We start 10 writes as well, using a similar approach.
for (int w = 0; w < 10; w++) {
new Thread(() -> {
while (true) {
WriteOp write = new WriteOp(
ThreadLocalRandom.current().nextInt(5),
ThreadLocalRandom.current().nextInt(100));
try {
writes.put(write);
write.resp.get();
writeOps.incrementAndGet();
Thread.sleep(1);
} catch (InterruptedException | ExecutionException e) {
break;
}
}
}).start();
}
// Let the threads work for a second.
Thread.sleep(1000);
// Finally, capture and report the op counts.
System.out.println("readOps: " + readOps.get());
System.out.println("writeOps: " + writeOps.get());
System.exit(0);
}
}
Running our program shows that the thread-based state management example completes about 80,000 total operations.
$ javac StatefulThreads.java
$ java StatefulThreads
readOps: 71708
writeOps: 7177
For this particular case, the thread-based approach was a bit more involved than a mutex-based one might be. It might be useful in certain cases though, for example where you have other channels involved or when managing multiple such locks would be error-prone. You should use whichever approach feels most natural, especially with respect to understanding the correctness of your program.
In Java, we’ve used Thread
s to represent the concept of goroutines, BlockingQueue
s for channels, and CompletableFuture
s for the response channels. The ConcurrentHashMap
is used as a thread-safe alternative to the regular Map
in the stateful thread.