enhance(API): Add source IP support to msg header
[ric-plt/lib/rmr.git] / src / rmr / nanomsg / src / rmr.c
index 93e4f7a..128039d 100644 (file)
@@ -340,11 +340,12 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
                The caller must check for this and handle.
 */
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
-       int nn_sock;                            // endpoint socket for send
+       int nn_sock = -1;                       // endpoint socket for send
        uta_ctx_t*      ctx;
        int state;
        uta_mhdr_t*     hdr;
        char*   hold_src;                       // we need the original source if send fails
+       char*   hold_ip;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -361,21 +362,31 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
-       nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src );                  // socket of specific endpoint
-       if( nn_sock < 0 ) {
-               msg->state = RMR_ERR_NOENDPT;
-               return msg;                                                     // preallocated msg can be reused since not given back to nn
+       if( HDR_VERSION( msg->header ) > 2 ) {                                                  // new version uses sender's ip address for rts
+               nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip );                // socket of specific endpoint
+       }
+       if(  nn_sock < 0 ) {
+               nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src );                  // socket of specific endpoint
+               if(  nn_sock < 0 ) {
+                       msg->state = RMR_ERR_NOENDPT;
+                       return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
+               }
        }
 
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                // must overlay the source to be ours
+       hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );
+       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                // must overlay the source to be ours
+       strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+
        msg = send_msg( ctx, msg, nn_sock );
        if( msg ) {
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );            // always return original source so rts can be called again
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );            // always return original source so rts can be called again
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
                msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
        }
 
        free( hold_src );
+       free( hold_ip );
        return msg;
 }
 
@@ -620,9 +631,9 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
        if( (tok = strchr( wbuf, '.' )) != NULL ) {
                *tok = 0;                                                                       // we don't keep domain portion
        }
-       ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
-       if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
-               fprintf( stderr, "[CRIT] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
+       ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
+       if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
+               fprintf( stderr, "[CRIT] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
                return NULL;
        }
 
@@ -637,6 +648,26 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
                return NULL;
        }
 
+       if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
+               if( atoi( tok ) > 0 ) {
+                       flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
+               }
+       }
+
+       if( flags & RMRFL_NAME_ONLY ) {
+               ctx->my_ip = strdup( ctx->my_name );                            // user application or env var has specified that IP address is NOT sent out, use name
+               if( DEBUG ) fprintf( stderr, "[DBUG] name only mode is set; not sending IP address as source\n" );
+       } else {
+               ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+               ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
+               if( ctx->my_ip == NULL ) {
+                       strcpy( ctx->my_ip, ctx->my_name );                     // revert to name if we cant suss out ip address
+                       fprintf( stderr, "[WARN] rmr_init: default ip address could not be sussed out, using name as source\n" );
+               } else {
+                       if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+               }
+       }
+
        if( ! (flags & FL_NOTHREAD) ) {                 // skip if internal context that does not need rout table thread
                if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {                // kick the rt collector thread
                        fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );