Stateful Goroutines in Fortress

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

class ReadOp {
    int key;
    BlockingQueue<Integer> resp;

    ReadOp(int key) {
        this.key = key;
        this.resp = new LinkedBlockingQueue<>();
    }
}

class WriteOp {
    int key;
    int val;
    BlockingQueue<Boolean> resp;

    WriteOp(int key, int val) {
        this.key = key;
        this.val = val;
        this.resp = new LinkedBlockingQueue<>();
    }
}

public class StatefulThreads {
    public static void main(String[] args) throws InterruptedException {
        AtomicLong readOps = new AtomicLong();
        AtomicLong writeOps = new AtomicLong();

        BlockingQueue<ReadOp> reads = new LinkedBlockingQueue<>();
        BlockingQueue<WriteOp> writes = new LinkedBlockingQueue<>();

        Thread stateManager = new Thread(() -> {
            Map<Integer, Integer> state = new HashMap<>();
            while (true) {
                try {
                    ReadOp read = reads.poll();
                    if (read != null) {
                        read.resp.put(state.getOrDefault(read.key, 0));
                        continue;
                    }

                    WriteOp write = writes.poll();
                    if (write != null) {
                        state.put(write.key, write.val);
                        write.resp.put(true);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        stateManager.start();

        for (int r = 0; r < 100; r++) {
            new Thread(() -> {
                Random rand = new Random();
                while (true) {
                    try {
                        ReadOp read = new ReadOp(rand.nextInt(5));
                        reads.put(read);
                        read.resp.take();
                        readOps.incrementAndGet();
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }).start();
        }

        for (int w = 0; w < 10; w++) {
            new Thread(() -> {
                Random rand = new Random();
                while (true) {
                    try {
                        WriteOp write = new WriteOp(rand.nextInt(5), rand.nextInt(100));
                        writes.put(write);
                        write.resp.take();
                        writeOps.incrementAndGet();
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }).start();
        }

        Thread.sleep(1000);

        System.out.println("readOps: " + readOps.get());
        System.out.println("writeOps: " + writeOps.get());
    }
}

In this example, we use Java’s concurrency features to manage shared state across multiple threads. This approach aligns with the idea of sharing memory by communicating and having each piece of data owned by exactly one thread.

The ReadOp and WriteOp classes encapsulate read and write requests, along with a way for the owning thread to respond.

We create a single thread that owns the state, which is a Map private to this thread. This thread repeatedly checks for read and write requests, responding to them as they arrive.

We start 100 threads to issue reads to the state-owning thread via the reads queue. Each read requires constructing a ReadOp, sending it over the reads queue, and then receiving the result over the provided resp queue.

Similarly, we start 10 threads for writes, using a similar approach.

After letting the threads work for a second, we capture and report the operation counts.

Running this program shows that the thread-based state management example completes about 80,000 total operations:

$ java StatefulThreads
readOps: 71708
writeOps: 7177

For this particular case, the thread-based approach in Java is a bit more involved than a mutex-based one might be. It might be useful in certain cases though, for example where you have other queues involved or when managing multiple locks would be error-prone. You should use whichever approach feels most natural, especially with respect to understanding the correctness of your program.

查看推荐产品