Stateful Goroutines in C++
Our example demonstrates the use of stateful goroutines for managing shared state. In C++, we’ll use threads and a mutex to achieve similar functionality. This approach aligns with C++’s thread-based concurrency model.
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <unordered_map>
#include <vector>
#include <random>
#include <chrono>
// These structs encapsulate read and write operations
struct ReadOp {
int key;
std::promise<int> resp;
};
struct WriteOp {
int key;
int val;
std::promise<bool> resp;
};
int main() {
// We'll count how many operations we perform
std::atomic<uint64_t> readOps(0);
std::atomic<uint64_t> writeOps(0);
// Channels are replaced with thread-safe queues
std::queue<ReadOp> reads;
std::queue<WriteOp> writes;
std::mutex queueMutex;
// This lambda function represents our stateful goroutine
auto stateManager = [&]() {
std::unordered_map<int, int> state;
while (true) {
std::unique_lock<std::mutex> lock(queueMutex);
if (!reads.empty()) {
auto read = std::move(reads.front());
reads.pop();
lock.unlock();
read.resp.set_value(state[read.key]);
} else if (!writes.empty()) {
auto write = std::move(writes.front());
writes.pop();
lock.unlock();
state[write.key] = write.val;
write.resp.set_value(true);
} else {
lock.unlock();
std::this_thread::yield();
}
}
};
// Start the state manager thread
std::thread stateThread(stateManager);
// Start 100 read threads
std::vector<std::thread> readThreads;
for (int r = 0; r < 100; ++r) {
readThreads.emplace_back([&]() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 4);
for (int i = 0; i < 100; ++i) {
ReadOp read{dis(gen), std::promise<int>()};
auto future = read.resp.get_future();
{
std::lock_guard<std::mutex> lock(queueMutex);
reads.push(std::move(read));
}
future.get();
readOps++;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
});
}
// Start 10 write threads
std::vector<std::thread> writeThreads;
for (int w = 0; w < 10; ++w) {
writeThreads.emplace_back([&]() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> keyDis(0, 4);
std::uniform_int_distribution<> valDis(0, 99);
for (int i = 0; i < 10; ++i) {
WriteOp write{keyDis(gen), valDis(gen), std::promise<bool>()};
auto future = write.resp.get_future();
{
std::lock_guard<std::mutex> lock(queueMutex);
writes.push(std::move(write));
}
future.get();
writeOps++;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
});
}
// Let the threads work for a second
std::this_thread::sleep_for(std::chrono::seconds(1));
// Wait for all threads to finish
for (auto& t : readThreads) t.join();
for (auto& t : writeThreads) t.join();
// Report the operation counts
std::cout << "readOps: " << readOps << std::endl;
std::cout << "writeOps: " << writeOps << std::endl;
return 0;
}
This C++ version uses threads instead of goroutines, and a mutex-protected queue instead of channels. The state is managed by a single thread, similar to the goroutine in the original example.
To run the program, compile it with C++11 or later support:
$ g++ -std=c++11 -pthread stateful_threads.cpp -o stateful_threads
$ ./stateful_threads
readOps: 10000
writeOps: 100
The program demonstrates a thread-based approach to state management. While it’s more complex than a simple mutex-based solution, it can be useful in scenarios involving multiple synchronization primitives or when direct mutex management would be error-prone. Choose the approach that makes your program’s correctness easiest to understand and verify.