Worker Pools in Perl

Our example demonstrates how to implement a worker pool using threads and queues in Perl.

use strict;
use warnings;
use threads;
use Thread::Queue;
use Time::HiRes qw(sleep);

# This is the worker subroutine. We'll run several concurrent instances of it.
# These workers will receive work on the $jobs queue and send the corresponding
# results on $results. We'll sleep a second per job to simulate an expensive task.
sub worker {
    my ($id, $jobs, $results) = @_;
    while (defined(my $job = $jobs->dequeue())) {
        print "worker $id started  job $job\n";
        sleep(1);
        print "worker $id finished job $job\n";
        $results->enqueue($job * 2);
    }
}

# In the main function, we set up our worker pool
sub main {
    my $num_jobs = 5;
    my $jobs = Thread::Queue->new();
    my $results = Thread::Queue->new();

    # This starts up 3 workers, initially blocked because there are no jobs yet.
    my @workers;
    for my $w (1..3) {
        push @workers, threads->create(\&worker, $w, $jobs, $results);
    }

    # Here we send 5 jobs and then signal that's all the work we have.
    for my $j (1..$num_jobs) {
        $jobs->enqueue($j);
    }
    $jobs->end();

    # Finally we collect all the results of the work.
    # This also ensures that the worker threads have finished.
    for (1..$num_jobs) {
        my $result = $results->dequeue();
    }

    # Wait for all worker threads to finish
    $_->join() for @workers;
}

main();

Our running program shows the 5 jobs being executed by various workers. The program only takes about 2 seconds despite doing about 5 seconds of total work because there are 3 workers operating concurrently.

$ perl worker_pools.pl
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

real    0m2.052s

This example demonstrates how to use Perl’s threading capabilities to create a worker pool. It uses the threads module for creating threads and the Thread::Queue module for inter-thread communication.

The worker subroutine represents each worker in the pool. It continuously dequeues jobs from the $jobs queue, processes them (simulated by a 1-second sleep), and enqueues the results to the $results queue.

In the main subroutine, we create the job and result queues, spawn three worker threads, distribute five jobs among them, and then collect the results.

Note that Perl’s threading model is different from some other languages. Perl threads are relatively heavyweight, and for many concurrent tasks, other solutions like event loops or forking might be more appropriate. However, this example serves to illustrate the concept of worker pools in a Perl context.

查看推荐产品