1 // vim: ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This version of the sender will perform verification on response
24 messages received back from the receiver.
26 It is expected that the response messages are created with the
27 functions in the test_support module so that they can easily be
30 This sender is designed only to test the functionality of message
31 routing, specifically the ability for a receiver to reallocate a
32 message to handle a larger payload without losing the ability to
33 use rmr_rts_msg(); no attempt has been made to be efficent, and
34 this sender should not be used for performance tests.
37 Author: E. Scott Daniels
45 #include <sys/epoll.h>
51 #define HDR_SIZE 64 // the size of the header we place into the message
52 #define MSG_SIZE 256 // toal message size sent via RMR (hdr+data)
53 #define DATA_SIZE (MSG_SIZE-HDR_SIZE) // the actual 'data' length returned in the ack msg
59 #include "test_support.c"
61 int main( int argc, char** argv ) {
62 void* mrc; // msg router context
63 struct epoll_event events[1]; // list of events to give to epoll
64 struct epoll_event epe; // event definition for event to listen to
65 int ep_fd = -1; // epoll's file des (given to epoll_wait)
66 int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
67 int nready; // number of events ready for receive
68 rmr_mbuf_t* sbuf; // send buffer
69 rmr_mbuf_t* rbuf; // received buffer
72 int short_count = 0; // number of acks that didn't seem to have a bigger payload
73 int rt_count = 0; // number of messages requiring a spin retry
75 int rts_ok = 0; // number received with our tag
76 int fail_count = 0; // # of failure sends after first successful send
77 char* listen_port = "43086";
80 int successful = 0; // set to true after we have a successful send
82 char me[256]; // who I am to vet rts was actually from me
85 long rep_timeout = 0; // report/stats timeout
86 int delay = 100000; // usec between send attempts
87 int nmsgs = 10; // number of messages to send
88 int max_mt = 10; // reset point for message type
94 nmsgs = atoi( argv[1] );
97 delay = atoi( argv[2] );
100 if( (ch = strchr( argv[3], ':' )) != NULL ) {
101 max_mt = atoi( ch+1 );
102 start_mt = atoi( argv[3] );
104 max_mt = atoi( argv[3] );
108 listen_port = argv[4];
113 fprintf( stderr, "<VSNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
115 if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
116 fprintf( stderr, "<VSNDR> unable to initialise RMr\n" );
120 if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
122 fprintf( stderr, "<VSNDR> unable to set up polling fd\n" );
125 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
126 fprintf( stderr, "<VSNDR> [FAIL] unable to create epoll fd: %d\n", errno );
129 epe.events = EPOLLIN;
130 epe.data.fd = rcv_fd;
132 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
133 fprintf( stderr, "<VSNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
137 fprintf( stderr, "<VSNDR> abort: epoll not supported, can't listen for messages\n" );
140 sbuf = rmr_alloc_msg( mrc, MSG_SIZE ); // alloc first send buffer; subsequent buffers allcoated on send
141 rbuf = NULL; // don't need to alloc receive buffer
143 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
144 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
145 fprintf( stderr, "<VSNDR> waiting for rmr to show ready\n" );
148 if( time( NULL ) > timeout ) {
149 fprintf( stderr, "<VSNDR> giving up\n" );
153 fprintf( stderr, "<VSNDR> rmr is ready; starting to send\n" );
155 gethostname( wbuf, sizeof( wbuf ) );
156 snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
158 while( count < nmsgs ) { // we send n messages after the first message is successful
159 snprintf( wbuf, DATA_SIZE, "Don't shoot the messaging library if you don't like what was in the payload (%d)", rand() ); // add random to change chksum
160 need = generate_header( sbuf->payload, wbuf, 0, 0 ); // generate the header directly into the message payload
161 if( need > MSG_SIZE ) { // we have a static size for sending; abort if it would be busted
162 fprintf( stderr, "[CRIT] abort: need for send payload size is > than allocated: %d\n", need );
166 memcpy( sbuf->payload + HDR_SIZE, wbuf, DATA_SIZE ); // copy in our data (probably not the full amount of bytes
167 sbuf->mtype = mtype; // fill in the message metadata
169 sbuf->sub_id = mtype * 10;
176 sbuf = rmr_send_msg( mrc, sbuf ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
178 switch( sbuf->state ) {
181 while( time( NULL ) < timeout && sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry
182 sbuf = rmr_send_msg( mrc, sbuf ); // retry send until it's good (simple test; real programmes should do better)
184 if( sbuf->state == RMR_OK ) {
185 if( successful == 0 ) {
186 fail_count = 0; // reset on first good message out
188 successful = 1; // indicates only that we sent one successful message, not the current state
190 fail_count++; // count failures after first successful message
191 if( ! successful && fail_count > 10 ) {
192 fprintf( stderr, "[FAIL] too many send failures\n" );
204 fail_count++; // count failures after first successful message
207 if( fail_count > 10 ) {
208 fprintf( stderr, "<VSEND> giving up\n" );
212 // some error (not connected likely), don't count this
217 if( successful ) { // once we have a message that was sent, start to increase things
220 if( mtype >= max_mt ) { // if large number of sends don't require infinite rt entries :)
226 while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) { // if something ready to receive (non-blocking check)
227 if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok
229 rbuf = rmr_rcv_msg( mrc, rbuf );
230 if( rbuf && rbuf->state == RMR_OK ) {
231 if( rmr_payload_size( rbuf ) >= HDR_SIZE+DATA_SIZE ) { // vet message
232 rts_ok += validate_msg( rbuf->payload, rbuf->len );
234 fprintf( stderr, "<VSNDR> received short response: >%d expected, got %d\n", HDR_SIZE+DATA_SIZE, rmr_payload_size( rbuf ) );
244 if( time( NULL ) > rep_timeout ) {
245 fprintf( stderr, "<VSNDR> sent=%d rcvd=%d ok_acks=%d short_acks=%d send_fails=%d retries=%d\n", count, rcvd_count, rts_ok, short_count, fail_count, rt_count );
247 rep_timeout = time( NULL ) + 5;
255 timeout = time( NULL ) + 20; // allow 20 seconds for the pipe to drain from the receiver
256 while( time( NULL ) < timeout ) {
258 while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) {
259 if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok
261 rbuf = rmr_rcv_msg( mrc, rbuf );
262 if( rbuf && rbuf->state == RMR_OK ) {
264 if( rmr_payload_size( rbuf ) >= HDR_SIZE+DATA_SIZE ) { // vet message
265 rts_ok += validate_msg( rbuf->payload, rbuf->len );
268 timeout = time( NULL ) + 10;
275 if( (rcvd_count != rts_ok) || (count != nmsgs) ) { // we might not receive all back if receiver didn't retry, so that is NOT a failure here
276 fprintf( stderr, "<VSNDR> recvd=%d rts_ok=%d short=%d count=%d nmsg=%d\n", rcvd_count, rts_ok, short_count, count, nmsgs );
280 fprintf( stderr, "<VSNDR> [%s] sent=%d rcvd=%d rts-ok=%d failures=%d retries=%d\n", pass ? "PASS" : "FAIL", count, rcvd_count, rts_ok, fail_count, rt_count );