--- /dev/null
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: caller.c
+ Abstract: This is a simple sender which will send a series of messages using
+ rmr_call(). N threads are started each sending the desired number
+ of messages and expecting an 'ack' for each. Each ack is examined
+ to verify that the thread id placed into the message matches (meaning
+ that the ack was delivered by RMr to the correct thread's chute.
+
+ In addition, the main thread listens for messages in order to verify
+ that a main or receiving thread can receive messages concurrently
+ while call acks are pending and being processed.
+
+ Message format is:
+ ck1 ck2|<msg-txt> @ tid<nil>
+
+ Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
+ Ck2 is the simple check sum of the trace data which is a nil terminated
+ series of bytes.
+ tid is the thread id assigned by the main thread.
+
+ Parms: argv[1] == number of msgs to send (10)
+ argv[2] == delay (mu-seconds, 1000000 default)
+ argv[3] == number of threads (3)
+ argv[4] == listen port
+
+ Sender will send for at most 20 seconds, so if nmsgs and delay extend
+ beyond that period the total number of messages sent will be less
+ than n.
+
+ Date: 18 April 2019
+ Author: E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <pthread.h>
+
+
+#include <rmr/rmr.h>
+
+#define TRACE_SIZE 40 // bytes in header to provide for trace junk
+
+/*
+ Thread data
+*/
+typedef struct tdata {
+ int id; // the id we'll pass to RMr mt-call function NOT the thread id
+ int n2send; // number of messages to send
+ int delay; // ms delay between messages
+ void* mrc; // RMr context
+ int state;
+} tdata_t;
+
+
+
+// --------------------------------------------------------------------------------
+
+
+static int sum( char* str ) {
+ int sum = 0;
+ int i = 0;
+
+ while( *str ) {
+ sum += *(str++) + i++;
+ }
+
+ return sum % 255;
+}
+
+
+
+/*
+ Executed as a thread, this puppy will generate calls to ensure that we get the
+ response back to the right thread, that we can handle threads, etc.
+*/
+static void* mk_calls( void* data ) {
+ tdata_t* control;
+ rmr_mbuf_t* sbuf; // send buffer
+ int count = 0;
+ int rt_count = 0; // number of messages requiring a spin retry
+ int ok_msg = 0; // received messages that were sent by us
+ int bad_msg = 0; // received messages that were sent by a different thread
+ int drops = 0;
+ int fail_count = 0; // # of failure sends after first successful send
+ int successful = 0; // set to true after we have a successful send
+ char wbuf[1024];
+ char xbuf[1024]; // build transaction string here
+ char trace[1024];
+ int xaction_id = 1;
+ char* tok;
+ int state = 0;
+
+ if( (control = (tdata_t *) data) == NULL ) {
+ fprintf( stderr, "thread data was nil; bailing out\n" );
+ }
+ //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
+
+ sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
+
+ memset( trace, 0, sizeof( trace ) );
+ while( count < control->n2send ) { // we send n messages after the first message is successful
+ snprintf( trace, 100, "%lld", (long long) time( NULL ) );
+ rmr_set_trace( sbuf, trace, TRACE_SIZE ); // fully populate so we dont cause a buffer realloc
+
+ snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
+ snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
+ snprintf( xbuf, 200, "%31d", xaction_id );
+ rmr_bytes2xact( sbuf, xbuf, 32 );
+
+ sbuf->mtype = 5; // mtype is always 5 as the test receiver acks just mtype 5 messages
+ sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string
+ sbuf->state = 0;
+ sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
+
+ if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send
+ rt_count++;
+ }
+ while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying
+ sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response
+ }
+
+ if( sbuf != NULL ) {
+ switch( sbuf->state ) {
+ case RMR_OK: // we should have a buffer back from the sender here
+ successful = 1;
+ if( (tok = strchr( sbuf->payload, '@' )) != NULL ) {
+ if( atoi( tok+1 ) == control->id ) {
+ //fprintf( stderr, "<THRD> tid=%-2d ok ack\n", control->id );
+ ok_msg++;
+ } else {
+ bad_msg++;
+ //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload );
+ }
+ }
+ //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload );
+ // future -- verify that we see our ID at the end of the message
+ count++;
+ break;
+
+ default:
+ fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
+ sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer
+ if( successful ) {
+ fail_count++; // count failures after first successful message
+ } else {
+ // some error (not connected likely), don't count this
+ sleep( 1 );
+ }
+ break;
+ }
+ } else {
+ //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
+ sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf
+ drops++;
+ count++;
+ }
+
+ if( control->delay > 0 ) {
+ usleep( control->delay );
+ }
+ }
+
+ state = 1;
+ if( ok_msg < (control->n2send-1) || bad_msg > 0 ) { // allow one drop to pass
+ state = 0;
+ }
+ if( count < control->n2send ) {
+ state = 0;
+ }
+
+ control->state = -state; // signal inactive to main thread; -1 == pass, 0 == fail
+ fprintf( stderr, "<THRD> [%s] tid=%-2d sent=%d ok-acks=%d bad-acks=%d drops=%d failures=%d retries=%d\n",
+ state ? "PASS" : "FAIL", control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
+
+
+ return NULL;
+}
+
+int main( int argc, char** argv ) {
+ void* mrc; // msg router context
+ rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
+ struct epoll_event events[1]; // list of events to give to epoll
+ struct epoll_event epe; // event definition for event to listen to
+ int ep_fd = -1; // epoll's file des (given to epoll_wait)
+ int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
+ int nready; // number of events ready for receive
+ char* listen_port = "43086";
+ long timeout = 0;
+ int delay = 100000; // usec between send attempts
+ int nmsgs = 10; // number of messages to send
+ int nthreads = 3;
+ tdata_t* cvs; // vector of control blocks
+ int i;
+ pthread_t* pt_info; // thread stuff
+ int failures = 0;
+ int pings = 0; // number of messages received on normal channel
+
+ if( argc > 1 ) {
+ nmsgs = atoi( argv[1] );
+ }
+ if( argc > 2 ) {
+ delay = atoi( argv[2] );
+ }
+ if( argc > 3 ) {
+ nthreads = atoi( argv[3] );
+ }
+ if( argc > 4 ) {
+ listen_port = argv[4];
+ }
+
+ fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+ if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled
+ fprintf( stderr, "<CALL> unable to initialise RMr\n" );
+ exit( 1 );
+ }
+
+ rmr_init_trace( mrc, TRACE_SIZE );
+
+ if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
+ if( rcv_fd < 0 ) {
+ fprintf( stderr, "<CALL> unable to set up polling fd\n" );
+ exit( 1 );
+ }
+ if( (ep_fd = epoll_create1( 0 )) < 0 ) {
+ fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
+ exit( 1 );
+ }
+ epe.events = EPOLLIN;
+ epe.data.fd = rcv_fd;
+
+ if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
+ fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+ exit( 1 );
+ }
+ } else {
+ rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
+ }
+
+
+ cvs = malloc( sizeof( tdata_t ) * nthreads );
+ pt_info = malloc( sizeof( pthread_t ) * nthreads );
+ if( cvs == NULL ) {
+ fprintf( stderr, "<CALL> unable to allocate control vector\n" );
+ exit( 1 );
+ }
+
+
+ timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
+ while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
+ fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
+ sleep( 1 );
+
+ if( time( NULL ) > timeout ) {
+ fprintf( stderr, "<CALL> giving up\n" );
+ exit( 1 );
+ }
+ }
+ fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
+
+ for( i = 0; i < nthreads; i++ ) {
+ cvs[i].mrc = mrc;
+ cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1
+ cvs[i].delay = delay;
+ cvs[i].n2send = nmsgs;
+ cvs[i].state = 1;
+
+ pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread
+ }
+
+ timeout = time( NULL ) + 20;
+ i = 0;
+ while( nthreads > 0 ) {
+ if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success
+ nthreads--;
+ if( cvs[i].state == 0 ) {
+ failures++;
+ }
+ i++;
+ } else {
+ // sleep( 1 );
+ rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
+ if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
+ pings++;
+ rmr_free_msg( rbuf );
+ rbuf = NULL;
+ }
+ }
+ if( time( NULL ) > timeout ) {
+ failures += nthreads;
+ fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
+ break;
+ }
+ }
+
+ fprintf( stderr, "<CALL> [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings );
+ rmr_close( mrc );
+
+ return failures > 0;
+}
+