1 // :vim ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is a simple sender which will send a series of messages using
24 rmr_call(). N threads are started each sending the desired number
25 of messages and expecting an 'ack' for each. Each ack is examined
26 to verify that the thread id placed into the message matches (meaning
27 that the ack was delivered by RMr to the correct thread's chute.
29 In addition, the main thread listens for messages in order to verify
30 that a main or receiving thread can receive messages concurrently
31 while call acks are pending and being processed.
34 ck1 ck2|<msg-txt> @ tid<nil>
36 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
37 Ck2 is the simple check sum of the trace data which is a nil terminated
39 tid is the thread id assigned by the main thread.
41 Parms: argv[1] == number of msgs to send (10)
42 argv[2] == delay (mu-seconds, 1000000 default)
43 argv[3] == number of threads (3)
44 argv[4] == listen port
46 Sender will send for at most 20 seconds, so if nmsgs and delay extend
47 beyond that period the total number of messages sent will be less
51 Author: E. Scott Daniels
59 #include <sys/epoll.h>
66 #define TRACE_SIZE 40 // bytes in header to provide for trace junk
71 typedef struct tdata {
72 int id; // the id we'll pass to RMr mt-call function NOT the thread id
73 int n2send; // number of messages to send
74 int delay; // ms delay between messages
75 void* mrc; // RMr context
81 // --------------------------------------------------------------------------------
84 static int sum( char* str ) {
89 sum += *(str++) + i++;
98 Executed as a thread, this puppy will generate calls to ensure that we get the
99 response back to the right thread, that we can handle threads, etc.
101 static void* mk_calls( void* data ) {
103 rmr_mbuf_t* sbuf; // send buffer
105 int rt_count = 0; // number of messages requiring a spin retry
106 int ok_msg = 0; // received messages that were sent by us
107 int bad_msg = 0; // received messages that were sent by a different thread
109 int fail_count = 0; // # of failure sends after first successful send
110 int successful = 0; // set to true after we have a successful send
112 char xbuf[1024]; // build transaction string here
118 if( (control = (tdata_t *) data) == NULL ) {
119 fprintf( stderr, "thread data was nil; bailing out\n" );
121 //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
123 sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
125 memset( trace, 0, sizeof( trace ) );
126 while( count < control->n2send ) { // we send n messages after the first message is successful
127 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
128 rmr_set_trace( sbuf, trace, TRACE_SIZE ); // fully populate so we dont cause a buffer realloc
130 snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
131 snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
132 snprintf( xbuf, 200, "%31d", xaction_id );
133 rmr_bytes2xact( sbuf, xbuf, 32 );
135 sbuf->mtype = 5; // mtype is always 5 as the test receiver acks just mtype 5 messages
136 sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string
138 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
140 if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send
143 while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying
144 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response
148 switch( sbuf->state ) {
149 case RMR_OK: // we should have a buffer back from the sender here
151 if( (tok = strchr( sbuf->payload, '@' )) != NULL ) {
152 if( atoi( tok+1 ) == control->id ) {
153 //fprintf( stderr, "<THRD> tid=%-2d ok ack\n", control->id );
157 //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload );
160 //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload );
161 // future -- verify that we see our ID at the end of the message
166 fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
167 sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer
169 fail_count++; // count failures after first successful message
171 // some error (not connected likely), don't count this
177 //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
178 sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf
183 if( control->delay > 0 ) {
184 usleep( control->delay );
189 if( ok_msg < (control->n2send-1) || bad_msg > 0 ) { // allow one drop to pass
192 if( count < control->n2send ) {
196 control->state = -state; // signal inactive to main thread; -1 == pass, 0 == fail
197 fprintf( stderr, "<THRD> [%s] tid=%-2d sent=%d ok-acks=%d bad-acks=%d drops=%d failures=%d retries=%d\n",
198 state ? "PASS" : "FAIL", control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
204 int main( int argc, char** argv ) {
205 void* mrc; // msg router context
206 rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
207 struct epoll_event events[1]; // list of events to give to epoll
208 struct epoll_event epe; // event definition for event to listen to
209 int ep_fd = -1; // epoll's file des (given to epoll_wait)
210 int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
211 int nready; // number of events ready for receive
212 char* listen_port = "43086";
214 int delay = 100000; // usec between send attempts
215 int nmsgs = 10; // number of messages to send
217 tdata_t* cvs; // vector of control blocks
219 pthread_t* pt_info; // thread stuff
221 int pings = 0; // number of messages received on normal channel
224 nmsgs = atoi( argv[1] );
227 delay = atoi( argv[2] );
230 nthreads = atoi( argv[3] );
233 listen_port = argv[4];
236 fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
238 if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled
239 fprintf( stderr, "<CALL> unable to initialise RMr\n" );
243 rmr_init_trace( mrc, TRACE_SIZE );
245 if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
247 fprintf( stderr, "<CALL> unable to set up polling fd\n" );
250 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
251 fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
254 epe.events = EPOLLIN;
255 epe.data.fd = rcv_fd;
257 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
258 fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
262 rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
266 cvs = malloc( sizeof( tdata_t ) * nthreads );
267 pt_info = malloc( sizeof( pthread_t ) * nthreads );
269 fprintf( stderr, "<CALL> unable to allocate control vector\n" );
274 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
275 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
276 fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
279 if( time( NULL ) > timeout ) {
280 fprintf( stderr, "<CALL> giving up\n" );
284 fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
286 for( i = 0; i < nthreads; i++ ) {
288 cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1
289 cvs[i].delay = delay;
290 cvs[i].n2send = nmsgs;
293 pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread
296 timeout = time( NULL ) + 20;
298 while( nthreads > 0 ) {
299 if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success
301 if( cvs[i].state == 0 ) {
307 rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
308 if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
310 rmr_free_msg( rbuf );
314 if( time( NULL ) > timeout ) {
315 failures += nthreads;
316 fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
321 fprintf( stderr, "<CALL> [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings );