CI: Add silent cmake SonarCloud scan
[ric-plt/lib/rmr.git] / test / app_test / sender.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sender.c
23         Abstract:       This is a simple sender which will send a series of messages.
24                                 It is expected that the first attempt(s) will fail if the receiver
25                                 is not up and this does not start decrementing the number to
26                                 send until it has a good send.
27
28                                 The process will check the receive queue and list received messages
29                                 but pass/fail is not dependent on what comes back.
30
31                                 If the receiver(s) do not become connectable in 20 sec this process
32                                 will give up and fail.
33
34
35                                 Message types will vary between 0 and 9, so the route table must
36                                 be set up to support those message types. Further, for message types
37                                 0, 1 and 2, the subscription ID will be set to type x 10, so the route
38                                 table must be set to include the sub-id for those types in order for
39                                 the messages to reach their destination.
40
41                                 Message format is:
42                                         ck1 ck2|<msg-txt><nil>
43
44                                 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
45                                 Ck2 is the simple check sum of the trace data which is a nil terminated
46                                 series of bytes.
47
48                                 Parms:  argv[1] == number of msgs to send (10)
49                                                 argv[2] == delay                (mu-seconds, 1000000 default)
50                                                 argv[3] == max msg type (not inclusive; default 10)
51                                                 argv[4] == listen port
52
53                                 Sender will send for at most 20 seconds, so if nmsgs and delay extend
54                                 beyond that period the total number of messages sent will be less
55                                 than n.
56
57         Date:           18 April 2019
58         Author:         E. Scott Daniels
59 */
60
61 #include <unistd.h>
62 #include <errno.h>
63 #include <string.h>
64 #include <stdio.h>
65 #include <stdlib.h>
66 #include <sys/epoll.h>
67 #include <time.h>
68
69 #include <rmr/rmr.h>
70
71 #define WBUF_SIZE       1024
72 #define TRACE_SIZE      1024
73
74 static int sum( char* str ) {
75         int sum = 0;
76         int     i = 0;
77
78         while( *str ) {
79                 sum += *(str++) + i++;
80         }
81
82         return sum % 255;
83 }
84
85 /*
86         See if my id string is in the buffer immediately after the first >.
87         Return 1 if so, 0 if not.
88 */
89 static int vet_received( char* me, char* buf ) {
90         char*   ch;
91
92         if( (ch = strchr( buf, '>' )) == NULL ) {
93                 return 0;
94         }
95
96         return strcmp( me, ch+1 ) == 0;
97 }
98
99 int main( int argc, char** argv ) {
100         void* mrc;                                                      // msg router context
101         struct  epoll_event events[1];                  // list of events to give to epoll
102         struct  epoll_event epe;                                // event definition for event to listen to
103         int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
104         int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
105         int             nready;                                                 // number of events ready for receive
106         rmr_mbuf_t*             sbuf;                                   // send buffer
107         rmr_mbuf_t*             rbuf;                                   // received buffer
108         char*   ch;
109         int             count = 0;
110         int             rt_count = 0;                                   // number of messages requiring a spin retry
111         int             rcvd_count = 0;
112         int             rts_ok = 0;                                             // number received with our tag
113         int             fail_count = 0;                                 // # of failure sends after first successful send
114         char*   listen_port = "43086";
115         int             mtype = 0;
116         int             stats_freq = 100;
117         int             successful = 0;                                 // set to true after we have a successful send
118         char*   wbuf = NULL;                                    // working buffer
119         char    me[128];                                                // who I am to vet rts was actually from me
120         char*   trace = NULL;                                   // area to build trace data in
121         long    timeout = 0;
122         int             delay = 100000;                                 // usec between send attempts
123         int             nmsgs = 10;                                             // number of messages to send
124         int             max_mt = 10;                                    // reset point for message type
125         int             start_mt = 0;
126         int             pass = 1;
127
128         wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
129         trace = (char *) malloc( sizeof( char ) * TRACE_SIZE );
130
131         if( argc > 1 ) {
132                 nmsgs = atoi( argv[1] );
133         }
134         if( argc > 2 ) {
135                 delay = atoi( argv[2] );
136         }
137         if( argc > 3 ) {
138                 if( (ch = strchr( argv[3], ':' )) != NULL ) {
139                         max_mt = atoi( ch+1 );
140                         start_mt = atoi( argv[3] );
141                 } else {
142                         max_mt = atoi( argv[3] );
143                 }
144         }
145         if( argc > 4 ) {
146                 listen_port = argv[4];
147         }
148
149         mtype = start_mt;
150
151         fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
152
153         if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
154                 fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
155                 exit( 1 );
156         }
157
158         if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                    // epoll only available from NNG -- skip receive later if not NNG
159                 if( rcv_fd < 0 ) {
160                         fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
161                         exit( 1 );
162                 }
163                 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
164                         fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
165                         exit( 1 );
166                 }
167                 epe.events = EPOLLIN;
168                 epe.data.fd = rcv_fd;
169
170                 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
171                         fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
172                         exit( 1 );
173                 }
174         } else {
175                 rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
176         }
177
178         sbuf = rmr_alloc_msg( mrc, 1024 );      // alloc first send buffer; subsequent buffers allcoated on send
179         //sbuf = rmr_tralloc_msg( mrc, 1024, 11, "xxxxxxxxxx" );        // alloc first send buffer; subsequent buffers allcoated on send
180         rbuf = NULL;                                            // don't need to alloc receive buffer
181
182         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
183         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
184                 fprintf( stderr, "<SNDR> waiting for rmr to show ready\n" );
185                 sleep( 1 );
186
187                 if( time( NULL ) > timeout ) {
188                         fprintf( stderr, "<SNDR> giving up\n" );
189                         exit( 1 );
190                 }
191         }
192         fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
193
194         timeout = time( NULL ) + 20;
195
196         gethostname( wbuf, WBUF_SIZE );
197         snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
198
199         while( count < nmsgs ) {                                                                // we send n messages after the first message is successful
200                 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
201                 rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
202                 snprintf( wbuf, 512, "count=%d tr=%s %d stand up and cheer!>%s", count, trace, rand(), me );
203                 snprintf( sbuf->payload, 1024, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
204
205                 sbuf->mtype = mtype;                                                    // fill in the message bits
206                 if( mtype < 3 ) {
207                         sbuf->sub_id = mtype * 10;
208                 } else {
209                         sbuf->sub_id = -1;
210                 }
211
212                 sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
213                 sbuf->state = 0;
214
215                 fprintf( stderr, "<SNDR> sending msg type %d\n", sbuf->mtype );
216
217                 sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
218
219                 switch( sbuf->state ) {
220                         case RMR_ERR_RETRY:
221                                 rt_count++;
222                                 while( time( NULL ) < timeout && sbuf->state == RMR_ERR_RETRY ) {                       // soft failure (device busy?) retry
223                                         sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
224                                 }
225                                 if( sbuf->state == RMR_OK ) {
226                                         if( successful == 0 ) {
227                                                 fail_count = 0;                                                 // count only after first message goes through
228                                         }
229                                         successful = 1;                                                         // indicates only that we sent one successful message, not the current state
230                                 } else {
231                                         fail_count++;                                                   // count failures after first successful message
232                                         if( !successful && fail_count > 30 ) {
233                                                 fprintf( stderr, "[FAIL] too many send errors for this test\n" );
234                                                 exit( 1 );
235                                         }
236                                 }
237                                 break;
238
239                         case RMR_OK:
240                                 successful = 1;
241                                 break;
242
243                         default:
244                                 if( successful ) {
245                                         fail_count++;                                                   // count failures after first successful message
246                                 }
247                                 // some error (not connected likely), don't count this
248                                 //sleep( 1 );
249                                 break;
250                 }
251
252                 if( successful ) {                              // once we have a message that was sent, start to increase things
253                         count++;
254                         mtype++;
255                         if( mtype >= max_mt ) {                 // if large number of sends don't require infinite rt entries :)
256                                 mtype = start_mt;
257                         }
258                 }
259
260                 if( rcv_fd >= 0 ) {
261                         while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) {                     // if something ready to receive (non-blocking check)
262                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
263                                         errno = 0;
264                                         rbuf = rmr_rcv_msg( mrc, rbuf );
265                                         if( rbuf ) {
266                                                 rts_ok += vet_received( me, rbuf->payload );
267                                                 rcvd_count++;
268                                         }
269                                 }
270                         }
271                 } else {                                // nano, we will only pick up one at a time.
272                         if(     (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
273                                 if( rbuf->state == RMR_OK ) {
274                                         rts_ok += vet_received( me, rbuf->payload );
275                                         rcvd_count++;
276                                 }
277                         }
278                 }
279
280                 if( time( NULL ) > timeout ) {                                          // should only happen if we never connect or nmsg > what we can send in 20sec
281                         fprintf( stderr, "sender timeout\n" );
282                         break;
283                 }
284
285                 if( delay > 0 ) {
286                         usleep( delay );
287                 }
288         }
289
290         fprintf( stderr, "<SNDR> draining begins\n" );
291         timeout = time( NULL ) + 10;                            // allow 10 seconds for the pipe to drain from the receiver
292         while( time( NULL ) < timeout ) {
293                 if( rcv_fd >= 0 ) {
294                         while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) {                   // if something ready to receive (non-blocking check)
295                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
296                                         errno = 0;
297                                         rbuf = rmr_rcv_msg( mrc, rbuf );
298                                         if( rbuf ) {
299                                                 rcvd_count++;
300                                                 rts_ok += vet_received( me, rbuf->payload );
301                                                 timeout = time( NULL ) + 10;                                                    // break 10s after last received message
302                                         }
303                                 }
304                         }
305                 } else {                                // nano, we will only pick up one at a time.
306                         if(     (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
307                                 if( rbuf->state == RMR_OK ) {
308                                         rcvd_count++;
309                                         rts_ok += vet_received( me, rbuf->payload );
310                                 }
311                         }
312                 }
313         }
314         fprintf( stderr, "<SNDR> draining finishes\n" );
315
316         if( rcvd_count != rts_ok || count != nmsgs ) {
317                 pass = 0;
318         }
319
320         fprintf( stderr, "<SNDR> [%s] sent=%d  rcvd=%d  rts-ok=%d failures=%d retries=%d\n",
321                 pass ? "PASS" : "FAIL",  count, rcvd_count, rts_ok, fail_count, rt_count );
322         rmr_close( mrc );
323
324         return !( count == nmsgs );
325 }
326