1 // :vim ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is a simple sender which will send a series of messages using
24 rmr_call(). N threads are started each sending the desired number
25 of messages and expecting an 'ack' for each. Each ack is examined
26 to verify that the thread id placed into the message matches (meaning
27 that the ack was delivered by RMr to the correct thread's chute.
29 In addition, the main thread listens for messages in order to verify
30 that a main or receiving thread can receive messages concurrently
31 while call acks are pending and being processed.
34 ck1 ck2|<msg-txt> @ tid<nil>
36 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
37 Ck2 is the simple check sum of the trace data which is a nil terminated
39 tid is the thread id assigned by the main thread.
41 Parms: argv[1] == number of msgs to send (10)
42 argv[2] == delay (mu-seconds, 1000000 default)
43 argv[3] == number of threads (3)
44 argv[4] == listen port
46 Sender will send for at most 20 seconds, so if nmsgs and delay extend
47 beyond that period the total number of messages sent will be less
51 Author: E. Scott Daniels
59 #include <sys/epoll.h>
66 #define TRACE_SIZE 40 // bytes in header to provide for trace junk
67 #define WBUF_SIZE 2048
72 typedef struct tdata {
73 int id; // the id we'll pass to RMr mt-call function NOT the thread id
74 int n2send; // number of messages to send
75 int delay; // ms delay between messages
76 void* mrc; // RMr context
82 // --------------------------------------------------------------------------------
85 static int sum( char* str ) {
90 sum += *(str++) + i++;
99 Executed as a thread, this puppy will generate calls to ensure that we get the
100 response back to the right thread, that we can handle threads, etc.
102 static void* mk_calls( void* data ) {
104 rmr_mbuf_t* sbuf; // send buffer
106 int rt_count = 0; // number of messages requiring a spin retry
107 int ok_msg = 0; // received messages that were sent by us
108 int bad_msg = 0; // received messages that were sent by a different thread
110 int fail_count = 0; // # of failure sends after first successful send
111 int successful = 0; // set to true after we have a successful send
113 char xbuf[1024]; // build transaction string here
119 wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
121 if( (control = (tdata_t *) data) == NULL ) {
122 fprintf( stderr, "thread data was nil; bailing out\n" );
124 //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
126 sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
128 memset( trace, 0, sizeof( trace ) );
129 while( count < control->n2send ) { // we send n messages after the first message is successful
130 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
131 rmr_set_trace( sbuf, trace, TRACE_SIZE ); // fully populate so we dont cause a buffer realloc
133 snprintf( wbuf, WBUF_SIZE, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
134 snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
135 snprintf( xbuf, 200, "%31d", xaction_id );
136 rmr_bytes2xact( sbuf, xbuf, 32 );
138 sbuf->mtype = 5; // mtype is always 5 as the test receiver acks just mtype 5 messages
139 sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string
141 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
143 if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send
146 while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying
147 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 5000 ); // call and wait up to 5s for a response
151 switch( sbuf->state ) {
152 case RMR_OK: // we should have a buffer back from the sender here
154 if( (tok = strchr( sbuf->payload, '@' )) != NULL ) {
155 if( atoi( tok+1 ) == control->id ) {
156 //fprintf( stderr, "<THRD> tid=%-2d ok ack\n", control->id );
160 //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload );
163 //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload );
164 // future -- verify that we see our ID at the end of the message
169 fprintf( stderr, "<CALLR> unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
170 sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer
172 fail_count++; // count failures after first successful message
174 // some error (not connected likely), don't count this
180 //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
181 sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf
186 if( control->delay > 0 ) {
187 usleep( control->delay );
192 if( ok_msg < (control->n2send-1) || bad_msg > 0 ) { // allow one drop to pass
195 if( count < control->n2send ) {
199 control->state = -state; // signal inactive to main thread; -1 == pass, 0 == fail
200 fprintf( stderr, "<THRD> [%s] tid=%-2d sent=%d ok-acks=%d bad-acks=%d drops=%d failures=%d retries=%d\n",
201 state ? "PASS" : "FAIL", control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
207 int main( int argc, char** argv ) {
208 void* mrc; // msg router context
209 rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
210 struct epoll_event events[1]; // list of events to give to epoll
211 struct epoll_event epe; // event definition for event to listen to
212 int ep_fd = -1; // epoll's file des (given to epoll_wait)
213 int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
214 int nready; // number of events ready for receive
215 char* listen_port = "43086";
217 int delay = 100000; // usec between send attempts
218 int nmsgs = 10; // number of messages to send
220 tdata_t* cvs; // vector of control blocks
222 pthread_t* pt_info; // thread stuff
224 int pings = 0; // number of messages received on normal channel
227 nmsgs = atoi( argv[1] );
230 delay = atoi( argv[2] );
233 nthreads = atoi( argv[3] );
236 listen_port = argv[4];
239 fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
241 if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled
242 fprintf( stderr, "<CALL> unable to initialise RMr\n" );
246 rmr_init_trace( mrc, TRACE_SIZE );
248 if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
250 fprintf( stderr, "<CALL> unable to set up polling fd\n" );
253 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
254 fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
257 epe.events = EPOLLIN;
258 epe.data.fd = rcv_fd;
260 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
261 fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
265 rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
269 cvs = malloc( sizeof( tdata_t ) * nthreads );
270 pt_info = malloc( sizeof( pthread_t ) * nthreads );
272 fprintf( stderr, "<CALL> unable to allocate control vector\n" );
277 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
278 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
279 fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
282 if( time( NULL ) > timeout ) {
283 fprintf( stderr, "<CALL> giving up\n" );
287 fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
289 for( i = 0; i < nthreads; i++ ) {
291 cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1
292 cvs[i].delay = delay;
293 cvs[i].n2send = nmsgs;
296 pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread
299 timeout = time( NULL ) + 20;
301 while( nthreads > 0 ) {
302 if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success
304 if( cvs[i].state == 0 ) {
310 rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
311 if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
313 rmr_free_msg( rbuf );
317 if( time( NULL ) > timeout ) {
318 failures += nthreads;
319 fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
324 fprintf( stderr, "<CALL> [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings );