1 // vim: ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: ex_rts_receiver.c
23 Abstract: This receiver is a bit more specitalised with respect to
24 expclicitly testing the abilty to expand a message payload
25 length when it is necessary for an appliction to send a response
26 to a message originator where the rts payload is larger than the
29 This specific test is accomplished by responding to all messages
30 with a response which is 1024 bytes in length. This means that
31 any message received which is smaller than 1K bytes will have to
32 be expanded. Further, all 1024 bytes will be generated into the
33 response, with a checksum, and the expectation is that the message
34 originator (assumed to be the v_sender) will verify that the
35 message size is as expected, and that the checksum matches.
37 This test is concerned only with functionality, and not latency
38 or speed, and as such there as not been any attempt to make the
39 building of responses etc. efficent.
42 DEBUG: write extra output about bad messages etc.
43 MTC: enable the multi-thraded call support in RMR
46 Author: E. Scott Daniels
58 #define HDR_SIZE 64 // the size of the header we place into the message
59 #define MSG_SIZE 1024 // the size of the message that will be sent (hdr+payload)
60 #define DATA_SIZE (MSG_SIZE-HDR_SIZE) // the actual 'data' length returned in the ack msg
67 #include "test_support.c" // checksum, header gen, etc.
69 int main( int argc, char** argv ) {
70 void* mrc; // msg router context
71 rmr_mbuf_t* msg = NULL; // message received
76 char* listen_port = "4560";
77 long count = 0; // total received
78 long good = 0; // good palyload buffers
79 long bad = 0; // payload buffers which were not correct
80 long bad_sid = 0; // bad subscription ids
81 long resized = 0; // number of messages we had to resize before replying
83 long rpt_timeout = 0; // next stats message
85 int nmsgs = 10; // number of messages to stop after (argv[1] overrides)
86 int rt_count = 0; // retry count
87 long ack_count = 0; // number of acks sent
88 int count_bins[11]; // histogram bins based on msg type (0-10)
89 char wbuf[1024]; // we'll pull trace data into here, and use as general working buffer
90 char sbuf[128]; // short buffer
91 char ack_header[64]; // we'll put checksum and maybe other stuff in the header of the response for validation
92 char ack_data[DATA_SIZE]; // data randomly generated for each response
93 int need; // amount of something that we need
94 int sv; // checksum valu
96 data = getenv( "RMR_RTG_SVC" );
98 setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host
102 listen_port = argv[1];
105 memset( count_bins, 0, sizeof( count_bins ) );
107 fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
110 fprintf( stderr, "<RCVR> starting in multi-threaded mode\n" );
111 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
113 fprintf( stderr, "<RCVR> starting in direct receive mode\n" );
114 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
117 fprintf( stderr, "<RCVR> ABORT: unable to initialise RMr\n" );
121 timeout = time( NULL ) + 20;
122 while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table
123 fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
126 if( time( NULL ) > timeout ) {
127 fprintf( stderr, "<RCVR> giving up\n" );
131 fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
133 timeout = time( NULL ) + 20;
135 msg = rmr_torcv_msg( mrc, msg, 1000 ); // break about every 1s so that if sender never starts we eventually escape
138 if( msg->state == RMR_OK ) {
139 if( validate_msg( msg->payload, msg->len ) ) { // defrock the header, then verify lengths and chksum
144 count++; // total messages received for stats output
146 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
147 count_bins[msg->mtype]++;
150 need = generate_payload( ack_header, ack_data, 0, 0 ); // create an ack w/ random payload in payload, and set data in header
151 if( rmr_payload_size( msg ) < need ) { // received message too small
153 msg = rmr_realloc_payload( msg, need, 0, 0 ); // reallocate the message with a payload big enough
155 fprintf( stderr, "[ERR] realloc returned a nil pointer\n" );
160 fill_payload( msg, ack_header, 0, ack_data, 0 ); // push headers (with default lengths) into message
165 msg = rmr_rts_msg( mrc, msg ); // return our ack message
167 while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
168 if( ack_count < 1 ) { // 1st ack, so we need to connect, and we'll wait for that
172 msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
174 if( msg && msg->state == RMR_OK ) { // if it eventually worked
178 timeout = time( NULL ) +20;
182 if( time( NULL ) > timeout ) {
183 fprintf( stderr, "<RCVR> stopping, no recent messages received\n" );
186 if( time( NULL ) > rpt_timeout ) {
187 fprintf( stderr, "<RCVR> %ld msgs=%ld good=%ld acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
189 rpt_timeout = time( NULL ) + 5;
195 for( i = 0; i < 11; i++ ) {
196 snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
197 strcat( wbuf, sbuf );
200 fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
201 fprintf( stderr, "<RCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld resized=%ld bad-sub_id=%ld\n",
202 !!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized, bad_sid );
204 sleep( 2 ); // let any outbound acks flow before closing
207 return !!(errors + bad); // bad rc if any are !0