Stateful Goroutines in Fortress
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class ReadOp {
int key;
BlockingQueue<Integer> resp;
ReadOp(int key) {
this.key = key;
this.resp = new LinkedBlockingQueue<>();
}
}
class WriteOp {
int key;
int val;
BlockingQueue<Boolean> resp;
WriteOp(int key, int val) {
this.key = key;
this.val = val;
this.resp = new LinkedBlockingQueue<>();
}
}
public class StatefulThreads {
public static void main(String[] args) throws InterruptedException {
AtomicLong readOps = new AtomicLong();
AtomicLong writeOps = new AtomicLong();
BlockingQueue<ReadOp> reads = new LinkedBlockingQueue<>();
BlockingQueue<WriteOp> writes = new LinkedBlockingQueue<>();
Thread stateManager = new Thread(() -> {
Map<Integer, Integer> state = new HashMap<>();
while (true) {
try {
ReadOp read = reads.poll();
if (read != null) {
read.resp.put(state.getOrDefault(read.key, 0));
continue;
}
WriteOp write = writes.poll();
if (write != null) {
state.put(write.key, write.val);
write.resp.put(true);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
stateManager.start();
for (int r = 0; r < 100; r++) {
new Thread(() -> {
Random rand = new Random();
while (true) {
try {
ReadOp read = new ReadOp(rand.nextInt(5));
reads.put(read);
read.resp.take();
readOps.incrementAndGet();
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
for (int w = 0; w < 10; w++) {
new Thread(() -> {
Random rand = new Random();
while (true) {
try {
WriteOp write = new WriteOp(rand.nextInt(5), rand.nextInt(100));
writes.put(write);
write.resp.take();
writeOps.incrementAndGet();
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
Thread.sleep(1000);
System.out.println("readOps: " + readOps.get());
System.out.println("writeOps: " + writeOps.get());
}
}
In this example, we use Java’s concurrency features to manage shared state across multiple threads. This approach aligns with the idea of sharing memory by communicating and having each piece of data owned by exactly one thread.
The ReadOp
and WriteOp
classes encapsulate read and write requests, along with a way for the owning thread to respond.
We create a single thread that owns the state
, which is a Map
private to this thread. This thread repeatedly checks for read and write requests, responding to them as they arrive.
We start 100 threads to issue reads to the state-owning thread via the reads
queue. Each read requires constructing a ReadOp
, sending it over the reads
queue, and then receiving the result over the provided resp
queue.
Similarly, we start 10 threads for writes, using a similar approach.
After letting the threads work for a second, we capture and report the operation counts.
Running this program shows that the thread-based state management example completes about 80,000 total operations:
$ java StatefulThreads
readOps: 71708
writeOps: 7177
For this particular case, the thread-based approach in Java is a bit more involved than a mutex-based one might be. It might be useful in certain cases though, for example where you have other queues involved or when managing multiple locks would be error-prone. You should use whichever approach feels most natural, especially with respect to understanding the correctness of your program.