Issue-ID: SIM-18
[sim/e2-interface.git] / e2sim / e2apv1sim / e2sim / test / rmr_interface / tests / receiver / rmr_rcvr.c
diff --git a/e2sim/e2apv1sim/e2sim/test/rmr_interface/tests/receiver/rmr_rcvr.c b/e2sim/e2apv1sim/e2sim/test/rmr_interface/tests/receiver/rmr_rcvr.c
new file mode 100644 (file)
index 0000000..2d8e7fd
--- /dev/null
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright 2019 AT&T Intellectual Property
+ * Copyright 2019 Nokia
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// :vim ts=4 sw=4 noet:
+/*
+       Mnemonic:       rmr_rcvr2.c
+       Abstract:       Very simple test listener built on RMr libraries. It does nothing
+                               but return the message it recevied back to the sender.
+
+                               Define these environment variables to have some control:
+                                       RMR_SEED_RT -- path to the static routing table
+                                       RMR_RTG_SVC -- host:port of the route table generator
+
+                               One command line parm is accepted: stats frequency.  This is a number, n,
+                               which causes stats to be generated after every n messages. If set to 0
+                               each message is written when received and no stats (msg rate) is generated.
+
+       Date:           11 February 2018
+       Author:         E. Scott Daniels
+
+       Mods:           18 Mar 2019 -- simplified for demo base.
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+
+#include <rmr/rmr.h>
+
+typedef struct {
+    int32_t mtype;                      // message type  ("long" network integer)
+    int32_t plen;                       // payload length
+    unsigned char xid[RMR_MAX_XID];     // space for user transaction id or somesuch
+    unsigned char sid[RMR_MAX_SID];     // sender ID for return to sender needs
+    unsigned char src[RMR_MAX_SRC];     // name of the sender (source)
+    struct timespec ts;                 // timestamp ???
+} mhdr_t;
+
+
+int main( int argc, char** argv ) {
+    void* mrc;                                         // msg router context
+    rmr_mbuf_t* msg = NULL;                            // message received
+       int i;
+       char*   listen_port;
+       char*   tok;
+       int             must_ack = 1;                           // flag -- if set we rts all messages
+       mhdr_t* hdr;
+       int last_seq = 0;                                       // sequence number from last message
+       int this_seq;                                           // sequence number on this message
+       int count = 0;                                          // count of msg since last status
+       long long tcount = 0;                           // total count of messages
+       time_t ts;
+       time_t lts;
+       int stat_freq = 20000;                          // write stats after reciving this many messages
+       int     first_seq = -1;                                 // first sequence number we got to report total received
+       int     max_rt = 1000;                                  // max times we'll retry an ack
+
+       if( (tok = getenv( "RMR_RCV_ACK" )) != NULL ) {
+               must_ack = atoi( tok );
+       }
+
+       if( (listen_port = getenv( "PENDULUM_XAPP_RMR_RCV_PORT" )) == NULL ) {
+               listen_port = "4560";
+       }
+
+       if( argc > 1 ) {
+               stat_freq = atoi( argv[1] );
+       }
+       fprintf( stderr, "<TEST> stats will be reported every %d messages\n", stat_freq );
+
+    mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );      // start your engines!
+       if( mrc == NULL ) {
+               fprintf( stderr, "<TEST> ABORT:  unable to initialise RMr\n" );
+               exit( 1 );
+       }
+
+       while( ! rmr_ready( mrc ) ) {
+               fprintf( stderr, "<TEST> waiting for RMr to show ready\n" );
+               sleep( 1 );
+       }
+       fprintf( stderr, "<TEST> RMr now shows ready\n" );
+
+       lts = time( NULL );
+       fprintf( stderr, "<TEST> listening on %s acking %s\n", listen_port, must_ack != 0 ? "on" : "off"  );
+
+       //rmr_set_stimeout( mrc, 50 );
+    while( 1 ) {
+               sleep (2 );
+               msg = rmr_rcv_msg( mrc, msg );                                          // block until one arrives
+               if( msg == NULL ) {
+                       continue;                               // shouldn't happen, but don't crash if we get nothing
+               }
+               if( msg->mtype < 0 || msg->state != RMR_OK ) {
+                       fprintf( stderr, "[WRN] bad msg:  state=%d  errno=%d\n", msg->state, errno );
+                       continue;                       // just loop to receive another
+               }
+
+               if( stat_freq == 0 ) {                          // mechanism to dump all received messages for quick testing
+                       fprintf( stdout, "<TEST> msg received: type = %d len = %d (%s)\n", msg->mtype, msg->len, msg->payload );        // assume a nil term string in payload
+               }
+
+               count++;                // messages received for stats output
+               tcount++;
+
+               //if( stat_freq >= 1000 ) {
+    if(1) {
+                       //if( (count % stat_freq) == 0  ) {
+      if(1) {
+                               ts = time( NULL );
+                               if( ts - lts ) {
+
+                                       fprintf( stderr, "<TEST> %7lld received %5lld msg/s over the last %3lld seconds  mrt=%d, with content=%s\n",
+                                                       (long long) last_seq - first_seq, (long long) (count / (ts-lts)), (long long) ts-lts, max_rt,msg->payload );
+                                       lts = ts;
+                                       count = 0;
+                               }
+                       }
+               }
+
+               if( must_ack ) {                                // send back a response
+                       //fprintf( stdout, "<TEST> msg: type = %d len = %d; acking\n", msg->mtype, msg->len );
+                       //msg->len = snprintf( msg->payload, 1024, "bar %lld", tcount );                                // ack with bar and counter
+      msg->len = snprintf( msg->payload, 1024, "Reply hello back to Arduino!\n");
+      // msg->len = snprintf( msg->payload, 1024, "OK\n");
+
+
+      //msg->mtype = 999; //only to be used if rts is not possible
+
+                       //msg = rmr_send_msg (mrc, msg); //only to be used if rts is not possible
+
+       msg = rmr_rts_msg( mrc, msg );                                                          // this is a retur to sender; preferred
+                       //if( (msg = rmr_send_msg( mrc, msg )) != NULL ) {                      // this is a routed send; not preferred, but possible
+                        if( (msg = rmr_rts_msg( mrc, msg )) != NULL ) {
+                               //----- checking too many times here has been problematic and causes what appears to be race condidtions in NNG threads; for now max_rt should be small
+                               max_rt = 2;
+                               while( max_rt > 0 && msg->state != RMR_OK && errno == EAGAIN ) {                // NNG likes to refuse sends, just keep trying on eagain
+                                       max_rt--;
+                                       rmr_rts_msg( mrc, msg );
+          //rmr_send_msg (mrc, msg);
+                               }
+                       }
+               }
+
+    }
+}