X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnanomsg%2Fsrc%2Frmr.c;h=256ea3bdd62c326661a6d170fb9da7b571e6e9ce;hb=5200efe1e6dd13b1e1241ce623c4978151be34e8;hp=93e4f7a7f5b35a1ea354dfec2b65a7676689bad3;hpb=412d53dfa2f9b5b56a448797d0dfec3b0f11f666;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nanomsg/src/rmr.c b/src/rmr/nanomsg/src/rmr.c index 93e4f7a..256ea3b 100644 --- a/src/rmr/nanomsg/src/rmr.c +++ b/src/rmr/nanomsg/src/rmr.c @@ -340,11 +340,12 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { The caller must check for this and handle. */ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { - int nn_sock; // endpoint socket for send + int nn_sock = -1; // endpoint socket for send uta_ctx_t* ctx; int state; uta_mhdr_t* hdr; char* hold_src; // we need the original source if send fails + char* hold_ip; if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -361,21 +362,31 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { return msg; } - nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src ); // socket of specific endpoint - if( nn_sock < 0 ) { - msg->state = RMR_ERR_NOENDPT; - return msg; // preallocated msg can be reused since not given back to nn + if( HDR_VERSION( msg->header ) > 2 ) { // new version uses sender's ip address for rts + nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip ); // socket of specific endpoint + } + if( nn_sock < 0 ) { + nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src ); // socket of specific endpoint + if( nn_sock < 0 ) { + msg->state = RMR_ERR_NOENDPT; + return msg; // preallocated msg can be reused since not given back to nn + } } hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours + hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); + strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours + strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); + msg = send_msg( ctx, msg, nn_sock ); if( msg ) { - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again + strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again + strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source } free( hold_src ); + free( hold_ip ); return msg; } @@ -587,7 +598,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller ctx->max_plen = max_msg_size; } else { - fprintf( stderr, "[WARN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen ); + fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen ); } } @@ -597,7 +608,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); // our 'listen' socket should allow multiple senders to connect if( ctx->nn_sock < 0 ) { - fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno ); + fprintf( stderr, "[CRI] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno ); free_ctx( ctx ); return NULL; } @@ -614,15 +625,15 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } if( (gethostname( wbuf, sizeof( wbuf ) )) < 0 ) { - fprintf( stderr, "[CRIT] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); + fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); return NULL; } if( (tok = strchr( wbuf, '.' )) != NULL ) { *tok = 0; // we don't keep domain portion } - ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID ); - if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) { // our registered name is host:port - fprintf( stderr, "[CRIT] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port ); + ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); + if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port + fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); return NULL; } @@ -631,15 +642,35 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port ); if( nn_bind( ctx->nn_sock, bind_info ) < 0) { // bind and automatically accept client sessions - fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) ); + fprintf( stderr, "[CRI] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) ); nn_close( ctx->nn_sock ); free_ctx( ctx ); return NULL; } + if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) { + if( atoi( tok ) > 0 ) { + flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages + } + } + + if( flags & RMRFL_NAME_ONLY ) { + ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name + if( DEBUG ) fprintf( stderr, "[DBUG] name only mode is set; not sending IP address as source\n" ); + } else { + ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons + ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src + if( ctx->my_ip == NULL ) { + strcpy( ctx->my_ip, ctx->my_name ); // revert to name if we cant suss out ip address + fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name as source\n" ); + } else { + if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip ); + } + } + if( ! (flags & FL_NOTHREAD) ) { // skip if internal context that does not need rout table thread if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread - fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) ); + fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) ); } }