Added new message type constants
[ric-plt/lib/rmr.git] / test / app_test / sender.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sender.c
23         Abstract:       This is a simple sender which will send a series of messages.
24                                 It is expected that the first attempt(s) will fail if the receiver
25                                 is not up and this does not start decrementing the number to
26                                 send until it has a good send.
27
28                                 The process will check the receive queue and list received messages
29                                 but pass/fail is not dependent on what comes back.
30
31                                 If the receiver(s) do not become connectable in 20 sec this process
32                                 will give up and fail.
33
34
35                                 Message types will vary between 0 and 9, so the route table must
36                                 be set up to support those message types. Further, for message types
37                                 0, 1 and 2, the subscription ID will be set to type x 10, so the route
38                                 table must be set to include the sub-id for those types in order for
39                                 the messages to reach their destination.
40
41                                 Message format is:
42                                         ck1 ck2|<msg-txt><nil>
43
44                                 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
45                                 Ck2 is the simple check sum of the trace data which is a nil terminated
46                                 series of bytes.
47
48                                 Parms:  argv[1] == number of msgs to send (10)
49                                                 argv[2] == delay                (mu-seconds, 1000000 default)
50                                                 argv[3] == max msg type (not inclusive; default 10)
51                                                 argv[4] == listen port
52
53                                 Sender will send for at most 20 seconds, so if nmsgs and delay extend
54                                 beyond that period the total number of messages sent will be less
55                                 than n.
56
57         Date:           18 April 2019
58         Author:         E. Scott Daniels
59 */
60
61 #include <unistd.h>
62 #include <errno.h>
63 #include <string.h>
64 #include <stdio.h>
65 #include <stdlib.h>
66 #include <sys/epoll.h>
67 #include <time.h>
68
69 #include <rmr/rmr.h>
70
71 static int sum( char* str ) {
72         int sum = 0;
73         int     i = 0;
74
75         while( *str ) {
76                 sum += *(str++) + i++;
77         }
78
79         return sum % 255;
80 }
81
82 /*
83         See if my id string is in the buffer immediately after the first >.
84         Return 1 if so, 0 if not.
85 */
86 static int vet_received( char* me, char* buf ) {
87         char*   ch;
88
89         if( (ch = strchr( buf, '>' )) == NULL ) {
90                 return 0;
91         }
92
93         return strcmp( me, ch+1 ) == 0;
94 }
95
96 int main( int argc, char** argv ) {
97         void* mrc;                                                      // msg router context
98         struct  epoll_event events[1];                  // list of events to give to epoll
99         struct  epoll_event epe;                                // event definition for event to listen to
100         int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
101         int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
102         int             nready;                                                 // number of events ready for receive
103         rmr_mbuf_t*             sbuf;                                   // send buffer
104         rmr_mbuf_t*             rbuf;                                   // received buffer
105         char*   ch;
106         int             count = 0;
107         int             rt_count = 0;                                   // number of messages requiring a spin retry
108         int             rcvd_count = 0;
109         int             rts_ok = 0;                                             // number received with our tag
110         int             fail_count = 0;                                 // # of failure sends after first successful send
111         char*   listen_port = "43086";
112         int             mtype = 0;
113         int             stats_freq = 100;
114         int             successful = 0;                                 // set to true after we have a successful send
115         char    wbuf[1024];
116         char    me[128];                                                // who I am to vet rts was actually from me
117         char    trace[1024];
118         long    timeout = 0;
119         int             delay = 100000;                                 // usec between send attempts
120         int             nmsgs = 10;                                             // number of messages to send
121         int             max_mt = 10;                                    // reset point for message type
122         int             start_mt = 0;
123         int             pass = 1;
124
125         if( argc > 1 ) {
126                 nmsgs = atoi( argv[1] );
127         }
128         if( argc > 2 ) {
129                 delay = atoi( argv[2] );
130         }
131         if( argc > 3 ) {
132                 if( (ch = strchr( argv[3], ':' )) != NULL ) {
133                         max_mt = atoi( ch+1 );
134                         start_mt = atoi( argv[3] );
135                 } else {
136                         max_mt = atoi( argv[3] );
137                 }
138         }
139         if( argc > 4 ) {
140                 listen_port = argv[4];
141         }
142
143         mtype = start_mt;
144
145         fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
146
147         if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
148                 fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
149                 exit( 1 );
150         }
151
152         if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                    // epoll only available from NNG -- skip receive later if not NNG
153                 if( rcv_fd < 0 ) {
154                         fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
155                         exit( 1 );
156                 }
157                 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
158                         fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
159                         exit( 1 );
160                 }
161                 epe.events = EPOLLIN;
162                 epe.data.fd = rcv_fd;
163
164                 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
165                         fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
166                         exit( 1 );
167                 }
168         } else {
169                 rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
170         }
171
172         sbuf = rmr_alloc_msg( mrc, 1024 );      // alloc first send buffer; subsequent buffers allcoated on send
173         //sbuf = rmr_tralloc_msg( mrc, 1024, 11, "xxxxxxxxxx" );        // alloc first send buffer; subsequent buffers allcoated on send
174         rbuf = NULL;                                            // don't need to alloc receive buffer
175
176         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
177         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
178                 fprintf( stderr, "<SNDR> waiting for rmr to show ready\n" );
179                 sleep( 1 );
180
181                 if( time( NULL ) > timeout ) {
182                         fprintf( stderr, "<SNDR> giving up\n" );
183                         exit( 1 );
184                 }
185         }
186         fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
187
188         timeout = time( NULL ) + 20;
189
190         gethostname( wbuf, sizeof( wbuf ) );
191         snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
192
193         while( count < nmsgs ) {                                                                // we send n messages after the first message is successful
194                 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
195                 rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
196                 snprintf( wbuf, 512, "count=%d tr=%s %d stand up and cheer!>%s", count, trace, rand(), me );
197                 snprintf( sbuf->payload, 1024, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
198
199                 sbuf->mtype = mtype;                                                    // fill in the message bits
200                 if( mtype < 3 ) {
201                         sbuf->sub_id = mtype * 10;
202                 } else {
203                         sbuf->sub_id = -1;
204                 }
205
206                 sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
207                 sbuf->state = 0;
208                 sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
209
210                 switch( sbuf->state ) {
211                         case RMR_ERR_RETRY:
212                                 rt_count++;
213                                 while( time( NULL ) < timeout && sbuf->state == RMR_ERR_RETRY ) {                       // soft failure (device busy?) retry
214                                         sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
215                                 }
216                                 if( sbuf->state == RMR_OK ) {
217                                         successful = 1;                                                         // indicates only that we sent one successful message, not the current state
218                                 } else {
219                                         if( successful ) {
220                                                 fail_count++;                                                   // count failures after first successful message
221                                         }
222                                 }
223                                 break;
224
225                         case RMR_OK:
226                                 successful = 1;
227                                 break;
228
229                         default:
230                                 if( successful ) {
231                                         fail_count++;                                                   // count failures after first successful message
232                                 }
233                                 // some error (not connected likely), don't count this
234                                 //sleep( 1 );
235                                 break;
236                 }
237
238                 if( successful ) {                              // once we have a message that was sent, start to increase things
239                         count++;
240                         mtype++;
241                         if( mtype >= max_mt ) {                 // if large number of sends don't require infinite rt entries :)
242                                 mtype = start_mt;
243                         }
244                 }
245
246                 if( rcv_fd >= 0 ) {
247                         while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) {                     // if something ready to receive (non-blocking check)
248                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
249                                         errno = 0;
250                                         rbuf = rmr_rcv_msg( mrc, rbuf );
251                                         if( rbuf ) {
252                                                 rts_ok += vet_received( me, rbuf->payload );
253                                                 rcvd_count++;
254                                         }
255                                 }
256                         }
257                 } else {                                // nano, we will only pick up one at a time.
258                         if(     (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
259                                 if( rbuf->state == RMR_OK ) {
260                                         rts_ok += vet_received( me, rbuf->payload );
261                                         rcvd_count++;
262                                 }
263                         }
264                 }
265
266                 if( time( NULL ) > timeout ) {                                          // should only happen if we never connect or nmsg > what we can send in 20sec
267                         fprintf( stderr, "sender timeout\n" );
268                         break;
269                 }
270
271                 if( delay > 0 ) {
272                         usleep( delay );
273                 }
274         }
275
276         timeout = time( NULL ) + 2;                             // allow 2 seconds for the pipe to drain from the receiver
277         while( time( NULL ) < timeout );
278                 if( rcv_fd >= 0 ) {
279                         while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) {                   // if something ready to receive (non-blocking check)
280                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
281                                         errno = 0;
282                                         rbuf = rmr_rcv_msg( mrc, rbuf );
283                                         if( rbuf ) {
284                                                 rcvd_count++;
285                                                 rts_ok += vet_received( me, rbuf->payload );
286                                                 timeout = time( NULL ) + 2;
287                                         }
288                                 }
289                         }
290                 } else {                                // nano, we will only pick up one at a time.
291                         if(     (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
292                                 if( rbuf->state == RMR_OK ) {
293                                         rcvd_count++;
294                                         rts_ok += vet_received( me, rbuf->payload );
295                                 }
296                         }
297                 }
298
299         if( rcvd_count != rts_ok || count != nmsgs ) {
300                 pass = 0;
301         }
302
303         fprintf( stderr, "<SNDR> [%s] sent=%d  rcvd=%d  rts-ok=%d failures=%d retries=%d\n", 
304                 pass ? "PASS" : "FAIL",  count, rcvd_count, rts_ok, fail_count, rt_count );
305         rmr_close( mrc );
306
307         return !( count == nmsgs );
308 }
309