X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnng%2Fsrc%2Frmr_nng.c;h=d9c4c061c3759cea494b3d3260bcb2df642461c1;hb=7d1b7b0de82414ad25ebd323b2913de72ab448c1;hp=e77c9f3b464906fa0c26034d66b19b68438d2700;hpb=68c1ab2191d9959fde0bd275a560f7c9cf6df485;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/rmr_nng.c b/src/rmr/nng/src/rmr_nng.c index e77c9f3..d9c4c06 100644 --- a/src/rmr/nng/src/rmr_nng.c +++ b/src/rmr/nng/src/rmr_nng.c @@ -1,4 +1,4 @@ -// : vi ts=4 sw=4 noet : +// vim: ts=4 sw=4 noet : /* ================================================================================== Copyright (c) 2019 Nokia @@ -50,6 +50,8 @@ #include #include #include +#include +#include #include #include @@ -70,6 +72,8 @@ #include "tools_static.c" #include "sr_nng_static.c" // send/receive static functions #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!) +#include "mt_call_static.c" +#include "mt_call_nng_static.c" //------------------------------------------------------------------------------ @@ -181,103 +185,20 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { } /* - send message with maximum timeout. - Accept a message and send it to an endpoint based on message type. - If NNG reports that the send attempt timed out, or should be retried, - RMr will retry for approximately max_to microseconds; rounded to the next - higher value of 10. - - Allocates a new message buffer for the next send. If a message type has - more than one group of endpoints defined, then the message will be sent - in round robin fashion to one endpoint in each group. - - An endpoint will be looked up in the route table using the message type and - the subscription id. If the subscription id is "UNSET_SUBID", then only the - message type is used. If the initial lookup, with a subid, fails, then a - second lookup using just the mtype is tried. - - CAUTION: this is a non-blocking send. If the message cannot be sent, then - it will return with an error and errno set to eagain. If the send is - a limited fanout, then the returned status is the status of the last - send attempt. - + This is a wrapper to the real timeout send. We must wrap it now to ensure that + the call flag and call-id are reset */ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { - nng_socket nn_sock; // endpoint socket for send - uta_ctx_t* ctx; - int group; // selected group to get socket for - int send_again; // true if the message must be sent again - rmr_mbuf_t* clone_m; // cloned message for an nth send - int sock_ok; // got a valid socket from round robin select - uint64_t key; // mtype or sub-id/mtype sym table key - int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails + char* d1; // point at the call-id in the header - if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast - errno = EINVAL; // if msg is null, this is their clue - if( msg != NULL ) { - msg->state = RMR_ERR_BADARG; - errno = EINVAL; // must ensure it's not eagain - } - return msg; - } + if( msg != NULL ) { + ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off - errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't - if( msg->header == NULL ) { - fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" ); - msg->state = RMR_ERR_NOHDR; - errno = EBADMSG; // must ensure it's not eagain - return msg; - } - - if( max_to < 0 ) { - max_to = ctx->send_retries; // convert to retries - } + d1 = DATA1_ADDR( msg->header ); + d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end + } - send_again = 1; // force loop entry - group = 0; // always start with group 0 - - key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry - if( msg->sub_id != UNSET_SUBID ) { - altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry - } - while( send_again ) { - sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups - if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n", - msg->mtype, send_again, group, msg->len, sock_ok, altk_ok ); - - if( ! sock_ok ) { - if( altk_ok ) { // we can try with the alternate (no sub-id) key - altk_ok = 0; - key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again - send_again = 1; // ensure we don't exit the while - continue; - } - - msg->state = RMR_ERR_NOENDPT; - errno = ENXIO; // must ensure it's not eagain - return msg; // caller can resend (maybe) or free - } - - group++; - - if( send_again ) { - clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available - if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); - msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer - msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success - /* - if( msg ) { - // error do we need to count successes/errors, how to report some success, esp if last fails? - } - */ - - msg = clone_m; // clone will be the next to send - } else { - msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was - } - } - - return msg; // last message caries the status of last/only send attempt + return mtosend_msg( vctx, msg, max_to ); } /* @@ -286,7 +207,16 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { See rmr_stimeout() for info on setting the default timeout. */ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { - return rmr_mtosend_msg( vctx, msg, -1 ); // retries < uses default from ctx + char* d1; // point at the call-id in the header + + if( msg != NULL ) { + ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off + + d1 = DATA1_ADDR( msg->header ); + d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end + } + + return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx } /* @@ -314,17 +244,19 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { The caller must check for this and handle. */ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { - nng_socket nn_sock; // endpoint socket for send + nng_socket nn_sock; // endpoint socket for send uta_ctx_t* ctx; - int state; - uta_mhdr_t* hdr; - char* hold_src; // we need the original source if send fails - int sock_ok; // true if we found a valid endpoint socket + int state; + char* hold_src; // we need the original source if send fails + char* hold_ip; // also must hold original ip + int sock_ok = 0; // true if we found a valid endpoint socket + endpoint_t* ep; // end point to track counts if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue if( msg != NULL ) { msg->state = RMR_ERR_BADARG; + msg->tp_state = errno; } return msg; } @@ -333,29 +265,65 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { if( msg->header == NULL ) { fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" ); msg->state = RMR_ERR_NOHDR; + msg->tp_state = errno; return msg; } - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // socket of specific endpoint + ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off + + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // src is always used first for rts if( ! sock_ok ) { - msg->state = RMR_ERR_NOENDPT; - return msg; // preallocated msg can be reused since not given back to nn + if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep ); + } + if( ! sock_ok ) { + msg->state = RMR_ERR_NOENDPT; + return msg; // preallocated msg can be reused since not given back to nn + } } - msg->state = RMR_OK; // ensure it is clear before send - hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours + msg->state = RMR_OK; // ensure it is clear before send + hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to + hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip + strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again - msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source + if( ep != NULL ) { + switch( msg->state ) { + case RMR_OK: + ep->scounts[EPSC_GOOD]++; + break; + + case RMR_ERR_RETRY: + ep->scounts[EPSC_TRANS]++; + break; + + default: + ep->scounts[EPSC_FAIL]++; + break; + } + } + strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again + strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again + msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source } free( hold_src ); + free( hold_ip ); return msg; } /* + If multi-threading call is turned on, this invokes that mechanism with the special call + id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original + behavour (described below) is carried out. This is safe to use when mt is enabled, but + the user app is invoking rmr_call() from only one thread, and the caller doesn't need + a flexible timeout. + + On timeout this function will return a nil pointer. If the original message could not + be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status. + + Original behavour: Call sends the message based on message routing using the message type, and waits for a response message to arrive with the same transaction id that was in the outgoing message. If, while wiating for the expected response, messages are received which do not have the @@ -373,8 +341,6 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { EAGAIN -- the underlying message system wsa interrupted or the device was busy; user should call this function with the message again. - - QUESTION: should user specify the number of messages to allow to queue? */ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { uta_ctx_t* ctx; @@ -387,6 +353,10 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { return msg; } + if( ctx->flags & CFL_MTC_ENABLED ) { // if multi threaded call is on, use that + return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec + } + memcpy( expected_id, msg->xaction, RMR_MAX_XID ); expected_id[RMR_MAX_XID] = 0; // ensure it's a string if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id ); @@ -398,6 +368,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { if( msg->state != RMR_ERR_RETRY ) { msg->state = RMR_ERR_CALLFAILED; // errno not available to all wrappers; don't stomp if marked retry } + msg->tp_state = errno; return msg; } @@ -418,14 +389,19 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { rmr_mbuf_t* qm; // message that was queued on the ring if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; if( old_msg != NULL ) { old_msg->state = RMR_ERR_BADARG; + old_msg->tp_state = errno; } - errno = EINVAL; return old_msg; } errno = 0; + if( ctx->flags & CFL_MTC_ENABLED ) { // must pop from ring with a semaphore dec first + return rmr_mt_rcv( ctx, old_msg, -1 ); + } + qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued if( qm != NULL ) { if( old_msg ) { @@ -451,13 +427,18 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { rmr_mbuf_t* msg; if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; if( old_msg != NULL ) { old_msg->state = RMR_ERR_BADARG; + old_msg->tp_state = errno; } - errno = EINVAL; return old_msg; } + if( ctx->flags & CFL_MTC_ENABLED ) { // must pop from ring with a semaphore dec first + return rmr_mt_rcv( ctx, old_msg, ms_to ); + } + qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued if( qm != NULL ) { if( old_msg ) { @@ -473,7 +454,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) { fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno ); free( eps ); - return NULL; + ctx->eps = NULL; + if( old_msg != NULL ) { + old_msg->state = RMR_ERR_INITFAILED; + old_msg->tp_state = errno; + } + return old_msg; } eps->nng_fd = rmr_get_rcvfd( ctx ); @@ -483,7 +469,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) { fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); free( eps ); - return NULL; + ctx->eps = NULL; + if( old_msg != NULL ) { + old_msg->state = RMR_ERR_INITFAILED; + old_msg->tp_state = errno; + } + return old_msg; } ctx->eps = eps; @@ -502,6 +493,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to ); // block until something or timedout if( nready <= 0 ) { // we only wait on ours, so we assume ready means it's ours msg->state = RMR_ERR_TIMEOUT; + msg->tp_state = errno; } else { return rcv_msg( ctx, msg ); // receive it and return it } @@ -525,10 +517,11 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int exp_len = 0; // length of expected ID if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; if( msg != NULL ) { msg->state = RMR_ERR_BADARG; + msg->tp_state = errno; } - errno = EINVAL; return msg; } @@ -569,17 +562,15 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, return NULL; } -// CAUTION: these are not supported as they must be set differently (between create and open) in NNG. -// until those details are worked out, these generate a warning. /* - Set send timeout. The value time is assumed to be microseconds. The timeout is the - rough maximum amount of time that RMr will block on a send attempt when the underlying + Set send timeout. The value time is assumed to be milliseconds. The timeout is the + _rough_ maximum amount of time that RMr will block on a send attempt when the underlying mechnism indicates eagain or etimeedout. All other error conditions are reported without this delay. Setting a timeout of 0 causes no retries to be attempted in - RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning, - but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) - after every 10K send attempts until the time value is reached. Retries are abandoned - if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT. + RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning, + but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) + after every 1K send attempts until the "time" value is reached. Retries are abandoned + if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT. The default, if this function is not used, is 1; meaning that RMr will retry, but will not enter a sleep. In all cases the caller should check the status in the message returned @@ -604,6 +595,8 @@ extern int rmr_set_stimeout( void* vctx, int time ) { /* Set receive timeout -- not supported in nng implementation + + CAUTION: this is not supported as they must be set differently (between create and open) in NNG. */ extern int rmr_set_rtimeout( void* vctx, int time ) { fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" ); @@ -628,11 +621,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { char* proto_port; char wbuf[1024]; // work buffer char* tok; // pointer at token in a buffer + char* tok2; int state; if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", - RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -650,7 +644,14 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { memset( ctx, 0, sizeof( uta_ctx_t ) ); ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning - ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response + ctx->d1_len = 4; // data1 space in header -- 4 bytes for now + + if( flags & RMRFL_MTCALL ) { // mt call support is on, need bigger ring + ctx->mring = uta_mk_ring( 2048 ); // message ring filled by rcv thread + init_mtcall( ctx ); // set up call chutes + } else { + ctx->mring = uta_mk_ring( 128 ); // ring filled only on blocking call + } ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh if( max_msg_size > 0 ) { @@ -677,21 +678,58 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { port = proto_port; // assume something like "1234" was passed } - if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { - fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); - return NULL; - } - if( (tok = strchr( wbuf, '.' )) != NULL ) { - *tok = 0; // we don't keep domain portion + if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system + tok = strdup( tok ); // something we can destroy + if( *tok == '[' ) { // we allow an ipv6 address here + tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given + } else { + tok2 = strchr( tok, ':' ); // find :port if there so we can chop + } + if( tok2 && *tok2 ) { // if it's not the end of string marker + *tok2 = 0; // make it so + } + + snprintf( wbuf, RMR_MAX_SRC, "%s", tok ); + free( tok ); + } else { + if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { + fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); + return NULL; + } + if( (tok = strchr( wbuf, '.' )) != NULL ) { + *tok = 0; // we don't keep domain portion + } } - ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID ); - if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) { // our registered name is host:port - fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port ); + + ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); + if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port + fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); return NULL; } - ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons + if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) { + if( atoi( tok ) > 0 ) { + flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages + } + } + ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons + if( flags & RMRFL_NAME_ONLY ) { + ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name + } else { + ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src + if( ctx->my_ip == NULL ) { + fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" ); + strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer + } + } + if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip ); + + if( (tok = getenv( ENV_WARNINGS )) != NULL ) { + if( *tok == '1' ) { + ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance) + } + } if( (interface = getenv( ENV_BIND_IF )) == NULL ) { @@ -701,7 +739,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { // rather than using this generic listen() call. snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port ); if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) { - fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) ); + fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) ); nng_close( ctx->nn_sock ); free_ctx( ctx ); return NULL; @@ -709,8 +747,16 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread - fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) ); + fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) ); + } + } + + if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) { // mt call support is on, must start the listener thread if not running + ctx->flags |= CFL_MTC_ENABLED; + if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // kick the receiver + fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); } + } free( proto_port ); @@ -818,4 +864,225 @@ extern void rmr_close( void* vctx ) { } +// ----- multi-threaded call/receive support ------------------------------------------------- + +/* + Blocks on the receive ring chute semaphore and then reads from the ring + when it is tickled. If max_wait is -1 then the function blocks until + a message is ready on the ring. Else max_wait is assumed to be the number + of millaseconds to wait before returning a timeout message. +*/ +extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { + uta_ctx_t* ctx; + uta_mhdr_t* hdr; // header in the transport buffer + chute_t* chute; + struct timespec ts; // time info if we have a timeout + long new_ms; // adjusted mu-sec + long seconds = 0; // max wait seconds + long nano_sec; // max wait xlated to nano seconds + int state; + rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + if( mbuf ) { + mbuf->state = RMR_ERR_BADARG; + mbuf->tp_state = errno; + } + return mbuf; + } + + if( ! (ctx->flags & CFL_MTC_ENABLED) ) { + errno = EINVAL; + if( mbuf != NULL ) { + mbuf->state = RMR_ERR_NOTSUPP; + mbuf->tp_state = errno; + } + return mbuf; + } + + ombuf = mbuf; + if( ombuf ) { + ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure + ombuf->len = 0; + } + + chute = &ctx->chutes[0]; // chute 0 used only for its semaphore + + if( max_wait >= 0 ) { + clock_gettime( CLOCK_REALTIME, &ts ); + + if( max_wait > 999 ) { + seconds = max_wait / 1000; + max_wait -= seconds * 1000; + ts.tv_sec += seconds; + } + if( max_wait > 0 ) { + nano_sec = max_wait * 1000000; + ts.tv_nsec += nano_sec; + if( ts.tv_nsec > 999999999 ) { + ts.tv_nsec -= 999999999; + ts.tv_sec++; + } + } + + seconds = 1; // use as flag later to invoked timed wait + } + + errno = EINTR; + state = -1; + while( state < 0 && errno == EINTR ) { + if( seconds ) { + state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout + } else { + state = sem_wait( &chute->barrier ); + } + } + + if( state < 0 ) { + mbuf = ombuf; // return caller's buffer if they passed one in + } else { + errno = 0; // interrupted call state could be left; clear + if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" ); + if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued + mbuf->state = RMR_OK; + + if( ombuf ) { + rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring + } + } else { + errno = ETIMEDOUT; + mbuf = ombuf; // no buffer, return user's if there + } + } + + if( mbuf ) { + mbuf->tp_state = errno; + } + return mbuf; +} + +/* + Accept a message buffer and caller ID, send the message and then wait + for the receiver to tickle the semaphore letting us know that a message + has been received. The call_id is a value between 2 and 255, inclusive; if + it's not in this range an error will be returned. Max wait is the amount + of time in millaseconds that the call should block for. If 0 is given + then no timeout is set. + + If the mt_call feature has not been initialised, then the attempt to use this + funciton will fail with RMR_ERR_NOTSUPP + + If no matching message is received before the max_wait period expires, a + nil pointer is returned, and errno is set to ETIMEOUT. If any other error + occurs after the message has been sent, then a nil pointer is returned + with errno set to some other value. +*/ +extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { + rmr_mbuf_t* ombuf; // original mbuf passed in + uta_ctx_t* ctx; + uta_mhdr_t* hdr; // header in the transport buffer + chute_t* chute; + unsigned char* d1; // d1 data in header + struct timespec ts; // time info if we have a timeout + long new_ms; // adjusted mu-sec + long seconds = 0; // max wait seconds + long nano_sec; // max wait xlated to nano seconds + int state; + + errno = EINVAL; + if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) { + if( mbuf ) { + mbuf->tp_state = errno; + mbuf->state = RMR_ERR_BADARG; + } + return mbuf; + } + + if( ! (ctx->flags & CFL_MTC_ENABLED) ) { + mbuf->state = RMR_ERR_NOTSUPP; + mbuf->tp_state = errno; + return mbuf; + } + + if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them + mbuf->state = RMR_ERR_BADARG; + mbuf->tp_state = errno; + return mbuf; + } + + ombuf = mbuf; // save to return timeout status with + + chute = &ctx->chutes[call_id]; + if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped + rmr_free_msg( chute->mbuf ); + chute->mbuf = NULL; + } + + hdr = (uta_mhdr_t *) mbuf->header; + hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call + memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for + d1 = DATA1_ADDR( hdr ); + d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response + mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend + + if( max_wait >= 0 ) { + clock_gettime( CLOCK_REALTIME, &ts ); + + if( max_wait > 999 ) { + seconds = max_wait / 1000; + max_wait -= seconds * 1000; + ts.tv_sec += seconds; + } + if( max_wait > 0 ) { + nano_sec = max_wait * 1000000; + ts.tv_nsec += nano_sec; + if( ts.tv_nsec > 999999999 ) { + ts.tv_nsec -= 999999999; + ts.tv_sec++; + } + } + + seconds = 1; // use as flag later to invoked timed wait + } + + mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! + if( mbuf ) { + if( mbuf->state != RMR_OK ) { + mbuf->tp_state = errno; + return mbuf; // timeout or unable to connect or no endpoint are most likely issues + } + } + + state = 0; + errno = 0; + while( chute->mbuf == NULL && ! errno ) { + if( seconds ) { + state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout + } else { + state = sem_wait( &chute->barrier ); + } + + if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit + errno = 0; + } + + if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here + if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) { + rmr_free_msg( chute->mbuf ); + chute->mbuf = NULL; + errno = 0; + } + } + } + + if( state < 0 ) { + return NULL; // leave errno as set by sem wait call + } + + mbuf = chute->mbuf; + mbuf->state = RMR_OK; + chute->mbuf = NULL; + return mbuf; +}