X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnng%2Fsrc%2Frmr_nng.c;h=bc3fe107cba3a476cbc35e70a04ea10ffe6e2f3b;hb=dfe7b622b128e7bfb4a5e1f7e0afdb84e6001d14;hp=1b6e1f0458891fa8204f0addbbc78eae7d8a5ea7;hpb=2596b2305f214efbd7aba832fa009217ab854770;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/rmr_nng.c b/src/rmr/nng/src/rmr_nng.c index 1b6e1f0..bc3fe10 100644 --- a/src/rmr/nng/src/rmr_nng.c +++ b/src/rmr/nng/src/rmr_nng.c @@ -248,6 +248,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { uta_ctx_t* ctx; int state; char* hold_src; // we need the original source if send fails + char* hold_ip; // also must hold original ip int sock_ok = 0; // true if we found a valid endpoint socket if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast @@ -268,11 +269,12 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { } ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off - if( HDR_VERSION( msg->header ) > 2 ) { // new version uses sender's ip address for rts - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock ); // default to IP based rts - } + + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // src is always used first for rts if( ! sock_ok ) { - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // IP not in rt, try name + if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock ); + } if( ! sock_ok ) { msg->state = RMR_ERR_NOENDPT; return msg; // preallocated msg can be reused since not given back to nn @@ -281,14 +283,17 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { msg->state = RMR_OK; // ensure it is clear before send hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to + hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again + strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source } free( hold_src ); + free( hold_ip ); return msg; } @@ -590,6 +595,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { char* proto_port; char wbuf[1024]; // work buffer char* tok; // pointer at token in a buffer + char* tok2; int state; if( ! announced ) { @@ -646,13 +652,29 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { port = proto_port; // assume something like "1234" was passed } - if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { - fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); - return NULL; - } - if( (tok = strchr( wbuf, '.' )) != NULL ) { - *tok = 0; // we don't keep domain portion + if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system + tok = strdup( tok ); // something we can destroy + if( *tok == '[' ) { // we allow an ipv6 address here + tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given + } else { + tok2 = strchr( tok, ':' ); // find :port if there so we can chop + } + if( tok2 && *tok2 ) { // if it's not the end of string marker + *tok2 = 0; // make it so + } + + snprintf( wbuf, RMR_MAX_SRC, "%s", tok ); + free( tok ); + } else { + if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { + fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); + return NULL; + } + if( (tok = strchr( wbuf, '.' )) != NULL ) { + *tok = 0; // we don't keep domain portion + } } + ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); @@ -860,7 +882,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { } chute = &ctx->chutes[0]; // chute 0 used only for its semaphore - + if( max_wait > 0 ) { clock_gettime( CLOCK_REALTIME, &ts ); @@ -881,34 +903,30 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { seconds = 1; // use as flag later to invoked timed wait } - errno = 0; - state = 0; - while( chute->mbuf == NULL && ! errno ) { + errno = EINTR; + state = -1; + while( state < 0 && errno == EINTR ) { if( seconds ) { state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout } else { state = sem_wait( &chute->barrier ); } - - if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit - errno = 0; - } } if( state < 0 ) { mbuf = ombuf; // return caller's buffer if they passed one in } else { + errno = 0; // interrupted call state could be left; clear if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" ); if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued - if( mbuf ) { - mbuf->state = RMR_OK; - - if( ombuf ) { - rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring - } - } else { - mbuf = ombuf; // no buffer, return user's if there + mbuf->state = RMR_OK; + + if( ombuf ) { + rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring } + } else { + errno = ETIMEDOUT; + mbuf = ombuf; // no buffer, return user's if there } }