SMG Comms Chapter 12: Thread-Safe Queues

December 13th, 2018 by Diana Coman

~ This is a work in progress towards an Ada implementation of Eulora's communication protocol. Start with Chapter 1.~

The various message types of Eulora's communication protocol are mostly implemented by now1 in chapters 1 - 11 of this series. What remains is the actual UDP communication and for this part I'm importing into SMG Comms Stanislav's neat UDP lib. Since Eulora effectively uses the size of the message to cheaply distinguish between RSA and UDP, it follows that any implementation of the protocol will have to support those two sizes (rather than one single fixed size as the original UDP lib imposes). There are several ways to do this2 but the main thing is that "any arbitrary size" is neither needed by SMG Comms as such nor worth the trouble. At the other extreme, copy/pasting the UDP lib code to have two packages that differ only by a constant is rather idiotic. So the middle ground is to use Ada's generic mechanism that allows one to parametrize code: the UDP lib package will expose the length as a parameter of the package as a whole and the client/server code will then simply create a UDP_RSA and a UDP_Serpent package types with their corresponding, fixed sizes. The only downside to this approach is that it breaks some of the existing restrictions but as previously stated, the point was not in having restrictions for the sake of having them but simply in having a good reason for any removal of any restriction. The need for UDP lib to handle those 2 different packet sizes counts in my eyes as a good reason to allow the generic mechanism.

With the UDP lib code, the most basic implementation of the protocol specification could be considered complete3. Basically the code provides everything that is needed to send and receive the messages in the format and through the communication means specified. However, in practice, Eulora will likely require a type of thread-safe queues for any processing of messages after they came in through the UDP socket and/or before they are sent. The reason for this is two-fold: first, it makes sense to decouple the send/receive via UDP from message processing; second, the encryption/decryption part required by the protocol can be relatively slow (especially for RSA) and as a result it's not the sort of thing that makes sense handled directly by the Sender/Receiver immediately on top of the UDP lib. Ideally, the Receiver would simply pass on a new message as soon as it gets it from UDP so that there is no queue forming at the lower level (since messages can then get lost/discarded simply). Arguably the Sender side can just fire the messages as soon as it receives a request to send them, since there is no additional delay there - although it could end up then either firing too many too quickly resulting in more losses than usual or otherwise becoming a bottleneck for the whole application on top of it. At any rate, the exact designs of Senders and Receivers are not part of the protocol itself - any Eulora client can implement those as it sees fit. Still, since I'm quite sure I'll need multiple tasks on the server, I'll include at least the means to decouple send/receive of rsa/serpent messages from any processing, namely a queue that can safely handle put/get calls from any number of tasks (e.g. the receiver will put messages into this queue while any number of worker tasks can then be created to process those messages in any way).

A thread-safe queue is of course nothing new and as such Ada seems to provide different mechanisms for implementation but the one that seems the most straightforward and adequate for this taks is the "protected variable": according to my Ada reference book4, the whole point of a protected variable is precisely to control the access to some protected resource. The resource itself (in this case the actual queue) is declared as private to the protected object and access to it is provided only through two procedures, Get and Put, with the important specific characteristic that calls to those procedures (and in general to any of the procedures of a protected object) are mutually exclusive. So no matter how many tasks call Get and Put and in what order, at any given time, only one task will proceed with one of Get or Put and moreover, no other task will get to enter either of those before the first task has finished. Moreover, the entry to both Get and Put can be effectively guarded by additional conditions: Get should proceed only when there is at least one item in the queue while Put should only proceed when the queue is NOT full. Using all this together with modular types for the index makes it quite easy to define a circular queue of fixed size that allows only mutually exclusive calls of Put and Get:

 -- FIFO queue of UDP messages with protected read/write (thread-safe).
 -- S.MG, 2018

with Raw_Types;
with Interfaces;

  -- exact length of payload of items in the queue (1470 or 1472 in Eulora)
  Payload_Len: in Positive;

package Msg_Queue is

  -- payload type
  subtype Payload_Type is Raw_Types.Octets(1..Payload_Len);

  -- maximum length of the message queue
  Max_Q_Len: constant := 1024;

  -- index in the queue will be modular type
  type Index is mod Max_Q_Len;

  -- an item in the queue; IP/port stored only for the "other" (variable) end.
  type Msg is
      Payload    : Raw_Types.Octets(1..Payload_Len);
      IP_Address : Interfaces.Unsigned_32;
      IP_Port    : Interfaces.Unsigned_16;
    end record;

  -- an array of messages
  type Msg_Array is array(Index) of Msg;

  -- the actual queue of messages, as a protected type
  protected type Queue is
    -- adds the given entry to the queue if there is space; BLOCKS if no space
    entry Put(Payload : in Payload_Type;
              Address : in Interfaces.Unsigned_32;
              Port    : in Interfaces.Unsigned_16);

    -- reads next entry from queue when available; BLOCKS if no entries
    entry Get(Payload : out Payload_Type;
              Address : out Interfaces.Unsigned_32;
              Port    : out Interfaces.Unsigned_16);


    Q: Msg_Array;
    Read_Pos, Write_Pos: Index := Index'First;
    Count: Natural range 0..Max_Q_Len := 0;
  end Queue;

