feat(routing): Support session based routing
[ric-plt/lib/rmr.git] / test / app_test / receiver.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_rcvr.c
23         Abstract:       This is a very simple receiver that does nothing but listen
24                                 for messages and write stats every so often to the tty.
25
26                                 The receiver expects messages which have some trace information
27                                 and a message format of:
28                                         ck1 ck2|<msg text><nil>
29
30                                 ck1 is a simple checksum of the message text (NOT including the
31                                 nil at the end of the string.
32
33                                 ck2 is a simple checksum of the trace data which for the purposes
34                                 of testing is assumed to have a terminating nil to keep this simple.
35
36                                 Good messages are messages where both computed checksums match
37                                 the ck1 and ck2 values.
38
39                                 The receiver will send an 'ack' message back to the sender for
40                                 all type 5 messages received.
41
42                                 The sender and receiver can be run on the same host/container
43                                 or on different hosts. The route table is the key to setting
44                                 things up properly.  See the sender code for rt information.
45
46                                 Define these environment variables to have some control:
47                                         RMR_SEED_RT -- path to the static routing table
48                                         RMR_RTG_SVC -- port to listen for RTG connections
49
50         Date:           18 April 2019
51         Author:         E. Scott Daniels
52 */
53
54 #include <unistd.h>
55 #include <errno.h>
56 #include <stdio.h>
57 #include <stdlib.h>
58 #include <time.h>
59 #include <string.h>
60
61 #include <rmr/rmr.h>
62
63 static int sum( char* str ) {
64         int sum = 0;
65         int     i = 0;
66
67         while( *str ) {
68                 sum += *(str++) + i++;
69         }
70
71         return sum % 255;
72 }
73
74 /*
75         Split the message at the first sep and return a pointer to the first
76         character after.
77 */
78 static char* split( char* str, char sep ) {
79         char*   s;
80
81         s = strchr( str, sep );
82
83         if( s ) {
84                 return s+1;
85         }
86
87         fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
88         return NULL;
89 }
90
91 int main( int argc, char** argv ) {
92         void* mrc;                                              // msg router context
93         rmr_mbuf_t* msg = NULL;                         // message received
94         int i;
95         int             state;
96         int             errors = 0;
97         char*   listen_port = "4560";
98         long count = 0;                                         // total received
99         long good = 0;                                          // good palyload buffers
100         long bad = 0;                                           // payload buffers which were not correct
101         long bad_tr = 0;                                        // trace buffers that were not correct
102         long bad_sid = 0;                                       // bad subscription ids
103         long timeout = 0;
104         char*   data;
105         int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
106         int             rt_count = 0;                           // retry count
107         long ack_count = 0;                                     // number of acks sent
108         int             count_bins[11];                         // histogram bins based on msg type (0-10)
109         char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
110         char    sbuf[128];                                      // short buffer
111
112         data = getenv( "RMR_RTG_SVC" );
113         if( data == NULL ) {
114                 setenv( "RMR_RTG_SVC", "19289", 1 );            // set one that won't collide with the sender if on same host
115         }
116
117         if( argc > 1 ) {
118                 nmsgs = atoi( argv[1] );
119         }
120         if( argc > 2 ) {
121                 listen_port = argv[2];
122         }
123
124         memset( count_bins, 0, sizeof( count_bins ) );
125
126         fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
127
128         mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
129         if( mrc == NULL ) {
130                 fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
131                 exit( 1 );
132         }
133
134         timeout = time( NULL ) + 20;
135         while( ! rmr_ready( mrc ) ) {                                                           // wait for RMr to load a route table
136                 fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
137                 sleep( 1 );
138
139                 if( time( NULL ) > timeout ) {
140                         fprintf( stderr, "<RCVR> giving up\n" );
141                         exit( 1 );
142                 }
143         }
144         fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
145
146         timeout = time( NULL ) + 20;
147         while( count < nmsgs ) {
148                 msg = rmr_torcv_msg( mrc, msg, 1000 );                          // wait for about 1s so that if sender never starts we eventually escape
149
150                 if( msg ) {
151                         if( msg->state == RMR_OK ) {
152                                 if( (data = split( msg->payload, '|'  )) != NULL ) {
153                                         if( sum( data ) == atoi( (char *) msg->payload ) ) {
154                                                 good++;
155                                         } else {
156                                                 fprintf( stderr, "<RCVR> chk sum bad: computed=%d expected;%d (%s)\n", sum( data ), atoi( msg->payload ), data );
157                                                 bad++;
158                                         }
159                                 }
160
161                                 if( (data = split( msg->payload, ' ' )) != NULL ) {                     // data will point to the chksum for the trace data
162                                         state = rmr_get_trace( msg, wbuf, 1024 );                               // should only copy upto the trace size; we'll check that
163                                         if( state > 128 || state < 1 ) {
164                                                 fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state );
165                                         }
166                                         if( sum( wbuf ) != atoi( data ) ) {
167                                                 fprintf( stderr, "<RCVR> trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ), atoi( data ), state, wbuf );
168                                                 bad_tr++;
169                                         }
170                                 }
171                                 count++;                                                                        // messages received for stats output
172
173                                 if( msg->mtype < 3 ) {                                                  // count number of properly set subscription id
174                                         if( msg->sub_id != msg->mtype * 10 ) {
175                                                 bad_sid++;
176                                         }
177                                 }
178
179                                 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
180                                         count_bins[msg->mtype]++;
181                                 }
182
183                                 if( msg->mtype == 5 ) {                                                                                 // send an ack; sender will count but not process, so data in message is moot
184                                         msg = rmr_rts_msg( mrc, msg );
185                                         rt_count = 1000;
186                                         while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
187                                                 if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
188                                                         sleep( 1 );
189                                                 }
190                                                 rt_count--;
191                                                 msg = rmr_rts_msg( mrc, msg );                                                  // we don't try to resend if this returns retry
192                                         }
193                                         if( msg && msg->state == RMR_OK ) {                                                     // if it eventually worked
194                                                 ack_count++;
195                                         }
196                                 }
197                         }
198                 }
199
200                 if( time( NULL ) > timeout ) {
201                         fprintf( stderr, "receiver timed out\n" );
202                         errors++;
203                         break;
204                 }
205         }
206
207         wbuf[0] = 0;
208         for( i = 0; i < 11; i++ ) {
209                 snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
210                 strcat( wbuf, sbuf );
211         }
212
213         fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
214         fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld bad-sub_id=%ld\n", 
215                 !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
216
217         sleep( 2 );                                                                     // let any outbound acks flow before closing
218
219         rmr_close( mrc );
220         return !!(errors + bad + bad_tr);                       // bad rc if any are !0
221 }
222