/* * * Copyright 2019 AT&T Intellectual Property * Copyright 2019 Nokia * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ // :vim ts=4 sw=4 noet: /* Mnemonic: rmr_rcvr2.c Abstract: Very simple test listener built on RMr libraries. It does nothing but return the message it recevied back to the sender. Define these environment variables to have some control: RMR_SEED_RT -- path to the static routing table RMR_RTG_SVC -- host:port of the route table generator One command line parm is accepted: stats frequency. This is a number, n, which causes stats to be generated after every n messages. If set to 0 each message is written when received and no stats (msg rate) is generated. Date: 11 February 2018 Author: E. Scott Daniels Mods: 18 Mar 2019 -- simplified for demo base. */ #include #include #include #include #include #include typedef struct { int32_t mtype; // message type ("long" network integer) int32_t plen; // payload length unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs unsigned char src[RMR_MAX_SRC]; // name of the sender (source) struct timespec ts; // timestamp ??? } mhdr_t; int main( int argc, char** argv ) { void* mrc; // msg router context rmr_mbuf_t* msg = NULL; // message received int i; char* listen_port; char* tok; int must_ack = 1; // flag -- if set we rts all messages mhdr_t* hdr; int last_seq = 0; // sequence number from last message int this_seq; // sequence number on this message int count = 0; // count of msg since last status long long tcount = 0; // total count of messages time_t ts; time_t lts; int stat_freq = 20000; // write stats after reciving this many messages int first_seq = -1; // first sequence number we got to report total received int max_rt = 1000; // max times we'll retry an ack if( (tok = getenv( "RMR_RCV_ACK" )) != NULL ) { must_ack = atoi( tok ); } if( (listen_port = getenv( "PENDULUM_XAPP_RMR_RCV_PORT" )) == NULL ) { listen_port = "4560"; } if( argc > 1 ) { stat_freq = atoi( argv[1] ); } fprintf( stderr, " stats will be reported every %d messages\n", stat_freq ); mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines! if( mrc == NULL ) { fprintf( stderr, " ABORT: unable to initialise RMr\n" ); exit( 1 ); } while( ! rmr_ready( mrc ) ) { fprintf( stderr, " waiting for RMr to show ready\n" ); sleep( 1 ); } fprintf( stderr, " RMr now shows ready\n" ); lts = time( NULL ); fprintf( stderr, " listening on %s acking %s\n", listen_port, must_ack != 0 ? "on" : "off" ); //rmr_set_stimeout( mrc, 50 ); while( 1 ) { sleep (2 ); msg = rmr_rcv_msg( mrc, msg ); // block until one arrives if( msg == NULL ) { continue; // shouldn't happen, but don't crash if we get nothing } if( msg->mtype < 0 || msg->state != RMR_OK ) { fprintf( stderr, "[WRN] bad msg: state=%d errno=%d\n", msg->state, errno ); continue; // just loop to receive another } if( stat_freq == 0 ) { // mechanism to dump all received messages for quick testing fprintf( stdout, " msg received: type = %d len = %d (%s)\n", msg->mtype, msg->len, msg->payload ); // assume a nil term string in payload } count++; // messages received for stats output tcount++; //if( stat_freq >= 1000 ) { if(1) { //if( (count % stat_freq) == 0 ) { if(1) { ts = time( NULL ); if( ts - lts ) { fprintf( stderr, " %7lld received %5lld msg/s over the last %3lld seconds mrt=%d, with content=%s\n", (long long) last_seq - first_seq, (long long) (count / (ts-lts)), (long long) ts-lts, max_rt,msg->payload ); lts = ts; count = 0; } } } if( must_ack ) { // send back a response //fprintf( stdout, " msg: type = %d len = %d; acking\n", msg->mtype, msg->len ); //msg->len = snprintf( msg->payload, 1024, "bar %lld", tcount ); // ack with bar and counter msg->len = snprintf( msg->payload, 1024, "Reply hello back to Arduino!\n"); // msg->len = snprintf( msg->payload, 1024, "OK\n"); //msg->mtype = 999; //only to be used if rts is not possible //msg = rmr_send_msg (mrc, msg); //only to be used if rts is not possible msg = rmr_rts_msg( mrc, msg ); // this is a retur to sender; preferred //if( (msg = rmr_send_msg( mrc, msg )) != NULL ) { // this is a routed send; not preferred, but possible if( (msg = rmr_rts_msg( mrc, msg )) != NULL ) { //----- checking too many times here has been problematic and causes what appears to be race condidtions in NNG threads; for now max_rt should be small max_rt = 2; while( max_rt > 0 && msg->state != RMR_OK && errno == EAGAIN ) { // NNG likes to refuse sends, just keep trying on eagain max_rt--; rmr_rts_msg( mrc, msg ); //rmr_send_msg (mrc, msg); } } } } }