1 // :vim ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is a simple sender which will send a series of messages.
24 It is expected that the first attempt(s) will fail if the receiver
25 is not up and this does not start decrementing the number to
26 send until it has a good send.
28 The process will check the receive queue and list received messages
29 but pass/fail is not dependent on what comes back.
31 If the receiver(s) do not become connectable in 20 sec this process
32 will give up and fail.
35 Message types will vary between 0 and 9, so the route table must
36 be set up to support those message types. Further, for message types
37 0, 1 and 2, the subscription ID will be set to type x 10, so the route
38 table must be set to include the sub-id for those types in order for
39 the messages to reach their destination.
42 ck1 ck2|<msg-txt><nil>
44 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
45 Ck2 is the simple check sum of the trace data which is a nil terminated
48 Parms: argv[1] == number of msgs to send (10)
49 argv[2] == delay (mu-seconds, 1000000 default)
50 argv[3] == max msg type (not inclusive; default 10)
51 argv[4] == listen port
53 Sender will send for at most 20 seconds, so if nmsgs and delay extend
54 beyond that period the total number of messages sent will be less
58 Author: E. Scott Daniels
66 #include <sys/epoll.h>
71 #define WBUF_SIZE 1024
72 #define TRACE_SIZE 1024
74 static int sum( char* str ) {
79 sum += *(str++) + i++;
86 See if my id string is in the buffer immediately after the first >.
87 Return 1 if so, 0 if not.
89 static int vet_received( char* me, char* buf ) {
92 if( (ch = strchr( buf, '>' )) == NULL ) {
96 return strcmp( me, ch+1 ) == 0;
99 int main( int argc, char** argv ) {
100 void* mrc; // msg router context
101 struct epoll_event events[1]; // list of events to give to epoll
102 struct epoll_event epe; // event definition for event to listen to
103 int ep_fd = -1; // epoll's file des (given to epoll_wait)
104 int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
105 int nready; // number of events ready for receive
106 rmr_mbuf_t* sbuf; // send buffer
107 rmr_mbuf_t* rbuf; // received buffer
110 int rt_count = 0; // number of messages requiring a spin retry
112 int rts_ok = 0; // number received with our tag
113 int fail_count = 0; // # of failure sends after first successful send
114 char* listen_port = "43086";
116 int stats_freq = 100;
117 int successful = 0; // set to true after we have a successful send
118 char* wbuf = NULL; // working buffer
119 char me[128]; // who I am to vet rts was actually from me
120 char* trace = NULL; // area to build trace data in
122 int delay = 100000; // usec between send attempts
123 int nmsgs = 10; // number of messages to send
124 int max_mt = 10; // reset point for message type
128 wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
129 trace = (char *) malloc( sizeof( char ) * TRACE_SIZE );
132 nmsgs = atoi( argv[1] );
135 delay = atoi( argv[2] );
138 if( (ch = strchr( argv[3], ':' )) != NULL ) {
139 max_mt = atoi( ch+1 );
140 start_mt = atoi( argv[3] );
142 max_mt = atoi( argv[3] );
146 listen_port = argv[4];
151 fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
153 if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
154 fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
158 if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
160 fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
163 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
164 fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
167 epe.events = EPOLLIN;
168 epe.data.fd = rcv_fd;
170 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
171 fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
175 rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
178 sbuf = rmr_alloc_msg( mrc, 1024 ); // alloc first send buffer; subsequent buffers allcoated on send
179 //sbuf = rmr_tralloc_msg( mrc, 1024, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send
180 rbuf = NULL; // don't need to alloc receive buffer
182 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
183 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
184 fprintf( stderr, "<SNDR> waiting for rmr to show ready\n" );
187 if( time( NULL ) > timeout ) {
188 fprintf( stderr, "<SNDR> giving up\n" );
192 fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
194 timeout = time( NULL ) + 20;
196 gethostname( wbuf, WBUF_SIZE );
197 snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
199 while( count < nmsgs ) { // we send n messages after the first message is successful
200 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
201 rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
202 snprintf( wbuf, 512, "count=%d tr=%s %d stand up and cheer!>%s", count, trace, rand(), me );
203 snprintf( sbuf->payload, 1024, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
205 sbuf->mtype = mtype; // fill in the message bits
207 sbuf->sub_id = mtype * 10;
212 sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string
214 sbuf = rmr_send_msg( mrc, sbuf ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
216 switch( sbuf->state ) {
219 while( time( NULL ) < timeout && sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry
220 sbuf = rmr_send_msg( mrc, sbuf ); // retry send until it's good (simple test; real programmes should do better)
222 if( sbuf->state == RMR_OK ) {
223 if( successful == 0 ) {
224 fail_count = 0; // count only after first message goes through
226 successful = 1; // indicates only that we sent one successful message, not the current state
228 fail_count++; // count failures after first successful message
229 if( !successful && fail_count > 30 ) {
230 fprintf( stderr, "[FAIL] too many send errors for this test\n" );
242 fail_count++; // count failures after first successful message
244 // some error (not connected likely), don't count this
249 if( successful ) { // once we have a message that was sent, start to increase things
252 if( mtype >= max_mt ) { // if large number of sends don't require infinite rt entries :)
258 while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) { // if something ready to receive (non-blocking check)
259 if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok
261 rbuf = rmr_rcv_msg( mrc, rbuf );
263 rts_ok += vet_received( me, rbuf->payload );
268 } else { // nano, we will only pick up one at a time.
269 if( (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
270 if( rbuf->state == RMR_OK ) {
271 rts_ok += vet_received( me, rbuf->payload );
277 if( time( NULL ) > timeout ) { // should only happen if we never connect or nmsg > what we can send in 20sec
278 fprintf( stderr, "sender timeout\n" );
287 fprintf( stderr, "<SNDR> draining begins\n" );
288 timeout = time( NULL ) + 20; // allow 20 seconds for the pipe to drain from the receiver
289 while( time( NULL ) < timeout ) {
291 while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) { // if something ready to receive (non-blocking check)
292 if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok
294 rbuf = rmr_rcv_msg( mrc, rbuf );
297 rts_ok += vet_received( me, rbuf->payload );
298 timeout = time( NULL ) + 10; // break 10s after last received message
302 } else { // nano, we will only pick up one at a time.
303 if( (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
304 if( rbuf->state == RMR_OK ) {
306 rts_ok += vet_received( me, rbuf->payload );
311 fprintf( stderr, "<SNDR> draining finishes\n" );
313 if( rcvd_count != rts_ok || count != nmsgs ) {
317 fprintf( stderr, "<SNDR> [%s] sent=%d rcvd=%d rts-ok=%d failures=%d retries=%d\n",
318 pass ? "PASS" : "FAIL", count, rcvd_count, rts_ok, fail_count, rt_count );
321 return !( count == nmsgs );