1 // :vim ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is a very simple receiver that does nothing but listen
24 for messages and write stats every so often to the tty.
26 The receiver expects messages which have some trace information
27 and a message format of:
28 ck1 ck2|<msg text><nil>
30 ck1 is a simple checksum of the message text (NOT including the
31 nil at the end of the string.
33 ck2 is a simple checksum of the trace data which for the purposes
34 of testing is assumed to have a terminating nil to keep this simple.
36 Good messages are messages where both computed checksums match
37 the ck1 and ck2 values.
39 The receiver will send an 'ack' message back to the sender for
40 all type 5 messages received.
42 The sender and receiver can be run on the same host/container
43 or on different hosts. The route table is the key to setting
44 things up properly. See the sender code for rt information.
46 Define these environment variables to have some control:
47 RMR_SEED_RT -- path to the static routing table
48 RMR_RTG_SVC -- port to listen for RTG connections
51 if -DMTC is defined on the compile command, then RMr is initialised
52 with the multi-threaded receive thread rather than using the same
53 process receive function. All other functions in the receiver are
57 Author: E. Scott Daniels
69 static int sum( char* str ) {
74 sum += *(str++) + i++;
81 Split the message at the first sep and return a pointer to the first
84 static char* split( char* str, char sep ) {
87 s = strchr( str, sep );
93 fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
97 int main( int argc, char** argv ) {
98 void* mrc; // msg router context
99 rmr_mbuf_t* msg = NULL; // message received
103 char* listen_port = "4560";
104 long count = 0; // total received
105 long good = 0; // good palyload buffers
106 long bad = 0; // payload buffers which were not correct
107 long bad_tr = 0; // trace buffers that were not correct
108 long bad_sid = 0; // bad subscription ids
111 int nmsgs = 10; // number of messages to stop after (argv[1] overrides)
112 int rt_count = 0; // retry count
113 long ack_count = 0; // number of acks sent
114 int count_bins[11]; // histogram bins based on msg type (0-10)
115 char wbuf[1024]; // we'll pull trace data into here, and use as general working buffer
116 char sbuf[128]; // short buffer
118 data = getenv( "RMR_RTG_SVC" );
120 setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host
124 nmsgs = atoi( argv[1] );
127 listen_port = argv[2];
130 memset( count_bins, 0, sizeof( count_bins ) );
132 fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
135 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
138 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
141 fprintf( stderr, "<RCVR> ABORT: unable to initialise RMr\n" );
145 timeout = time( NULL ) + 20;
146 while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table
147 fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
150 if( time( NULL ) > timeout ) {
151 fprintf( stderr, "<RCVR> giving up\n" );
155 fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
157 timeout = time( NULL ) + 20;
158 while( count < nmsgs ) {
159 msg = rmr_torcv_msg( mrc, msg, 1000 ); // wait for about 1s so that if sender never starts we eventually escape
162 if( msg->state == RMR_OK ) {
163 if( (data = split( msg->payload, '|' )) != NULL ) {
164 if( sum( data ) == atoi( (char *) msg->payload ) ) {
167 fprintf( stderr, "<RCVR> chk sum bad: computed=%d expected;%d (%s)\n", sum( data ), atoi( msg->payload ), data );
172 if( (data = split( msg->payload, ' ' )) != NULL ) { // data will point to the chksum for the trace data
173 state = rmr_get_trace( msg, wbuf, 1024 ); // should only copy upto the trace size; we'll check that
174 if( state > 128 || state < 1 ) {
175 fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state );
177 if( sum( wbuf ) != atoi( data ) ) {
178 fprintf( stderr, "<RCVR> trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ), atoi( data ), state, wbuf );
182 count++; // messages received for stats output
184 if( msg->mtype < 3 ) { // count number of properly set subscription id
185 if( msg->sub_id != msg->mtype * 10 ) {
190 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
191 count_bins[msg->mtype]++;
194 if( msg->mtype == 5 ) { // send an ack; sender will count but not process, so data in message is moot
195 msg = rmr_rts_msg( mrc, msg );
197 while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
198 if( ack_count < 1 ) { // 1st ack, so we need to connect, and we'll wait for that
202 msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
204 if( msg && msg->state == RMR_OK ) { // if it eventually worked
211 if( time( NULL ) > timeout ) {
212 fprintf( stderr, "receiver timed out\n" );
219 for( i = 0; i < 11; i++ ) {
220 snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
221 strcat( wbuf, sbuf );
224 fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
225 fprintf( stderr, "<RCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld bad-trace=%ld bad-sub_id=%ld\n",
226 !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
228 sleep( 2 ); // let any outbound acks flow before closing
231 return !!(errors + bad + bad_tr); // bad rc if any are !0