Stateful Goroutines in C++ Our example demonstrates the use of stateful goroutines for managing shared state. In C++, we’ll use threads and a mutex to achieve similar functionality. This approach aligns with C++’s thread-based concurrency model.
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <unordered_map>
#include <vector>
#include <random>
#include <chrono>
// These structs encapsulate read and write operations
struct ReadOp {
int key ;
std :: promise < int > resp ;
};
struct WriteOp {
int key ;
int val ;
std :: promise < bool > resp ;
};
int main () {
// We'll count how many operations we perform
std :: atomic < uint64_t > readOps ( 0 );
std :: atomic < uint64_t > writeOps ( 0 );
// Channels are replaced with thread-safe queues
std :: queue < ReadOp > reads ;
std :: queue < WriteOp > writes ;
std :: mutex queueMutex ;
// This lambda function represents our stateful goroutine
auto stateManager = [ & ]() {
std :: unordered_map < int , int > state ;
while ( true ) {
std :: unique_lock < std :: mutex > lock ( queueMutex );
if ( ! reads . empty ()) {
auto read = std :: move ( reads . front ());
reads . pop ();
lock . unlock ();
read . resp . set_value ( state [ read . key ]);
} else if ( ! writes . empty ()) {
auto write = std :: move ( writes . front ());
writes . pop ();
lock . unlock ();
state [ write . key ] = write . val ;
write . resp . set_value ( true );
} else {
lock . unlock ();
std :: this_thread :: yield ();
}
}
};
// Start the state manager thread
std :: thread stateThread ( stateManager );
// Start 100 read threads
std :: vector < std :: thread > readThreads ;
for ( int r = 0 ; r < 100 ; ++ r ) {
readThreads . emplace_back ([ & ]() {
std :: random_device rd ;
std :: mt19937 gen ( rd ());
std :: uniform_int_distribution <> dis ( 0 , 4 );
for ( int i = 0 ; i < 100 ; ++ i ) {
ReadOp read { dis ( gen ), std :: promise < int > ()};
auto future = read . resp . get_future ();
{
std :: lock_guard < std :: mutex > lock ( queueMutex );
reads . push ( std :: move ( read ));
}
future . get ();
readOps ++ ;
std :: this_thread :: sleep_for ( std :: chrono :: milliseconds ( 1 ));
}
});
}
// Start 10 write threads
std :: vector < std :: thread > writeThreads ;
for ( int w = 0 ; w < 10 ; ++ w ) {
writeThreads . emplace_back ([ & ]() {
std :: random_device rd ;
std :: mt19937 gen ( rd ());
std :: uniform_int_distribution <> keyDis ( 0 , 4 );
std :: uniform_int_distribution <> valDis ( 0 , 99 );
for ( int i = 0 ; i < 10 ; ++ i ) {
WriteOp write { keyDis ( gen ), valDis ( gen ), std :: promise < bool > ()};
auto future = write . resp . get_future ();
{
std :: lock_guard < std :: mutex > lock ( queueMutex );
writes . push ( std :: move ( write ));
}
future . get ();
writeOps ++ ;
std :: this_thread :: sleep_for ( std :: chrono :: milliseconds ( 1 ));
}
});
}
// Let the threads work for a second
std :: this_thread :: sleep_for ( std :: chrono :: seconds ( 1 ));
// Wait for all threads to finish
for ( auto & t : readThreads ) t . join ();
for ( auto & t : writeThreads ) t . join ();
// Report the operation counts
std :: cout << "readOps: " << readOps << std :: endl ;
std :: cout << "writeOps: " << writeOps << std :: endl ;
return 0 ;
}
This C++ version uses threads instead of goroutines, and a mutex-protected queue instead of channels. The state is managed by a single thread, similar to the goroutine in the original example.
To run the program, compile it with C++11 or later support:
$ g++ -std= c++11 -pthread stateful_threads.cpp -o stateful_threads
$ ./stateful_threads
readOps: 10000
writeOps: 100
The program demonstrates a thread-based approach to state management. While it’s more complex than a simple mutex-based solution, it can be useful in scenarios involving multiple synchronization primitives or when direct mutex management would be error-prone. Choose the approach that makes your program’s correctness easiest to understand and verify.