1 // vim: ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: ex_rts_receiver.c
23 Abstract: This receiver is a bit more specitalised with respect to
24 expclicitly testing the abilty to expand a message payload
25 length when it is necessary for an appliction to send a response
26 to a message originator where the rts payload is larger than the
29 This specific test is accomplished by responding to all messages
30 with a response which is 1024 bytes in length. This means that
31 any message received which is smaller than 1K bytes will have to
32 be expanded. Further, all 1024 bytes will be generated into the
33 response, with a checksum, and the expectation is that the message
34 originator (assumed to be the v_sender) will verify that the
35 message size is as expected, and that the checksum matches.
37 This test is concerned only with functionality, and not latency
38 or speed, and as such there as not been any attempt to make the
39 building of responses etc. efficent.
42 DEBUG: write extra output about bad messages etc.
43 MTC: enable the multi-thraded call support in RMR
46 Author: E. Scott Daniels
57 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone );
60 #define HDR_SIZE 64 // the size of the header we place into the message
61 #define MSG_SIZE 1024 // the size of the message that will be sent (hdr+payload)
62 #define DATA_SIZE (MSG_SIZE-HDR_SIZE) // the actual 'data' length returned in the ack msg
69 #include "test_support.c" // checksum, header gen, etc.
71 int main( int argc, char** argv ) {
72 void* mrc; // msg router context
73 rmr_mbuf_t* msg = NULL; // message received
74 rmr_mbuf_t* omsg = NULL; // original message if cloning
79 char* listen_port = "4560";
80 long count = 0; // total received
81 long good = 0; // good palyload buffers
82 long bad = 0; // payload buffers which were not correct
83 long bad_sid = 0; // bad subscription ids
84 long resized = 0; // number of messages we had to resize before replying
86 long rpt_timeout = 0; // next stats message
88 int nmsgs = 10; // number of messages to stop after (argv[1] overrides)
89 int rt_count = 0; // retry count
90 long ack_count = 0; // number of acks sent
91 int count_bins[11]; // histogram bins based on msg type (0-10)
92 char wbuf[1024]; // we'll pull trace data into here, and use as general working buffer
93 char sbuf[128]; // short buffer
94 char ack_header[64]; // we'll put checksum and maybe other stuff in the header of the response for validation
95 char ack_data[DATA_SIZE]; // data randomly generated for each response
96 int need; // amount of something that we need
97 int sv; // checksum valu
100 char osrc[128]; // src strings to test when cloning
104 int dmb_size = 2048; // default message buffer size
106 data = getenv( "RMR_RTG_SVC" );
108 setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host
112 for( i=1; i < argc && argv[i] != NULL && *(argv[i]) == '-'; i++ ) {
114 switch( *(data+1) ) {
115 case 'c': // copy clone values as xy value: 00 01 10 11
117 j = atoi( argv[i+1] );
127 listen_port = argv[i+1];
138 memset( count_bins, 0, sizeof( count_bins ) );
140 fprintf( stderr, "<EXRCVR> listening on port %s for a max of 20 seconds\n", listen_port );
141 fprintf( stderr, "<EXRCVR> copy=%d clone=%d\n", copy, clone );
144 dmb_size = 128; // not cloning, force small buffer to test larger buffer receives
147 fprintf( stderr, "<EXRCVR> default receive message buffer size: %d\n", dmb_size );
149 fprintf( stderr, "<EXRCVR> starting in multi-threaded mode\n" );
150 mrc = rmr_init( listen_port, dmb_size, RMRFL_MTCALL ); // start RMr in mt-receive mode
152 fprintf( stderr, "<EXRCVR> starting in direct receive mode\n" );
153 mrc = rmr_init( listen_port, dmb_size, RMRFL_NONE ); // start your engines!
156 fprintf( stderr, "<EXRCVR> ABORT: unable to initialise RMr\n" );
160 timeout = time( NULL ) + 20;
161 while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table
162 fprintf( stderr, "<EXRCVR> waiting for RMr to show ready\n" );
165 if( time( NULL ) > timeout ) {
166 fprintf( stderr, "<EXRCVR> giving up\n" );
170 fprintf( stderr, "<EXRCVR> rmr now shows ready, listening begins\n" );
172 timeout = time( NULL ) + 20;
174 msg = rmr_torcv_msg( mrc, msg, 1000 ); // break about every 1s so that if sender never starts we eventually escape
177 if( msg->state == RMR_OK ) {
178 if( validate_msg( msg->payload, msg->len ) ) { // defrock the header, then verify lengths and chksum
183 count++; // total messages received for stats output
185 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
186 count_bins[msg->mtype]++;
190 need = msg->len; // clone what was sent, not entire payload
192 need = generate_payload( ack_header, ack_data, 0, 0 ); // create an ack w/ random payload in payload, and set data in header
195 if( clone || rmr_payload_size( msg ) < need ) { // received message too small or we want to clone the original for test
199 rmr_get_src( omsg, osrc );
200 msg = rmr_realloc_payload( msg, need, copy, clone ); // reallocate the message with a payload big enough or clone if set
201 rmr_get_src( msg, nsrc );
203 if( msg->len != old_len ) {
204 fprintf( stderr, "[ERR] after realloc len=%d didn't match old len=%d\n", msg->len, old_len );
206 if( strcmp( osrc, nsrc ) != 0 ) {
207 fprintf( stderr, "[ERR] realloc source strings don't match (%s) new=(%s)\n", osrc, nsrc );
210 fprintf( stderr, "[OK] realloc source strings match (%s) new=(%s)\n", osrc, nsrc );
215 if( memcmp( omsg->payload, msg->payload, rmr_payload_size( omsg ) ) != 0 ) {
216 fprintf( stderr, "[ERR] realloc payload contents don't match %d bytes\n", rmr_payload_size( omsg ) );
217 //spew( omsg->payload, rmr_payload_size( omsg ) );
218 //fprintf( stderr, "\n---\n\n" );
219 //spew( msg->payload, rmr_payload_size( omsg ) );
225 rmr_free_msg( omsg ); // we're done with the old msg, and must free b/c we cloned it
228 fprintf( stderr, "[ERR] realloc returned a nil pointer\n" );
234 fill_payload( msg, ack_header, 0, ack_data, 0 ); // push headers (with default lengths) into message
239 msg = rmr_rts_msg( mrc, msg ); // return our ack message
241 while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
242 if( ack_count < 1 ) { // 1st ack, so we need to connect, and we'll wait for that
245 if( rt_count > 5 ) { // but only pause for max 5sec not 1000s!
249 msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
251 if( msg && msg->state == RMR_OK ) { // if it eventually worked
255 timeout = time( NULL ) +20;
259 if( time( NULL ) > timeout ) {
260 fprintf( stderr, "<EXRCVR> stopping, no recent messages received\n" );
263 if( time( NULL ) > rpt_timeout ) {
264 fprintf( stderr, "<EXRCVR> %ld msgs=%ld good=%ld acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
266 rpt_timeout = time( NULL ) + 5;
272 for( i = 0; i < 11; i++ ) {
273 snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
274 strcat( wbuf, sbuf );
277 fprintf( stderr, "<EXRCVR> mtype histogram: %s\n", wbuf );
278 fprintf( stderr, "<EXRCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld resized=%ld bad-sub_id=%ld\n",
279 !!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized, bad_sid );
281 sleep( 2 ); // let any outbound acks flow before closing
284 return !!(errors + bad); // bad rc if any are !0