RIC:1060: Change in PTL
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index 9018fb1..66724e6 100644 (file)
@@ -1,8 +1,8 @@
 // vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019-2020 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2019-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 #include "si95/socket_if.h"
 #include "si95/siproto.h"
 
+
 #define SI95_BUILD     1                       // we drop some common functions for si
 
 #include "rmr.h"                               // things the users see
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
 #include "rmr_si_private.h"            // things that we need too
+
 #include "rmr_symtab.h"
 #include "rmr_logging.h"
 
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
+#include "alarm.c"
 #include "rtc_static.c"                                // route table collector (thread code)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
 #include "mt_call_static.c"
 #include "mt_call_si_static.c"
+#include "rmr_debug_si.c"           // debuging functions
 
 
 //------------------------------------------------------------------------------
 
+/*
+       If we have an EP, up the counters based on state.
+       This isn't needed, but it makes driving the code under unit test easier so we
+       induldge in the bean counter's desire for coverage numbers.
+*/
+static inline void incr_ep_counts( int state, endpoint_t* ep ) {
+       if( ep != NULL ) {
+               switch( state ) {
+                       case RMR_OK:
+                               ep->scounts[EPSC_GOOD]++;
+                               break;
+
+                       case RMR_ERR_RETRY:
+                               ep->scounts[EPSC_TRANS]++;
+                               break;
+
+                       default:
+                               ep->scounts[EPSC_FAIL]++;
+                               break;
+               }
+       }
+}
 
 /*
        Clean up a context.
@@ -338,29 +364,14 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                }
        }
 
-
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
        zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
-               if( ep != NULL ) {
-                       switch( msg->state ) {
-                               case RMR_OK:
-                                       ep->scounts[EPSC_GOOD]++;
-                                       break;
-
-                               case RMR_ERR_RETRY:
-                                       ep->scounts[EPSC_TRANS]++;
-                                       break;
-
-                               default:
-                                       // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
-                                       ep->scounts[EPSC_FAIL]++;
-                                       break;
-                       }
-               }
+               incr_ep_counts(  msg->state, ep );                              // update counts
+
                zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
                zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
@@ -567,6 +578,35 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
        return 0;
 }
 
+/*
+       Common cleanup on initialisation error. These are hard to force, and this helps to ensure
+       all code is tested by providing a callable rather than a block of "goto" code.
+
+       There is a return value so that where we need this we get dinked only for one
+       uncovered line rather than two:
+               init_err(...);
+               return NULL;
+
+       That's a hack, and is yet another example of the testing tail wagging the dog.
+*/
+static inline void* init_err( char* msg, void* ctx, void* port, int errval ) {
+       if( errval != 0 ) {                     // if not letting it be what a sysllib set it to...
+               errno = errval;
+       }
+
+       if( port ) {                            // free things if allocated
+               free( port );
+       }
+       if( ctx ) {
+               free_ctx( ctx );
+       }
+
+       if( msg ) {                                                                     // crit message if supplied
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: %s: %s", msg, strerror( errno ) );
+       }
+
+       return NULL;
+}
 
 /*
        This is the actual init workhorse. The user visible function meerly ensures that the
@@ -583,6 +623,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) {
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
        char    bind_info[256];                         // bind info
+       char*   my_ip = NULL;
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
        char*   port;                                           // pointer into the proto_port buffer at the port value
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
@@ -599,7 +640,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) {
 
        if( ! announced ) {
                rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
                        uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
 
@@ -616,17 +657,16 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) {
        }
 
        if ( proto_port == NULL ){
-               errno = ENOMEM;
-               return NULL;
+               return init_err( "unable to alloc proto port string", NULL, NULL, ENOMEM );
        }
 
        if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
-               errno = ENOMEM;
-               goto err;
+               return init_err( "unable to allocate context", ctx, proto_port, ENOMEM );
        }
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
+       ctx->snarf_rt_fd = -1;
        ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
        ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
        ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
@@ -661,9 +701,9 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) {
 
        ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
        if( ctx->si_ctx == NULL ) {
-               rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
-               goto err;
+               return init_err( "unable to initialise SI95 interface\n", ctx, proto_port, 0 );
        }
+       SIset_tflags(ctx->si_ctx,SI_TF_QUICK);
 
        if( (port = strchr( proto_port, ':' )) != NULL ) {
                if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
@@ -696,8 +736,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) {
                free( tok );
        } else {
                if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
-                       rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
-                       goto err;
+                       return init_err( "cannot determine localhost name\n", ctx, proto_port, 0 );
                }
                if( (tok = strchr( wbuf, '.' )) != NULL ) {
                        *tok = 0;                                                                       // we don't keep domain portion
@@ -706,9 +745,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) {
 
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
-               rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
-               errno = EINVAL;
-               goto err;
+               return init_err( "hostname + port is too long", ctx, proto_port, EINVAL );
        }
 
        if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
@@ -717,33 +754,50 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) {
                }
        }
 
-       ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+       ctx->ip_list = mk_ip_list( port );                      // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+       my_ip = get_default_ip( ctx->ip_list );         // and (guess) at what should be the default to put into messages as src
+       if( my_ip == NULL ) {
+               rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
+               my_ip = strdup( ctx->my_name );                 // if we cannot suss it out, use the name rather than a nil pointer
+       }
+
        if( flags & RMRFL_NAME_ONLY ) {
                ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
        } else {
-               ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
-               if( ctx->my_ip == NULL ) {
-                       rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
-                       ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
-               }
+               ctx->my_ip = strdup( my_ip );
        }
-       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip );
 
        if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
                if( *tok == '1' ) {
-                       ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
+                       ctx->flags |= CFL_WARN;                                 // turn on some warnings (not all, just ones that shouldn't impact performance)
                }
        }
 
+       if( (interface = getenv( ENV_BIND_IF )) == NULL ) {             // if specific interface not defined, listen on all (IPv4, IPv6, or interface name)
+               /*
+                       compares the first ip sussed out by mk_ip_list (returned by get_default_ip)
+                       NOTE: this might be not work very predictable in dual-stack where an interface can have IPv4 and IPv6 addresses assigned,
+                       meaning that it can select either IPv4 or IPv6 on applications restarts (depends on the order of IP addresses assigned on the interface)
+               */
+               if( my_ip[0] == '[' ) {                 // IPv6
+                       interface = "[::]";
+               } else {                                                // IPv4
+                       interface = "0.0.0.0";
+                       if( flags & RMRFL_NAME_ONLY ) { // if hostname is given instead of IP in RMR source address
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: hostname:ip is provided as source information for rts() calls, falling back to any IPv4\n" );
+                       }
+               }
+       }
 
-       if( (interface = getenv( ENV_BIND_IF )) == NULL ) {             // if specific interface not defined, listen on all
-               interface = "0.0.0.0";
+       if( my_ip != NULL ) {
+               free( my_ip );
        }
 
        snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
                rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
-               goto err;
+               return init_err( NULL, ctx, proto_port, 0 );
        }
 
                                                                                                // finish all flag setting before threads to keep helgrind quiet
@@ -758,9 +812,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) {
 
        ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
        if( ctx->ephash == NULL ) {
-               rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
-               errno = ENOMEM;
-               goto err;
+               return init_err( "unable to allocate ep hash\n", ctx, proto_port, ENOMEM );
        }
 
        ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
@@ -788,11 +840,6 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) {
 
        free( proto_port );
        return (void *) ctx;
-
-err:
-       free( proto_port );
-       free_ctx( ctx );
-       return NULL;
 }
 
 /*
@@ -882,6 +929,10 @@ extern void rmr_close( void* vctx ) {
                return;
        }
 
+       if( ctx->seed_rt_fname != NULL ) {
+               free( ctx->seed_rt_fname );
+       }
+
        ctx->shutdown = 1;
 
        SItp_stats( ctx->si_ctx );                      // dump some interesting stats