Stateful Goroutines in AngelScript

Our first example demonstrates how to use stateful goroutines to manage shared state. This approach aligns with the idea of sharing memory by communicating and having each piece of data owned by exactly one thread.

#include <string>
#include <array>
#include <random>
#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#include <unordered_map>
#include <queue>
#include <mutex>
#include <condition_variable>

class ReadOp {
public:
    int key;
    int value;
    std::condition_variable cv;
    std::mutex mutex;
    bool ready = false;
};

class WriteOp {
public:
    int key;
    int value;
    std::condition_variable cv;
    std::mutex mutex;
    bool ready = false;
};

void main() {
    std::atomic<uint64_t> readOps(0);
    std::atomic<uint64_t> writeOps(0);

    std::queue<ReadOp*> reads;
    std::queue<WriteOp*> writes;
    std::mutex readMutex;
    std::mutex writeMutex;

    std::thread stateManager([&]() {
        std::unordered_map<int, int> state;
        while (true) {
            bool processed = false;

            {
                std::unique_lock<std::mutex> lock(readMutex);
                if (!reads.empty()) {
                    auto read = reads.front();
                    reads.pop();
                    lock.unlock();

                    read->value = state[read->key];

                    std::unique_lock<std::mutex> opLock(read->mutex);
                    read->ready = true;
                    read->cv.notify_one();
                    processed = true;
                }
            }

            {
                std::unique_lock<std::mutex> lock(writeMutex);
                if (!writes.empty()) {
                    auto write = writes.front();
                    writes.pop();
                    lock.unlock();

                    state[write->key] = write->value;

                    std::unique_lock<std::mutex> opLock(write->mutex);
                    write->ready = true;
                    write->cv.notify_one();
                    processed = true;
                }
            }

            if (!processed) {
                std::this_thread::yield();
            }
        }
    });

    std::array<std::thread, 100> readThreads;
    for (int r = 0; r < 100; r++) {
        readThreads[r] = std::thread([&]() {
            std::random_device rd;
            std::mt19937 gen(rd());
            std::uniform_int_distribution<> dis(0, 4);

            while (true) {
                ReadOp read;
                read.key = dis(gen);

                {
                    std::unique_lock<std::mutex> lock(readMutex);
                    reads.push(&read);
                }

                {
                    std::unique_lock<std::mutex> lock(read.mutex);
                    read.cv.wait(lock, [&]{ return read.ready; });
                }

                readOps++;
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            }
        });
    }

    std::array<std::thread, 10> writeThreads;
    for (int w = 0; w < 10; w++) {
        writeThreads[w] = std::thread([&]() {
            std::random_device rd;
            std::mt19937 gen(rd());
            std::uniform_int_distribution<> keyDis(0, 4);
            std::uniform_int_distribution<> valDis(0, 99);

            while (true) {
                WriteOp write;
                write.key = keyDis(gen);
                write.value = valDis(gen);

                {
                    std::unique_lock<std::mutex> lock(writeMutex);
                    writes.push(&write);
                }

                {
                    std::unique_lock<std::mutex> lock(write.mutex);
                    write.cv.wait(lock, [&]{ return write.ready; });
                }

                writeOps++;
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            }
        });
    }

    std::this_thread::sleep_for(std::chrono::seconds(1));

    std::cout << "readOps: " << readOps << std::endl;
    std::cout << "writeOps: " << writeOps << std::endl;

    // Note: In a real application, you would need to implement proper shutdown logic
}

This example demonstrates how to use threads and synchronization primitives in AngelScript to manage shared state. The main concepts are:

  1. We define ReadOp and WriteOp classes to encapsulate read and write operations.
  2. A dedicated state manager thread handles all operations on the shared state.
  3. We use condition variables and mutexes for synchronization between threads.
  4. Multiple reader and writer threads continuously perform operations.

To run the program:

$ angelscript stateful_threads.as
readOps: 71708
writeOps: 7177

The output shows the number of read and write operations completed in one second. Note that the exact numbers may vary due to system performance and scheduling.

This thread-based approach in AngelScript is more complex than using mutexes directly. However, it can be useful in scenarios where you need to manage multiple resources or when direct mutex management would be error-prone. Choose the approach that makes your program’s correctness easiest to understand and verify.