89a0afc6beac75bc5ceea80616f2985a722e2edd
[ric-plt/lib/rmr.git] / test / app_test / sender.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4     Copyright (c) 2019 Nokia
5     Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sender.c
23         Abstract:       This is a simple sender which will send a series of messages. 
24                                 It is expected that the first attempt(s) will fail if the receiver
25                                 is not up and this does not start decrementing the number to
26                                 send until it has a good send.  
27
28                                 The process will check the receive queue and list received messages
29                                 but pass/fail is not dependent on what comes back.
30
31                                 If the receiver(s) do not become connectable in 20 sec this process
32                                 will give up and fail. 
33
34
35                                 Message types will vary between 1 and 10, so the route table must
36                                 be set up to support those message types.  
37
38                                 Message format is:
39                                         ck1 ck2|<msg-txt><nil>
40
41                                 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
42                                 Ck2 is the simple check sum of the trace data which is a nil terminated
43                                 series of bytes. 
44
45                                 Parms:  argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port
46
47                                 Sender will send for at most 20 seconds, so if nmsgs and delay extend
48                                 beyond that period the total number of messages sent will be less
49                                 than n.
50
51         Date:           18 April 2019
52         Author:         E. Scott Daniels
53 */
54
55 #include <unistd.h>
56 #include <errno.h>
57 #include <string.h>
58 #include <stdio.h>
59 #include <stdlib.h>
60 #include <sys/epoll.h>
61 #include <time.h>
62
63 #include <rmr/rmr.h>
64
65 static int sum( char* str ) {
66         int sum = 0;
67         int     i = 0;
68
69         while( *str ) {
70                 sum += *(str++) + i++;
71         }
72
73         return sum % 255;
74 }
75
76 int main( int argc, char** argv ) {
77     void* mrc;                                                  // msg router context
78     struct epoll_event events[1];                       // list of events to give to epoll
79     struct epoll_event epe;                 // event definition for event to listen to
80     int     ep_fd = -1;                                         // epoll's file des (given to epoll_wait)
81     int rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
82         int nready;                                                             // number of events ready for receive
83         rmr_mbuf_t*             sbuf;                                   // send buffer
84         rmr_mbuf_t*             rbuf;                                   // received buffer
85         int     count = 0;
86         int     rt_count = 0;                                           // number of messages requiring a spin retry
87         int     rcvd_count = 0;
88         char*   listen_port = "43086";
89         int             mtype = 0;
90         int             stats_freq = 100;
91         int             successful = 0;                                 // set to true after we have a successful send
92         char    wbuf[1024];
93         char    trace[1024];
94         long    timeout = 0;
95         int             delay = 100000;                                 // usec between send attempts
96         int             nmsgs = 10;                                             // number of messages to send
97
98         if( argc > 1 ) {
99                 nmsgs = atoi( argv[1] );
100         }
101         if( argc > 2 ) {
102                 delay = atoi( argv[2] );
103         }
104         if( argc > 3 ) {
105                 listen_port = argv[3];
106         }
107
108         fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
109
110     if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
111                 fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
112                 exit( 1 );
113         }
114
115     if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                        // epoll only available from NNG -- skip receive later if not NNG
116                 if( rcv_fd < 0 ) {
117                         fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
118                         exit( 1 );
119                 }
120                 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
121                         fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
122                         exit( 1 );
123                 }
124         epe.events = EPOLLIN;
125         epe.data.fd = rcv_fd;
126
127         if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
128                         fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
129                         exit( 1 );
130                 }
131         } else {
132                 rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
133         }
134
135         sbuf = rmr_alloc_msg( mrc, 512 );       // alloc first send buffer; subsequent buffers allcoated on send
136         //sbuf = rmr_tralloc_msg( mrc, 512, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send
137         rbuf = NULL;                                            // don't need to alloc receive buffer
138
139         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
140         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
141                 fprintf( stderr, "<SNDR> waiting for rmr to show ready\n" );
142                 sleep( 1 );
143
144                 if( time( NULL ) > timeout ) {
145                         fprintf( stderr, "<SNDR> giving up\n" );
146                         exit( 1 );
147                 }
148         }
149         fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
150         
151         timeout = time( NULL ) + 20;
152
153     while( count < nmsgs ) {                                                            // we send 10 messages after the first message is successful
154                 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
155                 rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
156                 snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() );
157                 snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
158
159                 sbuf->mtype = mtype;                                                    // fill in the message bits
160                 sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
161                 sbuf->state = 0;
162                 sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
163
164                 switch( sbuf->state ) {
165                         case RMR_ERR_RETRY:
166                                 rt_count++;
167                                 while( sbuf->state == RMR_ERR_RETRY ) {                 // soft failure (device busy?) retry
168                                         sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
169                                 }
170                                 successful = 1;
171                                 break;
172
173                         case RMR_OK:
174                                 successful = 1;
175                                 break;
176
177                         default:
178                                 // some error (not connected likely), don't count this
179                                 break;
180                 }
181
182                 if( successful ) {                              // once we have a message that was sent, start to increase things
183                         count++;
184                         mtype++;
185                         if( mtype > 10 ) {                      // if large number of sends don't require infinite rt entries :)
186                                 mtype = 1;
187                         }
188                 }
189
190                 if( rcv_fd >= 0 ) {
191                         while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) {                     // if something ready to receive (non-blocking check)
192                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
193                                         errno = 0;
194                                         rbuf = rmr_rcv_msg( mrc, rbuf );
195                                         if( rbuf ) {
196                                                 rcvd_count++;
197                                         }
198                                 }
199                         }
200                 } else {                                // nano, we will only pick up one at a time.
201                         if(     (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
202                                 if( rbuf->state == RMR_OK ) {
203                                         rcvd_count++;
204                                 }
205                         }
206                 }
207
208                 if( time( NULL ) > timeout ) {                                          // should only happen if we never connect or nmsg > what we can send in 20sec
209                         fprintf( stderr, "sender timeout\n" );
210                         break;
211                 }
212
213                 if( delay > 0 ) {
214                         usleep( delay );
215                 }
216     }
217
218         
219         timeout = time( NULL ) + 2;                             // allow 2 seconds for the pipe to drain from the receiver
220         while( time( NULL ) < timeout );
221                 if( rcv_fd >= 0 ) {
222                         while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) {                   // if something ready to receive (non-blocking check)
223                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
224                                         errno = 0;
225                                         rbuf = rmr_rcv_msg( mrc, rbuf );
226                                         if( rbuf ) {
227                                                 rcvd_count++;
228                                                 timeout = time( NULL ) + 2;
229                                         }
230                                 }
231                         }
232                 } else {                                // nano, we will only pick up one at a time.
233                         if(     (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
234                                 if( rbuf->state == RMR_OK ) {
235                                         rcvd_count++;
236                                 }
237                         }
238                 }
239
240         fprintf( stderr, "<SNDR> [%s] sent %d messages   received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL",  count, rcvd_count, rt_count );
241         rmr_close( mrc );
242
243         return !( count == nmsgs );
244 }
245