X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frmr_si.c;h=80d7408c05d0d259055de5f8512358cf825a8f3f;hb=f4a2227da63b7dd64310d24b285cffb44d159746;hp=d1b9f26ca660bf7be312d96c1e3c7807197c2212;hpb=e3a021c1e7bc4bdb877cd12601a55c7e3f74437e;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index d1b9f26..80d7408 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -82,8 +82,32 @@ Clean up a context. */ static void free_ctx( uta_ctx_t* ctx ) { - if( ctx && ctx->rtg_addr ) { - free( ctx->rtg_addr ); + if( ctx ) { + if( ctx->rtg_addr ){ + free( ctx->rtg_addr ); + } + uta_ring_free( ctx->mring ); + uta_ring_free( ctx->zcb_mring ); + if( ctx->chutes ){ + free( ctx->chutes ); + } + if( ctx->fd2ep ){ + rmr_sym_free( ctx->fd2ep ); + } + if( ctx->my_name ){ + free( ctx->my_name ); + } + if( ctx->my_ip ){ + free( ctx->my_ip ); + } + if( ctx->rtable ){ + rmr_sym_free( ctx->rtable->hash ); + free( ctx->rtable ); + } + if ( ctx->ephash ){ + free( ctx->ephash ); + } + free( ctx ); } } @@ -302,7 +326,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { msg->state = RMR_OK; // ensure it is clear before send hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { if( ep != NULL ) { @@ -321,8 +345,8 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { break; } } - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again - strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always replace original source & ip so rts can be called again + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source } @@ -419,6 +443,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { } /* + DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will + too. This function likely will not behave as expected in SI, and we are pretty sure it + isn't being used as there was an abort triggering reference to rmr_rcv() until now. + This blocks until the message with the 'expect' ID is received. Messages which are received before the expected message are queued onto the message ring. The function will return a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the @@ -455,22 +483,25 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect ); while( queued < allow2queue ) { - msg = rcv_msg( ctx, msg ); // hard wait for next - if( msg->state == RMR_OK ) { - if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued ); - return msg; - } - - if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full - if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" ); - errno = ENOBUFS; - return NULL; + msg = rmr_rcv_msg( ctx, msg ); // hard wait for next + if( msg != NULL ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n", queued, allow2queue, msg->state ); + if( msg->state == RMR_OK ) { + if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued ); + return msg; + } + + if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" ); + errno = ENOBUFS; + return NULL; + } + + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype ); + queued++; + msg = NULL; } - - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype ); - queued++; - msg = NULL; } } @@ -537,7 +568,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { uta_ctx_t* ctx = NULL; char bind_info[256]; // bind info char* proto = "tcp"; // pointer into the proto/port string user supplied - char* port; + char* port; // pointer into the proto_port buffer at the port value char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined) char* proto_port; char wbuf[1024]; // work buffer @@ -549,14 +580,16 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { int old_vlevel; old_vlevel = rmr_vlog_init(); // initialise and get the current level - rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc if( ! announced ) { + rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; + + rmr_set_vlevel( old_vlevel ); // return logging to the desired state + uta_dump_env(); // spit out environment settings meaningful to us if in info mode } - rmr_set_vlevel( old_vlevel ); // return logging to the desired state errno = 0; if( uproto_port == NULL ) { @@ -565,10 +598,15 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { proto_port = strdup( uproto_port ); // so we can modify it } - if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) { + if ( proto_port == NULL ){ errno = ENOMEM; return NULL; } + + if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) { + errno = ENOMEM; + goto err; + } memset( ctx, 0, sizeof( uta_ctx_t ) ); if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" ); @@ -590,6 +628,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock + uta_ring_config( ctx->zcb_mring, RING_FRLOCK ); // concurrent message allocatieon calls from userland require read lock, but can be fast } else { rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" ); } @@ -602,14 +641,10 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { ctx->max_plen = def_msg_size; } - // we're using a listener to get rtg updates, so we do NOT need this. - //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors - ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff if( ctx->si_ctx == NULL ) { rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" ); - free_ctx( ctx ); - return NULL; + goto err; } if( (port = strchr( proto_port, ':' )) != NULL ) { @@ -643,7 +678,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { } else { if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); - return NULL; + goto err; } if( (tok = strchr( wbuf, '.' )) != NULL ) { *tok = 0; // we don't keep domain portion @@ -653,7 +688,8 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); - return NULL; + errno = EINVAL; + goto err; } if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) { @@ -688,16 +724,32 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) { rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) ); - free_ctx( ctx ); - return NULL; + goto err; } // finish all flag setting before threads to keep helgrind quiet ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way - if( flags & RMRFL_NOTHREAD ) { // thread set to off; no route table collector started (could be called by the rtc thread itself) - ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used + + // ---------------- setup for route table collector before invoking ---------------------------------- + ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) ); // single mutex required to gate access to moving rtables + if( ctx->rtgate != NULL ) { + pthread_mutex_init( ctx->rtgate, NULL ); + } + + ctx->ephash = rmr_sym_alloc( 129 ); // host:port to ep symtab exists outside of any route table + if( ctx->ephash == NULL ) { + rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" ); + errno = ENOMEM; + goto err; + } + + ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used + if( flags & RMRFL_NOTHREAD ) { // no thread prevents the collector start for very special cases + ctx->rtable_ready = 1; // route based sends will always fail, but rmr is ready for the non thread case } else { + ctx->rtable_ready = 0; // no sends until a real route table is loaded in the rtc thread + if( static_rtc ) { rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port ); if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader @@ -717,6 +769,11 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { free( proto_port ); return (void *) ctx; + +err: + free( proto_port ); + free_ctx( ctx ); + return NULL; } /* @@ -770,11 +827,7 @@ extern int rmr_ready( void* vctx ) { return FALSE; } - if( ctx->rtable != NULL ) { - return TRUE; - } - - return FALSE; + return ctx->rtable_ready; } /* @@ -791,13 +844,6 @@ extern int rmr_get_rcvfd( void* vctx ) { return -1; } -/* - if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) { - rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) ); - return -1; - } -*/ - return uta_ring_getpfd( ctx->mring ); } @@ -860,6 +906,8 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW! if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued + clock_gettime( CLOCK_REALTIME, &ts ); // pass current time as expriry time + sem_timedwait( &chute->barrier, &ts ); // must pop the count (ring is locking so if we got a message we can pop) if( ombuf ) { rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now } @@ -1079,8 +1127,10 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m // must vet call_id here, all others vetted by workhorse mt_call() function if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them - mbuf->state = RMR_ERR_BADARG; - mbuf->tp_state = EINVAL; + if( mbuf != NULL ) { + mbuf->state = RMR_ERR_BADARG; + mbuf->tp_state = EINVAL; + } return mbuf; }