3873c7799f2da4cd5d69dce3c264c2c4a8a64e39
[ric-plt/lib/rmr.git] / test / app_test / ex_rts_receiver.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       ex_rts_receiver.c
23         Abstract:       This receiver is a bit more specitalised with respect to 
24                                 expclicitly testing the abilty to expand a message payload
25                                 length when it is necessary for an appliction to send a response
26                                 to a message originator where the rts payload is larger than the
27                                 received message.
28
29                                 This specific test is accomplished by responding to all messages
30                                 with a response which is 1024 bytes in length. This means that
31                                 any message received which is smaller than 1K bytes will have to
32                                 be expanded.  Further, all 1024 bytes will be generated into the
33                                 response, with a checksum, and the expectation is that the message
34                                 originator (assumed to be the v_sender) will verify that the 
35                                 message size is as expected, and that the checksum matches.
36
37                                 This test is concerned only with functionality, and not latency
38                                 or speed, and as such there as not been any attempt to make the 
39                                 building of responses etc. efficent.
40                                 
41                                 Compile time options:
42                                                 DEBUG: write extra output about bad messages etc.
43                                                 MTC:    enable the multi-thraded call support in RMR
44
45         Date:           28 October 2019
46         Author:         E. Scott Daniels
47 */
48
49 #include <unistd.h>
50 #include <errno.h>
51 #include <stdio.h>
52 #include <stdlib.h>
53 #include <time.h>
54 #include <string.h>
55
56 #include <rmr/rmr.h>
57
58 #define HDR_SIZE 64                                                     // the size of the header we place into the message
59 #define MSG_SIZE 1024                                           // the size of the message that will be sent (hdr+payload)
60 #define DATA_SIZE (MSG_SIZE-HDR_SIZE)           // the actual 'data' length returned in the ack msg
61
62
63 #ifndef DEBUG
64 #define DEBUG 0
65 #endif
66
67 #include "test_support.c"                                       // checksum, header gen, etc.
68
69 int main( int argc, char** argv ) {
70         void* mrc;                                              // msg router context
71         rmr_mbuf_t* msg = NULL;                         // message received
72         int     i;
73         int             j;
74         int             state;
75         int             errors = 0;
76         char*   listen_port = "4560";
77         long    count = 0;                                      // total received
78         long    good = 0;                                       // good palyload buffers
79         long    bad = 0;                                        // payload buffers which were not correct
80         long    bad_sid = 0;                            // bad subscription ids
81         long    resized = 0;                            // number of messages we had to resize before replying
82         long    timeout = 0;
83         long    rpt_timeout = 0;                        // next stats message
84         char*   data;
85         int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
86         int             rt_count = 0;                           // retry count
87         long    ack_count = 0;                          // number of acks sent
88         int             count_bins[11];                         // histogram bins based on msg type (0-10)
89         char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
90         char    sbuf[128];                                      // short buffer
91         char    ack_header[64];                         // we'll put checksum and maybe other stuff in the header of the response for validation
92         char    ack_data[DATA_SIZE];            // data randomly generated for each response
93         int             need;                                           // amount of something that we need
94         int             sv;                                                     // checksum valu
95
96         data = getenv( "RMR_RTG_SVC" );
97         if( data == NULL ) {
98                 setenv( "RMR_RTG_SVC", "19289", 1 );            // set one that won't collide with the sender if on same host
99         }
100
101         if( argc > 1 ) {
102                 listen_port = argv[1];
103         }
104
105         memset( count_bins, 0, sizeof( count_bins ) );
106
107         fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
108
109 #ifdef MTC
110         fprintf( stderr, "<RCVR> starting in multi-threaded mode\n" );
111         mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
112 #else
113         fprintf( stderr, "<RCVR> starting in direct receive mode\n" );
114         mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
115 #endif
116         if( mrc == NULL ) {
117                 fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
118                 exit( 1 );
119         }
120
121         timeout = time( NULL ) + 20;
122         while( ! rmr_ready( mrc ) ) {                                                           // wait for RMr to load a route table
123                 fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
124                 sleep( 1 );
125
126                 if( time( NULL ) > timeout ) {
127                         fprintf( stderr, "<RCVR> giving up\n" );
128                         exit( 1 );
129                 }
130         }
131         fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
132
133         timeout = time( NULL ) + 20;
134         while( 1 ) {
135                 msg = rmr_torcv_msg( mrc, msg, 1000 );                          // break about every 1s so that if sender never starts we eventually escape
136
137                 if( msg ) {
138                         if( msg->state == RMR_OK ) {
139                                 if( validate_msg( msg->payload, msg->len ) ) {          // defrock the header, then verify lengths and chksum
140                                         good++;
141                                 } else {
142                                         bad++;
143                                 }
144                                 count++;                                                                                // total messages received for stats output
145
146                                 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
147                                         count_bins[msg->mtype]++;
148                                 }
149
150                                 need = generate_payload( ack_header, ack_data, 0, 0 );          // create an ack w/ random payload in payload, and set data in header
151                                 if( rmr_payload_size( msg ) < need ) {                                  // received message too small
152                                         resized++;
153                                         msg = rmr_realloc_payload( msg, need, 0, 0 );           // reallocate the message with a payload big enough
154                                         if( msg == NULL ) {
155                                                 fprintf( stderr, "[ERR] realloc returned a nil pointer\n" );
156                                                 continue;
157                                         }
158                                 }
159
160                                 fill_payload( msg, ack_header, 0, ack_data, 0 );                // push headers (with default lengths) into message
161                                 msg->mtype = 99;
162                                 msg->sub_id = -1;
163                                 msg->len = need;
164
165                                 msg = rmr_rts_msg( mrc, msg );                                                  // return our ack message
166                                 rt_count = 1000;
167                                 while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
168                                         if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
169                                                 sleep( 1 );
170                                         }
171                                         if( rt_count > 5 ) {                                                                    // but only pause for max 5sec not 1000s!
172                                                 rt_count = 5;
173                                         }
174                                         rt_count--;
175                                         msg = rmr_rts_msg( mrc, msg );                                                  // we don't try to resend if this returns retry
176                                 }
177                                 if( msg && msg->state == RMR_OK ) {                                                     // if it eventually worked
178                                         ack_count++;
179                                 }
180
181                                 timeout = time( NULL ) +20;
182                         }
183                 }
184
185                 if( time( NULL ) > timeout ) {
186                         fprintf( stderr, "<RCVR> stopping, no recent messages received\n" );
187                         break;
188                 } else {
189                         if( time( NULL ) > rpt_timeout ) {
190                                 fprintf( stderr, "<RCVR> %ld msgs=%ld good=%ld  acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
191
192                                 rpt_timeout = time( NULL ) + 5;
193                         }
194                 }
195         }
196
197         wbuf[0] = 0;
198         for( i = 0; i < 11; i++ ) {
199                 snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
200                 strcat( wbuf, sbuf );
201         }
202
203         fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
204         fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  resized=%ld bad-sub_id=%ld\n", 
205                 !!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized,  bad_sid );
206
207         sleep( 2 );                                                                     // let any outbound acks flow before closing
208
209         rmr_close( mrc );
210         return !!(errors + bad);                        // bad rc if any are !0
211 }
212