Stateful Goroutines in Dart

Our example demonstrates how to manage state using isolates and message passing in Dart. This approach aligns with Dart’s asynchronous programming model and provides a way to share data between isolates through communication.

import 'dart:async';
import 'dart:isolate';
import 'dart:math';

class ReadOp {
  final int key;
  final SendPort responsePort;

  ReadOp(this.key, this.responsePort);
}

class WriteOp {
  final int key;
  final int value;
  final SendPort responsePort;

  WriteOp(this.key, this.value, this.responsePort);
}

void main() async {
  final reads = ReceivePort();
  final writes = ReceivePort();
  
  // Start the isolate that owns the state
  await Isolate.spawn(stateManager, [reads.sendPort, writes.sendPort]);

  int readOps = 0;
  int writeOps = 0;

  // Start 100 read operations
  for (int r = 0; r < 100; r++) {
    readWorker(reads.sendPort, () => readOps++);
  }

  // Start 10 write operations
  for (int w = 0; w < 10; w++) {
    writeWorker(writes.sendPort, () => writeOps++);
  }

  // Let the operations run for a second
  await Future.delayed(Duration(seconds: 1));

  print('readOps: $readOps');
  print('writeOps: $writeOps');

  // Close the ports
  reads.close();
  writes.close();
}

void stateManager(List<SendPort> ports) {
  final readPort = ReceivePort();
  final writePort = ReceivePort();
  ports[0].send(readPort.sendPort);
  ports[1].send(writePort.sendPort);

  final state = <int, int>{};

  readPort.listen((message) {
    if (message is ReadOp) {
      final value = state[message.key] ?? 0;
      message.responsePort.send(value);
    }
  });

  writePort.listen((message) {
    if (message is WriteOp) {
      state[message.key] = message.value;
      message.responsePort.send(true);
    }
  });
}

void readWorker(SendPort sendPort, Function() incrementOps) async {
  final responsePort = ReceivePort();
  final random = Random();

  while (true) {
    final read = ReadOp(random.nextInt(5), responsePort.sendPort);
    sendPort.send(read);
    await responsePort.first;
    incrementOps();
    await Future.delayed(Duration(milliseconds: 1));
  }
}

void writeWorker(SendPort sendPort, Function() incrementOps) async {
  final responsePort = ReceivePort();
  final random = Random();

  while (true) {
    final write = WriteOp(random.nextInt(5), random.nextInt(100), responsePort.sendPort);
    sendPort.send(write);
    await responsePort.first;
    incrementOps();
    await Future.delayed(Duration(milliseconds: 1));
  }
}

In this Dart example, we use isolates to manage shared state. The stateManager function runs in a separate isolate and owns the state (a map). Other parts of the program communicate with this isolate through message passing.

We create ReadOp and WriteOp classes to encapsulate read and write requests. These are sent to the state-owning isolate via SendPorts.

The main function sets up the isolates and starts 100 read workers and 10 write workers. Each worker runs in a loop, sending read or write requests to the state manager and incrementing a counter.

The state manager listens for incoming messages on its ports and responds to read and write requests accordingly.

After letting the operations run for a second, we print out the total number of read and write operations performed.

This approach ensures that the state is accessed safely in a concurrent environment, as all access is serialized through the state-owning isolate. It’s worth noting that while this example demonstrates the concept, for simple cases in Dart, you might prefer using other concurrency primitives like Futures or Streams, depending on your specific needs.

Running our program might show output similar to this:

readOps: 71708
writeOps: 7177

The exact numbers will vary depending on the system and runtime conditions.