Correct bug in payload reallocation function
[ric-plt/lib/rmr.git] / test / app_test / receiver.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_rcvr.c
23         Abstract:       This is a very simple receiver that does nothing but listen
24                                 for messages and write stats every so often to the tty.
25
26                                 The receiver expects messages which have some trace information
27                                 and a message format of:
28                                         ck1 ck2|<msg text><nil>
29
30                                 ck1 is a simple checksum of the message text (NOT including the
31                                 nil at the end of the string.
32
33                                 ck2 is a simple checksum of the trace data which for the purposes
34                                 of testing is assumed to have a terminating nil to keep this simple.
35
36                                 Good messages are messages where both computed checksums match
37                                 the ck1 and ck2 values.
38
39                                 The receiver will send an 'ack' message back to the sender for
40                                 all type 5 messages received.
41
42                                 The sender and receiver can be run on the same host/container
43                                 or on different hosts. The route table is the key to setting
44                                 things up properly.  See the sender code for rt information.
45
46                                 Define these environment variables to have some control:
47                                         RMR_SEED_RT -- path to the static routing table
48                                         RMR_RTG_SVC -- port to listen for RTG connections
49
50                                 Compile time options
51                                 if -DMTC is defined on the compile command, then RMr is initialised
52                                 with the multi-threaded receive thread rather than using the same
53                                 process receive function. All other functions in the receiver are
54                                 the same.
55
56         Date:           18 April 2019
57         Author:         E. Scott Daniels
58 */
59
60 #include <unistd.h>
61 #include <errno.h>
62 #include <stdio.h>
63 #include <stdlib.h>
64 #include <time.h>
65 #include <string.h>
66
67 #include <rmr/rmr.h>
68
69 static int sum( char* str ) {
70         int sum = 0;
71         int     i = 0;
72
73         while( *str ) {
74                 sum += *(str++) + i++;
75         }
76
77         return sum % 255;
78 }
79
80 /*
81         Split the message at the first sep and return a pointer to the first
82         character after.
83 */
84 static char* split( char* str, char sep ) {
85         char*   s;
86
87         s = strchr( str, sep );
88
89         if( s ) {
90                 return s+1;
91         }
92
93         fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
94         return NULL;
95 }
96
97 int main( int argc, char** argv ) {
98         void* mrc;                                              // msg router context
99         rmr_mbuf_t* msg = NULL;                         // message received
100         int i;
101         int             state;
102         int             errors = 0;
103         char*   listen_port = "4560";
104         long count = 0;                                         // total received
105         long good = 0;                                          // good palyload buffers
106         long bad = 0;                                           // payload buffers which were not correct
107         long bad_tr = 0;                                        // trace buffers that were not correct
108         long bad_sid = 0;                                       // bad subscription ids
109         long timeout = 0;
110         char*   data;
111         int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
112         int             rt_count = 0;                           // retry count
113         long ack_count = 0;                                     // number of acks sent
114         int             count_bins[11];                         // histogram bins based on msg type (0-10)
115         char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
116         char    sbuf[128];                                      // short buffer
117
118         data = getenv( "RMR_RTG_SVC" );
119         if( data == NULL ) {
120                 setenv( "RMR_RTG_SVC", "19289", 1 );            // set one that won't collide with the sender if on same host
121         }
122
123         if( argc > 1 ) {
124                 nmsgs = atoi( argv[1] );
125         }
126         if( argc > 2 ) {
127                 listen_port = argv[2];
128         }
129
130         memset( count_bins, 0, sizeof( count_bins ) );
131
132         fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
133
134 #ifdef MTC
135         fprintf( stderr, "<RCVR> starting in multi-threaded mode\n" );
136         mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
137 #else
138         fprintf( stderr, "<RCVR> starting in direct receive mode\n" );
139         mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
140 #endif
141         if( mrc == NULL ) {
142                 fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
143                 exit( 1 );
144         }
145
146         timeout = time( NULL ) + 20;
147         while( ! rmr_ready( mrc ) ) {                                                           // wait for RMr to load a route table
148                 fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
149                 sleep( 1 );
150
151                 if( time( NULL ) > timeout ) {
152                         fprintf( stderr, "<RCVR> giving up\n" );
153                         exit( 1 );
154                 }
155         }
156         fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
157
158         timeout = time( NULL ) + 20;
159         while( count < nmsgs ) {
160                 msg = rmr_torcv_msg( mrc, msg, 1000 );                          // wait for about 1s so that if sender never starts we eventually escape
161
162                 if( msg ) {
163                         if( msg->state == RMR_OK ) {
164                                 if( (data = split( msg->payload, '|'  )) != NULL ) {
165                                         if( sum( data ) == atoi( (char *) msg->payload ) ) {
166                                                 good++;
167                                         } else {
168                                                 fprintf( stderr, "<RCVR> chk sum bad: computed=%d expected;%d (%s)\n", sum( data ), atoi( msg->payload ), data );
169                                                 bad++;
170                                         }
171                                 }
172
173                                 if( (data = split( msg->payload, ' ' )) != NULL ) {                     // data will point to the chksum for the trace data
174                                         state = rmr_get_trace( msg, wbuf, 1024 );                               // should only copy upto the trace size; we'll check that
175                                         if( state > 128 || state < 1 ) {
176                                                 fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state );
177                                         }
178                                         if( sum( wbuf ) != atoi( data ) ) {
179                                                 fprintf( stderr, "<RCVR> trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ), atoi( data ), state, wbuf );
180                                                 bad_tr++;
181                                         }
182                                 }
183                                 count++;                                                                        // messages received for stats output
184
185                                 if( msg->mtype < 3 ) {                                                  // count number of properly set subscription id
186                                         if( msg->sub_id != msg->mtype * 10 ) {
187                                                 bad_sid++;
188                                         }
189                                 }
190
191                                 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
192                                         count_bins[msg->mtype]++;
193                                 }
194
195                                 if( msg->mtype == 5 ) {                                                                                 // send an ack; sender will count but not process, so data in message is moot
196                                         msg = rmr_rts_msg( mrc, msg );
197                                         rt_count = 1000;
198                                         while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
199                                                 if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
200                                                         sleep( 1 );
201                                                         if( rt_count > 5 ) {
202                                                                 rt_count = 5;                                                                   // but only for 5sec; not 1000sec!
203                                                         }
204                                                 }
205                                                 rt_count--;
206                                                 msg = rmr_rts_msg( mrc, msg );                                                  // we don't try to resend if this returns retry
207                                         }
208                                         if( msg && msg->state == RMR_OK ) {                                                     // if it eventually worked
209                                                 ack_count++;
210                                         }
211                                 }
212
213                                 timeout = time( NULL ) + 10;                                    // extend timeout to 10s past last received message
214                         }
215                 }
216
217                 if( time( NULL ) > timeout ) {
218                         fprintf( stderr, "receiver timed out\n" );
219                         errors++;
220                         break;
221                 }
222         }
223
224         wbuf[0] = 0;
225         for( i = 0; i < 11; i++ ) {
226                 snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
227                 strcat( wbuf, sbuf );
228         }
229
230         fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
231         fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld bad-sub_id=%ld\n", 
232                 !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
233
234         sleep( 2 );                                                                     // let any outbound acks flow before closing
235
236         rmr_close( mrc );
237         return !!(errors + bad + bad_tr);                       // bad rc if any are !0
238 }
239