CI: Add silent cmake SonarCloud scan
[ric-plt/lib/rmr.git] / test / app_test / ex_rts_receiver.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       ex_rts_receiver.c
23         Abstract:       This receiver is a bit more specitalised with respect to 
24                                 expclicitly testing the abilty to expand a message payload
25                                 length when it is necessary for an appliction to send a response
26                                 to a message originator where the rts payload is larger than the
27                                 received message.
28
29                                 This specific test is accomplished by responding to all messages
30                                 with a response which is 1024 bytes in length. This means that
31                                 any message received which is smaller than 1K bytes will have to
32                                 be expanded.  Further, all 1024 bytes will be generated into the
33                                 response, with a checksum, and the expectation is that the message
34                                 originator (assumed to be the v_sender) will verify that the 
35                                 message size is as expected, and that the checksum matches.
36
37                                 This test is concerned only with functionality, and not latency
38                                 or speed, and as such there as not been any attempt to make the 
39                                 building of responses etc. efficent.
40                                 
41                                 Compile time options:
42                                                 DEBUG: write extra output about bad messages etc.
43                                                 MTC:    enable the multi-thraded call support in RMR
44
45         Date:           28 October 2019
46         Author:         E. Scott Daniels
47 */
48
49 #include <unistd.h>
50 #include <errno.h>
51 #include <stdio.h>
52 #include <stdlib.h>
53 #include <time.h>
54 #include <string.h>
55
56 #include <rmr/rmr.h>
57 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone );
58
59
60 #define HDR_SIZE 64                                                     // the size of the header we place into the message
61 #define MSG_SIZE 1024                                           // the size of the message that will be sent (hdr+payload)
62 #define DATA_SIZE (MSG_SIZE-HDR_SIZE)           // the actual 'data' length returned in the ack msg
63
64
65 #ifndef DEBUG
66 #define DEBUG 0
67 #endif
68
69 #include "test_support.c"                                       // checksum, header gen, etc.
70
71 int main( int argc, char** argv ) {
72         void* mrc;                                              // msg router context
73         rmr_mbuf_t* msg = NULL;                         // message received
74         rmr_mbuf_t* omsg = NULL;                        // original message if cloning
75         int     i;
76         int             j;
77         int             state;
78         int             errors = 0;
79         char*   listen_port = "4560";
80         long    count = 0;                                      // total received
81         long    good = 0;                                       // good palyload buffers
82         long    bad = 0;                                        // payload buffers which were not correct
83         long    bad_sid = 0;                            // bad subscription ids
84         long    resized = 0;                            // number of messages we had to resize before replying
85         long    timeout = 0;
86         long    rpt_timeout = 0;                        // next stats message
87         char*   data;
88         int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
89         int             rt_count = 0;                           // retry count
90         long    ack_count = 0;                          // number of acks sent
91         int             count_bins[11];                         // histogram bins based on msg type (0-10)
92         char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
93         char    sbuf[128];                                      // short buffer
94         char    ack_header[64];                         // we'll put checksum and maybe other stuff in the header of the response for validation
95         char    ack_data[DATA_SIZE];            // data randomly generated for each response
96         int             need;                                           // amount of something that we need
97         int             sv;                                                     // checksum valu
98         int             clone = 0;
99         int             copy = 0;
100         char    osrc[128];                                      // src strings to test when cloning
101         char    nsrc[128];
102         int             verbose = 0;
103         int             old_len;
104         int             dmb_size = 2048;                        // default message buffer size
105
106         data = getenv( "RMR_RTG_SVC" );
107         if( data == NULL ) {
108                 setenv( "RMR_RTG_SVC", "19289", 1 );            // set one that won't collide with the sender if on same host
109         }
110
111         i = 1;
112         for( i=1;  i < argc && argv[i] != NULL && *(argv[i]) == '-'; i++ ) {
113                 data = argv[i];
114                 switch( *(data+1) ) {
115                         case 'c':                                       //  copy clone values as xy value: 00 01 10 11
116                                 if( i < argc-1 ) {
117                                         j = atoi( argv[i+1] );
118                                         clone = j % 10 != 0;
119                                         copy = (j/10) != 0;
120
121                                         i++;
122                                 }
123                                 break;
124
125                         case 'p':
126                                 if( i < argc-1 ) {
127                                         listen_port = argv[i+1];
128                                         i++;
129                                 }
130                                 break;
131
132                         case 'v':
133                                 verbose = 1;
134                                 break;
135                 }
136         }
137
138         memset( count_bins, 0, sizeof( count_bins ) );
139
140         fprintf( stderr, "<EXRCVR> listening on port %s for a max of 20 seconds\n", listen_port );
141         fprintf( stderr, "<EXRCVR> copy=%d clone=%d\n", copy, clone );
142
143         if( ! clone ) {
144                 dmb_size = 128;                 // not cloning, force small buffer to test larger buffer receives
145         }
146
147         fprintf( stderr, "<EXRCVR> default receive message buffer size: %d\n", dmb_size );
148 #ifdef MTC
149         fprintf( stderr, "<EXRCVR> starting in multi-threaded mode\n" );
150         mrc = rmr_init( listen_port, dmb_size, RMRFL_MTCALL ); // start RMr in mt-receive mode
151 #else
152         fprintf( stderr, "<EXRCVR> starting in direct receive mode\n" );
153         mrc = rmr_init( listen_port, dmb_size, RMRFL_NONE );    // start your engines!
154 #endif
155         if( mrc == NULL ) {
156                 fprintf( stderr, "<EXRCVR> ABORT:  unable to initialise RMr\n" );
157                 exit( 1 );
158         }
159
160         timeout = time( NULL ) + 20;
161         while( ! rmr_ready( mrc ) ) {                                                           // wait for RMr to load a route table
162                 fprintf( stderr, "<EXRCVR> waiting for RMr to show ready\n" );
163                 sleep( 1 );
164
165                 if( time( NULL ) > timeout ) {
166                         fprintf( stderr, "<EXRCVR> giving up\n" );
167                         exit( 1 );
168                 }
169         }
170         fprintf( stderr, "<EXRCVR> rmr now shows ready, listening begins\n" );
171
172         timeout = time( NULL ) + 20;
173         while( 1 ) {
174                 msg = rmr_torcv_msg( mrc, msg, 1000 );                          // break about every 1s so that if sender never starts we eventually escape
175
176                 if( msg ) {
177                         if( msg->state == RMR_OK ) {
178                                 if( validate_msg( msg->payload, msg->len ) ) {          // defrock the header, then verify lengths and chksum
179                                         good++;
180                                 } else {
181                                         bad++;
182                                 }
183                                 count++;                                                                                // total messages received for stats output
184
185                                 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
186                                         count_bins[msg->mtype]++;
187                                 }
188
189                                 if( clone ) {
190                                         need = msg->len;                                                        // clone what was sent, not entire payload
191                                 } else {
192                                         need = generate_payload( ack_header, ack_data, 0, 0 );          // create an ack w/ random payload in payload, and set data in header
193                                 }
194
195                                 if( clone || rmr_payload_size( msg ) < need ) {                                 // received message too small or we want to clone the original for test
196                                         old_len = msg->len;
197                                         resized++;
198                                         omsg = msg;
199                                         rmr_get_src( omsg, osrc );
200                                         msg = rmr_realloc_payload( msg, need, copy, clone );            // reallocate the message with a payload big enough or clone if set
201                                         rmr_get_src( msg, nsrc );
202
203                                         if( msg->len != old_len ) {
204                                                 fprintf( stderr, "[ERR] after realloc len=%d didn't match old len=%d\n", msg->len, old_len );
205                                         }
206                                         if( strcmp( osrc, nsrc ) != 0 ) {
207                                                 fprintf( stderr, "[ERR] realloc source strings don't match (%s) new=(%s)\n", osrc, nsrc );
208                                         } else {
209                                                 if( verbose ) {
210                                                         fprintf( stderr, "[OK]  realloc source strings match (%s) new=(%s)\n", osrc, nsrc );
211                                                 }
212                                         }
213
214                                         if( copy ) {
215                                                 if( memcmp( omsg->payload, msg->payload, rmr_payload_size( omsg ) ) != 0 ) {
216                                                         fprintf( stderr, "[ERR] realloc payload contents don't match %d bytes\n", rmr_payload_size( omsg ) );
217                                                         //spew( omsg->payload, rmr_payload_size( omsg ) );
218                                                         //fprintf( stderr, "\n---\n\n" );
219                                                         //spew( msg->payload, rmr_payload_size( omsg ) );
220                                                         exit( 1 );
221                                                 }
222                                         }
223
224                                         if( clone ) {
225                                                 rmr_free_msg( omsg );                           // we're done with the old msg, and must free b/c we cloned it
226                                         }
227                                         if( msg == NULL ) {
228                                                 fprintf( stderr, "[ERR] realloc returned a nil pointer\n" );
229                                                 continue;
230                                         }
231                                 }
232
233                                 if( ! clone ) {
234                                         fill_payload( msg, ack_header, 0, ack_data, 0 );                // push headers (with default lengths) into message
235                                 }
236                                 msg->mtype = 99;
237                                 msg->sub_id = -1;
238                                 msg->len = need;
239                                 msg = rmr_rts_msg( mrc, msg );                                                  // return our ack message
240                                 rt_count = 1000;
241                                 while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
242                                         if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
243                                                 sleep( 1 );
244                                         }
245                                         if( rt_count > 5 ) {                                                                    // but only pause for max 5sec not 1000s!
246                                                 rt_count = 5;
247                                         }
248                                         rt_count--;
249                                         msg = rmr_rts_msg( mrc, msg );                                                  // we don't try to resend if this returns retry
250                                 }
251                                 if( msg && msg->state == RMR_OK ) {                                                     // if it eventually worked
252                                         ack_count++;
253                                 }
254
255                                 timeout = time( NULL ) +20;
256                         }
257                 }
258
259                 if( time( NULL ) > timeout ) {
260                         fprintf( stderr, "<EXRCVR> stopping, no recent messages received\n" );
261                         break;
262                 } else {
263                         if( time( NULL ) > rpt_timeout ) {
264                                 fprintf( stderr, "<EXRCVR> %ld msgs=%ld good=%ld  acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
265
266                                 rpt_timeout = time( NULL ) + 5;
267                         }
268                 }
269         }
270
271         wbuf[0] = 0;
272         for( i = 0; i < 11; i++ ) {
273                 snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
274                 strcat( wbuf, sbuf );
275         }
276
277         fprintf( stderr, "<EXRCVR> mtype histogram: %s\n", wbuf );
278         fprintf( stderr, "<EXRCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  resized=%ld bad-sub_id=%ld\n", 
279                 !!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized,  bad_sid );
280
281         sleep( 2 );                                                                     // let any outbound acks flow before closing
282
283         rmr_close( mrc );
284         return !!(errors + bad);                        // bad rc if any are !0
285 }
286