// :vim ts=4 sw=4 noet:
/*
==================================================================================
- Copyright (c) 2019 Nokia
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
ck1 ck2|<msg text><nil>
ck1 is a simple checksum of the message text (NOT including the
- nil at the end of the string.
+ nil at the end of the string.
ck2 is a simple checksum of the trace data which for the purposes
of testing is assumed to have a terminating nil to keep this simple.
Good messages are messages where both computed checksums match
- the ck1 and ck2 values.
+ the ck1 and ck2 values.
The receiver will send an 'ack' message back to the sender for
all type 5 messages received.
The sender and receiver can be run on the same host/container
- or on different hosts. The route table is the key to setting
+ or on different hosts. The route table is the key to setting
things up properly. See the sender code for rt information.
Define these environment variables to have some control:
RMR_SEED_RT -- path to the static routing table
RMR_RTG_SVC -- port to listen for RTG connections
+ Compile time options
+ if -DMTC is defined on the compile command, then RMr is initialised
+ with the multi-threaded receive thread rather than using the same
+ process receive function. All other functions in the receiver are
+ the same.
+
Date: 18 April 2019
Author: E. Scott Daniels
*/
/*
Split the message at the first sep and return a pointer to the first
- character after.
+ character after.
*/
static char* split( char* str, char sep ) {
char* s;
}
int main( int argc, char** argv ) {
- void* mrc; // msg router context
- rmr_mbuf_t* msg = NULL; // message received
+ void* mrc; // msg router context
+ rmr_mbuf_t* msg = NULL; // message received
int i;
int state;
int errors = 0;
long good = 0; // good palyload buffers
long bad = 0; // payload buffers which were not correct
long bad_tr = 0; // trace buffers that were not correct
+ long bad_sid = 0; // bad subscription ids
long timeout = 0;
char* data;
- char wbuf[1024]; // we'll pull trace data into here
int nmsgs = 10; // number of messages to stop after (argv[1] overrides)
int rt_count = 0; // retry count
long ack_count = 0; // number of acks sent
+ int count_bins[11]; // histogram bins based on msg type (0-10)
+ char wbuf[1024]; // we'll pull trace data into here, and use as general working buffer
+ char sbuf[128]; // short buffer
data = getenv( "RMR_RTG_SVC" );
if( data == NULL ) {
if( argc > 2 ) {
listen_port = argv[2];
}
-
+
+ memset( count_bins, 0, sizeof( count_bins ) );
+
fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
- mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
+#ifdef MTC
+ mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
+
+#else
+ mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
+#endif
if( mrc == NULL ) {
fprintf( stderr, "<RCVR> ABORT: unable to initialise RMr\n" );
exit( 1 );
fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
timeout = time( NULL ) + 20;
- while( count < nmsgs ) {
+ while( count < nmsgs ) {
msg = rmr_torcv_msg( mrc, msg, 1000 ); // wait for about 1s so that if sender never starts we eventually escape
-
+
if( msg ) {
if( msg->state == RMR_OK ) {
if( (data = split( msg->payload, '|' )) != NULL ) {
}
count++; // messages received for stats output
- if( msg->mtype == 5 ) { // send an ack; sender will count but not process, so data in message is moot
- msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
+ if( msg->mtype < 3 ) { // count number of properly set subscription id
+ if( msg->sub_id != msg->mtype * 10 ) {
+ bad_sid++;
+ }
+ }
+
+ if( msg->mtype >= 0 && msg->mtype <= 10 ) {
+ count_bins[msg->mtype]++;
+ }
+
+ if( msg->mtype == 5 ) { // send an ack; sender will count but not process, so data in message is moot
+ msg = rmr_rts_msg( mrc, msg );
rt_count = 1000;
while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
- if( ack_count < 1 ) { // need to connect, so hard wait
+ if( ack_count < 1 ) { // 1st ack, so we need to connect, and we'll wait for that
sleep( 1 );
}
rt_count--;
errors++;
break;
}
- }
+ }
+
+ wbuf[0] = 0;
+ for( i = 0; i < 11; i++ ) {
+ snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
+ strcat( wbuf, sbuf );
+ }
+
+ fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
+ fprintf( stderr, "<RCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld bad-trace=%ld bad-sub_id=%ld\n",
+ !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
- fprintf( stderr, "<RCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld bad-trace=%ld\n", !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr );
sleep( 2 ); // let any outbound acks flow before closing
rmr_close( mrc );