technology from back to front

Testing the Reactor pattern

A good while ago I wrote a SIP stack. Like many network things, a SIP stack needs to keep track of multiple tasks – reading or writing from sockets, asking the user to respond to events, and so on. And so I na├»vely added a bunch of threads. And then I spent a few weeks trying to fix nasty bugs. And then I got fed up and rewrote all the logic. It turns out that I accidentally stumbled upon the Reactor pattern… but with something that I’ve not seen elsewhere.

At the heart of the SIP stack is the unwieldily named TIdTimerQueue. (“T” because class names in (Object) Pascal traditionally start with “T”, and “Id” because at one point I’d aimed to work within the Indy Project namespace, and never bothered to remove/change the prefix.) So forgetting about some of the details, the basic interface of a TIdTimeQueue looks like this (with non-essentials removed):

TIdTimerQueue = class(TIdRegisteredObject)
public
  procedure AddEvent(MillisecsWait: Cardinal;
                     Event: TIdWait); virtual;
  procedure AddListener(Listener: IIdTimerQueueListener);
  procedure RemoveListener(Listener: IIdTimerQueueListener);
end

where a TIdWait is

TIdWait = class(TIdRegisteredObject)
public
  function  Due: Boolean;
  procedure Trigger; virtual;
end

Essentially a TIdWait is a closure, representing some chunk of work we want to perform at some point in the future.

Normally when I see a framework that uses the Reactor pattern – like in eventmachine – I see the clock hidden behind the curtain. You schedule something, and it just runs. Only, how do you test this? Writing tests that pause makes for seriously slow test, while virtually speeding up the clock still leaves you with fragiletests. Since I was in the first flush of love with test driven development, not being able to test the heartbeat of the whole stack worried me a great deal. And then I thought, but if time makes testing things hard, why not turn the whole construct inside out? Separate the queueing/scheduling of events from the running of events. And thus were born the brothers TIdDebugTimerQueue and TIdThreadedTimerQueue. A TIdDebugTimerQueue exposes a bunch of introspective tools for tests to check whether things correctly scheduled certain events, examine the order of scheduled events, and so on:

TIdDebugTimerQueue = class(TIdTimerQueue)
public
    procedure AddEvent(MillisecsWait: Cardinal;
                       Event: TIdWait); override;
    function  EventAt(Index: Integer): TIdWait; override;
    function  EventCount: Integer;
    function  EventCountFor(WaitType: TIdWaitClass; CountSubclasses: Boolean = false): Integer;
    function  FirstEventScheduledFor(Event: Pointer): TIdWait;
    function  LastEventScheduled: TIdWait; overload;
    function  LastEventScheduled(WaitType: TIdWaitClass): TIdWait; overload;
    function  LastEventScheduledFor(Event: Pointer): TIdWait;
    procedure RemoveAllEvents;
    function  ScheduledEvent(Event: TIdWait): Boolean; overload;
    function  SecondLastEventScheduled: TIdWait;
    procedure Terminate; override;
    procedure TriggerAllEventsOfType(WaitType: TIdWaitClass);
    procedure TriggerAllEventsUpToFirst(WaitType: TIdWaitClass);
    procedure TriggerEarliestEvent; override;

    property TriggerImmediateEvents: Boolean read fTriggerImmediateEvents write fTriggerImmediateEvents;
end;

TIdThreadedTimeQueue is a TIdTimerQueue “enlivened” by a thread that breathes life into it:

// The name comes from Smalltalk - a block is a chunk of code, possibly
// together with some variables. A closure, in other words. Well, this is as
// close as we get in Delphi.
TIdBlockRunnerThread = class(TIdBaseThread)
private
  Block: TIdThreadProc;
protected
  procedure Run; override;
public
  constructor Create(Block: TIdThreadProc;
                     CreateSuspended: Boolean = True); reintroduce;
end;

TIdTimerEmptyProc = procedure(Sender: TIdTimerQueue) of object;

// I provide a thread in which to execute my events. Obviously, all
// TIdWaits execute in BlockRunner's context.
TIdThreadedTimerQueue = class(TIdTimerQueue)
private
  BlockRunner:     TIdBlockRunnerThread;
  fOnEmpty:        TIdTimerEmptyProc;
  TerminatedEvent: TEvent;

  procedure NotifyOfTermination;
  procedure PossiblyNotifyOfEmpty;
  procedure Run;
public
  constructor Create(CreateSuspended: Boolean); override;

  procedure Resume; override;
  procedure Terminate; override;
  procedure TerminateAndWaitFor(WaitEvent: TEvent);

  property OnEmpty: TIdTimerEmptyProc read fOnEmpty write fOnEmpty;
end;
procedure TIdBlockRunnerThread.Run;
begin
  if Assigned(Self.Block) then
    Self.Block;
end;

Nice and simple… but where’d that Block come from?

constructor TIdThreadedTimerQueue.Create(CreateSuspended: Boolean);
begin
  Self.TerminatedEvent := nil;

  inherited Create(CreateSuspended);
end;

procedure TIdThreadedTimerQueue.Resume;
begin
  inherited Resume;

  if not Assigned(Self.BlockRunner) then
    Self.BlockRunner := TIdBlockRunnerThread.Create(Self.Run, Self.CreateSuspended);

  Self.BlockRunner.Resume;
end;

And finally the core of actually running the events, the event loop:

procedure TIdThreadedTimerQueue.Run;
begin
  try
    while not Self.Terminated do begin
      Self.WaitEvent.WaitFor(Self.ShortestWait);

      if not Self.Terminated then
        Self.TriggerEarliestEvent;

      Self.PossiblyNotifyOfEmpty;
    end;

    Self.NotifyOfTermination;
  finally
    Self.Free;
  end;
end;

And there you have it: separating the concerns of scheduling events from the event loop allows you to easily test code that produces the events.

by
Frank Shearar
on
30/09/13
 
 


nine + 8 =

2000-14 LShift Ltd, 1st Floor, Hoxton Point, 6 Rufus Street, London, N1 6PE, UK+44 (0)20 7729 7060   Contact us