Beef up tests to the extended header functions
[ric-plt/lib/rmr.git] / test / app_test / ex_rts_receiver.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       ex_rts_receiver.c
23         Abstract:       This receiver is a bit more specitalised with respect to 
24                                 expclicitly testing the abilty to expand a message payload
25                                 length when it is necessary for an appliction to send a response
26                                 to a message originator where the rts payload is larger than the
27                                 received message.
28
29                                 This specific test is accomplished by responding to all messages
30                                 with a response which is 1024 bytes in length. This means that
31                                 any message received which is smaller than 1K bytes will have to
32                                 be expanded.  Further, all 1024 bytes will be generated into the
33                                 response, with a checksum, and the expectation is that the message
34                                 originator (assumed to be the v_sender) will verify that the 
35                                 message size is as expected, and that the checksum matches.
36
37                                 This test is concerned only with functionality, and not latency
38                                 or speed, and as such there as not been any attempt to make the 
39                                 building of responses etc. efficent.
40                                 
41                                 Compile time options:
42                                                 DEBUG: write extra output about bad messages etc.
43                                                 MTC:    enable the multi-thraded call support in RMR
44
45         Date:           28 October 2019
46         Author:         E. Scott Daniels
47 */
48
49 #include <unistd.h>
50 #include <errno.h>
51 #include <stdio.h>
52 #include <stdlib.h>
53 #include <time.h>
54 #include <string.h>
55
56 #include <rmr/rmr.h>
57 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone );
58
59
60 #define HDR_SIZE 64                                                     // the size of the header we place into the message
61 #define MSG_SIZE 1024                                           // the size of the message that will be sent (hdr+payload)
62 #define DATA_SIZE (MSG_SIZE-HDR_SIZE)           // the actual 'data' length returned in the ack msg
63
64
65 #ifndef DEBUG
66 #define DEBUG 0
67 #endif
68
69 #include "test_support.c"                                       // checksum, header gen, etc.
70
71 int main( int argc, char** argv ) {
72         void* mrc;                                              // msg router context
73         rmr_mbuf_t* msg = NULL;                         // message received
74         rmr_mbuf_t* omsg = NULL;                        // original message if cloning
75         int     i;
76         int             j;
77         int             state;
78         int             errors = 0;
79         char*   listen_port = "4560";
80         long    count = 0;                                      // total received
81         long    good = 0;                                       // good palyload buffers
82         long    bad = 0;                                        // payload buffers which were not correct
83         long    bad_sid = 0;                            // bad subscription ids
84         long    resized = 0;                            // number of messages we had to resize before replying
85         long    timeout = 0;
86         long    rpt_timeout = 0;                        // next stats message
87         char*   data;
88         int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
89         int             rt_count = 0;                           // retry count
90         long    ack_count = 0;                          // number of acks sent
91         int             count_bins[11];                         // histogram bins based on msg type (0-10)
92         char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
93         char    sbuf[128];                                      // short buffer
94         char    ack_header[64];                         // we'll put checksum and maybe other stuff in the header of the response for validation
95         char    ack_data[DATA_SIZE];            // data randomly generated for each response
96         int             need;                                           // amount of something that we need
97         int             sv;                                                     // checksum valu
98         int             clone = 0;
99         int             copy = 0;
100         char    osrc[128];                                      // src strings to test when cloning
101         char    nsrc[128];
102         int             verbose = 0;
103
104         data = getenv( "RMR_RTG_SVC" );
105         if( data == NULL ) {
106                 setenv( "RMR_RTG_SVC", "19289", 1 );            // set one that won't collide with the sender if on same host
107         }
108
109         i = 1;
110         for( i=1;  i < argc && argv[i] != NULL && *(argv[i]) == '-'; i++ ) {
111                 data = argv[i];
112                 switch( *(data+1) ) {
113                         case 'c':                                       //  copy clone values as xy value: 00 01 10 11
114                                 if( i < argc-1 ) {
115                                         j = atoi( argv[i+1] );
116                                         clone = j % 10 != 0;
117                                         copy = (j/10) != 0;
118
119                                         i++;
120                                 }
121                                 break;
122
123                         case 'p':
124                                 if( i < argc-1 ) {
125                                         listen_port = argv[i+1];
126                                         i++;
127                                 }
128                                 break;
129
130                         case 'v':
131                                 verbose = 1;
132                                 break;
133                 }
134         }
135
136         memset( count_bins, 0, sizeof( count_bins ) );
137
138         fprintf( stderr, "<EXRCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
139         fprintf( stderr, "<EXRCVR> copy=%d clone=%d\n", copy, clone );
140
141 #ifdef MTC
142         fprintf( stderr, "<EXRCVR> starting in multi-threaded mode\n" );
143         mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
144 #else
145         fprintf( stderr, "<EXRCVR> starting in direct receive mode\n" );
146         mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
147 #endif
148         if( mrc == NULL ) {
149                 fprintf( stderr, "<EXRCVR> ABORT:  unable to initialise RMr\n" );
150                 exit( 1 );
151         }
152
153         timeout = time( NULL ) + 20;
154         while( ! rmr_ready( mrc ) ) {                                                           // wait for RMr to load a route table
155                 fprintf( stderr, "<EXRCVR> waiting for RMr to show ready\n" );
156                 sleep( 1 );
157
158                 if( time( NULL ) > timeout ) {
159                         fprintf( stderr, "<EXRCVR> giving up\n" );
160                         exit( 1 );
161                 }
162         }
163         fprintf( stderr, "<EXRCVR> rmr now shows ready, listening begins\n" );
164
165         timeout = time( NULL ) + 20;
166         while( 1 ) {
167                 msg = rmr_torcv_msg( mrc, msg, 1000 );                          // break about every 1s so that if sender never starts we eventually escape
168
169                 if( msg ) {
170                         if( msg->state == RMR_OK ) {
171                                 if( validate_msg( msg->payload, msg->len ) ) {          // defrock the header, then verify lengths and chksum
172                                         good++;
173                                 } else {
174                                         bad++;
175                                 }
176                                 count++;                                                                                // total messages received for stats output
177
178                                 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
179                                         count_bins[msg->mtype]++;
180                                 }
181
182                                 if( clone ) {
183                                         need = rmr_payload_size( msg );
184                                 } else {
185                                         need = generate_payload( ack_header, ack_data, 0, 0 );          // create an ack w/ random payload in payload, and set data in header
186                                 }
187
188                                 if( clone || rmr_payload_size( msg ) < need ) {                                 // received message too small or we want to clone the original for test
189                                         resized++;
190                                         omsg = msg;
191                                         rmr_get_src( omsg, osrc );
192                                         msg = rmr_realloc_payload( msg, need, copy, clone );            // reallocate the message with a payload big enough or clone if set
193                                         rmr_get_src( msg, nsrc );
194
195                                         if( strcmp( osrc, nsrc ) != 0 ) {
196                                                 fprintf( stderr, "[ERR] realloc source strings don't match (%s) new=(%s)\n", osrc, nsrc );
197                                         } else {
198                                                 if( verbose ) {
199                                                         fprintf( stderr, "[OK]  realloc source strings match (%s) new=(%s)\n", osrc, nsrc );
200                                                 }
201                                         }
202
203                                         if( copy ) {
204                                                 if( memcmp( omsg->payload, msg->payload, rmr_payload_size( omsg ) ) != 0 ) {
205                                                         fprintf( stderr, "[ERR] realloc payload contents don't match %d bytes\n", rmr_payload_size( omsg ) );
206                                                         //spew( omsg->payload, rmr_payload_size( omsg ) );
207                                                         //fprintf( stderr, "\n---\n\n" );
208                                                         //spew( msg->payload, rmr_payload_size( omsg ) );
209                                                         exit( 1 );
210                                                 }
211                                         }
212
213                                         if( clone ) {
214                                                 rmr_free_msg( omsg );                           // we're done with the old msg, and must free b/c we cloned it
215                                         }
216                                         if( msg == NULL ) {
217                                                 fprintf( stderr, "[ERR] realloc returned a nil pointer\n" );
218                                                 continue;
219                                         }
220                                 }
221
222                                 if( ! clone ) {
223                                         fill_payload( msg, ack_header, 0, ack_data, 0 );                // push headers (with default lengths) into message
224                                 }
225                                 msg->mtype = 99;
226                                 msg->sub_id = -1;
227                                 msg->len = need;
228                                 msg = rmr_rts_msg( mrc, msg );                                                  // return our ack message
229                                 rt_count = 1000;
230                                 while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
231                                         if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
232                                                 sleep( 1 );
233                                         }
234                                         if( rt_count > 5 ) {                                                                    // but only pause for max 5sec not 1000s!
235                                                 rt_count = 5;
236                                         }
237                                         rt_count--;
238                                         msg = rmr_rts_msg( mrc, msg );                                                  // we don't try to resend if this returns retry
239                                 }
240                                 if( msg && msg->state == RMR_OK ) {                                                     // if it eventually worked
241                                         ack_count++;
242                                 }
243
244                                 timeout = time( NULL ) +20;
245                         }
246                 }
247
248                 if( time( NULL ) > timeout ) {
249                         fprintf( stderr, "<EXRCVR> stopping, no recent messages received\n" );
250                         break;
251                 } else {
252                         if( time( NULL ) > rpt_timeout ) {
253                                 fprintf( stderr, "<EXRCVR> %ld msgs=%ld good=%ld  acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
254
255                                 rpt_timeout = time( NULL ) + 5;
256                         }
257                 }
258         }
259
260         wbuf[0] = 0;
261         for( i = 0; i < 11; i++ ) {
262                 snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
263                 strcat( wbuf, sbuf );
264         }
265
266         fprintf( stderr, "<EXRCVR> mtype histogram: %s\n", wbuf );
267         fprintf( stderr, "<EXRCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  resized=%ld bad-sub_id=%ld\n", 
268                 !!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized,  bad_sid );
269
270         sleep( 2 );                                                                     // let any outbound acks flow before closing
271
272         rmr_close( mrc );
273         return !!(errors + bad);                        // bad rc if any are !0
274 }
275