Stateful Goroutines in D Programming Language

In this example, we’ll explore how to manage state using channels and threads in D. This approach aligns with D’s philosophy of sharing memory by communicating and having each piece of data owned by exactly one thread.

import std.stdio;
import std.random;
import std.concurrency;
import std.datetime;
import core.atomic;

struct ReadOp {
    int key;
    Tid resp;
}

struct WriteOp {
    int key;
    int val;
    Tid resp;
}

void main() {
    shared uint readOps = 0;
    shared uint writeOps = 0;

    auto reads = new Tid[](100);
    auto writes = new Tid[](10);

    // This is the thread that owns the state
    auto stateThread = spawn((Tid parent) {
        int[int] state;
        while (true) {
            receive(
                (ReadOp read) {
                    send(read.resp, state.get(read.key, 0));
                },
                (WriteOp write) {
                    state[write.key] = write.val;
                    send(write.resp, true);
                }
            );
        }
    }, thisTid);

    // Start 100 read threads
    foreach (i; 0..100) {
        reads[i] = spawn((Tid parent, Tid stateThread) {
            while (true) {
                auto read = ReadOp(uniform(0, 5), thisTid);
                send(stateThread, read);
                receiveOnly!int;
                atomicOp!"+="(readOps, 1);
                Thread.sleep(1.msecs);
            }
        }, thisTid, stateThread);
    }

    // Start 10 write threads
    foreach (i; 0..10) {
        writes[i] = spawn((Tid parent, Tid stateThread) {
            while (true) {
                auto write = WriteOp(uniform(0, 5), uniform(0, 100), thisTid);
                send(stateThread, write);
                receiveOnly!bool;
                atomicOp!"+="(writeOps, 1);
                Thread.sleep(1.msecs);
            }
        }, thisTid, stateThread);
    }

    // Let the threads work for a second
    Thread.sleep(1.seconds);

    // Capture and report the op counts
    writeln("readOps: ", atomicLoad(readOps));
    writeln("writeOps: ", atomicLoad(writeOps));
}

In this D version, we use std.concurrency to create threads and communicate between them using messages. The spawn function is used to create new threads, and send and receive functions are used for inter-thread communication.

The state is managed by a dedicated thread (stateThread) that owns a private state associative array. Other threads send read and write requests to this thread using ReadOp and WriteOp structs.

We create 100 read threads and 10 write threads, each continuously sending requests to the state thread. The number of operations is tracked using atomic operations to ensure thread-safety.

After letting the threads run for a second, we print out the total number of read and write operations performed.

To run this program:

$ dmd -run stateful_threads.d
readOps: 71708
writeOps: 7177

This approach demonstrates how to use message passing and dedicated threads for state management in D. It’s particularly useful in scenarios where you need to manage complex shared state or when dealing with multiple synchronization primitives would be error-prone. Choose the approach that feels most natural and makes it easiest to reason about the correctness of your program.