Worker Pools in Pascal

Our example demonstrates how to implement a worker pool using threads and queues.

program WorkerPools;

uses
  SysUtils, Classes, SyncObjs;

const
  NumJobs = 5;

type
  TJobQueue = class(TThreadList)
  public
    procedure Enqueue(Value: Integer);
    function Dequeue: Integer;
  end;

  TResultQueue = class(TThreadList)
  public
    procedure Enqueue(Value: Integer);
    function Dequeue: Integer;
  end;

  TWorker = class(TThread)
  private
    FId: Integer;
    FJobs: TJobQueue;
    FResults: TResultQueue;
  protected
    procedure Execute; override;
  public
    constructor Create(Id: Integer; Jobs: TJobQueue; Results: TResultQueue);
  end;

{ TJobQueue }

procedure TJobQueue.Enqueue(Value: Integer);
var
  List: TList;
begin
  List := LockList;
  try
    List.Add(Pointer(Value));
  finally
    UnlockList;
  end;
end;

function TJobQueue.Dequeue: Integer;
var
  List: TList;
begin
  Result := -1;
  List := LockList;
  try
    if List.Count > 0 then
    begin
      Result := Integer(List[0]);
      List.Delete(0);
    end;
  finally
    UnlockList;
  end;
end;

{ TResultQueue }

procedure TResultQueue.Enqueue(Value: Integer);
var
  List: TList;
begin
  List := LockList;
  try
    List.Add(Pointer(Value));
  finally
    UnlockList;
  end;
end;

function TResultQueue.Dequeue: Integer;
var
  List: TList;
begin
  Result := -1;
  List := LockList;
  try
    if List.Count > 0 then
    begin
      Result := Integer(List[0]);
      List.Delete(0);
    end;
  finally
    UnlockList;
  end;
end;

{ TWorker }

constructor TWorker.Create(Id: Integer; Jobs: TJobQueue; Results: TResultQueue);
begin
  inherited Create(False);
  FId := Id;
  FJobs := Jobs;
  FResults := Results;
  FreeOnTerminate := True;
end;

procedure TWorker.Execute;
var
  Job: Integer;
begin
  while not Terminated do
  begin
    Job := FJobs.Dequeue;
    if Job = -1 then
      Break;

    WriteLn(Format('worker %d started  job %d', [FId, Job]));
    Sleep(1000); // Simulate an expensive task
    WriteLn(Format('worker %d finished job %d', [FId, Job]));
    FResults.Enqueue(Job * 2);
  end;
end;

var
  Jobs: TJobQueue;
  Results: TResultQueue;
  i: Integer;

begin
  Jobs := TJobQueue.Create;
  Results := TResultQueue.Create;
  try
    // Start 3 worker threads
    for i := 1 to 3 do
      TWorker.Create(i, Jobs, Results);

    // Add jobs to the queue
    for i := 1 to NumJobs do
      Jobs.Enqueue(i);

    // Wait for all jobs to complete
    for i := 1 to NumJobs do
    begin
      while Results.Dequeue = -1 do
        Sleep(100);
    end;
  finally
    Jobs.Free;
    Results.Free;
  end;
end.

This Pascal program implements a worker pool using threads and thread-safe queues. Here’s a breakdown of the implementation:

  1. We define TJobQueue and TResultQueue classes to manage the jobs and results. These are thread-safe wrappers around TList.

  2. The TWorker class represents a worker thread. It continuously dequeues jobs, processes them (simulated by a 1-second sleep), and enqueues the results.

  3. In the main program, we create job and result queues, start 3 worker threads, enqueue 5 jobs, and then wait for all results to be processed.

  4. The program uses WriteLn to output status messages, similar to the original example.

  5. Instead of channels, we use thread-safe queues for communication between the main thread and worker threads.

  6. The Sleep function is used to simulate time-consuming tasks and for polling the result queue.

This implementation demonstrates concurrent processing in Pascal, although the syntax and mechanisms differ from the original. Pascal doesn’t have built-in goroutines or channels, so we use threads and thread-safe queues to achieve similar functionality.

To run this program, save it as worker_pools.pas, compile it with a Pascal compiler (like Free Pascal), and then execute the resulting binary. The output will show jobs being processed by different workers concurrently.

$ fpc worker_pools.pas
$ ./worker_pools
worker 1 started  job 1
worker 2 started  job 2
worker 3 started  job 3
worker 1 finished job 1
worker 1 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5

The program should complete in about 2 seconds, despite doing about 5 seconds of total work, because of the concurrent processing by multiple worker threads.