X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnng%2Fsrc%2Frmr_nng.c;h=45b7fc6edcb02e88956347af5874becde3861aa3;hb=refs%2Ftags%2F4.1.3;hp=bc3fe107cba3a476cbc35e70a04ea10ffe6e2f3b;hpb=dfe7b622b128e7bfb4a5e1f7e0afdb84e6001d14;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/rmr_nng.c b/src/rmr/nng/src/rmr_nng.c index bc3fe10..45b7fc6 100644 --- a/src/rmr/nng/src/rmr_nng.c +++ b/src/rmr/nng/src/rmr_nng.c @@ -1,8 +1,8 @@ -// : vi ts=4 sw=4 noet : +// vim: ts=4 sw=4 noet : /* ================================================================================== - Copyright (c) 2019 Nokia - Copyright (c) 2018-2019 AT&T Intellectual Property. + Copyright (c) 2019-2020 Nokia + Copyright (c) 2018-2020 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -64,6 +64,7 @@ #include "rmr_agnostic.h" // agnostic things (must be included before private) #include "rmr_nng_private.h" // things that we need too #include "rmr_symtab.h" +#include "rmr_logging.h" #include "ring_static.c" // message ring support #include "rt_generic_static.c" // route table things not transport specific @@ -250,6 +251,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { char* hold_src; // we need the original source if send fails char* hold_ip; // also must hold original ip int sock_ok = 0; // true if we found a valid endpoint socket + endpoint_t* ep; // end point to track counts if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -262,7 +264,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { errno = 0; // at this point any bad state is in msg returned if( msg->header == NULL ) { - fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" ); + rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" ); msg->state = RMR_ERR_NOHDR; msg->tp_state = errno; return msg; @@ -270,10 +272,10 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // src is always used first for rts + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // src is always used first for rts if( ! sock_ok ) { if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock ); + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep ); } if( ! sock_ok ) { msg->state = RMR_ERR_NOENDPT; @@ -287,6 +289,21 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { + if( ep != NULL ) { + switch( msg->state ) { + case RMR_OK: + ep->scounts[EPSC_GOOD]++; + break; + + case RMR_ERR_RETRY: + ep->scounts[EPSC_TRANS]++; + break; + + default: + ep->scounts[EPSC_FAIL]++; + break; + } + } strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source @@ -343,7 +360,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { memcpy( expected_id, msg->xaction, RMR_MAX_XID ); expected_id[RMR_MAX_XID] = 0; // ensure it's a string - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rmr_call is making call, waiting for (%s)\n", expected_id ); errno = 0; msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send @@ -438,7 +455,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) { fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno ); free( eps ); - return NULL; + ctx->eps = NULL; + if( old_msg != NULL ) { + old_msg->state = RMR_ERR_INITFAILED; + old_msg->tp_state = errno; + } + return old_msg; } eps->nng_fd = rmr_get_rcvfd( ctx ); @@ -448,7 +470,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) { fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); free( eps ); - return NULL; + ctx->eps = NULL; + if( old_msg != NULL ) { + old_msg->state = RMR_ERR_INITFAILED; + old_msg->tp_state = errno; + } + return old_msg; } ctx->eps = eps; @@ -509,29 +536,29 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, if( exp_len > RMR_MAX_XID ) { exp_len = RMR_MAX_XID; } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific waiting for id=%s\n", expect ); while( queued < allow2queue ) { msg = rcv_msg( ctx, msg ); // hard wait for next if( msg->state == RMR_OK ) { if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it - if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued ); return msg; } if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific ring is full\n" ); errno = ENOBUFS; return NULL; } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific queued message type=%d\n", msg->mtype ); queued++; msg = NULL; } } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific timeout waiting for %s\n", expect ); errno = ETIMEDOUT; return NULL; } @@ -573,7 +600,7 @@ extern int rmr_set_stimeout( void* vctx, int time ) { CAUTION: this is not supported as they must be set differently (between create and open) in NNG. */ extern int rmr_set_rtimeout( void* vctx, int time ) { - fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" ); + rmr_vlog( RMR_VL_WARN, "Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" ); return 0; } @@ -597,12 +624,17 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { char* tok; // pointer at token in a buffer char* tok2; int state; + int old_vlevel = 0; + + old_vlevel = rmr_vlog_init(); // initialise and get the current level + rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", - RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + rmr_vlog( RMR_VL_INFO, "ric message routing library on NNG/d mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } + rmr_set_vlevel( old_vlevel ); // return logging to the desired state errno = 0; if( uproto_port == NULL ) { @@ -636,7 +668,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors if( nng_pull0_open( &ctx->nn_sock ) != 0 ) { // and assign the mode - fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno ); + rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno ); free_ctx( ctx ); return NULL; } @@ -667,7 +699,8 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { free( tok ); } else { if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { - fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); + rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); + free( proto_port ); return NULL; } if( (tok = strchr( wbuf, '.' )) != NULL ) { @@ -677,7 +710,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port - fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); + rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); return NULL; } @@ -693,11 +726,11 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } else { ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src if( ctx->my_ip == NULL ) { - fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" ); - strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer + rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" ); + ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer } } - if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip ); if( (tok = getenv( ENV_WARNINGS )) != NULL ) { if( *tok == '1' ) { @@ -713,22 +746,34 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { // rather than using this generic listen() call. snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port ); if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) { - fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) ); + rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) ); nng_close( ctx->nn_sock ); + free( proto_port ); free_ctx( ctx ); return NULL; } - if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc - if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread - fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) ); + ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // allows wormhole and rts calls to work before rt is received + if( flags & FL_NOTHREAD ) { // if no rtc thread, we still need an empty route table for wormholes + ctx->rmr_ready = 1; // for a nothread instance, rmr is ready to go here + } else { + ctx->rmr_ready = o; // ensure not ready until static/dynamic table loaded + + if( (tok = getenv( ENV_RTG_RAW )) != NULL && *tok == '0' ) { // use RMR for Rmgr comm only when specifically off + if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rmr based rt collector thread + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) ); + } + } else { + if( pthread_create( &ctx->rtc_th, NULL, raw_rtc, (void *) ctx ) ) { // kick the raw msg rt collector thread + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) ); + } } } if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) { // mt call support is on, must start the listener thread if not running ctx->flags |= CFL_MTC_ENABLED; if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // kick the receiver - fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); } } @@ -788,11 +833,7 @@ extern int rmr_ready( void* vctx ) { return FALSE; } - if( ctx->rtable != NULL ) { - return TRUE; - } - - return FALSE; + return ctx->rmr_ready; } /* @@ -810,7 +851,7 @@ extern int rmr_get_rcvfd( void* vctx ) { } if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) { - fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) ); + rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) ); return -1; } @@ -883,11 +924,11 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { chute = &ctx->chutes[0]; // chute 0 used only for its semaphore - if( max_wait > 0 ) { + if( max_wait >= 0 ) { clock_gettime( CLOCK_REALTIME, &ts ); if( max_wait > 999 ) { - seconds = (max_wait - 999)/1000; + seconds = max_wait / 1000; max_wait -= seconds * 1000; ts.tv_sec += seconds; } @@ -917,7 +958,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { mbuf = ombuf; // return caller's buffer if they passed one in } else { errno = 0; // interrupted call state could be left; clear - if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_rcv extracting from normal ring\n" ); if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued mbuf->state = RMR_OK; @@ -936,23 +977,15 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { return mbuf; } -/* - Accept a message buffer and caller ID, send the message and then wait - for the receiver to tickle the semaphore letting us know that a message - has been received. The call_id is a value between 2 and 255, inclusive; if - it's not in this range an error will be returned. Max wait is the amount - of time in millaseconds that the call should block for. If 0 is given - then no timeout is set. - If the mt_call feature has not been initialised, then the attempt to use this - funciton will fail with RMR_ERR_NOTSUPP +/* + This does the real work behind both of the outward facing call functions. See + the rmr_mt_call() description for details modulo the comments blow. - If no matching message is received before the max_wait period expires, a - nil pointer is returned, and errno is set to ETIMEOUT. If any other error - occurs after the message has been sent, then a nil pointer is returned - with errno set to some other value. + If ep is given, then we skip the normal route table endpoint selection. This is + likely a wormhole call. */ -extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { +static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) { rmr_mbuf_t* ombuf; // original mbuf passed in uta_ctx_t* ctx; uta_mhdr_t* hdr; // header in the transport buffer @@ -1000,11 +1033,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend - if( max_wait > 0 ) { + if( max_wait >= 0 ) { clock_gettime( CLOCK_REALTIME, &ts ); if( max_wait > 999 ) { - seconds = (max_wait - 999)/1000; + seconds = max_wait / 1000; max_wait -= seconds * 1000; ts.tv_sec += seconds; } @@ -1020,7 +1053,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m seconds = 1; // use as flag later to invoked timed wait } - mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! + if( ep == NULL ) { + mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! + } else { + mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 ); + } if( mbuf ) { if( mbuf->state != RMR_OK ) { mbuf->tp_state = errno; @@ -1028,7 +1065,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m } } - state = 0; + state = -1; errno = 0; while( chute->mbuf == NULL && ! errno ) { if( seconds ) { @@ -1054,9 +1091,71 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m return NULL; // leave errno as set by sem wait call } - mbuf = chute->mbuf; - mbuf->state = RMR_OK; + if( (mbuf = chute->mbuf) != NULL ) { + mbuf->state = RMR_OK; + } chute->mbuf = NULL; return mbuf; } + +/* + Accept a message buffer and caller ID, send the message and then wait + for the receiver to tickle the semaphore letting us know that a message + has been received. The call_id is a value between 2 and 255, inclusive; if + it's not in this range an error will be returned. Max wait is the amount + of time in millaseconds that the call should block for. If 0 is given + then no timeout is set. + + If the mt_call feature has not been initialised, then the attempt to use this + funciton will fail with RMR_ERR_NOTSUPP + + If no matching message is received before the max_wait period expires, a + nil pointer is returned, and errno is set to ETIMEOUT. If any other error + occurs after the message has been sent, then a nil pointer is returned + with errno set to some other value. + + This is now just a wrapper to the real work horse so that we can provide + this and wormhole call functions without duplicating code. + +*/ +extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { + return mt_call( vctx, mbuf, call_id, max_wait, NULL ); +} + +/* + Given an existing message buffer, reallocate the payload portion to + be at least new_len bytes. The message header will remain such that + the caller may use the rmr_rts_msg() function to return a payload + to the sender. + + The mbuf passed in may or may not be reallocated and the caller must + use the returned pointer and should NOT assume that it can use the + pointer passed in with the exceptions based on the clone flag. + + If the clone flag is set, then a duplicated message, with larger payload + size, is allocated and returned. The old_msg pointer in this situation is + still valid and must be explicitly freed by the application. If the clone + message is not set (0), then any memory management of the old message is + handled by the function. + + If the copy flag is set, the contents of the old message's payload is + copied to the reallocated payload. If the flag is not set, then the + contents of the payload is undetermined. +*/ +extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) { + if( old_msg == NULL ) { + return NULL; + } + + return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough +} + +/* + The following functions are "dummies" as NNG has no concept of supporting + them, but are needed to resolve calls at link time. +*/ + +extern void rmr_set_fack( void* p ) { + return; +}