Ossasepia

December 17, 2018

SMG Comms Chapter 13: Sender and Receiver

Filed under: Coding, SMG_Comms — Diana Coman @ 1:39 p.m.

~ This is a work in progress towards an Ada implementation of Eulora's communication protocol. Start with Chapter 1.~

This chapter adds to SMG Comms a thin wrapper package that effectively rescues the queue of UDP messages from the IP stack (where it's relatively small) into memory (where it can be, by comparison, large). Once the decision has been clearly made as to what the sender/receiver should do and moreover I finally seem to have gotten my head around using Ada's threads1, the implementation is deliciously straightforward. Who could have predicted this ?! Let's see directly the new Snd_Rcv package as it's very easy to read indeed:

 --Sender and Receiver task types for Eulora's Communication Protocol
 --This is a THIN layer on top of UDP lib, mainly to move messages out
 -- of the small queue of the IP stack onto a bigger, in-memory queue.
 --There is NO processing of messages here: just read/write from/to UDP.
 --S.MG, 2018

with Interfaces;
with Msg_Queue;
with UDP;

generic
  -- exact length of payload aka whether RSA or Serpent
  Len: in Positive;

package Snd_Rcv is
  -- queue package with specified payload length
  package M_Q is new Msg_Queue( Payload_Len => Len);

  -- outbound and inbound messages queues
  -- those are meant to be accessed from outside the package too!
  out_q : M_Q.Queue;
  in_q  : M_Q.Queue;

  -- sender type of task: takes msgs out of out_q and sends them via UDP
  task type Sender( Port: Interfaces.Unsigned_16);

  -- receiver type of tasks: reads incoming msgs from UDP and puts them in in_q
  task type Receiver( Port: Interfaces.Unsigned_16);

private
  -- udp lib package with specified payload length
  package M_UDP is new UDP( Payload_Size => Len);

end Snd_Rcv;

As it can be seen above, the package simply packs in one place an outbound message queue (out_q), an inbound message queue (in_q) and the definitions of two types of tasks: the Sender and the Receiver. The two queues act effectively as mailboxes: all and any tasks from anywhere else are invited to just drop their outbound packages into out_q and /or get incoming packages from in_q. Note that both those queues are thread-safe so there is no concern here over who tries to read/write and when - at most, a task may end up blocked waiting on an empty queue (when trying to read a message) or on a full queue (when trying to write a message).

If the two out_q and in_q are mailboxes, then the two types of tasks, Sender and Receiver, are postmen. They share the same underlying UDP package that is private here (only postmen are allowed to use the UDP post van!) and has a fixed size of messages. Note that this fixed size is given as a parameter to the Snd_Rcv package itself and is then used both for the queues and for the UDP package. Essentially the snd_rcv package is a postal service that handles just one pre-defined length of messages. An application may use of course as many different lengths of message it requires - all it needs to do is to create a different snd_rcv package (i.e. "postal service") for each of those lengths. Note also that the actual ports used by the Sender and Receiver are given as parameters - an application can create as many Sender/Receiver tasks as it wants and even bind them to different ports or to the same port, as desired. This gives maximum flexibility: an application can listen for messages on one port and send them out on a different port, while still having everything in one single queue; or it can listen and send through the same port via any number of Sender/Receiver tasks. Each Sender and Receiver task will simply bind its own local socket and then enter an endless loop in which the Sender picks up messages from the out_q and sends them through its socket via UDP lib, while the Receiver picks up messages from its socket via the UDP lib and writes them into in_q. The corresponding code is short (and it's made slightly longer by my choice of having each Sender/Receiver use its own local socket):

 -- S.MG, 2018

package body snd_rcv is
  -- sender
  task body Sender is
    E       : M_UDP.Endpoint;
    S       : M_UDP.Socket;
    Payload : M_Q.Payload_Type;
    Dest    : M_UDP.Endpoint;
  begin
    -- open the socket on local interface, specified port
    E.Address := M_UDP.INADDR_ANY;
    E.Port := Port;
    M_UDP.Open_Socket( S, E );

    -- infinite loop reading from out queue and sending via udp
    -- caller will have to call abort to stop this!
    loop
      out_q.Get( Payload, Dest.Address, Dest.Port);
      M_UDP.Transmit( S, Dest, Payload);
    end loop;
  end Sender;

  -- receiver
  task body Receiver is
    E      : M_UDP.Endpoint;
    Source : M_UDP.Endpoint;
    S      : M_UDP.Socket;
    Payload: M_Q.Payload_Type;
    Valid  : Boolean;
  begin
    -- open the socket on local interface, specified port
    E.Address := M_UDP.INADDR_ANY;
    E.Port := Port;
    M_UDP.Open_Socket( S, E );

    -- infinite loop reading from out udp and writing to inbound queue
    -- caller will have to call abort to stop this!
    loop
      M_UDP.Receive( S, Source, Payload, Valid);
      -- store ONLY if valid, otherwise discard
      if Valid then
        in_q.Put( Payload, Source.Address, Source.Port);
      end if;
    end loop;

  end Receiver;

end snd_rcv;

An alternative approach to the above (and one that I have implemented at first) was to have a single task Snd_Rcv that bound one single socket and then started on it its own sub-tasks for the actual sender and receiver. However, I find such an approach needlessly complicated and inflexible: it creates an additional layer in the hierarchy of tasks for no clear benefit (perhaps it would make sense if one added some sort of additional management of the sender/receiver tasks in there but at the moment it's unclear that any such thing is actually needed or needed here of all places); it is harder to read with the single and so far unconvincing benefit of a shared socket (so no repeated binding code); it forces some choices on any application using this package: the sender/receiver are forced as a package so there is no more option of just listening on a port and/or just sending on it; there is also no option of listening on one port and sending on another or indeed of creating - if needed - more senders than receivers or the other way around. Sure, it can be argued that several senders and receivers are anyway not likely to be required or that binding too many is likely to just increase packet loss or any other trouble. This is however up to higher levels of the application rather than the concern of this thin sender/receiver and since this implementation offers both highest flexibility AND highest clarity, I think it's the best option so far. As usual, feel free to let me know in the comments your reasons for disagreeing with this and your better solution for implementing a sender/receiver layer.

The above tiny amount of code would be all for this chapter if it weren't for 3 things: the need to relax yet another few restrictions; an example/test of using the above sender/receiver package; my decision to include the UDP lib as just another package of SMG comms rather than keeping it as a separate lib. This last part concerning the UDP lib accounts for most lines in the .vpatch and is essentially some noise at this level (since vdiff is not yet bright enough to figure out a move of files). The reason for it is mainly the fact that the UDP code is really meant to be used from this snd_rcv package and from nowhere else so I can't quite see the justification in keeping it entirely separate, with a .gpr file and everything else of its own and moreover - perhaps more importantly from a practical point of view - unable to directly use the basic types of smg_comms in raw_types. Note that this move does *not* make it in any significant way more difficult to replace this UDP code with another at a later time if that becomes available - it's still one package and those pesky C files, nothing else.

Going back to the need to relax a few restrictions - those are mainly restrictions related to the use of tasks. As both Sender and Receiver work in infinite loops2, the caller has to ruthlessly abort them when it needs them to finish (in Ada a task has to wait for all its sub-tasks to finish before it can finish itself). So the "No_Abort_Statements" restriction needs to go. The use of Abort is illustrated in the test/example code I wrote aka test_client and test_server. Similarly, because of the queues that use protected objects, the "No_Local_Protected_Objects" restriction had to go too. Here I must say that I am not sure I fully grasp why would it be better to have protected objects only as global rather than local? They are of course meant to be accessed from many places and therefore in "global" but this doesn't mean that they don't still belong somewhere and/ or that "access from several places" has to mean "access from ALL places". Finally, the restriction "No_Nested_Finalization" also had to go to allow the testing code to create the snd_rcv packages with different length of messages.

The testing code itself provides more of an example of using the snd_rcv package rather than a test as such since UDP communications are unreliable and therefore one can't really say in advance what one should get on the other side of the connection. At any rate, the test_server package provides an example of a basic "echo server" end of the connection: there are 2 Sender and 2 Receiver tasks working with Serpent-length and RSA-length packages on 2 different ports, respectively; there is also a "consumer" task for each type of package, simply taking it out of the inbound queue, printing it at the console and then echoing it back to the source aka writing it into the outbound queue for the Sender to send. The example awaits for a pre-defined total number of packages so it may remain waiting if the other end sends fewer packages or fewer packages make it all the way. At any rate, once all the expected messages are received, the whole application (aka the main task) simply aborts all the tasks it created and then finishes itself:

 -- S.MG, 2018
with Ada.Text_IO; use Ada.Text_IO;
with Interfaces;
with Snd_Rcv;
with Raw_Types;

procedure Test_Server is
  PortRSA: Interfaces.Unsigned_16 := 44340;
  PortS  : Interfaces.Unsigned_16 := 44341;
  N_S    : Interfaces.Unsigned_8 := 105;
  N_RSA  : Interfaces.Unsigned_8 := 82;
  package Snd_Rcv_RSA is new Snd_Rcv(Raw_Types.RSA_Pkt'Length);
  package Snd_Rcv_S is new Snd_Rcv(Raw_Types.Serpent_Pkt'Length);

  -- sender/receiver tasks --
  -- sender RSA and Serpent
  Sender_RSA: Snd_Rcv_RSA.Sender( PortRSA );
  Sender_S  : Snd_Rcv_S.Sender( PortS );
  -- receiver RSA and Serpent
  Receiver_RSA: Snd_Rcv_RSA.Receiver( PortRSA );
  Receiver_S: Snd_Rcv_S.Receiver( PortS );

  -- Serpent Consumer
  task s_cons is
    Entry Finish;
  end s_cons;
  task body s_cons is
    Payload: Raw_Types.Serpent_Pkt;
    A: Interfaces.Unsigned_32;
    P: Interfaces.Unsigned_16;
  begin
    for I in 1..N_S loop
      -- consume one message and echo it back
      Snd_Rcv_S.in_q.Get(Payload, A, P);
      Put_Line("S msg " &
               Interfaces.Unsigned_8'Image(Payload(Payload'First)) &
               " from " & Interfaces.Unsigned_32'Image(A) &
               ":" & Interfaces.Unsigned_16'Image(P));
      -- echo it back
      Snd_Rcv_S.out_q.Put(Payload, A, P);
    end loop;

    accept Finish;
    Put_Line("S Cons got the finish.");
  end s_cons;

  -- RSA Consumer
  task rsa_cons is
    Entry Finish;
  end rsa_cons;
  task body rsa_cons is
    Payload: Raw_Types.RSA_Pkt;
    A: Interfaces.Unsigned_32;
    P: Interfaces.Unsigned_16;
  begin
    for I in 1..N_RSA loop
      -- consume one message and echo it back
      Snd_Rcv_RSA.in_q.Get(Payload, A, P);
      Put_Line("RSA msg " &
               Interfaces.Unsigned_8'Image(Payload(Payload'First)) &
               " from " & Interfaces.Unsigned_32'Image(A) &
               ":" & Interfaces.Unsigned_16'Image(P));
      -- echo it back
      Snd_Rcv_RSA.out_q.Put(Payload, A, P);
    end loop;

    accept Finish;
    Put_Line("RSA Cons got the finish.");
  end rsa_cons;

begin
  Put_Line("Test server");
  -- wait for consumers to finish
  rsa_cons.Finish;
  s_cons.Finish;

  -- abort the sender & receiver to be able to finish
  abort Sender_S, Receiver_S, Sender_RSA, Receiver_RSA;
end Test_Server;

Similarly to the server example code above, an example client sends both RSA and Serpent packages and has consumer and producer tasks for both:

 -- S.MG, 2018
with Snd_Rcv;
with Interfaces;
with Ada.Text_IO; use Ada.Text_IO;
with Raw_Types;
with UDP;

procedure Test_Client is
  PortRSA   : Interfaces.Unsigned_16 := 34340;
  PortS     : Interfaces.Unsigned_16 := 34341;
  N_S       : Interfaces.Unsigned_8 := 105;
  N_RSA     : Interfaces.Unsigned_8 := 82;

  Server    : String := "127.0.0.1";
  package test_udp is new UDP(10);
  ServerA   : Interfaces.Unsigned_32 :=
                test_udp.IP_From_String(Server);
  ServerRSA : Interfaces.Unsigned_16 := 44340;
  ServerS   : Interfaces.Unsigned_16 := 44341;
  package Snd_Rcv_RSA is new Snd_Rcv(Raw_Types.RSA_Pkt'Length);
  package Snd_Rcv_S is new Snd_Rcv(Raw_Types.Serpent_Pkt'Length);
  -- sender RSA and Serpent
  Sender_RSA: Snd_Rcv_RSA.Sender( PortRSA );
  Sender_S  : Snd_Rcv_S.Sender( PortS );
  -- receiver RSA and Serpent
  Receiver_RSA: Snd_Rcv_RSA.Receiver( PortRSA );
  Receiver_S: Snd_Rcv_S.Receiver( PortS );

  -- producer of serpent messages
  task s_prod is
    entry Finish;
  end s_prod;
  task body s_prod is
    Payload : Raw_Types.Serpent_Pkt := (others => 10);
  begin
    Put_Line("S Producer with " &
             Interfaces.Unsigned_8'Image(N_S) & "messages.");
    -- send the messages with first octet the number
    for I in 1..N_S loop
      Payload(Payload'First) := I;
      Snd_Rcv_S.out_q.Put( Payload, ServerA, ServerS);
      Put_Line("Sent S message " &
                Interfaces.Unsigned_8'Image(I));
    end loop;

    -- signal it's done
    accept Finish;
    Put_Line("S prod got the finish.");

  end s_prod;

  -- producer of RSA messages
  task rsa_prod is
    Entry Finish;
  end rsa_prod;
  task body rsa_prod is
    Payload : Raw_Types.RSA_Pkt := (others => 20);
  begin
    Put_Line("RSA Producer with " &
             Interfaces.Unsigned_8'Image(N_RSA) & "messages.");

    -- send the messages with first octet the number
    for I in 1..N_RSA loop
      Payload(Payload'First) := I;
      Snd_Rcv_RSA.out_q.Put( Payload, ServerA, ServerRSA);
      Put_Line("Sent RSA message " &
                Interfaces.Unsigned_8'Image(I));
    end loop;

    -- signal it's done
    accept Finish;
    Put_Line("RSA prod got the finish.");

  end rsa_prod;

  -- Serpent Consumer
  task s_cons is
    Entry Finish;
  end s_cons;
  task body s_cons is
    Payload: Raw_Types.Serpent_Pkt;
    A: Interfaces.Unsigned_32;
    P: Interfaces.Unsigned_16;
  begin
    for I in 1..N_S loop
      -- consume one message
      Snd_Rcv_S.in_q.Get(Payload, A, P);
      Put_Line("S msg " &
               Interfaces.Unsigned_8'Image(Payload(Payload'First)) &
               " from " & Interfaces.Unsigned_32'Image(A) &
               ":" & Interfaces.Unsigned_16'Image(P));
      -- do NOT echo it back
    end loop;

    accept Finish;
    Put_Line("S Cons got the finish.");
  end s_cons;

  -- RSA Consumer
  task rsa_cons is
    Entry Finish;
  end rsa_cons;
  task body rsa_cons is
    Payload: Raw_Types.RSA_Pkt;
    A: Interfaces.Unsigned_32;
    P: Interfaces.Unsigned_16;
  begin
    for I in 1..N_RSA loop
      -- consume one message
      Snd_Rcv_RSA.in_q.Get(Payload, A, P);
      Put_Line("RSA msg " &
               Interfaces.Unsigned_8'Image(Payload(Payload'First)) &
               " from " & Interfaces.Unsigned_32'Image(A) &
               ":" & Interfaces.Unsigned_16'Image(P));
      -- do NOT echo back
    end loop;

    accept Finish;
    Put_Line("RSA Cons got the finish.");
  end rsa_cons;
begin
  Put_Line("Test client");
  -- wait for producers/consumers to finish
  rsa_prod.Finish;
  s_prod.Finish;
  rsa_cons.Finish;
  s_cons.Finish;

  -- abort the sender & receiver to be able to finish
  abort Sender_S, Receiver_S, Sender_RSA, Receiver_RSA;
end Test_Client;

One important issue to note here is the way in which exceptions (hence: potential issues) will be handled in this specific implementation of the Snd_Rcv package: since the Sender and Receiver are tasks and don't handle any exceptions themselves, it means that an UDP "eggog" aka exception3 will have as effect the silent death of the Sender/Receiver in which it happens4. I did consider ways of handling such exceptions rather than letting them kill the task silently but so far at least I don't quite see what the task itself can do other than re-trying whatever it was trying to do when it failed. While this could perhaps be considered a better option than not handling exceptions at all, it's been pointed to me that UDP errors mean almost always some hardware failure and as such a re-try is not going to help at all. Moreover, re-trying means also that the failure remains hidden from the calling task since there is no way in which one would be able to tell whether a task is just stuck re-trying or actually proceeding with its work just fine. Considering all this, I decided to leave it for now to the higher level task to monitor its subtasks if/when desired and take action accordingly (e.g. check perhaps periodically if a Sender/Receiver is Terminated or in other abnormal state). This doesn't mean of course that the code can't be changed at a later date to provide a different approach to handling this - all it means is that currently this is the best decision I can see given what I know so far.

With this chapter, the SMG Comms code provides everything that is needed to build on top of it a basic client for Eulora that is compliant with the published communication protocol. So I'd suggest to anyone interested in this to give it a go since starting now means that they would have some time to tinker with it before everything else is in place! At any rate, the SMG Comms series takes at least a break for now (at a later stage there should be a few bits and pieces to add) as I'll focus for a while more on the server-side moving parts that need to be done before Eulora can finally fully work on a sane protocol. The full .vpatch for this chapter and my signature for it:


  1. It's a pleasure to use Ada's implementation of threads aka tasks. But it still can take me a while to be able to say I get something to some reasonable degree - even when the something in question is actually a good implementation of threads, what can I do. 

  2. And note that even if I'd implement this with a select to allow some sort of "Shutdown" option, the task could still be stuck waiting on a queue or on UDP since the calls to those are blocking when there is nothing to retrieve yet/no place to write to; so the caller would STILL have to do an abort or at least to be prepared to do abort if the shutdown is not obeyed within some interval. In a nutshell, a shutdown option would therefore still not work at *all times* and as a result I can't quite see why bother with it really. 

  3. There are currently 7 of those with clear names: UDP_Invalid_Text_IP, UDP_Failed_Open, UDP_Failed_SetOpt, UDP_Bind, UDP_Failed_Transmit, UDP_Truncated_Send, UDP_Failed_Receive. 

  4. In Ada the exceptions are not propagated to the parent task since they would be potentially too disruptive and I can see that quite clearly. 

No Comments »

No comments yet.

RSS feed for comments on this post. TrackBack URL

Leave a comment

Theme and content by Diana Coman