Stateful Goroutines in Dart
Our example demonstrates how to manage state using isolates and message passing in Dart. This approach aligns with Dart’s asynchronous programming model and provides a way to share data between isolates through communication.
import 'dart:async';
import 'dart:isolate';
import 'dart:math';
class ReadOp {
final int key;
final SendPort responsePort;
ReadOp(this.key, this.responsePort);
}
class WriteOp {
final int key;
final int value;
final SendPort responsePort;
WriteOp(this.key, this.value, this.responsePort);
}
void main() async {
final reads = ReceivePort();
final writes = ReceivePort();
// Start the isolate that owns the state
await Isolate.spawn(stateManager, [reads.sendPort, writes.sendPort]);
int readOps = 0;
int writeOps = 0;
// Start 100 read operations
for (int r = 0; r < 100; r++) {
readWorker(reads.sendPort, () => readOps++);
}
// Start 10 write operations
for (int w = 0; w < 10; w++) {
writeWorker(writes.sendPort, () => writeOps++);
}
// Let the operations run for a second
await Future.delayed(Duration(seconds: 1));
print('readOps: $readOps');
print('writeOps: $writeOps');
// Close the ports
reads.close();
writes.close();
}
void stateManager(List<SendPort> ports) {
final readPort = ReceivePort();
final writePort = ReceivePort();
ports[0].send(readPort.sendPort);
ports[1].send(writePort.sendPort);
final state = <int, int>{};
readPort.listen((message) {
if (message is ReadOp) {
final value = state[message.key] ?? 0;
message.responsePort.send(value);
}
});
writePort.listen((message) {
if (message is WriteOp) {
state[message.key] = message.value;
message.responsePort.send(true);
}
});
}
void readWorker(SendPort sendPort, Function() incrementOps) async {
final responsePort = ReceivePort();
final random = Random();
while (true) {
final read = ReadOp(random.nextInt(5), responsePort.sendPort);
sendPort.send(read);
await responsePort.first;
incrementOps();
await Future.delayed(Duration(milliseconds: 1));
}
}
void writeWorker(SendPort sendPort, Function() incrementOps) async {
final responsePort = ReceivePort();
final random = Random();
while (true) {
final write = WriteOp(random.nextInt(5), random.nextInt(100), responsePort.sendPort);
sendPort.send(write);
await responsePort.first;
incrementOps();
await Future.delayed(Duration(milliseconds: 1));
}
}
In this Dart example, we use isolates to manage shared state. The stateManager
function runs in a separate isolate and owns the state (a map). Other parts of the program communicate with this isolate through message passing.
We create ReadOp
and WriteOp
classes to encapsulate read and write requests. These are sent to the state-owning isolate via SendPort
s.
The main
function sets up the isolates and starts 100 read workers and 10 write workers. Each worker runs in a loop, sending read or write requests to the state manager and incrementing a counter.
The state manager listens for incoming messages on its ports and responds to read and write requests accordingly.
After letting the operations run for a second, we print out the total number of read and write operations performed.
This approach ensures that the state is accessed safely in a concurrent environment, as all access is serialized through the state-owning isolate. It’s worth noting that while this example demonstrates the concept, for simple cases in Dart, you might prefer using other concurrency primitives like Future
s or Stream
s, depending on your specific needs.
Running our program might show output similar to this:
readOps: 71708
writeOps: 7177
The exact numbers will vary depending on the system and runtime conditions.