3 * Copyright 2019 AT&T Intellectual Property
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
20 // :vim ts=4 sw=4 noet:
23 Abstract: Very simple test listener built on RMr libraries. It does nothing
24 but return the message it recevied back to the sender.
26 Define these environment variables to have some control:
27 RMR_SEED_RT -- path to the static routing table
28 RMR_RTG_SVC -- host:port of the route table generator
30 One command line parm is accepted: stats frequency. This is a number, n,
31 which causes stats to be generated after every n messages. If set to 0
32 each message is written when received and no stats (msg rate) is generated.
34 Date: 11 February 2018
35 Author: E. Scott Daniels
37 Mods: 18 Mar 2019 -- simplified for demo base.
49 int32_t mtype; // message type ("long" network integer)
50 int32_t plen; // payload length
51 unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
52 unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
53 unsigned char src[RMR_MAX_SRC]; // name of the sender (source)
54 struct timespec ts; // timestamp ???
58 int main( int argc, char** argv ) {
59 void* mrc; // msg router context
60 rmr_mbuf_t* msg = NULL; // message received
64 int must_ack = 1; // flag -- if set we rts all messages
66 int last_seq = 0; // sequence number from last message
67 int this_seq; // sequence number on this message
68 int count = 0; // count of msg since last status
69 long long tcount = 0; // total count of messages
72 int stat_freq = 20000; // write stats after reciving this many messages
73 int first_seq = -1; // first sequence number we got to report total received
74 int max_rt = 1000; // max times we'll retry an ack
76 if( (tok = getenv( "RMR_RCV_ACK" )) != NULL ) {
77 must_ack = atoi( tok );
80 if( (listen_port = getenv( "PENDULUM_XAPP_RMR_RCV_PORT" )) == NULL ) {
85 stat_freq = atoi( argv[1] );
87 fprintf( stderr, "<TEST> stats will be reported every %d messages\n", stat_freq );
89 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
91 fprintf( stderr, "<TEST> ABORT: unable to initialise RMr\n" );
95 while( ! rmr_ready( mrc ) ) {
96 fprintf( stderr, "<TEST> waiting for RMr to show ready\n" );
99 fprintf( stderr, "<TEST> RMr now shows ready\n" );
102 fprintf( stderr, "<TEST> listening on %s acking %s\n", listen_port, must_ack != 0 ? "on" : "off" );
104 //rmr_set_stimeout( mrc, 50 );
107 msg = rmr_rcv_msg( mrc, msg ); // block until one arrives
109 continue; // shouldn't happen, but don't crash if we get nothing
111 if( msg->mtype < 0 || msg->state != RMR_OK ) {
112 fprintf( stderr, "[WRN] bad msg: state=%d errno=%d\n", msg->state, errno );
113 continue; // just loop to receive another
116 if( stat_freq == 0 ) { // mechanism to dump all received messages for quick testing
117 fprintf( stdout, "<TEST> msg received: type = %d len = %d (%s)\n", msg->mtype, msg->len, msg->payload ); // assume a nil term string in payload
120 count++; // messages received for stats output
123 //if( stat_freq >= 1000 ) {
125 //if( (count % stat_freq) == 0 ) {
130 fprintf( stderr, "<TEST> %7lld received %5lld msg/s over the last %3lld seconds mrt=%d, with content=%s\n",
131 (long long) last_seq - first_seq, (long long) (count / (ts-lts)), (long long) ts-lts, max_rt,msg->payload );
138 if( must_ack ) { // send back a response
139 //fprintf( stdout, "<TEST> msg: type = %d len = %d; acking\n", msg->mtype, msg->len );
140 //msg->len = snprintf( msg->payload, 1024, "bar %lld", tcount ); // ack with bar and counter
141 msg->len = snprintf( msg->payload, 1024, "Reply hello back to Arduino!\n");
142 // msg->len = snprintf( msg->payload, 1024, "OK\n");
145 //msg->mtype = 999; //only to be used if rts is not possible
147 //msg = rmr_send_msg (mrc, msg); //only to be used if rts is not possible
149 msg = rmr_rts_msg( mrc, msg ); // this is a retur to sender; preferred
150 //if( (msg = rmr_send_msg( mrc, msg )) != NULL ) { // this is a routed send; not preferred, but possible
151 if( (msg = rmr_rts_msg( mrc, msg )) != NULL ) {
152 //----- checking too many times here has been problematic and causes what appears to be race condidtions in NNG threads; for now max_rt should be small
154 while( max_rt > 0 && msg->state != RMR_OK && errno == EAGAIN ) { // NNG likes to refuse sends, just keep trying on eagain
156 rmr_rts_msg( mrc, msg );
157 //rmr_send_msg (mrc, msg);