Worker Pools in Pascal
Our example demonstrates how to implement a worker pool using threads and queues.
program WorkerPools;
uses
SysUtils, Classes, SyncObjs;
const
NumJobs = 5;
type
TJobQueue = class(TThreadList)
public
procedure Enqueue(Value: Integer);
function Dequeue: Integer;
end;
TResultQueue = class(TThreadList)
public
procedure Enqueue(Value: Integer);
function Dequeue: Integer;
end;
TWorker = class(TThread)
private
FId: Integer;
FJobs: TJobQueue;
FResults: TResultQueue;
protected
procedure Execute; override;
public
constructor Create(Id: Integer; Jobs: TJobQueue; Results: TResultQueue);
end;
{ TJobQueue }
procedure TJobQueue.Enqueue(Value: Integer);
var
List: TList;
begin
List := LockList;
try
List.Add(Pointer(Value));
finally
UnlockList;
end;
end;
function TJobQueue.Dequeue: Integer;
var
List: TList;
begin
Result := -1;
List := LockList;
try
if List.Count > 0 then
begin
Result := Integer(List[0]);
List.Delete(0);
end;
finally
UnlockList;
end;
end;
{ TResultQueue }
procedure TResultQueue.Enqueue(Value: Integer);
var
List: TList;
begin
List := LockList;
try
List.Add(Pointer(Value));
finally
UnlockList;
end;
end;
function TResultQueue.Dequeue: Integer;
var
List: TList;
begin
Result := -1;
List := LockList;
try
if List.Count > 0 then
begin
Result := Integer(List[0]);
List.Delete(0);
end;
finally
UnlockList;
end;
end;
{ TWorker }
constructor TWorker.Create(Id: Integer; Jobs: TJobQueue; Results: TResultQueue);
begin
inherited Create(False);
FId := Id;
FJobs := Jobs;
FResults := Results;
FreeOnTerminate := True;
end;
procedure TWorker.Execute;
var
Job: Integer;
begin
while not Terminated do
begin
Job := FJobs.Dequeue;
if Job = -1 then
Break;
WriteLn(Format('worker %d started job %d', [FId, Job]));
Sleep(1000); // Simulate an expensive task
WriteLn(Format('worker %d finished job %d', [FId, Job]));
FResults.Enqueue(Job * 2);
end;
end;
var
Jobs: TJobQueue;
Results: TResultQueue;
i: Integer;
begin
Jobs := TJobQueue.Create;
Results := TResultQueue.Create;
try
// Start 3 worker threads
for i := 1 to 3 do
TWorker.Create(i, Jobs, Results);
// Add jobs to the queue
for i := 1 to NumJobs do
Jobs.Enqueue(i);
// Wait for all jobs to complete
for i := 1 to NumJobs do
begin
while Results.Dequeue = -1 do
Sleep(100);
end;
finally
Jobs.Free;
Results.Free;
end;
end.
This Pascal program implements a worker pool using threads and thread-safe queues. Here’s a breakdown of the implementation:
We define
TJobQueue
andTResultQueue
classes to manage the jobs and results. These are thread-safe wrappers aroundTList
.The
TWorker
class represents a worker thread. It continuously dequeues jobs, processes them (simulated by a 1-second sleep), and enqueues the results.In the main program, we create job and result queues, start 3 worker threads, enqueue 5 jobs, and then wait for all results to be processed.
The program uses
WriteLn
to output status messages, similar to the original example.Instead of channels, we use thread-safe queues for communication between the main thread and worker threads.
The
Sleep
function is used to simulate time-consuming tasks and for polling the result queue.
This implementation demonstrates concurrent processing in Pascal, although the syntax and mechanisms differ from the original. Pascal doesn’t have built-in goroutines or channels, so we use threads and thread-safe queues to achieve similar functionality.
To run this program, save it as worker_pools.pas
, compile it with a Pascal compiler (like Free Pascal), and then execute the resulting binary. The output will show jobs being processed by different workers concurrently.
$ fpc worker_pools.pas
$ ./worker_pools
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
The program should complete in about 2 seconds, despite doing about 5 seconds of total work, because of the concurrent processing by multiple worker threads.