Stateful Goroutines in Python
In this example, we’ll demonstrate how to manage state using coroutines and queues in Python. This approach aligns with Python’s asynchronous programming model and provides a way to ensure that data is accessed safely in a concurrent environment.
import asyncio
import random
from collections import defaultdict
class ReadOp:
def __init__(self, key):
self.key = key
self.response = asyncio.Future()
class WriteOp:
def __init__(self, key, val):
self.key = key
self.val = val
self.response = asyncio.Future()
async def state_manager(reads, writes):
state = defaultdict(int)
while True:
read_task = asyncio.create_task(reads.get())
write_task = asyncio.create_task(writes.get())
done, pending = await asyncio.wait(
[read_task, write_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
if read_task in done:
read = read_task.result()
read.response.set_result(state[read.key])
elif write_task in done:
write = write_task.result()
state[write.key] = write.val
write.response.set_result(True)
async def read_ops(reads, count):
for _ in range(count):
read = ReadOp(random.randint(0, 4))
await reads.put(read)
await read.response
await asyncio.sleep(0.001)
async def write_ops(writes, count):
for _ in range(count):
write = WriteOp(random.randint(0, 4), random.randint(0, 99))
await writes.put(write)
await write.response
await asyncio.sleep(0.001)
async def main():
reads = asyncio.Queue()
writes = asyncio.Queue()
manager = asyncio.create_task(state_manager(reads, writes))
read_tasks = [asyncio.create_task(read_ops(reads, 1000)) for _ in range(100)]
write_tasks = [asyncio.create_task(write_ops(writes, 100)) for _ in range(10)]
await asyncio.gather(*read_tasks, *write_tasks)
manager.cancel()
try:
await manager
except asyncio.CancelledError:
pass
print(f"readOps: {len(read_tasks) * 1000}")
print(f"writeOps: {len(write_tasks) * 100}")
if __name__ == "__main__":
asyncio.run(main())
In this Python version, we use asyncio
to manage concurrency. The state_manager
coroutine owns the state (a defaultdict) and processes read and write operations received through queues.
We define ReadOp
and WriteOp
classes to encapsulate the operations and their responses. The response is implemented using asyncio.Future
, which allows the requester to wait for the operation to complete.
The read_ops
and write_ops
coroutines simulate multiple clients performing read and write operations. They create the appropriate operation objects, send them through the queues, and await the responses.
In the main
function, we set up the state manager and create multiple read and write tasks. We then use asyncio.gather
to run all these tasks concurrently.
Running this program will show the total number of read and write operations performed:
$ python stateful_coroutines.py
readOps: 100000
writeOps: 1000
This coroutine-based approach in Python provides a way to manage shared state without explicit locks. It ensures that the state is only accessed by a single coroutine (the state manager), which helps prevent race conditions and other concurrency-related issues.
While this approach might be more complex than using locks or other synchronization primitives, it can be beneficial in scenarios where you’re already using asyncio for other parts of your application, or when managing multiple locks would be error-prone. As always, choose the approach that makes your program easiest to understand and reason about.