Add initial codes
[it/test.git] / simulators / e2sim / src / rmr_interface / tests / receiver / rmr_rcvr.c
1 /*
2  *
3  * Copyright 2019 AT&T Intellectual Property
4  * Copyright 2019 Nokia
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  */
19
20 // :vim ts=4 sw=4 noet:
21 /*
22         Mnemonic:       rmr_rcvr2.c
23         Abstract:       Very simple test listener built on RMr libraries. It does nothing
24                                 but return the message it recevied back to the sender.
25
26                                 Define these environment variables to have some control:
27                                         RMR_SEED_RT -- path to the static routing table
28                                         RMR_RTG_SVC -- host:port of the route table generator
29
30                                 One command line parm is accepted: stats frequency.  This is a number, n,
31                                 which causes stats to be generated after every n messages. If set to 0
32                                 each message is written when received and no stats (msg rate) is generated.
33
34         Date:           11 February 2018
35         Author:         E. Scott Daniels
36
37         Mods:           18 Mar 2019 -- simplified for demo base.
38 */
39
40 #include <unistd.h>
41 #include <errno.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45
46 #include <rmr/rmr.h>
47
48 typedef struct {
49     int32_t mtype;                      // message type  ("long" network integer)
50     int32_t plen;                       // payload length
51     unsigned char xid[RMR_MAX_XID];     // space for user transaction id or somesuch
52     unsigned char sid[RMR_MAX_SID];     // sender ID for return to sender needs
53     unsigned char src[RMR_MAX_SRC];     // name of the sender (source)
54     struct timespec ts;                 // timestamp ???
55 } mhdr_t;
56
57
58 int main( int argc, char** argv ) {
59     void* mrc;                                          // msg router context
60     rmr_mbuf_t* msg = NULL;                             // message received
61         int i;
62         char*   listen_port;
63         char*   tok;
64         int             must_ack = 1;                           // flag -- if set we rts all messages
65         mhdr_t* hdr;
66         int last_seq = 0;                                       // sequence number from last message
67         int this_seq;                                           // sequence number on this message
68         int count = 0;                                          // count of msg since last status
69         long long tcount = 0;                           // total count of messages
70         time_t ts;
71         time_t lts;
72         int stat_freq = 20000;                          // write stats after reciving this many messages
73         int     first_seq = -1;                                 // first sequence number we got to report total received
74         int     max_rt = 1000;                                  // max times we'll retry an ack
75
76         if( (tok = getenv( "RMR_RCV_ACK" )) != NULL ) {
77                 must_ack = atoi( tok );
78         }
79
80         if( (listen_port = getenv( "PENDULUM_XAPP_RMR_RCV_PORT" )) == NULL ) {
81                 listen_port = "4560";
82         }
83
84         if( argc > 1 ) {
85                 stat_freq = atoi( argv[1] );
86         }
87         fprintf( stderr, "<TEST> stats will be reported every %d messages\n", stat_freq );
88
89     mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );       // start your engines!
90         if( mrc == NULL ) {
91                 fprintf( stderr, "<TEST> ABORT:  unable to initialise RMr\n" );
92                 exit( 1 );
93         }
94
95         while( ! rmr_ready( mrc ) ) {
96                 fprintf( stderr, "<TEST> waiting for RMr to show ready\n" );
97                 sleep( 1 );
98         }
99         fprintf( stderr, "<TEST> RMr now shows ready\n" );
100
101         lts = time( NULL );
102         fprintf( stderr, "<TEST> listening on %s acking %s\n", listen_port, must_ack != 0 ? "on" : "off"  );
103
104         //rmr_set_stimeout( mrc, 50 );
105     while( 1 ) {
106                 sleep (2 );
107                 msg = rmr_rcv_msg( mrc, msg );                                          // block until one arrives
108                 if( msg == NULL ) {
109                         continue;                               // shouldn't happen, but don't crash if we get nothing
110                 }
111                 if( msg->mtype < 0 || msg->state != RMR_OK ) {
112                         fprintf( stderr, "[WRN] bad msg:  state=%d  errno=%d\n", msg->state, errno );
113                         continue;                       // just loop to receive another
114                 }
115
116                 if( stat_freq == 0 ) {                          // mechanism to dump all received messages for quick testing
117                         fprintf( stdout, "<TEST> msg received: type = %d len = %d (%s)\n", msg->mtype, msg->len, msg->payload );        // assume a nil term string in payload
118                 }
119
120                 count++;                // messages received for stats output
121                 tcount++;
122
123                 //if( stat_freq >= 1000 ) {
124     if(1) {
125                         //if( (count % stat_freq) == 0  ) {
126       if(1) {
127                                 ts = time( NULL );
128                                 if( ts - lts ) {
129
130                                         fprintf( stderr, "<TEST> %7lld received %5lld msg/s over the last %3lld seconds  mrt=%d, with content=%s\n",
131                                                         (long long) last_seq - first_seq, (long long) (count / (ts-lts)), (long long) ts-lts, max_rt,msg->payload );
132                                         lts = ts;
133                                         count = 0;
134                                 }
135                         }
136                 }
137
138                 if( must_ack ) {                                // send back a response
139                         //fprintf( stdout, "<TEST> msg: type = %d len = %d; acking\n", msg->mtype, msg->len );
140                         //msg->len = snprintf( msg->payload, 1024, "bar %lld", tcount );                                // ack with bar and counter
141       msg->len = snprintf( msg->payload, 1024, "Reply hello back to Arduino!\n");
142       // msg->len = snprintf( msg->payload, 1024, "OK\n");
143
144
145       //msg->mtype = 999; //only to be used if rts is not possible
146
147                         //msg = rmr_send_msg (mrc, msg); //only to be used if rts is not possible
148
149        msg = rmr_rts_msg( mrc, msg );                                                           // this is a retur to sender; preferred
150                         //if( (msg = rmr_send_msg( mrc, msg )) != NULL ) {                      // this is a routed send; not preferred, but possible
151                          if( (msg = rmr_rts_msg( mrc, msg )) != NULL ) {
152                                 //----- checking too many times here has been problematic and causes what appears to be race condidtions in NNG threads; for now max_rt should be small
153                                 max_rt = 2;
154                                 while( max_rt > 0 && msg->state != RMR_OK && errno == EAGAIN ) {                // NNG likes to refuse sends, just keep trying on eagain
155                                         max_rt--;
156                                         rmr_rts_msg( mrc, msg );
157           //rmr_send_msg (mrc, msg);
158                                 }
159                         }
160                 }
161
162     }
163 }