Worker Pools in Python

Our example demonstrates how to implement a worker pool using threads and queues in Python.

import threading
import queue
import time

# Here's the worker function, of which we'll run several
# concurrent instances. These workers will receive
# work on the `jobs` queue and send the corresponding
# results on `results`. We'll sleep a second per job to
# simulate an expensive task.
def worker(id, jobs, results):
    while True:
        j = jobs.get()
        if j is None:
            break
        print(f"worker {id} started  job {j}")
        time.sleep(1)
        print(f"worker {id} finished job {j}")
        results.put(j * 2)
    print(f"worker {id} exiting")

def main():
    # In order to use our pool of workers we need to send
    # them work and collect their results. We make 2
    # queues for this.
    num_jobs = 5
    jobs = queue.Queue()
    results = queue.Queue()

    # This starts up 3 workers, initially blocked
    # because there are no jobs yet.
    workers = []
    for w in range(1, 4):
        t = threading.Thread(target=worker, args=(w, jobs, results))
        t.start()
        workers.append(t)

    # Here we send 5 `jobs` and then send a `None` value
    # for each worker to indicate that's all the work we have.
    for j in range(1, num_jobs + 1):
        jobs.put(j)
    for _ in workers:
        jobs.put(None)

    # Finally we collect all the results of the work.
    # This also ensures that the worker threads have
    # finished.
    for _ in range(num_jobs):
        results.get()

    # Wait for all worker threads to finish
    for w in workers:
        w.join()

if __name__ == "__main__":
    main()

Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

$ time python worker_pools.py
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
worker 1 exiting
worker 2 exiting
worker 3 exiting

real    0m2.058s

This example demonstrates how to use Python’s threading and queue modules to create a worker pool. The worker function represents each worker thread, which continuously pulls jobs from the jobs queue, processes them, and puts the results into the results queue. The main function sets up the worker threads, distributes the jobs, and collects the results.

Note that Python’s Global Interpreter Lock (GIL) can limit the performance benefits of threading for CPU-bound tasks. For CPU-intensive operations, you might want to consider using the multiprocessing module instead, which uses separate processes to achieve true parallelism.

查看推荐产品