1 // :vim ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is a very simple receiver that listens for messages and
24 returns each to the sender after adding a timestamp to the
25 payload. The payload is expected to be lc_msg_t (see lcaller.c)
26 and this will update the 'turn' timestamp on receipt.
28 Define these environment variables to have some control:
29 RMR_SEED_RT -- path to the static routing table
30 RMR_RTG_SVC -- port to listen for RTG connections
33 Author: E. Scott Daniels
46 The message type placed into the payload.
48 typedef struct lc_msg {
49 struct timespec out_ts; // time just before call executed
50 struct timespec turn_ts; // time at the receiver, on receipt
51 struct timespec in_ts; // time received back by the caller
52 int out_retries; // number of retries required to send
53 int turn_retries; // number of retries required to send
56 // ----------------------------------------------------------------------------------
58 static int sum( char* str ) {
63 sum += *(str++) + i++;
70 Split the message at the first sep and return a pointer to the first
73 static char* split( char* str, char sep ) {
76 s = strchr( str, sep );
82 fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
86 int main( int argc, char** argv ) {
87 void* mrc; // msg router context
88 lc_msg_t* lmc; // latency message type from caller
89 rmr_mbuf_t* msg = NULL; // message received
92 char* listen_port = "4560";
93 long count = 0; // total received
96 int nmsgs = 10; // number of messages to stop after (argv[1] overrides)
97 int rt_count = 0; // retry count
101 data = getenv( "RMR_RTG_SVC" );
103 setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host
107 nmsgs = atoi( argv[1] );
110 listen_port = argv[2];
114 fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
116 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start your engines!
117 //mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, 0 ); // start your engines!
119 fprintf( stderr, "<RCVR> ABORT: unable to initialise RMr\n" );
123 timeout = time( NULL ) + 20;
124 while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table
125 fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
128 if( time( NULL ) > timeout ) {
129 fprintf( stderr, "<RCVR> giving up\n" );
133 fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
135 timeout = time( NULL ) + 2; // once we start, we assume if we go 2s w/o a message that we're done
136 //while( count < nmsgs ) {
139 msg = rmr_torcv_msg( mrc, msg, 1000 ); // pop every second or so to timeout if needed
142 if( msg->state == RMR_OK ) {
144 lmc = (lc_msg_t *) msg->payload;
145 clock_gettime( CLOCK_REALTIME, &lmc->turn_ts ); // mark time that we received it.
148 msg = rmr_rts_msg( mrc, msg );
150 while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
152 if( count < 1 ) { // 1st msg, so we need to connect, and we'll wait for that
156 msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
162 if( now > timeout ) {
171 fprintf( stderr, "<RCVR> %ld is finished got %ld messages\n", (long) getpid(), count );