The change to fix a bug2
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
index 1b6e1f0..bc3fe10 100644 (file)
@@ -248,6 +248,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        uta_ctx_t*      ctx;
        int                     state;
        char*           hold_src;                       // we need the original source if send fails
+       char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
@@ -268,11 +269,12 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        }
 
        ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
-       if( HDR_VERSION( msg->header ) > 2 ) {                                                  // new version uses sender's ip address for rts
-               sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );                      // default to IP based rts
-       }
+
+       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // src is always used first for rts
        if( ! sock_ok ) {
-               sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                // IP  not in rt, try name
+               if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
+                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );
+               }
                if( ! sock_ok ) {
                        msg->state = RMR_ERR_NOENDPT;
                        return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
@@ -281,14 +283,17 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
+       hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
        strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
        free( hold_src );
+       free( hold_ip );
        return msg;
 }
 
@@ -590,6 +595,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        char*   proto_port;
        char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
+       char*   tok2;
        int             state;
 
        if( ! announced ) {
@@ -646,13 +652,29 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                port = proto_port;                      // assume something like "1234" was passed
        }
 
-       if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
-               fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
-               return NULL;
-       }
-       if( (tok = strchr( wbuf, '.' )) != NULL ) {
-               *tok = 0;                                                                       // we don't keep domain portion
+       if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
+               tok = strdup( tok );                                    // something we can destroy
+               if( *tok == '[' ) {                                             // we allow an ipv6 address here
+                       tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
+               } else {
+                       tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
+               }
+               if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
+                       *tok2 = 0;                                                      // make it so
+               }
+
+               snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
+               free( tok );
+       } else {
+               if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
+                       fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+                       return NULL;
+               }
+               if( (tok = strchr( wbuf, '.' )) != NULL ) {
+                       *tok = 0;                                                                       // we don't keep domain portion
+               }
        }
+
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
                fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
@@ -860,7 +882,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        }
 
        chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
-       
+
        if( max_wait > 0 ) {
                clock_gettime( CLOCK_REALTIME, &ts );   
 
@@ -881,34 +903,30 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                seconds = 1;                                                                                                    // use as flag later to invoked timed wait
        }
 
-       errno = 0;
-       state = 0;
-       while( chute->mbuf == NULL && ! errno ) {
+       errno = EINTR;
+       state = -1;
+       while( state < 0 && errno == EINTR ) {
                if( seconds ) {
                        state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
                } else {
                        state = sem_wait( &chute->barrier );
                }
-
-               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
-                       errno = 0;
-               }
        }
 
        if( state < 0 ) {
                mbuf = ombuf;                           // return caller's buffer if they passed one in
        } else {
+               errno = 0;                                              // interrupted call state could be left; clear
                if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
-                       if( mbuf ) {
-                               mbuf->state = RMR_OK;
-
-                               if( ombuf ) {
-                                       rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
-                               }
-                       } else {
-                               mbuf = ombuf;                           // no buffer, return user's if there
+                       mbuf->state = RMR_OK;
+
+                       if( ombuf ) {
+                               rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
                        }
+               } else {
+                       errno = ETIMEDOUT;
+                       mbuf = ombuf;                           // no buffer, return user's if there
                }
        }