end Msg_Queue;

You might have noticed in the code above that I made the Msg_Queue type generic, with a single parameter that represents the length of the payload of a message that is stored in the queue: this is needed for the same reason as the generic at UDP lib previously - to allow one to store Serpent messages or RSA messages while maintaining them clearly differentiated5. Note that the package defines internally types for the items stored in the queue but the Put/Get methods use 3 parameters rather than the "Msg" record structure - the reason for this is that it makes it easier for callers to use the queue with existing types rather than having to create a new structure just for reading from the queue/writing to the queue. The corresponding implementation in msg_queue.adb:

 -- S.MG, 2018

package body Msg_Queue is
  protected body Queue is
    entry Put(Payload : in Payload_Type;
              Address : in Interfaces.Unsigned_32;
              Port    : in Interfaces.Unsigned_16)
          when Count < Max_Q_len is
      M: Msg;
      -- fill the Msg item structure
      M.Payload    := Payload;
      M.IP_Address := Address;
      M.IP_Port    := Port;
      -- add it to queue and update counter + pos
      Q( Write_Pos ) := M;
      Write_Pos := Write_Pos + 1;
      Count := Count + 1;
    end Put;

    entry Get(Payload : out Payload_Type;
              Address : out Interfaces.Unsigned_32;
              Port    : out Interfaces.Unsigned_16)
            when Count > 0 is
      M: Msg;
      M := Q( Read_Pos );
      Payload := M.Payload;
      Address := M.IP_Address;
      Port    := M.IP_Port;
      Read_Pos := Read_Pos + 1;
      Count := Count - 1;
    end Get;

  end Queue;
end Msg_Queue;

As you can see above, a call to Put may proceed only when Count < Max_Q_Len, meaning that the queue is NOT full. Symmetrically, a call to Get may proceed only when Count > 0, meaning that the queue is not empty. The Count variable is updated by both Put and Get to reflect at any given time the exact number of elements in the queue - note that this is needed as a separate variable because the queue is circular and so the indices in the queue are modular types. The modular type is very handy here as the indices will simply roll over when they get to the maximum value and so there is no possibility of attempting to read/write out of the queue boundaries. Combined with the guards for Get/Put on Count, there is no possibility of overwriting items in the queue either. Win.

The only potential downside to choosing this approach to implement the queue as a protected object is that there are yet more restrictions to relax: max_protected_entries can't be 0 anymore, since we are introducing... a protected object, yes; no_protected_types can't remain either for the obvious reason; simple_barriers can't remain because the code uses the variable Count to guard Put/Get (i.e. not a constant) - however, pure_barriers remains in place since the variable is local to Msg_Queue (and I think that how it should actually be at any rate). There are also the task-related restrictions that need to be relaxed when the Msg_Queue is used since there will be presumably some... tasks defined: max_task_entries, no_task_hierarchy, max_tasks, max_select_alternatives. Obviously, the exact list of task-related restrictions that you can keep depends on what tasks you define. The previous list is what I had to relax for some testing.

To see the above in action and at the same time to get a bit more practice with Ada tasks (aka threads), I wrote a basic test that just "reads" and "writes" stuff to a Msg_Queue. The test makes use of worker tasks but it doesn't throw delays in as well. At any rate, any "testing" for multi-threaded code is a rather dubious proposition since one can't even quite control what code gets called when exactly. So as I was saying: more of an exercise/example than a test but anyway, here's the definition of the Worker task type in tests/

 --S.MG, 2018

with Msg_Queue;
with Ada.Text_IO; use Ada.Text_IO;
with Interfaces; use Interfaces;
with Raw_Types;
with RNG;

package Q_Pkg is
  procedure Start;
  package MQ is new Msg_Queue(Payload_Len => 10);
  Q: MQ.Queue;

  task type Worker is
    Entry Start(A: in Unsigned_32;
                P: in Unsigned_16;
            Times: in Positive;
            Write: in Boolean);
  end Worker;

  W: array(1..10) of Worker;

end Q_Pkg;

The corresponding implementation in tests/q_pkg.adb:

