Stateful Goroutines in Python

In this example, we’ll demonstrate how to manage state using coroutines and queues in Python. This approach aligns with Python’s asynchronous programming model and provides a way to ensure that data is accessed safely in a concurrent environment.

import asyncio
import random
from collections import defaultdict

class ReadOp:
    def __init__(self, key):
        self.key = key
        self.response = asyncio.Future()

class WriteOp:
    def __init__(self, key, val):
        self.key = key
        self.val = val
        self.response = asyncio.Future()

async def state_manager(reads, writes):
    state = defaultdict(int)
    while True:
        read_task = asyncio.create_task(reads.get())
        write_task = asyncio.create_task(writes.get())
        done, pending = await asyncio.wait(
            [read_task, write_task],
            return_when=asyncio.FIRST_COMPLETED
        )
        for task in pending:
            task.cancel()
        
        if read_task in done:
            read = read_task.result()
            read.response.set_result(state[read.key])
        elif write_task in done:
            write = write_task.result()
            state[write.key] = write.val
            write.response.set_result(True)

async def read_ops(reads, count):
    for _ in range(count):
        read = ReadOp(random.randint(0, 4))
        await reads.put(read)
        await read.response
        await asyncio.sleep(0.001)

async def write_ops(writes, count):
    for _ in range(count):
        write = WriteOp(random.randint(0, 4), random.randint(0, 99))
        await writes.put(write)
        await write.response
        await asyncio.sleep(0.001)

async def main():
    reads = asyncio.Queue()
    writes = asyncio.Queue()
    
    manager = asyncio.create_task(state_manager(reads, writes))
    
    read_tasks = [asyncio.create_task(read_ops(reads, 1000)) for _ in range(100)]
    write_tasks = [asyncio.create_task(write_ops(writes, 100)) for _ in range(10)]
    
    await asyncio.gather(*read_tasks, *write_tasks)
    
    manager.cancel()
    try:
        await manager
    except asyncio.CancelledError:
        pass
    
    print(f"readOps: {len(read_tasks) * 1000}")
    print(f"writeOps: {len(write_tasks) * 100}")

if __name__ == "__main__":
    asyncio.run(main())

In this Python version, we use asyncio to manage concurrency. The state_manager coroutine owns the state (a defaultdict) and processes read and write operations received through queues.

We define ReadOp and WriteOp classes to encapsulate the operations and their responses. The response is implemented using asyncio.Future, which allows the requester to wait for the operation to complete.

The read_ops and write_ops coroutines simulate multiple clients performing read and write operations. They create the appropriate operation objects, send them through the queues, and await the responses.

In the main function, we set up the state manager and create multiple read and write tasks. We then use asyncio.gather to run all these tasks concurrently.

Running this program will show the total number of read and write operations performed:

$ python stateful_coroutines.py
readOps: 100000
writeOps: 1000

This coroutine-based approach in Python provides a way to manage shared state without explicit locks. It ensures that the state is only accessed by a single coroutine (the state manager), which helps prevent race conditions and other concurrency-related issues.

While this approach might be more complex than using locks or other synchronization primitives, it can be beneficial in scenarios where you’re already using asyncio for other parts of your application, or when managing multiple locks would be error-prone. As always, choose the approach that makes your program easiest to understand and reason about.

查看推荐产品