+++ /dev/null
-/*
- *
- * Copyright 2019 AT&T Intellectual Property
- * Copyright 2019 Nokia
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// :vim ts=4 sw=4 noet:
-/*
- Mnemonic: rmr_rcvr2.c
- Abstract: Very simple test listener built on RMr libraries. It does nothing
- but return the message it recevied back to the sender.
-
- Define these environment variables to have some control:
- RMR_SEED_RT -- path to the static routing table
- RMR_RTG_SVC -- host:port of the route table generator
-
- One command line parm is accepted: stats frequency. This is a number, n,
- which causes stats to be generated after every n messages. If set to 0
- each message is written when received and no stats (msg rate) is generated.
-
- Date: 11 February 2018
- Author: E. Scott Daniels
-
- Mods: 18 Mar 2019 -- simplified for demo base.
-*/
-
-#include <unistd.h>
-#include <errno.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <time.h>
-
-#include <rmr/rmr.h>
-
-typedef struct {
- int32_t mtype; // message type ("long" network integer)
- int32_t plen; // payload length
- unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
- unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
- unsigned char src[RMR_MAX_SRC]; // name of the sender (source)
- struct timespec ts; // timestamp ???
-} mhdr_t;
-
-
-int main( int argc, char** argv ) {
- void* mrc; // msg router context
- rmr_mbuf_t* msg = NULL; // message received
- int i;
- char* listen_port;
- char* tok;
- int must_ack = 1; // flag -- if set we rts all messages
- mhdr_t* hdr;
- int last_seq = 0; // sequence number from last message
- int this_seq; // sequence number on this message
- int count = 0; // count of msg since last status
- long long tcount = 0; // total count of messages
- time_t ts;
- time_t lts;
- int stat_freq = 20000; // write stats after reciving this many messages
- int first_seq = -1; // first sequence number we got to report total received
- int max_rt = 1000; // max times we'll retry an ack
-
- if( (tok = getenv( "RMR_RCV_ACK" )) != NULL ) {
- must_ack = atoi( tok );
- }
-
- if( (listen_port = getenv( "PENDULUM_XAPP_RMR_RCV_PORT" )) == NULL ) {
- listen_port = "4560";
- }
-
- if( argc > 1 ) {
- stat_freq = atoi( argv[1] );
- }
- fprintf( stderr, "<TEST> stats will be reported every %d messages\n", stat_freq );
-
- mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
- if( mrc == NULL ) {
- fprintf( stderr, "<TEST> ABORT: unable to initialise RMr\n" );
- exit( 1 );
- }
-
- while( ! rmr_ready( mrc ) ) {
- fprintf( stderr, "<TEST> waiting for RMr to show ready\n" );
- sleep( 1 );
- }
- fprintf( stderr, "<TEST> RMr now shows ready\n" );
-
- lts = time( NULL );
- fprintf( stderr, "<TEST> listening on %s acking %s\n", listen_port, must_ack != 0 ? "on" : "off" );
-
- //rmr_set_stimeout( mrc, 50 );
- while( 1 ) {
- sleep (2 );
- msg = rmr_rcv_msg( mrc, msg ); // block until one arrives
- if( msg == NULL ) {
- continue; // shouldn't happen, but don't crash if we get nothing
- }
- if( msg->mtype < 0 || msg->state != RMR_OK ) {
- fprintf( stderr, "[WRN] bad msg: state=%d errno=%d\n", msg->state, errno );
- continue; // just loop to receive another
- }
-
- if( stat_freq == 0 ) { // mechanism to dump all received messages for quick testing
- fprintf( stdout, "<TEST> msg received: type = %d len = %d (%s)\n", msg->mtype, msg->len, msg->payload ); // assume a nil term string in payload
- }
-
- count++; // messages received for stats output
- tcount++;
-
- //if( stat_freq >= 1000 ) {
- if(1) {
- //if( (count % stat_freq) == 0 ) {
- if(1) {
- ts = time( NULL );
- if( ts - lts ) {
-
- fprintf( stderr, "<TEST> %7lld received %5lld msg/s over the last %3lld seconds mrt=%d, with content=%s\n",
- (long long) last_seq - first_seq, (long long) (count / (ts-lts)), (long long) ts-lts, max_rt,msg->payload );
- lts = ts;
- count = 0;
- }
- }
- }
-
- if( must_ack ) { // send back a response
- //fprintf( stdout, "<TEST> msg: type = %d len = %d; acking\n", msg->mtype, msg->len );
- //msg->len = snprintf( msg->payload, 1024, "bar %lld", tcount ); // ack with bar and counter
- msg->len = snprintf( msg->payload, 1024, "Reply hello back to Arduino!\n");
- // msg->len = snprintf( msg->payload, 1024, "OK\n");
-
-
- //msg->mtype = 999; //only to be used if rts is not possible
-
- //msg = rmr_send_msg (mrc, msg); //only to be used if rts is not possible
-
- msg = rmr_rts_msg( mrc, msg ); // this is a retur to sender; preferred
- //if( (msg = rmr_send_msg( mrc, msg )) != NULL ) { // this is a routed send; not preferred, but possible
- if( (msg = rmr_rts_msg( mrc, msg )) != NULL ) {
- //----- checking too many times here has been problematic and causes what appears to be race condidtions in NNG threads; for now max_rt should be small
- max_rt = 2;
- while( max_rt > 0 && msg->state != RMR_OK && errno == EAGAIN ) { // NNG likes to refuse sends, just keep trying on eagain
- max_rt--;
- rmr_rts_msg( mrc, msg );
- //rmr_send_msg (mrc, msg);
- }
- }
- }
-
- }
-}