package body Q_Pkg is
  procedure Start is
    U32: Unsigned_32 := 100;
    U16: Unsigned_16 := 1;
    Write: Boolean;
    for I in 1..W'Length loop
      if I mod 2 = 0 then
        Write := False;
        Write := True;
      end if;
      W(I).Start(U32, U16, W'Length-I+10, Write);
      U32 := U32 + 1;
      U16 := U16 + 1;
    end loop;
  end Start;

  task body Worker is
    Pay: MQ.Payload_Type := (others => 0);
    Address: Unsigned_32;
    Port : Unsigned_16;
    accept Start(A: in Unsigned_32;
                 P: in Unsigned_16;
             Times: in Positive;
             Write: in Boolean) do
      if Write then
        for I in 1 .. Times loop
          Pay(Pay'First) := Unsigned_8( I mod 256 );
          Pay(Pay'First+1) := Unsigned_8( (I / 256) mod 256);
          Q.Put( Pay, A, P );
          Put_Line(Integer'Image(I) & "." &
                   Unsigned_32'Image(A) & "." & Unsigned_16'Image(P) &
                   " WROTE: " & Unsigned_8'Image(Pay(Pay'First)) &
        end loop;
        for I in 1 .. Times loop
          Q.Get( Pay, Address, Port );
          Put_Line(Integer'Image(I) & "." &
                   Unsigned_32'Image(A) & "." & Unsigned_16'Image(P) &
                   " read: " & Unsigned_8'Image(Pay(Pay'First)) &
                   Unsigned_8'Image(Pay(Pay'First+1)) &
                   " from " & Unsigned_32'Image(Address) & "." &
        end loop;
      end if;
    end Start;
  end Worker;

end Q_Pkg;

The "test" itself simply calls Q_Pkg.Start that will iterate through a set of Worker tasks and ask those on even positions to read and those on odd positions to write to the queue (the address/ip parts of "messages" are used here to identify the workers so that one can follow the whole output):

 -- Basic example of using a message Queue with multiple readers/writers
 -- S.MG, 2018

with Q_Pkg;
procedure Test_Queue is
end Test_Queue;

The next step would now be to actually implement an example of sender/receiver. I've been thinking on this for a while but it's still not very clear to me what would be most useful here or how the sender and receiver should exactly look like. For starters, I'm not even sure whether a sender/receiver should be in fact part of smg_comms or whether it's better to leave them out, as they are strictly speaking outside of the communication protocol itself and a matter for the client/server to decide. The argument for including them (or at least some version/example of them) is that some version of them will be needed at any rate so having an example to start from is likely to help anyone who wants to implement a client. However, what is "basic" version can be a thorny issue and moreover, once some "basic" version is there, there's also a great pull towards using that one as "default" rather than basic. As to more specific questions: should both the sender and the receiver use their own Msg_Queue (i.e. outbound queue for sender to read from and inbound queue for receiver to write into) and otherwise share a socket? Or should just the receiver use a queue since sending is a much thinner layer here (messages are already fully prepared so it's just a call to the UDP lib with the already bound socket)? What should sender/receiver even do with exceptions thrown by the UDP lib, should they handle them all/ just some/none? How should sender/receiver handle potential blocks on writing to/reading from the queues (do they wait a certain amount of time, do they check and signal when queue is fuller than some percentage, so they do something else - what)?

In a word, the sender/receiver part still requires some discussion it would seem since some choices need to be made even if just for now. At any rate, here's the .vpatch for this chapter, together with my signature for it, as usual:

  1. The few that are not yet implemented are also not yet fully specified - some practical experience with an implementation, even partial, can help to flesh those out better too. 

  2. The way it's usually done in C being the least beneficial one - it forces upon the code all the potential issues of "any size *could* be fine" without much advantage to claim for it since no program is really likely to use all possible sizes at the same time anyway. 

  3. And it's about time, too! Looking at the date for Chapter 1, it's been 3 months almost to the day already. 

  4. John Barnes, "Programming in Ada 2012", Cambridge University Press, 2014, p. 515 

  5. Since sizes are quite similar, one could otherwise simply use the larger size when defining the item type and then just fill in only as much as needed. The unpleasant effect of this however is that one then needs to also store somewhere else the actual used length. And given that I'm already using generic for the UDP lib, there's really no point in going to such lengths to avoid it here. So the Msg_Queue is a generic type - any code using it will first have to create a specific type out of it by providing a concrete length of payloads stored. 

Comments feed: RSS 2.0

2 Responses to “SMG Comms Chapter 12: Thread-Safe Queues”

  1. [...] their outbound packages into out_q and /or get incoming packages from in_q. Note that both those queues are thread-safe so there is no concern here over who tries to read/write and when - at most, a task may end up [...]

  2. [...] Coman publishes Chapter 12 of SMG Comms containing a thread-safe Ada implementation of simple queues that are specific to the needs of [...]

Leave a Reply