Correct bug in payload reallocation function
[ric-plt/lib/rmr.git] / test / app_test / sender.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sender.c
23         Abstract:       This is a simple sender which will send a series of messages.
24                                 It is expected that the first attempt(s) will fail if the receiver
25                                 is not up and this does not start decrementing the number to
26                                 send until it has a good send.
27
28                                 The process will check the receive queue and list received messages
29                                 but pass/fail is not dependent on what comes back.
30
31                                 If the receiver(s) do not become connectable in 20 sec this process
32                                 will give up and fail.
33
34
35                                 Message types will vary between 0 and 9, so the route table must
36                                 be set up to support those message types. Further, for message types
37                                 0, 1 and 2, the subscription ID will be set to type x 10, so the route
38                                 table must be set to include the sub-id for those types in order for
39                                 the messages to reach their destination.
40
41                                 Message format is:
42                                         ck1 ck2|<msg-txt><nil>
43
44                                 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
45                                 Ck2 is the simple check sum of the trace data which is a nil terminated
46                                 series of bytes.
47
48                                 Parms:  argv[1] == number of msgs to send (10)
49                                                 argv[2] == delay                (mu-seconds, 1000000 default)
50                                                 argv[3] == max msg type (not inclusive; default 10)
51                                                 argv[4] == listen port
52
53                                 Sender will send for at most 20 seconds, so if nmsgs and delay extend
54                                 beyond that period the total number of messages sent will be less
55                                 than n.
56
57         Date:           18 April 2019
58         Author:         E. Scott Daniels
59 */
60
61 #include <unistd.h>
62 #include <errno.h>
63 #include <string.h>
64 #include <stdio.h>
65 #include <stdlib.h>
66 #include <sys/epoll.h>
67 #include <time.h>
68
69 #include <rmr/rmr.h>
70
71 static int sum( char* str ) {
72         int sum = 0;
73         int     i = 0;
74
75         while( *str ) {
76                 sum += *(str++) + i++;
77         }
78
79         return sum % 255;
80 }
81
82 /*
83         See if my id string is in the buffer immediately after the first >.
84         Return 1 if so, 0 if not.
85 */
86 static int vet_received( char* me, char* buf ) {
87         char*   ch;
88
89         if( (ch = strchr( buf, '>' )) == NULL ) {
90                 return 0;
91         }
92
93         return strcmp( me, ch+1 ) == 0;
94 }
95
96 int main( int argc, char** argv ) {
97         void* mrc;                                                      // msg router context
98         struct  epoll_event events[1];                  // list of events to give to epoll
99         struct  epoll_event epe;                                // event definition for event to listen to
100         int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
101         int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
102         int             nready;                                                 // number of events ready for receive
103         rmr_mbuf_t*             sbuf;                                   // send buffer
104         rmr_mbuf_t*             rbuf;                                   // received buffer
105         char*   ch;
106         int             count = 0;
107         int             rt_count = 0;                                   // number of messages requiring a spin retry
108         int             rcvd_count = 0;
109         int             rts_ok = 0;                                             // number received with our tag
110         int             fail_count = 0;                                 // # of failure sends after first successful send
111         char*   listen_port = "43086";
112         int             mtype = 0;
113         int             stats_freq = 100;
114         int             successful = 0;                                 // set to true after we have a successful send
115         char    wbuf[1024];
116         char    me[128];                                                // who I am to vet rts was actually from me
117         char    trace[1024];
118         long    timeout = 0;
119         int             delay = 100000;                                 // usec between send attempts
120         int             nmsgs = 10;                                             // number of messages to send
121         int             max_mt = 10;                                    // reset point for message type
122         int             start_mt = 0;
123         int             pass = 1;
124
125         if( argc > 1 ) {
126                 nmsgs = atoi( argv[1] );
127         }
128         if( argc > 2 ) {
129                 delay = atoi( argv[2] );
130         }
131         if( argc > 3 ) {
132                 if( (ch = strchr( argv[3], ':' )) != NULL ) {
133                         max_mt = atoi( ch+1 );
134                         start_mt = atoi( argv[3] );
135                 } else {
136                         max_mt = atoi( argv[3] );
137                 }
138         }
139         if( argc > 4 ) {
140                 listen_port = argv[4];
141         }
142
143         mtype = start_mt;
144
145         fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
146
147         if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
148                 fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
149                 exit( 1 );
150         }
151
152         if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                    // epoll only available from NNG -- skip receive later if not NNG
153                 if( rcv_fd < 0 ) {
154                         fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
155                         exit( 1 );
156                 }
157                 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
158                         fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
159                         exit( 1 );
160                 }
161                 epe.events = EPOLLIN;
162                 epe.data.fd = rcv_fd;
163
164                 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
165                         fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
166                         exit( 1 );
167                 }
168         } else {
169                 rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
170         }
171
172         sbuf = rmr_alloc_msg( mrc, 1024 );      // alloc first send buffer; subsequent buffers allcoated on send
173         //sbuf = rmr_tralloc_msg( mrc, 1024, 11, "xxxxxxxxxx" );        // alloc first send buffer; subsequent buffers allcoated on send
174         rbuf = NULL;                                            // don't need to alloc receive buffer
175
176         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
177         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
178                 fprintf( stderr, "<SNDR> waiting for rmr to show ready\n" );
179                 sleep( 1 );
180
181                 if( time( NULL ) > timeout ) {
182                         fprintf( stderr, "<SNDR> giving up\n" );
183                         exit( 1 );
184                 }
185         }
186         fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
187
188         timeout = time( NULL ) + 20;
189
190         gethostname( wbuf, sizeof( wbuf ) );
191         snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
192
193         while( count < nmsgs ) {                                                                // we send n messages after the first message is successful
194                 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
195                 rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
196                 snprintf( wbuf, 512, "count=%d tr=%s %d stand up and cheer!>%s", count, trace, rand(), me );
197                 snprintf( sbuf->payload, 1024, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
198
199                 sbuf->mtype = mtype;                                                    // fill in the message bits
200                 if( mtype < 3 ) {
201                         sbuf->sub_id = mtype * 10;
202                 } else {
203                         sbuf->sub_id = -1;
204                 }
205
206                 sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
207                 sbuf->state = 0;
208                 sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
209
210                 switch( sbuf->state ) {
211                         case RMR_ERR_RETRY:
212                                 rt_count++;
213                                 while( time( NULL ) < timeout && sbuf->state == RMR_ERR_RETRY ) {                       // soft failure (device busy?) retry
214                                         sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
215                                 }
216                                 if( sbuf->state == RMR_OK ) {
217                                         if( successful == 0 ) {
218                                                 fail_count = 0;                                                 // count only after first message goes through
219                                         }
220                                         successful = 1;                                                         // indicates only that we sent one successful message, not the current state
221                                 } else {
222                                         fail_count++;                                                   // count failures after first successful message
223                                         if( !successful && fail_count > 30 ) {
224                                                 fprintf( stderr, "[FAIL] too many send errors for this test\n" );
225                                                 exit( 1 );
226                                         }
227                                 }
228                                 break;
229
230                         case RMR_OK:
231                                 successful = 1;
232                                 break;
233
234                         default:
235                                 if( successful ) {
236                                         fail_count++;                                                   // count failures after first successful message
237                                 }
238                                 // some error (not connected likely), don't count this
239                                 //sleep( 1 );
240                                 break;
241                 }
242
243                 if( successful ) {                              // once we have a message that was sent, start to increase things
244                         count++;
245                         mtype++;
246                         if( mtype >= max_mt ) {                 // if large number of sends don't require infinite rt entries :)
247                                 mtype = start_mt;
248                         }
249                 }
250
251                 if( rcv_fd >= 0 ) {
252                         while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) {                     // if something ready to receive (non-blocking check)
253                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
254                                         errno = 0;
255                                         rbuf = rmr_rcv_msg( mrc, rbuf );
256                                         if( rbuf ) {
257                                                 rts_ok += vet_received( me, rbuf->payload );
258                                                 rcvd_count++;
259                                         }
260                                 }
261                         }
262                 } else {                                // nano, we will only pick up one at a time.
263                         if(     (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
264                                 if( rbuf->state == RMR_OK ) {
265                                         rts_ok += vet_received( me, rbuf->payload );
266                                         rcvd_count++;
267                                 }
268                         }
269                 }
270
271                 if( time( NULL ) > timeout ) {                                          // should only happen if we never connect or nmsg > what we can send in 20sec
272                         fprintf( stderr, "sender timeout\n" );
273                         break;
274                 }
275
276                 if( delay > 0 ) {
277                         usleep( delay );
278                 }
279         }
280
281         fprintf( stderr, "<SNDR> draining begins\n" );
282         timeout = time( NULL ) + 20;                            // allow 20 seconds for the pipe to drain from the receiver
283         while( time( NULL ) < timeout ) {
284                 if( rcv_fd >= 0 ) {
285                         while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) {                   // if something ready to receive (non-blocking check)
286                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
287                                         errno = 0;
288                                         rbuf = rmr_rcv_msg( mrc, rbuf );
289                                         if( rbuf ) {
290                                                 rcvd_count++;
291                                                 rts_ok += vet_received( me, rbuf->payload );
292                                                 timeout = time( NULL ) + 10;                                                    // break 10s after last received message
293                                         }
294                                 }
295                         }
296                 } else {                                // nano, we will only pick up one at a time.
297                         if(     (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
298                                 if( rbuf->state == RMR_OK ) {
299                                         rcvd_count++;
300                                         rts_ok += vet_received( me, rbuf->payload );
301                                 }
302                         }
303                 }
304         }
305         fprintf( stderr, "<SNDR> draining finishes\n" );
306
307         if( rcvd_count != rts_ok || count != nmsgs ) {
308                 pass = 0;
309         }
310
311         fprintf( stderr, "<SNDR> [%s] sent=%d  rcvd=%d  rts-ok=%d failures=%d retries=%d\n", 
312                 pass ? "PASS" : "FAIL",  count, rcvd_count, rts_ok, fail_count, rt_count );
313         rmr_close( mrc );
314
315         return !( count == nmsgs );
316 }
317