Add transport provider status to message buffer
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
index 77f539e..3529c46 100644 (file)
@@ -248,12 +248,13 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        uta_ctx_t*      ctx;
        int                     state;
        char*           hold_src;                       // we need the original source if send fails
-       int                     sock_ok;                        // true if we found a valid endpoint socket
+       int                     sock_ok = 0;            // true if we found a valid endpoint socket
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
+                       msg->tp_state = errno;
                }
                return msg;
        }
@@ -262,22 +263,28 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        if( msg->header == NULL ) {
                fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
                msg->state = RMR_ERR_NOHDR;
+               msg->tp_state = errno;
                return msg;
        }
 
        ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
-       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // socket of specific endpoint
+       if( HDR_VERSION( msg->header ) > 2 ) {                                                  // new version uses sender's ip address for rts
+               sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );                      // default to IP based rts
+       }
        if( ! sock_ok ) {
-               msg->state = RMR_ERR_NOENDPT;
-               return msg;                                                     // preallocated msg can be reused since not given back to nn
+               sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                // IP  not in rt, try name
+               if( ! sock_ok ) {
+                       msg->state = RMR_ERR_NOENDPT;
+                       return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
+               }
        }
 
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );        // must overlay the source to be ours
+       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );    // always return original source so rts can be called again
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
@@ -340,6 +347,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                if( msg->state != RMR_ERR_RETRY ) {
                        msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
                }
+               msg->tp_state = errno;
                return msg;
        }
 
@@ -360,10 +368,11 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
        rmr_mbuf_t*     qm;                             // message that was queued on the ring
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
+                       old_msg->tp_state = errno;
                }
-               errno = EINVAL;
                return old_msg;
        }
        errno = 0;
@@ -397,10 +406,11 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        rmr_mbuf_t* msg;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
+                       old_msg->tp_state = errno;
                }
-               errno = EINVAL;
                return old_msg;
        }
 
@@ -452,6 +462,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
        if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
                msg->state = RMR_ERR_TIMEOUT;
+               msg->tp_state = errno;
        } else {
                return rcv_msg( ctx, msg );                                                             // receive it and return it
        }
@@ -475,10 +486,11 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        int     exp_len = 0;                    // length of expected ID
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
+                       msg->tp_state = errno;
                }
-               errno = EINVAL;
                return msg;
        }
 
@@ -519,17 +531,15 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        return NULL;
 }
 
-//  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
-//                             until those details are worked out, these generate a warning.
 /*
-       Set send timeout. The value time is assumed to be microseconds.  The timeout is the
-       rough maximum amount of time that RMr will block on a send attempt when the underlying
+       Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
+       _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
-       RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
-       but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
-       after every 10K send attempts until the time value is reached. Retries are abandoned
-       if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
+       RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
+       but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
+       after every 1K send attempts until the "time" value is reached. Retries are abandoned
+       if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
 
        The default, if this function is not used, is 1; meaning that RMr will retry, but will
        not enter a sleep.  In all cases the caller should check the status in the message returned
@@ -554,6 +564,8 @@ extern int rmr_set_stimeout( void* vctx, int time ) {
 
 /*
        Set receive timeout -- not supported in nng implementation
+
+       CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
        fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
@@ -641,13 +653,29 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        if( (tok = strchr( wbuf, '.' )) != NULL ) {
                *tok = 0;                                                                       // we don't keep domain portion
        }
-       ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
-       if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
-               fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
+       ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
+       if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
+               fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
                return NULL;
        }
 
-       ctx->ip_list = mk_ip_list( port );              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+       if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
+               if( atoi( tok ) > 0 ) {
+                       flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
+               }
+       }
+
+       ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+       if( flags & RMRFL_NAME_ONLY ) {
+               ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
+       } else {
+               ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
+               if( ctx->my_ip == NULL ) {
+                       fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
+                       strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
+               }
+       }
+       if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
 
 
 
@@ -658,7 +686,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        //       rather than using this generic listen() call.
        snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
        if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
-               fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
+               fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
                nng_close( ctx->nn_sock );
                free_ctx( ctx );
                return NULL;
@@ -666,14 +694,14 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
                if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
-                       fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+                       fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
                }
        }
 
        if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
                ctx->flags |= CFL_MTC_ENABLED;
                if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
-                       fprintf( stderr, "[WARN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+                       fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
                }
                
        }
@@ -806,6 +834,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                errno = EINVAL;
                if( mbuf ) {
                        mbuf->state = RMR_ERR_BADARG;
+                       mbuf->tp_state = errno;
                }
                return mbuf;
        }
@@ -814,6 +843,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                errno = EINVAL;
                if( mbuf != NULL ) {
                        mbuf->state = RMR_ERR_NOTSUPP;
+                       mbuf->tp_state = errno;
                }
                return mbuf;
        }
@@ -876,6 +906,9 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                }
        }
 
+       if( mbuf ) {
+               mbuf->tp_state = errno;
+       }
        return mbuf;
 }
 
@@ -907,9 +940,10 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        long    nano_sec;                       // max wait xlated to nano seconds
        int             state;
        
+       errno = EINVAL;
        if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
-               errno = EINVAL;
                if( mbuf ) {
+                       mbuf->tp_state = errno;
                        mbuf->state = RMR_ERR_BADARG;
                }
                return mbuf;
@@ -917,11 +951,13 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
 
        if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
                mbuf->state = RMR_ERR_NOTSUPP;
+               mbuf->tp_state = errno;
                return mbuf;
        }
 
        if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
                mbuf->state = RMR_ERR_BADARG;
+               mbuf->tp_state = errno;
                return mbuf;
        }
 
@@ -963,6 +999,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
        if( mbuf ) {
                if( mbuf->state != RMR_OK ) {
+                       mbuf->tp_state = errno;
                        return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
                }
        }