X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnng%2Fsrc%2Frmr_nng.c;h=d9c4c061c3759cea494b3d3260bcb2df642461c1;hb=7d1b7b0de82414ad25ebd323b2913de72ab448c1;hp=cf5e4295308af24a48164c3bf533f874681473a0;hpb=750cbee6b224532fc10971c04df556ce4bceb832;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/rmr_nng.c b/src/rmr/nng/src/rmr_nng.c index cf5e429..d9c4c06 100644 --- a/src/rmr/nng/src/rmr_nng.c +++ b/src/rmr/nng/src/rmr_nng.c @@ -1,4 +1,4 @@ -// : vi ts=4 sw=4 noet : +// vim: ts=4 sw=4 noet : /* ================================================================================== Copyright (c) 2019 Nokia @@ -248,12 +248,15 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { uta_ctx_t* ctx; int state; char* hold_src; // we need the original source if send fails + char* hold_ip; // also must hold original ip int sock_ok = 0; // true if we found a valid endpoint socket + endpoint_t* ep; // end point to track counts if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue if( msg != NULL ) { msg->state = RMR_ERR_BADARG; + msg->tp_state = errno; } return msg; } @@ -262,15 +265,17 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { if( msg->header == NULL ) { fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" ); msg->state = RMR_ERR_NOHDR; + msg->tp_state = errno; return msg; } ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off - if( HDR_VERSION( msg->header ) > 2 ) { // new version uses sender's ip address for rts - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock ); // default to IP based rts - } + + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // src is always used first for rts if( ! sock_ok ) { - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // IP not in rt, try name + if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep ); + } if( ! sock_ok ) { msg->state = RMR_ERR_NOENDPT; return msg; // preallocated msg can be reused since not given back to nn @@ -279,14 +284,32 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { msg->state = RMR_OK; // ensure it is clear before send hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to + hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { + if( ep != NULL ) { + switch( msg->state ) { + case RMR_OK: + ep->scounts[EPSC_GOOD]++; + break; + + case RMR_ERR_RETRY: + ep->scounts[EPSC_TRANS]++; + break; + + default: + ep->scounts[EPSC_FAIL]++; + break; + } + } strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again + strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source } free( hold_src ); + free( hold_ip ); return msg; } @@ -345,6 +368,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { if( msg->state != RMR_ERR_RETRY ) { msg->state = RMR_ERR_CALLFAILED; // errno not available to all wrappers; don't stomp if marked retry } + msg->tp_state = errno; return msg; } @@ -365,10 +389,11 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { rmr_mbuf_t* qm; // message that was queued on the ring if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; if( old_msg != NULL ) { old_msg->state = RMR_ERR_BADARG; + old_msg->tp_state = errno; } - errno = EINVAL; return old_msg; } errno = 0; @@ -402,10 +427,11 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { rmr_mbuf_t* msg; if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; if( old_msg != NULL ) { old_msg->state = RMR_ERR_BADARG; + old_msg->tp_state = errno; } - errno = EINVAL; return old_msg; } @@ -428,7 +454,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) { fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno ); free( eps ); - return NULL; + ctx->eps = NULL; + if( old_msg != NULL ) { + old_msg->state = RMR_ERR_INITFAILED; + old_msg->tp_state = errno; + } + return old_msg; } eps->nng_fd = rmr_get_rcvfd( ctx ); @@ -438,7 +469,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) { fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); free( eps ); - return NULL; + ctx->eps = NULL; + if( old_msg != NULL ) { + old_msg->state = RMR_ERR_INITFAILED; + old_msg->tp_state = errno; + } + return old_msg; } ctx->eps = eps; @@ -457,6 +493,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to ); // block until something or timedout if( nready <= 0 ) { // we only wait on ours, so we assume ready means it's ours msg->state = RMR_ERR_TIMEOUT; + msg->tp_state = errno; } else { return rcv_msg( ctx, msg ); // receive it and return it } @@ -480,10 +517,11 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int exp_len = 0; // length of expected ID if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; if( msg != NULL ) { msg->state = RMR_ERR_BADARG; + msg->tp_state = errno; } - errno = EINVAL; return msg; } @@ -524,17 +562,15 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, return NULL; } -// CAUTION: these are not supported as they must be set differently (between create and open) in NNG. -// until those details are worked out, these generate a warning. /* - Set send timeout. The value time is assumed to be microseconds. The timeout is the - rough maximum amount of time that RMr will block on a send attempt when the underlying + Set send timeout. The value time is assumed to be milliseconds. The timeout is the + _rough_ maximum amount of time that RMr will block on a send attempt when the underlying mechnism indicates eagain or etimeedout. All other error conditions are reported without this delay. Setting a timeout of 0 causes no retries to be attempted in - RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning, - but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) - after every 10K send attempts until the time value is reached. Retries are abandoned - if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT. + RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning, + but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) + after every 1K send attempts until the "time" value is reached. Retries are abandoned + if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT. The default, if this function is not used, is 1; meaning that RMr will retry, but will not enter a sleep. In all cases the caller should check the status in the message returned @@ -559,6 +595,8 @@ extern int rmr_set_stimeout( void* vctx, int time ) { /* Set receive timeout -- not supported in nng implementation + + CAUTION: this is not supported as they must be set differently (between create and open) in NNG. */ extern int rmr_set_rtimeout( void* vctx, int time ) { fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" ); @@ -583,11 +621,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { char* proto_port; char wbuf[1024]; // work buffer char* tok; // pointer at token in a buffer + char* tok2; int state; if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", - RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -639,13 +678,29 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { port = proto_port; // assume something like "1234" was passed } - if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { - fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); - return NULL; - } - if( (tok = strchr( wbuf, '.' )) != NULL ) { - *tok = 0; // we don't keep domain portion + if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system + tok = strdup( tok ); // something we can destroy + if( *tok == '[' ) { // we allow an ipv6 address here + tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given + } else { + tok2 = strchr( tok, ':' ); // find :port if there so we can chop + } + if( tok2 && *tok2 ) { // if it's not the end of string marker + *tok2 = 0; // make it so + } + + snprintf( wbuf, RMR_MAX_SRC, "%s", tok ); + free( tok ); + } else { + if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { + fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); + return NULL; + } + if( (tok = strchr( wbuf, '.' )) != NULL ) { + *tok = 0; // we don't keep domain portion + } } + ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); @@ -670,6 +725,11 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip ); + if( (tok = getenv( ENV_WARNINGS )) != NULL ) { + if( *tok == '1' ) { + ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance) + } + } if( (interface = getenv( ENV_BIND_IF )) == NULL ) { @@ -827,6 +887,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { errno = EINVAL; if( mbuf ) { mbuf->state = RMR_ERR_BADARG; + mbuf->tp_state = errno; } return mbuf; } @@ -835,6 +896,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { errno = EINVAL; if( mbuf != NULL ) { mbuf->state = RMR_ERR_NOTSUPP; + mbuf->tp_state = errno; } return mbuf; } @@ -846,12 +908,12 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { } chute = &ctx->chutes[0]; // chute 0 used only for its semaphore - - if( max_wait > 0 ) { + + if( max_wait >= 0 ) { clock_gettime( CLOCK_REALTIME, &ts ); if( max_wait > 999 ) { - seconds = (max_wait - 999)/1000; + seconds = max_wait / 1000; max_wait -= seconds * 1000; ts.tv_sec += seconds; } @@ -867,36 +929,36 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { seconds = 1; // use as flag later to invoked timed wait } - errno = 0; - while( chute->mbuf == NULL && ! errno ) { + errno = EINTR; + state = -1; + while( state < 0 && errno == EINTR ) { if( seconds ) { state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout } else { state = sem_wait( &chute->barrier ); } - - if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit - errno = 0; - } } if( state < 0 ) { mbuf = ombuf; // return caller's buffer if they passed one in } else { + errno = 0; // interrupted call state could be left; clear if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" ); if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued - if( mbuf ) { - mbuf->state = RMR_OK; - - if( ombuf ) { - rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring - } - } else { - mbuf = ombuf; // no buffer, return user's if there + mbuf->state = RMR_OK; + + if( ombuf ) { + rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring } + } else { + errno = ETIMEDOUT; + mbuf = ombuf; // no buffer, return user's if there } } + if( mbuf ) { + mbuf->tp_state = errno; + } return mbuf; } @@ -928,9 +990,10 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m long nano_sec; // max wait xlated to nano seconds int state; + errno = EINVAL; if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) { - errno = EINVAL; if( mbuf ) { + mbuf->tp_state = errno; mbuf->state = RMR_ERR_BADARG; } return mbuf; @@ -938,11 +1001,13 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m if( ! (ctx->flags & CFL_MTC_ENABLED) ) { mbuf->state = RMR_ERR_NOTSUPP; + mbuf->tp_state = errno; return mbuf; } if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them mbuf->state = RMR_ERR_BADARG; + mbuf->tp_state = errno; return mbuf; } @@ -961,11 +1026,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend - if( max_wait > 0 ) { + if( max_wait >= 0 ) { clock_gettime( CLOCK_REALTIME, &ts ); if( max_wait > 999 ) { - seconds = (max_wait - 999)/1000; + seconds = max_wait / 1000; max_wait -= seconds * 1000; ts.tv_sec += seconds; } @@ -984,10 +1049,12 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! if( mbuf ) { if( mbuf->state != RMR_OK ) { + mbuf->tp_state = errno; return mbuf; // timeout or unable to connect or no endpoint are most likely issues } } + state = 0; errno = 0; while( chute->mbuf == NULL && ! errno ) { if( seconds ) {