CI: Add silent cmake SonarCloud scan
[ric-plt/lib/rmr.git] / test / app_test / mt_listener.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_listener.c
23         Abstract:       This simple application runs multiple "listener" threads. Each thread
24                                 receives from a single RMR context to validate the ability spin
25                                 several listening threads in an application.
26
27                                 Message format is:
28                                         ck1 ck2|<msg-txt> @ tid<nil>
29
30                                 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
31                                 Ck2 is the simple check sum of the trace data which is a nil terminated
32                                 series of bytes.
33                                 tid is the thread id assigned by the main thread.
34
35                                 Parms:  argv[1] == number of msgs to send (10)
36                                                 argv[2] == delay                (mu-seconds, 1000000 default)
37                                                 argv[3] == number of threads (3)
38                                                 argv[4] == listen port
39
40                                 Sender will send for at most 20 seconds, so if nmsgs and delay extend
41                                 beyond that period the total number of messages sent will be less
42                                 than n.
43
44         Date:           18 April 2019
45         Author:         E. Scott Daniels
46 */
47
48 #include <unistd.h>
49 #include <errno.h>
50 #include <string.h>
51 #include <stdio.h>
52 #include <stdlib.h>
53 #include <sys/epoll.h>
54 #include <time.h>
55 #include <pthread.h>
56
57
58 #include <rmr/rmr.h>
59 #include "time_tools.c"         // our time based test tools
60
61 #define TRACE_SIZE 40           // bytes in header to provide for trace junk
62 #define WBUF_SIZE 2048
63
64 /*
65         Thread data
66 */
67 typedef struct tdata {
68         int     id;                                     // the id we'll pass to RMr mt-call function NOT the thread id
69         int n2get;                              // number of messages to expect
70         int delay;                              // max delay waiting for n2get messages
71         void* mrc;                              // RMr context
72         int     state;
73 } tdata_t;
74
75
76
77 // --------------------------------------------------------------------------------
78
79
80 static int sum( char* str ) {
81         int sum = 0;
82         int i = 0;
83
84         while( *str ) {
85                 sum += *(str++) + i++;
86         }
87
88         return sum % 255;
89 }
90
91 /*
92         Split the message at the first sep and return a pointer to the first
93         character after.
94 */
95 static char* split( char* str, char sep ) {
96         char*   s;
97
98         s = strchr( str, sep );
99
100         if( s ) {
101                 return s+1;
102         }
103
104         //fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
105         return NULL;
106 }
107
108 /*
109         Executed as a thread, this puppy will listen for messages and report
110         what it receives.
111 */
112 static void* mk_calls( void* data ) {
113         tdata_t*        control;
114         rmr_mbuf_t*             msg = NULL;                                     // message
115         int*    count_bins = NULL;
116         char*   wbuf = NULL;
117         char    buf2[128];
118         int             i;
119         int             state = 0;
120         char*   msg_data;                                               // bits after checksum info in payload
121         long    good = 0;                                               // counters
122         long    bad = 0;
123         long    bad_tr = 0;
124         long    count = 0;                                              // total msgs received
125         struct timespec start_ts;
126         struct timespec end_ts;
127         int             elap;                                                   // elapsed time to receive messages
128         time_t  timeout;
129
130         count_bins = (int *) malloc( sizeof( int ) * 11 );
131
132         wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
133
134         if( (control  = (tdata_t *) data) == NULL ) {
135                 fprintf( stderr, "thread data was nil; bailing out\n" );
136         }
137         fprintf( stderr, "<THRD> id=%d thread receiver started expecting=%d messages timeout=%d seconds\n", 
138                 control->id, control->n2get, control->delay );
139
140         timeout = time( NULL ) + control->delay;                        // max time to wait for a good message
141         while( count < control->n2get ) {               // wait for n messages -- no timeout
142                 msg = rmr_torcv_msg( control->mrc, msg, 1000 );                         //  pop after ~1 second
143
144                 if( msg ) {
145                         //fprintf( stderr, "<THRD> id=%d got type=%d state=%s msg=(%s)\n", control->id, msg->mtype, msg->state == RMR_OK ? "OK" : "timeout", msg->payload );
146                         if( msg->state == RMR_OK ) {
147                                 if( good == 0 ) {                               // mark time of first good message
148                                         set_time( &start_ts );
149                                 }
150                                 set_time( &end_ts );            // mark the time of last good message
151
152                                 if( (msg_data = split( msg->payload, '|'  )) != NULL ) {
153                                         if( sum( msg_data ) == atoi( (char *) msg->payload ) ) {
154                                                 good++;
155                                         } else {
156                                                 fprintf( stderr, "<RCVR> chk sum bad: computed=%d expected;%d (%s)\n", sum( msg_data ), 
157                                                         atoi( msg->payload ), msg_data );
158                                                 bad++;
159                                         }
160
161                                         if( (msg_data = split( msg->payload, ' ' )) != NULL ) {                 // data will point to the chksum for the trace data
162                                                 state = rmr_get_trace( msg, wbuf, 1024 );                               // should only copy upto the trace size; we'll check that
163                                                 if( state > 128 || state < 0 ) {
164                                                         fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state );
165                                                 } else {
166                                                         if( state  &&  sum( wbuf ) != atoi( msg_data ) ) {
167                                                                 fprintf( stderr, "<RCVR> trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ), 
168                                                                                 atoi( msg_data ), state, wbuf );
169                                                                 bad_tr++;
170                                                         }
171                                                 }
172                                         }
173                                 } else {
174                                         good++;         // nothing to check, assume good
175                                 }
176                                 count++;
177
178                                 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
179                                         count_bins[msg->mtype]++;
180                                 }
181                         }
182                 } else {
183                         fprintf( stderr, "<THRD> id=%d timeout with nil msg\n", control->id );
184                 }
185
186                 if( time( NULL ) > timeout ) {
187                         fprintf( stderr, "<THRD> id=%d timeout before receiving %d messages\n", control->id, control->n2get );
188                         break;
189                 }
190         }
191         elap = elapsed( &start_ts, &end_ts, ELAP_MS );
192         if( elap > 0 ) {
193                 fprintf( stderr, "<THRD> id=%d received %ld messages in %d ms rate = %ld msg/sec\n", control->id, count, elap, (count/elap)*1000 );
194         } else {
195                 fprintf( stderr, "<THRD> id=%d runtime too short to compute received rate\n", control->id );
196         }
197
198         snprintf( wbuf, WBUF_SIZE, "<THRD> id=%d histogram: ", control->id );           // build histogram so we can write with one fprintf call
199         for( i = 0; i < 11; i++ ) {
200                 snprintf( buf2, sizeof( buf2 ), "%5d ", count_bins[i] );
201                 strcat( wbuf, buf2 );
202         }
203         fprintf( stderr, "%s\n", wbuf );
204
205         fprintf( stderr, "<THRD> id=%d %ld messages %ld good %ld bad\n", control->id, count, good, bad );
206
207         control->state = bad > 0 ? -1 : 0;                                              // set to indicate done and <0 to indicate some failure
208         control->state += count < control->n2get ? -2 : 0;
209         return NULL;
210 }
211
212 int main( int argc, char** argv ) {
213         void* mrc;                                                      // msg router context
214         rmr_mbuf_t*     rbuf = NULL;                            // received on 'normal' flow
215         struct  epoll_event events[1];                  // list of events to give to epoll
216         struct  epoll_event epe;                                // event definition for event to listen to
217         int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
218         char*   listen_port = "43086";
219         long    timeout = 0;                                    // time the main thread will pop if listeners have not returned
220         int             delay = 30;                                             // max time to wait for n messages
221         int             nmsgs = 10;                                             // number of messages to expect
222         int             nthreads = 3;                                   // default number of listener threads
223         tdata_t*        cvs;                                            // vector of control blocks
224         int                     i;
225         pthread_t*      pt_info;                                        // thread stuff
226         int     failures = 0;
227
228         if( argc > 1 ) {
229                 nmsgs = atoi( argv[1] );
230         }
231         if( argc > 2 ) {
232                 delay = atoi( argv[2] );
233         }
234         if( argc > 3 ) {
235                 nthreads = atoi( argv[3] );
236         }
237         if( argc > 4 ) {
238                 listen_port = argv[4];
239         }
240
241         fprintf( stderr, "<MTL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
242
243         if( (mrc = rmr_init( listen_port, 1400, 0 )) == NULL ) {
244                 fprintf( stderr, "<MTL> unable to initialise RMr\n" );
245                 exit( 1 );
246         }
247
248         rmr_init_trace( mrc, TRACE_SIZE );
249
250         cvs = malloc( sizeof( tdata_t ) * nthreads );
251         pt_info = malloc( sizeof( pthread_t ) * nthreads );
252         if( cvs == NULL ) {
253                 fprintf( stderr, "<MTL> unable to allocate control vector\n" );
254                 exit( 1 );      
255         }
256
257         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
258         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
259                 fprintf( stderr, "<MTL> waiting for rmr to show ready\n" );
260                 sleep( 1 );
261
262                 if( time( NULL ) > timeout ) {
263                         fprintf( stderr, "<MTL> giving up\n" );
264                         exit( 1 );
265                 }
266         }
267         fprintf( stderr, "<MTL> rmr is ready; starting threads\n" );
268
269         for( i = 0; i < nthreads; i++ ) {
270                 cvs[i].mrc = mrc;
271                 cvs[i].id = i + 2;                              // we pass this as the call-id to rmr, so must be >1
272                 cvs[i].delay = delay;
273                 cvs[i].n2get = nmsgs;
274                 cvs[i].state = 1;
275
276                 fprintf( stderr, "kicking %d i=%d\n", i+2, i );
277                 pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] );         // kick a thread
278         }
279
280         timeout = time( NULL ) + 300;   // wait up to 5 minutes
281         i = 0;
282         while( nthreads > 0 ) {
283                 if( cvs[i].state < 1 ) {                        // states 0 or below indicate done. 0 == good; <0 is failure
284                         nthreads--;
285                         if( cvs[i].state < 0 ) {
286                                 failures++;
287                         }
288                         i++;
289                 }
290                 
291                 if( time( NULL ) > timeout ) {
292                         failures += nthreads;
293                         fprintf( stderr, "<MTL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
294                         break;
295                 }
296
297                 sleep( 1 );
298         }
299
300         fprintf( stderr, "<MTL> [%s] failing threads=%d\n", failures == 0 ? "PASS" : "FAIL",  failures );
301         rmr_close( mrc );
302
303         return failures > 0;
304 }
305