1 // :vim ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is a simple sender which will send a series of messages.
24 It is expected that the first attempt(s) will fail if the receiver
25 is not up and this does not start decrementing the number to
26 send until it has a good send.
28 The process will check the receive queue and list received messages
29 but pass/fail is not dependent on what comes back.
31 If the receiver(s) do not become connectable in 20 sec this process
32 will give up and fail.
35 Message types will vary between 1 and 10, so the route table must
36 be set up to support those message types.
39 ck1 ck2|<msg-txt><nil>
41 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
42 Ck2 is the simple check sum of the trace data which is a nil terminated
45 Parms: argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port
47 Sender will send for at most 20 seconds, so if nmsgs and delay extend
48 beyond that period the total number of messages sent will be less
52 Author: E. Scott Daniels
60 #include <sys/epoll.h>
65 static int sum( char* str ) {
70 sum += *(str++) + i++;
76 int main( int argc, char** argv ) {
77 void* mrc; // msg router context
78 struct epoll_event events[1]; // list of events to give to epoll
79 struct epoll_event epe; // event definition for event to listen to
80 int ep_fd = -1; // epoll's file des (given to epoll_wait)
81 int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
82 int nready; // number of events ready for receive
83 rmr_mbuf_t* sbuf; // send buffer
84 rmr_mbuf_t* rbuf; // received buffer
86 int rt_count = 0; // number of messages requiring a spin retry
88 char* listen_port = "43086";
91 int successful = 0; // set to true after we have a successful send
95 int delay = 100000; // usec between send attempts
96 int nmsgs = 10; // number of messages to send
99 nmsgs = atoi( argv[1] );
102 delay = atoi( argv[2] );
105 listen_port = argv[3];
108 fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
110 if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
111 fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
115 if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
117 fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
120 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
121 fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
124 epe.events = EPOLLIN;
125 epe.data.fd = rcv_fd;
127 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
128 fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
132 rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
135 sbuf = rmr_alloc_msg( mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
136 //sbuf = rmr_tralloc_msg( mrc, 512, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send
137 rbuf = NULL; // don't need to alloc receive buffer
139 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
140 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
141 fprintf( stderr, "<SNDR> waiting for rmr to show ready\n" );
144 if( time( NULL ) > timeout ) {
145 fprintf( stderr, "<SNDR> giving up\n" );
149 fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
151 timeout = time( NULL ) + 20;
153 while( count < nmsgs ) { // we send 10 messages after the first message is successful
154 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
155 rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
156 snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() );
157 snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
159 sbuf->mtype = mtype; // fill in the message bits
160 sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string
162 sbuf = rmr_send_msg( mrc, sbuf ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
164 switch( sbuf->state ) {
167 while( sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry
168 sbuf = rmr_send_msg( mrc, sbuf ); // retry send until it's good (simple test; real programmes should do better)
178 // some error (not connected likely), don't count this
182 if( successful ) { // once we have a message that was sent, start to increase things
185 if( mtype > 10 ) { // if large number of sends don't require infinite rt entries :)
191 while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) { // if something ready to receive (non-blocking check)
192 if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok
194 rbuf = rmr_rcv_msg( mrc, rbuf );
200 } else { // nano, we will only pick up one at a time.
201 if( (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
202 if( rbuf->state == RMR_OK ) {
208 if( time( NULL ) > timeout ) { // should only happen if we never connect or nmsg > what we can send in 20sec
209 fprintf( stderr, "sender timeout\n" );
219 timeout = time( NULL ) + 2; // allow 2 seconds for the pipe to drain from the receiver
220 while( time( NULL ) < timeout );
222 while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) { // if something ready to receive (non-blocking check)
223 if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok
225 rbuf = rmr_rcv_msg( mrc, rbuf );
228 timeout = time( NULL ) + 2;
232 } else { // nano, we will only pick up one at a time.
233 if( (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
234 if( rbuf->state == RMR_OK ) {
240 fprintf( stderr, "<SNDR> [%s] sent %d messages received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL", count, rcvd_count, rt_count );
243 return !( count == nmsgs );