1 // :vim ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: mt_listener.c
23 Abstract: This simple application runs multiple "listener" threads. Each thread
24 receives from a single RMR context to validate the ability spin
25 several listening threads in an application.
28 ck1 ck2|<msg-txt> @ tid<nil>
30 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
31 Ck2 is the simple check sum of the trace data which is a nil terminated
33 tid is the thread id assigned by the main thread.
35 Parms: argv[1] == number of msgs to send (10)
36 argv[2] == delay (mu-seconds, 1000000 default)
37 argv[3] == number of threads (3)
38 argv[4] == listen port
40 Sender will send for at most 20 seconds, so if nmsgs and delay extend
41 beyond that period the total number of messages sent will be less
45 Author: E. Scott Daniels
53 #include <sys/epoll.h>
59 #include "time_tools.c" // our time based test tools
61 #define TRACE_SIZE 40 // bytes in header to provide for trace junk
62 #define WBUF_SIZE 2048
67 typedef struct tdata {
68 int id; // the id we'll pass to RMr mt-call function NOT the thread id
69 int n2get; // number of messages to expect
70 int delay; // max delay waiting for n2get messages
71 void* mrc; // RMr context
77 // --------------------------------------------------------------------------------
80 static int sum( char* str ) {
85 sum += *(str++) + i++;
92 Split the message at the first sep and return a pointer to the first
95 static char* split( char* str, char sep ) {
98 s = strchr( str, sep );
104 //fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
109 Executed as a thread, this puppy will listen for messages and report
112 static void* mk_calls( void* data ) {
114 rmr_mbuf_t* msg = NULL; // message
115 int* count_bins = NULL;
120 char* msg_data; // bits after checksum info in payload
121 long good = 0; // counters
124 long count = 0; // total msgs received
125 struct timespec start_ts;
126 struct timespec end_ts;
127 int elap; // elapsed time to receive messages
130 count_bins = (int *) malloc( sizeof( int ) * 11 );
132 wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
134 if( (control = (tdata_t *) data) == NULL ) {
135 fprintf( stderr, "thread data was nil; bailing out\n" );
137 fprintf( stderr, "<THRD> id=%d thread receiver started expecting=%d messages timeout=%d seconds\n",
138 control->id, control->n2get, control->delay );
140 timeout = time( NULL ) + control->delay; // max time to wait for a good message
141 while( count < control->n2get ) { // wait for n messages -- no timeout
142 msg = rmr_torcv_msg( control->mrc, msg, 1000 ); // pop after ~1 second
145 //fprintf( stderr, "<THRD> id=%d got type=%d state=%s msg=(%s)\n", control->id, msg->mtype, msg->state == RMR_OK ? "OK" : "timeout", msg->payload );
146 if( msg->state == RMR_OK ) {
147 if( good == 0 ) { // mark time of first good message
148 set_time( &start_ts );
150 set_time( &end_ts ); // mark the time of last good message
152 if( (msg_data = split( msg->payload, '|' )) != NULL ) {
153 if( sum( msg_data ) == atoi( (char *) msg->payload ) ) {
156 fprintf( stderr, "<RCVR> chk sum bad: computed=%d expected;%d (%s)\n", sum( msg_data ),
157 atoi( msg->payload ), msg_data );
161 if( (msg_data = split( msg->payload, ' ' )) != NULL ) { // data will point to the chksum for the trace data
162 state = rmr_get_trace( msg, wbuf, 1024 ); // should only copy upto the trace size; we'll check that
163 if( state > 128 || state < 0 ) {
164 fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state );
166 if( state && sum( wbuf ) != atoi( msg_data ) ) {
167 fprintf( stderr, "<RCVR> trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ),
168 atoi( msg_data ), state, wbuf );
174 good++; // nothing to check, assume good
178 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
179 count_bins[msg->mtype]++;
183 fprintf( stderr, "<THRD> id=%d timeout with nil msg\n", control->id );
186 if( time( NULL ) > timeout ) {
187 fprintf( stderr, "<THRD> id=%d timeout before receiving %d messages\n", control->id, control->n2get );
191 elap = elapsed( &start_ts, &end_ts, ELAP_MS );
193 fprintf( stderr, "<THRD> id=%d received %ld messages in %d ms rate = %ld msg/sec\n", control->id, count, elap, (count/elap)*1000 );
195 fprintf( stderr, "<THRD> id=%d runtime too short to compute received rate\n", control->id );
198 snprintf( wbuf, WBUF_SIZE, "<THRD> id=%d histogram: ", control->id ); // build histogram so we can write with one fprintf call
199 for( i = 0; i < 11; i++ ) {
200 snprintf( buf2, sizeof( buf2 ), "%5d ", count_bins[i] );
201 strcat( wbuf, buf2 );
203 fprintf( stderr, "%s\n", wbuf );
205 fprintf( stderr, "<THRD> id=%d %ld messages %ld good %ld bad\n", control->id, count, good, bad );
207 control->state = bad > 0 ? -1 : 0; // set to indicate done and <0 to indicate some failure
208 control->state += count < control->n2get ? -2 : 0;
212 int main( int argc, char** argv ) {
213 void* mrc; // msg router context
214 rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
215 struct epoll_event events[1]; // list of events to give to epoll
216 struct epoll_event epe; // event definition for event to listen to
217 int ep_fd = -1; // epoll's file des (given to epoll_wait)
218 char* listen_port = "43086";
219 long timeout = 0; // time the main thread will pop if listeners have not returned
220 int delay = 30; // max time to wait for n messages
221 int nmsgs = 10; // number of messages to expect
222 int nthreads = 3; // default number of listener threads
223 tdata_t* cvs; // vector of control blocks
225 pthread_t* pt_info; // thread stuff
229 nmsgs = atoi( argv[1] );
232 delay = atoi( argv[2] );
235 nthreads = atoi( argv[3] );
238 listen_port = argv[4];
241 fprintf( stderr, "<MTL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
243 if( (mrc = rmr_init( listen_port, 1400, 0 )) == NULL ) {
244 fprintf( stderr, "<MTL> unable to initialise RMr\n" );
248 rmr_init_trace( mrc, TRACE_SIZE );
250 cvs = malloc( sizeof( tdata_t ) * nthreads );
251 pt_info = malloc( sizeof( pthread_t ) * nthreads );
253 fprintf( stderr, "<MTL> unable to allocate control vector\n" );
257 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
258 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
259 fprintf( stderr, "<MTL> waiting for rmr to show ready\n" );
262 if( time( NULL ) > timeout ) {
263 fprintf( stderr, "<MTL> giving up\n" );
267 fprintf( stderr, "<MTL> rmr is ready; starting threads\n" );
269 for( i = 0; i < nthreads; i++ ) {
271 cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1
272 cvs[i].delay = delay;
273 cvs[i].n2get = nmsgs;
276 fprintf( stderr, "kicking %d i=%d\n", i+2, i );
277 pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread
280 timeout = time( NULL ) + 300; // wait up to 5 minutes
282 while( nthreads > 0 ) {
283 if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == good; <0 is failure
285 if( cvs[i].state < 0 ) {
291 if( time( NULL ) > timeout ) {
292 failures += nthreads;
293 fprintf( stderr, "<MTL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
300 fprintf( stderr, "<MTL> [%s] failing threads=%d\n", failures == 0 ? "PASS" : "FAIL", failures );