1 // vim: ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: ex_rts_receiver.c
23 Abstract: This receiver is a bit more specitalised with respect to
24 expclicitly testing the abilty to expand a message payload
25 length when it is necessary for an appliction to send a response
26 to a message originator where the rts payload is larger than the
29 This specific test is accomplished by responding to all messages
30 with a response which is 1024 bytes in length. This means that
31 any message received which is smaller than 1K bytes will have to
32 be expanded. Further, all 1024 bytes will be generated into the
33 response, with a checksum, and the expectation is that the message
34 originator (assumed to be the v_sender) will verify that the
35 message size is as expected, and that the checksum matches.
37 This test is concerned only with functionality, and not latency
38 or speed, and as such there as not been any attempt to make the
39 building of responses etc. efficent.
42 DEBUG: write extra output about bad messages etc.
43 MTC: enable the multi-thraded call support in RMR
46 Author: E. Scott Daniels
57 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone );
60 #define HDR_SIZE 64 // the size of the header we place into the message
61 #define MSG_SIZE 1024 // the size of the message that will be sent (hdr+payload)
62 #define DATA_SIZE (MSG_SIZE-HDR_SIZE) // the actual 'data' length returned in the ack msg
69 #include "test_support.c" // checksum, header gen, etc.
71 int main( int argc, char** argv ) {
72 void* mrc; // msg router context
73 rmr_mbuf_t* msg = NULL; // message received
74 rmr_mbuf_t* omsg = NULL; // original message if cloning
79 char* listen_port = "4560";
80 long count = 0; // total received
81 long good = 0; // good palyload buffers
82 long bad = 0; // payload buffers which were not correct
83 long bad_sid = 0; // bad subscription ids
84 long resized = 0; // number of messages we had to resize before replying
86 long rpt_timeout = 0; // next stats message
88 int nmsgs = 10; // number of messages to stop after (argv[1] overrides)
89 int rt_count = 0; // retry count
90 long ack_count = 0; // number of acks sent
91 int count_bins[11]; // histogram bins based on msg type (0-10)
92 char wbuf[1024]; // we'll pull trace data into here, and use as general working buffer
93 char sbuf[128]; // short buffer
94 char ack_header[64]; // we'll put checksum and maybe other stuff in the header of the response for validation
95 char ack_data[DATA_SIZE]; // data randomly generated for each response
96 int need; // amount of something that we need
97 int sv; // checksum valu
100 char osrc[128]; // src strings to test when cloning
104 data = getenv( "RMR_RTG_SVC" );
106 setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host
110 for( i=1; i < argc && argv[i] != NULL && *(argv[i]) == '-'; i++ ) {
112 switch( *(data+1) ) {
113 case 'c': // copy clone values as xy value: 00 01 10 11
115 j = atoi( argv[i+1] );
125 listen_port = argv[i+1];
136 memset( count_bins, 0, sizeof( count_bins ) );
138 fprintf( stderr, "<EXRCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
139 fprintf( stderr, "<EXRCVR> copy=%d clone=%d\n", copy, clone );
142 fprintf( stderr, "<EXRCVR> starting in multi-threaded mode\n" );
143 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
145 fprintf( stderr, "<EXRCVR> starting in direct receive mode\n" );
146 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
149 fprintf( stderr, "<EXRCVR> ABORT: unable to initialise RMr\n" );
153 timeout = time( NULL ) + 20;
154 while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table
155 fprintf( stderr, "<EXRCVR> waiting for RMr to show ready\n" );
158 if( time( NULL ) > timeout ) {
159 fprintf( stderr, "<EXRCVR> giving up\n" );
163 fprintf( stderr, "<EXRCVR> rmr now shows ready, listening begins\n" );
165 timeout = time( NULL ) + 20;
167 msg = rmr_torcv_msg( mrc, msg, 1000 ); // break about every 1s so that if sender never starts we eventually escape
170 if( msg->state == RMR_OK ) {
171 if( validate_msg( msg->payload, msg->len ) ) { // defrock the header, then verify lengths and chksum
176 count++; // total messages received for stats output
178 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
179 count_bins[msg->mtype]++;
183 need = rmr_payload_size( msg );
185 need = generate_payload( ack_header, ack_data, 0, 0 ); // create an ack w/ random payload in payload, and set data in header
188 if( clone || rmr_payload_size( msg ) < need ) { // received message too small or we want to clone the original for test
191 rmr_get_src( omsg, osrc );
192 msg = rmr_realloc_payload( msg, need, copy, clone ); // reallocate the message with a payload big enough or clone if set
193 rmr_get_src( msg, nsrc );
195 if( strcmp( osrc, nsrc ) != 0 ) {
196 fprintf( stderr, "[ERR] realloc source strings don't match (%s) new=(%s)\n", osrc, nsrc );
199 fprintf( stderr, "[OK] realloc source strings match (%s) new=(%s)\n", osrc, nsrc );
204 if( memcmp( omsg->payload, msg->payload, rmr_payload_size( omsg ) ) != 0 ) {
205 fprintf( stderr, "[ERR] realloc payload contents don't match %d bytes\n", rmr_payload_size( omsg ) );
206 //spew( omsg->payload, rmr_payload_size( omsg ) );
207 //fprintf( stderr, "\n---\n\n" );
208 //spew( msg->payload, rmr_payload_size( omsg ) );
214 rmr_free_msg( omsg ); // we're done with the old msg, and must free b/c we cloned it
217 fprintf( stderr, "[ERR] realloc returned a nil pointer\n" );
223 fill_payload( msg, ack_header, 0, ack_data, 0 ); // push headers (with default lengths) into message
228 msg = rmr_rts_msg( mrc, msg ); // return our ack message
230 while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
231 if( ack_count < 1 ) { // 1st ack, so we need to connect, and we'll wait for that
234 if( rt_count > 5 ) { // but only pause for max 5sec not 1000s!
238 msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
240 if( msg && msg->state == RMR_OK ) { // if it eventually worked
244 timeout = time( NULL ) +20;
248 if( time( NULL ) > timeout ) {
249 fprintf( stderr, "<EXRCVR> stopping, no recent messages received\n" );
252 if( time( NULL ) > rpt_timeout ) {
253 fprintf( stderr, "<EXRCVR> %ld msgs=%ld good=%ld acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
255 rpt_timeout = time( NULL ) + 5;
261 for( i = 0; i < 11; i++ ) {
262 snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
263 strcat( wbuf, sbuf );
266 fprintf( stderr, "<EXRCVR> mtype histogram: %s\n", wbuf );
267 fprintf( stderr, "<EXRCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld resized=%ld bad-sub_id=%ld\n",
268 !!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized, bad_sid );
270 sleep( 2 ); // let any outbound acks flow before closing
273 return !!(errors + bad); // bad rc if any are !0