RIC:1060: Change in PTL
[ric-plt/lib/rmr.git] / test / app_test / caller.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       caller.c
23         Abstract:       This is a simple sender which will send a series of messages using
24                                 rmr_call().  N threads are started each sending the desired number
25                                 of messages and expecting an 'ack' for each. Each ack is examined
26                                 to verify that the thread id placed into the message matches (meaning
27                                 that the ack was delivered by RMr to the correct thread's chute.
28
29                                 In addition, the main thread listens for messages in order to verify
30                                 that a main or receiving thread can receive messages concurrently
31                                 while call acks are pending and being processed.
32
33                                 Message format is:
34                                         ck1 ck2|<msg-txt> @ tid<nil>
35
36                                 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
37                                 Ck2 is the simple check sum of the trace data which is a nil terminated
38                                 series of bytes.
39                                 tid is the thread id assigned by the main thread.
40
41                                 Parms:  argv[1] == number of msgs to send (10)
42                                                 argv[2] == delay                (mu-seconds, 1000000 default)
43                                                 argv[3] == number of threads (3)
44                                                 argv[4] == listen port
45
46                                 Sender will send for at most 20 seconds, so if nmsgs and delay extend
47                                 beyond that period the total number of messages sent will be less
48                                 than n.
49
50         Date:           18 April 2019
51         Author:         E. Scott Daniels
52 */
53
54 #include <unistd.h>
55 #include <errno.h>
56 #include <string.h>
57 #include <stdio.h>
58 #include <stdlib.h>
59 #include <sys/epoll.h>
60 #include <time.h>
61 #include <pthread.h>
62
63
64 #include <rmr/rmr.h>
65
66 #define TRACE_SIZE 40           // bytes in header to provide for trace junk
67 #define WBUF_SIZE 2048
68
69 /*
70         Thread data
71 */
72 typedef struct tdata {
73         int     id;                                     // the id we'll pass to RMr mt-call function NOT the thread id
74         int n2send;                             // number of messages to send
75         int delay;                              // ms delay between messages
76         void* mrc;                              // RMr context
77         int     state;
78 } tdata_t;
79
80
81
82 // --------------------------------------------------------------------------------
83
84
85 static int sum( char* str ) {
86         int sum = 0;
87         int     i = 0;
88
89         while( *str ) {
90                 sum += *(str++) + i++;
91         }
92
93         return sum % 255;
94 }
95
96
97
98 /*
99         Executed as a thread, this puppy will generate calls to ensure that we get the
100         response back to the right thread, that we can handle threads, etc.
101 */
102 static void* mk_calls( void* data ) {
103         tdata_t*        control;
104         rmr_mbuf_t*             sbuf;                                   // send buffer
105         int             count = 0;
106         int             rt_count = 0;                                   // number of messages requiring a spin retry
107         int             ok_msg = 0;                                             // received messages that were sent by us
108         int             bad_msg = 0;                                    // received messages that were sent by a different thread
109         int             drops = 0;
110         int             fail_count = 0;                                 // # of failure sends after first successful send
111         int             successful = 0;                                 // set to true after we have a successful send
112         char*   wbuf = NULL;
113         char    xbuf[1024];                                             // build transaction string here
114         char    trace[1024];
115         int             xaction_id = 1;
116         char*   tok;
117         int             state = 0;
118
119         wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
120
121         if( (control  = (tdata_t *) data) == NULL ) {
122                 fprintf( stderr, "thread data was nil; bailing out\n" );
123         }
124         //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
125
126         sbuf = rmr_alloc_msg( control->mrc, 512 );      // alloc first send buffer; subsequent buffers allcoated on send
127
128         memset( trace, 0, sizeof( trace ) );
129         while( count < control->n2send ) {                                                              // we send n messages after the first message is successful
130                 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
131                 rmr_set_trace( sbuf, trace, TRACE_SIZE );                                       // fully populate so we dont cause a buffer realloc
132
133                 snprintf( wbuf, WBUF_SIZE, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
134                 snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
135                 snprintf( xbuf, 200, "%31d", xaction_id );
136                 rmr_bytes2xact( sbuf, xbuf, 32 );
137
138                 sbuf->mtype = 5;                                                                // mtype is always 5 as the test receiver acks just mtype 5 messages
139                 sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
140                 sbuf->state = 0;
141                 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 );    // send it (send returns an empty payload on success, or the original payload on fail/retry)
142
143                 if( sbuf && sbuf->state == RMR_ERR_RETRY ) {                                    // number of times we had to spin to send
144                         rt_count++;
145                 }
146                 while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) {                         // send blocked; keep trying
147                         sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 5000 );    // call and wait up to 5s for a response
148                 }
149
150                 if( sbuf != NULL ) {
151                         switch( sbuf->state ) {
152                                 case RMR_OK:                                                    // we should have a buffer back from the sender here
153                                         successful = 1;
154                                         if( (tok = strchr( sbuf->payload, '@' )) != NULL ) {
155                                                 if( atoi( tok+1 ) == control->id ) {
156                                                         //fprintf( stderr, "<THRD> tid=%-2d ok  ack\n", control->id );
157                                                         ok_msg++;
158                                                 } else {
159                                                         bad_msg++;
160                                                         //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload );
161                                                 }
162                                         }
163                                         //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload );
164                                         // future -- verify that we see our ID at the end of the message
165                                         count++;
166                                         break;
167
168                                 default:
169                                         fprintf( stderr, "<CALLR> unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
170                                         sbuf = rmr_alloc_msg( control->mrc, 512 );                      // allocate a sendable buffer
171                                         if( successful ) {
172                                                 fail_count++;                                                   // count failures after first successful message
173                                         } else {
174                                                 // some error (not connected likely), don't count this
175                                                 sleep( 1 );
176                                         }
177                                         break;
178                         }
179                 } else {
180                         //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
181                         sbuf = rmr_alloc_msg( control->mrc, 512 );                              // loop expects an subf
182                         drops++;
183                         count++;
184                 }
185
186                 if( control->delay > 0 ) {
187                         usleep( control->delay );
188                 }
189         }
190
191         state = 1;
192         if( ok_msg < (control->n2send-1) || bad_msg > 0 ) {             // allow one drop to pass
193                 state = 0;
194         }
195         if( count < control->n2send ) {
196                 state = 0;
197         }
198
199         control->state = -state;                                // signal inactive to main thread; -1 == pass, 0 == fail
200         fprintf( stderr, "<THRD> [%s]  tid=%-2d sent=%d  ok-acks=%d bad-acks=%d  drops=%d failures=%d retries=%d\n",
201                 state ? "PASS" : "FAIL",  control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
202
203
204         return NULL;
205 }
206
207 int main( int argc, char** argv ) {
208         void* mrc;                                                      // msg router context
209         rmr_mbuf_t*     rbuf = NULL;                            // received on 'normal' flow
210         struct  epoll_event events[1];                  // list of events to give to epoll
211         struct  epoll_event epe;                                // event definition for event to listen to
212         int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
213         int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
214         int             nready;                                                 // number of events ready for receive
215         char*   listen_port = "43086";
216         long    timeout = 0;
217         int             delay = 100000;                                 // usec between send attempts
218         int             nmsgs = 10;                                             // number of messages to send
219         int             nthreads = 3;
220         tdata_t*        cvs;                                            // vector of control blocks
221         int                     i;
222         pthread_t*      pt_info;                                        // thread stuff
223         int     failures = 0;
224         int             pings = 0;                                              // number of messages received on normal channel
225
226         if( argc > 1 ) {
227                 nmsgs = atoi( argv[1] );
228         }
229         if( argc > 2 ) {
230                 delay = atoi( argv[2] );
231         }
232         if( argc > 3 ) {
233                 nthreads = atoi( argv[3] );
234         }
235         if( argc > 4 ) {
236                 listen_port = argv[4];
237         }
238
239         fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
240
241         if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) {             // initialise with multi-threaded call enabled
242                 fprintf( stderr, "<CALL> unable to initialise RMr\n" );
243                 exit( 1 );
244         }
245
246         rmr_init_trace( mrc, TRACE_SIZE );
247
248         if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                                                    // epoll only available from NNG -- skip receive later if not NNG
249                 if( rcv_fd < 0 ) {
250                         fprintf( stderr, "<CALL> unable to set up polling fd\n" );
251                         exit( 1 );
252                 }
253                 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
254                         fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
255                         exit( 1 );
256                 }
257                 epe.events = EPOLLIN;
258                 epe.data.fd = rcv_fd;
259
260                 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
261                         fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
262                         exit( 1 );
263                 }
264         } else {
265                 rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
266         }
267
268
269         cvs = malloc( sizeof( tdata_t ) * nthreads );
270         pt_info = malloc( sizeof( pthread_t ) * nthreads );
271         if( cvs == NULL ) {
272                 fprintf( stderr, "<CALL> unable to allocate control vector\n" );
273                 exit( 1 );
274         }
275
276
277         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
278         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
279                 fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
280                 sleep( 1 );
281
282                 if( time( NULL ) > timeout ) {
283                         fprintf( stderr, "<CALL> giving up\n" );
284                         exit( 1 );
285                 }
286         }
287         fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
288
289         for( i = 0; i < nthreads; i++ ) {
290                 cvs[i].mrc = mrc;
291                 cvs[i].id = i + 2;                              // we pass this as the call-id to rmr, so must be >1
292                 cvs[i].delay = delay;
293                 cvs[i].n2send = nmsgs;
294                 cvs[i].state = 1;
295
296                 pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] );         // kick a thread
297         }
298
299         timeout = time( NULL ) + 20;
300         i = 0;
301         while( nthreads > 0 ) {
302                 if( cvs[i].state < 1 ) {                        // states 0 or below indicate done. 0 == failure, -n == success
303                         nthreads--;
304                         if( cvs[i].state == 0 ) {
305                                 failures++;
306                         }
307                         i++;
308                 } else {
309                 //      sleep( 1 );
310                         rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
311                         if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
312                                 pings++;
313                                 rmr_free_msg( rbuf );
314                                 rbuf = NULL;
315                         }
316                 }
317                 if( time( NULL ) > timeout ) {
318                         failures += nthreads;
319                         fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
320                         break;
321                 }
322         }
323
324         fprintf( stderr, "<CALL> [%s] failing threads=%d  pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL",  failures, pings );
325         rmr_close( mrc );
326
327         return failures > 0;
328 }
329