feat(routing): Support session based routing
[ric-plt/lib/rmr.git] / test / app_test / sender.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sender.c
23         Abstract:       This is a simple sender which will send a series of messages.
24                                 It is expected that the first attempt(s) will fail if the receiver
25                                 is not up and this does not start decrementing the number to
26                                 send until it has a good send.
27
28                                 The process will check the receive queue and list received messages
29                                 but pass/fail is not dependent on what comes back.
30
31                                 If the receiver(s) do not become connectable in 20 sec this process
32                                 will give up and fail.
33
34
35                                 Message types will vary between 0 and 9, so the route table must
36                                 be set up to support those message types. Further, for message types
37                                 0, 1 and 2, the subscription ID will be set to type x 10, so the route
38                                 table must be set to include the sub-id for those types in order for
39                                 the messages to reach their destination.
40
41                                 Message format is:
42                                         ck1 ck2|<msg-txt><nil>
43
44                                 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
45                                 Ck2 is the simple check sum of the trace data which is a nil terminated
46                                 series of bytes.
47
48                                 Parms:  argv[1] == number of msgs to send (10)
49                                                 argv[2] == delay                (mu-seconds, 1000000 default)
50                                                 argv[3] == max msg type (not inclusive; default 10)
51                                                 argv[4] == listen port
52
53                                 Sender will send for at most 20 seconds, so if nmsgs and delay extend
54                                 beyond that period the total number of messages sent will be less
55                                 than n.
56
57         Date:           18 April 2019
58         Author:         E. Scott Daniels
59 */
60
61 #include <unistd.h>
62 #include <errno.h>
63 #include <string.h>
64 #include <stdio.h>
65 #include <stdlib.h>
66 #include <sys/epoll.h>
67 #include <time.h>
68
69 #include <rmr/rmr.h>
70
71 static int sum( char* str ) {
72         int sum = 0;
73         int     i = 0;
74
75         while( *str ) {
76                 sum += *(str++) + i++;
77         }
78
79         return sum % 255;
80 }
81
82 int main( int argc, char** argv ) {
83         void* mrc;                                                      // msg router context
84         struct  epoll_event events[1];                  // list of events to give to epoll
85         struct  epoll_event epe;                                // event definition for event to listen to
86         int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
87         int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
88         int             nready;                                                 // number of events ready for receive
89         rmr_mbuf_t*             sbuf;                                   // send buffer
90         rmr_mbuf_t*             rbuf;                                   // received buffer
91         int             count = 0;
92         int             rt_count = 0;                                   // number of messages requiring a spin retry
93         int             rcvd_count = 0;
94         int             fail_count = 0;                                 // # of failure sends after first successful send
95         char*   listen_port = "43086";
96         int             mtype = 0;
97         int             stats_freq = 100;
98         int             successful = 0;                                 // set to true after we have a successful send
99         char    wbuf[1024];
100         char    trace[1024];
101         long    timeout = 0;
102         int             delay = 100000;                                 // usec between send attempts
103         int             nmsgs = 10;                                             // number of messages to send
104         int             max_mt = 10;                                    // reset point for message type
105
106         if( argc > 1 ) {
107                 nmsgs = atoi( argv[1] );
108         }
109         if( argc > 2 ) {
110                 delay = atoi( argv[2] );
111         }
112         if( argc > 3 ) {
113                 max_mt = atoi( argv[3] );
114         }
115         if( argc > 4 ) {
116                 listen_port = argv[4];
117         }
118
119         fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
120
121         if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
122                 fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
123                 exit( 1 );
124         }
125
126         if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                    // epoll only available from NNG -- skip receive later if not NNG
127                 if( rcv_fd < 0 ) {
128                         fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
129                         exit( 1 );
130                 }
131                 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
132                         fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
133                         exit( 1 );
134                 }
135                 epe.events = EPOLLIN;
136                 epe.data.fd = rcv_fd;
137
138                 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
139                         fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
140                         exit( 1 );
141                 }
142         } else {
143                 rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
144         }
145
146         sbuf = rmr_alloc_msg( mrc, 512 );       // alloc first send buffer; subsequent buffers allcoated on send
147         //sbuf = rmr_tralloc_msg( mrc, 512, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send
148         rbuf = NULL;                                            // don't need to alloc receive buffer
149
150         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
151         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
152                 fprintf( stderr, "<SNDR> waiting for rmr to show ready\n" );
153                 sleep( 1 );
154
155                 if( time( NULL ) > timeout ) {
156                         fprintf( stderr, "<SNDR> giving up\n" );
157                         exit( 1 );
158                 }
159         }
160         fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
161
162         timeout = time( NULL ) + 20;
163
164         while( count < nmsgs ) {                                                                // we send n messages after the first message is successful
165                 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
166                 rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
167                 snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() );
168                 snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
169
170                 sbuf->mtype = mtype;                                                    // fill in the message bits
171                 if( mtype < 3 ) {
172                         sbuf->sub_id = mtype * 10;
173                 } else {
174                         sbuf->sub_id = -1;
175                 }
176
177                 sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
178                 sbuf->state = 0;
179                 sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
180
181                 switch( sbuf->state ) {
182                         case RMR_ERR_RETRY:
183                                 rt_count++;
184                                 while( sbuf->state == RMR_ERR_RETRY ) {                 // soft failure (device busy?) retry
185                                         sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
186                                 }
187                                 if( sbuf->state == RMR_OK ) {
188                                         successful = 1;                                                         // indicates only that we sent one successful message, not the current state
189                                 } else {
190                                         if( successful ) {
191                                                 fail_count++;                                                   // count failures after first successful message
192                                         }
193                                 }
194                                 break;
195
196                         case RMR_OK:
197                                 successful = 1;
198                                 break;
199
200                         default:
201                                 if( successful ) {
202                                         fail_count++;                                                   // count failures after first successful message
203                                 }
204                                 // some error (not connected likely), don't count this
205                                 //sleep( 1 );
206                                 break;
207                 }
208
209                 if( successful ) {                              // once we have a message that was sent, start to increase things
210                         count++;
211                         mtype++;
212                         if( mtype >= max_mt ) {                 // if large number of sends don't require infinite rt entries :)
213                                 mtype = 0;
214                         }
215                 }
216
217                 if( rcv_fd >= 0 ) {
218                         while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) {                     // if something ready to receive (non-blocking check)
219                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
220                                         errno = 0;
221                                         rbuf = rmr_rcv_msg( mrc, rbuf );
222                                         if( rbuf ) {
223                                                 rcvd_count++;
224                                         }
225                                 }
226                         }
227                 } else {                                // nano, we will only pick up one at a time.
228                         if(     (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
229                                 if( rbuf->state == RMR_OK ) {
230                                         rcvd_count++;
231                                 }
232                         }
233                 }
234
235                 if( time( NULL ) > timeout ) {                                          // should only happen if we never connect or nmsg > what we can send in 20sec
236                         fprintf( stderr, "sender timeout\n" );
237                         break;
238                 }
239
240                 if( delay > 0 ) {
241                         usleep( delay );
242                 }
243         }
244
245
246         timeout = time( NULL ) + 2;                             // allow 2 seconds for the pipe to drain from the receiver
247         while( time( NULL ) < timeout );
248                 if( rcv_fd >= 0 ) {
249                         while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) {                   // if something ready to receive (non-blocking check)
250                                 if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
251                                         errno = 0;
252                                         rbuf = rmr_rcv_msg( mrc, rbuf );
253                                         if( rbuf ) {
254                                                 rcvd_count++;
255                                                 timeout = time( NULL ) + 2;
256                                         }
257                                 }
258                         }
259                 } else {                                // nano, we will only pick up one at a time.
260                         if(     (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
261                                 if( rbuf->state == RMR_OK ) {
262                                         rcvd_count++;
263                                 }
264                         }
265                 }
266
267         fprintf( stderr, "<SNDR> [%s] sent=%d  rcvd-acks=%d  failures=%d retries=%d\n", count == nmsgs ? "PASS" : "FAIL",  count, rcvd_count, fail_count, rt_count );
268         rmr_close( mrc );
269
270         return !( count == nmsgs );
271 }
272