f8700b5a6430c84843addd7832e952fb29cc478f
[ric-plt/lib/rmr.git] / test / app_test / v_sender.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sender.c
23         Abstract:       This version of the sender will perform verification on response
24                                 messages received back from the receiver.
25
26                                 It is expected that the response messages are created with the 
27                                 functions in the test_support module so that they can easily be
28                                 vetted here.
29
30                                 This sender is designed only to test the functionality of message
31                                 routing, specifically the ability for a receiver to reallocate a
32                                 message to handle a larger payload without losing the ability to
33                                 use rmr_rts_msg(); no attempt has been made to be efficent, and
34                                 this sender should not be used for performance tests.
35
36         Date:           28 October 2019
37         Author:         E. Scott Daniels
38 */
39
40 #include <unistd.h>
41 #include <errno.h>
42 #include <string.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <sys/epoll.h>
46 #include <time.h>
47
48 #include <rmr/rmr.h>
49
50
51 #define HDR_SIZE 64                                                     // the size of the header we place into the message
52 #define MSG_SIZE 256                                            // toal message size sent via RMR (hdr+data)
53 #define DATA_SIZE (MSG_SIZE-HDR_SIZE)           // the actual 'data' length returned in the ack msg
54
55 #ifndef DEBUG
56 #define DEBUG 0
57 #endif
58
59 #include "test_support.c"
60
61 int main( int argc, char** argv ) {
62         void* mrc;                                                      // msg router context
63         struct  epoll_event events[1];                  // list of events to give to epoll
64         struct  epoll_event epe;                                // event definition for event to listen to
65         int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
66         int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
67         int             nready;                                                 // number of events ready for receive
68         rmr_mbuf_t*             sbuf;                                   // send buffer
69         rmr_mbuf_t*             rbuf;                                   // received buffer
70         char*   ch;
71         int             count = 0;
72         int             short_count = 0;                                // number of acks that didn't seem to have a bigger payload
73         int             rt_count = 0;                                   // number of messages requiring a spin retry
74         int             rcvd_count = 0;
75         int             rts_ok = 0;                                             // number received with our tag
76         int             fail_count = 0;                                 // # of failure sends after first successful send
77         char*   listen_port = "43086";
78         int             mtype = 0;
79         int             stats_freq = 100;
80         int             successful = 0;                                 // set to true after we have a successful send
81         char    wbuf[DATA_SIZE];
82         char    me[128];                                                // who I am to vet rts was actually from me
83         char    trace[1024];
84         long    timeout = 0;
85         long    rep_timeout = 0;                                // report/stats timeout
86         int             delay = 100000;                                 // usec between send attempts
87         int             nmsgs = 10;                                             // number of messages to send
88         int             max_mt = 10;                                    // reset point for message type
89         int             start_mt = 0;
90         int             pass = 1;
91         int             need;
92
93         if( argc > 1 ) {
94                 nmsgs = atoi( argv[1] );
95         }
96         if( argc > 2 ) {
97                 delay = atoi( argv[2] );
98         }
99         if( argc > 3 ) {
100                 if( (ch = strchr( argv[3], ':' )) != NULL ) {
101                         max_mt = atoi( ch+1 );
102                         start_mt = atoi( argv[3] );
103                 } else {
104                         max_mt = atoi( argv[3] );
105                 }
106         }
107         if( argc > 4 ) {
108                 listen_port = argv[4];
109         }
110
111         mtype = start_mt;
112
113         fprintf( stderr, "<VSNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
114
115         if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
116                 fprintf( stderr, "<VSNDR> unable to initialise RMr\n" );
117                 exit( 1 );
118         }
119
120         if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                    // epoll only available from NNG -- skip receive later if not NNG
121                 if( rcv_fd < 0 ) {
122                         fprintf( stderr, "<VSNDR> unable to set up polling fd\n" );
123                         exit( 1 );
124                 }
125                 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
126                         fprintf( stderr, "<VSNDR> [FAIL] unable to create epoll fd: %d\n", errno );
127                         exit( 1 );
128                 }
129                 epe.events = EPOLLIN;
130                 epe.data.fd = rcv_fd;
131
132                 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
133                         fprintf( stderr, "<VSNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
134                         exit( 1 );
135                 }
136         } else {
137                 fprintf( stderr, "<VSNDR> abort: epoll not supported, can't listen for messages\n" );   
138         }
139
140         sbuf = rmr_alloc_msg( mrc, MSG_SIZE );                                          // alloc first send buffer; subsequent buffers allcoated on send
141         rbuf = NULL;                                                                                            // don't need to alloc receive buffer
142
143         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
144         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
145                 fprintf( stderr, "<VSNDR> waiting for rmr to show ready\n" );
146                 sleep( 1 );
147
148                 if( time( NULL ) > timeout ) {
149                         fprintf( stderr, "<VSNDR> giving up\n" );
150                         exit( 1 );
151                 }
152         }
153         fprintf( stderr, "<VSNDR> rmr is ready; starting to send\n" );
154
155         gethostname( wbuf, sizeof( wbuf ) );
156         snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
157
158         while( count < nmsgs ) {                                                                                // we send n messages after the first message is successful
159                 snprintf( wbuf, DATA_SIZE, "Don't shoot the messaging library if you don't like what was in the payload (%d)", rand() );        // add random to change chksum
160                 need = generate_header( sbuf->payload, wbuf, 0, 0 );            // generate the header directly into the message payload
161                 if( need > MSG_SIZE )                   {                                                       // we have a static size for sending; abort if it would be busted
162                         fprintf( stderr, "[CRIT] abort: need for send payload size is > than allocated: %d\n", need );
163                         exit( 1 );
164                 }
165
166                 memcpy( sbuf->payload + HDR_SIZE, wbuf, DATA_SIZE );            // copy in our data (probably not the full amount of bytes
167                 sbuf->mtype = mtype;                                                                            // fill in the message metadata
168                 if( mtype < 3 ) {
169                         sbuf->sub_id = mtype * 10;
170                 } else {
171                         sbuf->sub_id = -1;
172                 }
173
174                 sbuf->len =  need;
175                 sbuf->state = 0;
176                 sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
177
178                 switch( sbuf->state ) {
179                         case RMR_ERR_RETRY:
180                                 rt_count++;
181                                 while( time( NULL ) < timeout && sbuf->state == RMR_ERR_RETRY ) {                       // soft failure (device busy?) retry
182                                         sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
183                                 }
184                                 if( sbuf->state == RMR_OK ) {
185                                         if( successful == 0 ) {
186                                                 fail_count = 0;                                                 // reset on first good message out
187                                         }
188                                         successful = 1;                                                         // indicates only that we sent one successful message, not the current state
189                                 } else {
190                                         fail_count++;                                                           // count failures after first successful message
191                                         if( ! successful && fail_count > 10 ) {
192                                                 fprintf( stderr, "[FAIL] too many send failures\n" );
193                                                 exit( 1 );
194                                         }
195                                 }
196                                 break;
197
198                         case RMR_OK:
199                                 successful = 1;
200                                 break;
201
202                         default:
203                                 if( successful ) {
204                                         fail_count++;                                                   // count failures after first successful message
205                                 }
206                                 // some error (not connected likely), don't count this
207                                 //sleep( 1 );
208                                 break;
209                 }
210
211                 if( successful ) {                              // once we have a message that was sent, start to increase things
212                         count++;
213                         mtype++;
214                         if( mtype >= max_mt ) {                 // if large number of sends don't require infinite rt entries :)
215                                 mtype = start_mt;
216                         }
217                 }
218
219                 if( rcv_fd >= 0 ) {
220                         while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) {                     // if something ready to receive (non-blocking check)
221                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
222                                         errno = 0;
223                                         rbuf = rmr_rcv_msg( mrc, rbuf );
224                                         if( rbuf && rbuf->state == RMR_OK ) {
225                                                 if( rmr_payload_size( rbuf ) > HDR_SIZE+DATA_SIZE ) {           // verify that response has a larger payload than we should have sent
226                                                         rts_ok += validate_msg( rbuf->payload, rbuf->len );
227                                                 } else { 
228                                                         short_count++;
229                                                 }
230                                                 rcvd_count++;
231                                         }
232                                 }
233                         }
234                 }
235
236                 if( time( NULL ) > rep_timeout ) {
237                         fprintf( stderr, "<VSNDR> sent=%d  rcvd=%d  ok_acks=%d short_acks=%d send_fails=%d retries=%d\n", count, rcvd_count, rts_ok, short_count, fail_count, rt_count );
238
239                         rep_timeout = time( NULL ) + 5;
240                 }
241
242                 if( delay > 0 ) {
243                         usleep( delay );
244                 }
245         }
246
247         timeout = time( NULL ) + 20;                            // allow 20 seconds for the pipe to drain from the receiver
248         while( time( NULL ) < timeout ) {
249                 if( rcv_fd >= 0 ) {
250                         while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) {
251                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
252                                         errno = 0;
253                                         rbuf = rmr_rcv_msg( mrc, rbuf );
254                                         if( rbuf && rbuf->state == RMR_OK ) {
255                                                 rcvd_count++;
256                                                 if( rmr_payload_size( rbuf ) > HDR_SIZE+DATA_SIZE ) {           // verify that response has a larger payload than we should have sent
257                                                         rts_ok += validate_msg( rbuf->payload, rbuf->len );
258                                                 }
259
260                                                 timeout = time( NULL ) + 10;
261                                         }
262                                 }
263                         }
264                 }
265         }
266
267         if( rcvd_count != rts_ok || count != nmsgs ) {                  // we might not receive all back if receiver didn't retry, so that is NOT a failure here
268                 fprintf( stderr, "<VSNDR> rcvd=%d rts_ok=%d count=%d nmsg=%d\n", rcvd_count, rts_ok, count, nmsgs );
269                 pass = 0;
270         }
271
272         fprintf( stderr, "<VSNDR> [%s] sent=%d  rcvd=%d  rts-ok=%d failures=%d retries=%d\n", pass ? "PASS" : "FAIL",  count, rcvd_count, rts_ok, fail_count, rt_count );
273         rmr_close( mrc );
274
275         return !pass;
276 }
277