+/*
+ Blocks on the receive ring chute semaphore and then reads from the ring
+ when it is tickled. If max_wait is -1 then the function blocks until
+ a message is ready on the ring. Else max_wait is assumed to be the number
+ of millaseconds to wait before returning a timeout message.
+*/
+extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
+ uta_ctx_t* ctx;
+ uta_mhdr_t* hdr; // header in the transport buffer
+ chute_t* chute;
+ struct timespec ts; // time info if we have a timeout
+ long new_ms; // adjusted mu-sec
+ long seconds = 0; // max wait seconds
+ long nano_sec; // max wait xlated to nano seconds
+ int state;
+ rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
+ if( mbuf ) {
+ mbuf->state = RMR_ERR_BADARG;
+ mbuf->tp_state = errno;
+ }
+ return mbuf;
+ }
+
+ if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+ errno = EINVAL;
+ if( mbuf != NULL ) {
+ mbuf->state = RMR_ERR_NOTSUPP;
+ mbuf->tp_state = errno;
+ }
+ return mbuf;
+ }
+
+ ombuf = mbuf;
+ if( ombuf ) {
+ ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
+ ombuf->len = 0;
+ }
+
+ chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
+
+ if( max_wait > 0 ) {
+ clock_gettime( CLOCK_REALTIME, &ts );
+
+ if( max_wait > 999 ) {
+ seconds = (max_wait - 999)/1000;
+ max_wait -= seconds * 1000;
+ ts.tv_sec += seconds;
+ }
+ if( max_wait > 0 ) {
+ nano_sec = max_wait * 1000000;
+ ts.tv_nsec += nano_sec;
+ if( ts.tv_nsec > 999999999 ) {
+ ts.tv_nsec -= 999999999;
+ ts.tv_sec++;
+ }
+ }
+
+ seconds = 1; // use as flag later to invoked timed wait
+ }
+
+ errno = 0;
+ while( chute->mbuf == NULL && ! errno ) {
+ if( seconds ) {
+ state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
+ } else {
+ state = sem_wait( &chute->barrier );
+ }
+
+ if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
+ errno = 0;
+ }
+ }
+
+ if( state < 0 ) {
+ mbuf = ombuf; // return caller's buffer if they passed one in
+ } else {
+ if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+ if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
+ if( mbuf ) {
+ mbuf->state = RMR_OK;
+
+ if( ombuf ) {
+ rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
+ }
+ } else {
+ mbuf = ombuf; // no buffer, return user's if there
+ }
+ }
+ }
+
+ if( mbuf ) {
+ mbuf->tp_state = errno;
+ }
+ return mbuf;
+}
+
+/*
+ Accept a message buffer and caller ID, send the message and then wait
+ for the receiver to tickle the semaphore letting us know that a message
+ has been received. The call_id is a value between 2 and 255, inclusive; if
+ it's not in this range an error will be returned. Max wait is the amount
+ of time in millaseconds that the call should block for. If 0 is given
+ then no timeout is set.
+
+ If the mt_call feature has not been initialised, then the attempt to use this
+ funciton will fail with RMR_ERR_NOTSUPP
+
+ If no matching message is received before the max_wait period expires, a
+ nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+ occurs after the message has been sent, then a nil pointer is returned
+ with errno set to some other value.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+ rmr_mbuf_t* ombuf; // original mbuf passed in
+ uta_ctx_t* ctx;
+ uta_mhdr_t* hdr; // header in the transport buffer
+ chute_t* chute;
+ unsigned char* d1; // d1 data in header
+ struct timespec ts; // time info if we have a timeout
+ long new_ms; // adjusted mu-sec
+ long seconds = 0; // max wait seconds
+ long nano_sec; // max wait xlated to nano seconds
+ int state;
+
+ errno = EINVAL;
+ if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
+ if( mbuf ) {
+ mbuf->tp_state = errno;
+ mbuf->state = RMR_ERR_BADARG;
+ }
+ return mbuf;
+ }
+
+ if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+ mbuf->state = RMR_ERR_NOTSUPP;
+ mbuf->tp_state = errno;
+ return mbuf;
+ }
+
+ if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
+ mbuf->state = RMR_ERR_BADARG;
+ mbuf->tp_state = errno;
+ return mbuf;
+ }
+
+ ombuf = mbuf; // save to return timeout status with
+
+ chute = &ctx->chutes[call_id];
+ if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
+ rmr_free_msg( chute->mbuf );
+ chute->mbuf = NULL;
+ }
+
+ hdr = (uta_mhdr_t *) mbuf->header;
+ hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
+ memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
+ d1 = DATA1_ADDR( hdr );
+ d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
+ mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
+
+ if( max_wait > 0 ) {
+ clock_gettime( CLOCK_REALTIME, &ts );
+
+ if( max_wait > 999 ) {
+ seconds = (max_wait - 999)/1000;
+ max_wait -= seconds * 1000;
+ ts.tv_sec += seconds;
+ }
+ if( max_wait > 0 ) {
+ nano_sec = max_wait * 1000000;
+ ts.tv_nsec += nano_sec;
+ if( ts.tv_nsec > 999999999 ) {
+ ts.tv_nsec -= 999999999;
+ ts.tv_sec++;
+ }
+ }
+
+ seconds = 1; // use as flag later to invoked timed wait
+ }
+
+ mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
+ if( mbuf ) {
+ if( mbuf->state != RMR_OK ) {
+ mbuf->tp_state = errno;
+ return mbuf; // timeout or unable to connect or no endpoint are most likely issues
+ }
+ }
+
+ errno = 0;
+ while( chute->mbuf == NULL && ! errno ) {
+ if( seconds ) {
+ state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
+ } else {
+ state = sem_wait( &chute->barrier );
+ }
+
+ if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
+ errno = 0;
+ }
+
+ if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
+ if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
+ rmr_free_msg( chute->mbuf );
+ chute->mbuf = NULL;
+ errno = 0;
+ }
+ }
+ }
+
+ if( state < 0 ) {
+ return NULL; // leave errno as set by sem wait call
+ }
+
+ mbuf = chute->mbuf;
+ mbuf->state = RMR_OK;
+ chute->mbuf = NULL;
+
+ return mbuf;
+}