X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fcommon%2Fsrc%2Frtc_static.c;h=2deef8ee02cb1584fb8668fd9ab8befcf02a67f4;hb=5ec64c5253b3b7611ec69cc1487989fae45eca26;hp=45a277a95ef3a631e913dfbc1cb421660615203d;hpb=4919b11a06c3c96652121922c010c31f27471756;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/common/src/rtc_static.c b/src/rmr/common/src/rtc_static.c index 45a277a..2deef8e 100644 --- a/src/rmr/common/src/rtc_static.c +++ b/src/rmr/common/src/rtc_static.c @@ -1,8 +1,8 @@ // : vi ts=4 sw=4 noet : /* ================================================================================== - Copyright (c) 2019 Nokia - Copyright (c) 2018-2019 AT&T Intellectual Property. + Copyright (c) 2019-2020 Nokia + Copyright (c) 2018-2020 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -84,6 +84,9 @@ static void* rtc_file( void* vctx ) { read_static_rt( ctx, vlevel ); // seed the route table if one provided + if( ctx->shutdown != 0 ) { // allow for graceful termination and unit testing + return NULL; + } sleep( 60 ); } } @@ -102,28 +105,103 @@ static int refresh_vlevel( int vfd ) { return vlevel; } +/* + Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated, + records in each message; it is required that the last record in the message be complete (we do not + reconstruct records split over multiple messages). For each record, we call the record parser + to parse and add the information to the table being built. + + This function was broken from the main rtc() function in order to be able to unit test it. Without + this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private + context. + + To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other + words, this is not thread safe but it shouldn't need to be. +*/ +static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel, int* flags ) { + static unsigned char* pbuf = NULL; + static int pbuf_size = 0; + + unsigned char* payload; + unsigned char* curr; + unsigned char* nextr; + int mlen; + + payload = msg->payload; + mlen = msg->len; // usable bytes in the payload + + if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen ); + switch( msg->mtype ) { + case RMRRM_TABLE_DATA: + if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) { + *flags |= RTCFL_HAVE_UPDATE; + rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" ); + } + + if( pbuf_size <= mlen ) { + if( pbuf ) { + free( pbuf ); + } + if( mlen < 512 ) { + pbuf_size = 1024; + } else { + pbuf_size = mlen * 2; + } + pbuf = (char *) malloc( sizeof( char ) * pbuf_size ); + } + memcpy( pbuf, payload, mlen ); + pbuf[mlen] = 0; // don't depend on sender making this a legit string + if( vlevel > 1 ) { + rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf ); + } + + curr = pbuf; + while( curr ) { // loop over each record in the buffer + nextr = strchr( (char *) curr, '\n' ); // allow multiple newline records, find end of current and mark + + if( nextr ) { + *(nextr++) = 0; + } + + if( vlevel > 1 ) { + rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr ); + } + parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table; ack using rts to msg + + curr = nextr; + } + + msg->len = 0; // force back into the listen loop + break; + + default: + rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len ); + break; + } +} + /* Route Table Collector A side thread which either attempts to connect and request a table from the Route Manager, or opens a port and listens for Route Manager to push table updates. - It may do other things along the way (latency measurements, alarms, + It may do other things along the way (latency measurements, alarms, respond to RMR pings, etc.). The behaviour with respect to listening for Route Manager updates vs the initiation of the connection and sending a request depends on the value of the ENV_RTG_ADDR (RMR_RTG_SVC) environment variable. If host:port, or IP:port, is given, then we assume that we make the connection - and send a request for the table (request mode). If the variable is just - a port, then we assume Route Manager will connect and push updates (original + and send a request for the table (request mode). If the variable is just + a port, then we assume Route Manager will connect and push updates (original method). If the variable is not defined, the default behaviour, in order to be - backwards compatable, depends on the presence of the ENV_CTL_PORT + backwards compatable, depends on the presence of the ENV_CTL_PORT (RMR_CTL_PORT) variable (new with the support for requesting a table). - + ENV_CTL_PORT ENV_RTG_ADDR Behaviour unset unset Open default CTL port (DEF_CTL_PORT) and wait for Rt Mgr to push tables @@ -134,15 +212,15 @@ static int refresh_vlevel( int vfd ) { used is the value set by ENV_CTL_PORT. unset set As described above. The default control - port (DEF_CTL_PORT) is used. + port (DEF_CTL_PORT) is used. - When we are running in request mode, then we will send the RMR message - RMRRM_REFRESH to this address (wormhole) as a request for the route manager + When we are running in request mode, then we will send the RMR message + RMRRM_REFRESH to this address (wormhole) as a request for the route manager to send a new table. We will attempt to connect and send requests until we have a table. Calls to rmr_ready() will report FALSE until a table is loaded _unless_ a seed table was given. - Route table information is expected to arrive on RMR messages with type + Route table information is expected to arrive on RMR messages with type RMRRM_TABLE_DATA. There is NOT a specific message type for each possible table record, so the payload is as it appears in the seed file or as delivered in old versions. It may take several RMRRM_TABLE_DATA messages @@ -163,22 +241,16 @@ static void* rtc( void* vctx ) { uta_ctx_t* ctx; // context user has -- where we pin the route table uta_ctx_t* pvt_cx; // private context for session with rtg rmr_mbuf_t* msg = NULL; // message from rtg - char* payload; // payload in the message - size_t mlen; char* my_port; // the port number that we will listen on (4561 has been the default for this) char* rtg_addr; // host:port address of route table generator (route manager) char* daddr; // duplicated rtg address string to parse/trash size_t buf_size; // nng needs var pointer not just size? - char* nextr; // pointer at next record in the message - char* curr; // current record - int i; + int i; long blabber = 0; // time of last blabber so we don't flood if rtg goes bad int cstate = -1; // connection state to rtg int state; // processing state of some nng function char* tokens[128]; char wbuf[128]; - char* pbuf = NULL; - int pbuf_size = 0; // number allocated in pbuf int ntoks; int vfd = -1; // verbose file des if we have one int vlevel = 0; // how chatty we should be 0== no nattering allowed @@ -189,6 +261,8 @@ static void* rtc( void* vctx ) { int count_delay = 30; // number of seconds between writing count info; initially every 30s int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes int flags = 0; + int rt_req_freq = DEF_RTREQ_FREQ; // request frequency (sec) when wanting a new table + int nxt_rt_req = 0; // time of next request if( (ctx = (uta_ctx_t *) vctx) == NULL ) { @@ -196,19 +270,33 @@ static void* rtc( void* vctx ) { return NULL; } + if( (ctx->ephash = rmr_sym_alloc( RT_SIZE )) == NULL ) { // master hash table for endpoints (each rt will reference this) + rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: unable to allocate an endpoint hash table\n" ); + return NULL; + } + if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) { vfd = open( eptr, O_RDONLY ); vlevel = refresh_vlevel( vfd ); } - ctx->flags |= CFL_NO_RTACK; // don't ack when reading from a file + if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) { + rt_req_freq = atoi( eptr ); + if( rt_req_freq < 1 || rt_req_freq > 300 ) { + rt_req_freq = DEF_RTREQ_FREQ; + rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ ); + } + } + rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq ); + + ctx->flags |= CFL_NO_RTACK; // don't ack when reading from a file read_static_rt( ctx, vlevel ); // seed the route table if one provided ctx->flags &= ~CFL_NO_RTACK; my_port = getenv( ENV_CTL_PORT ); // default port to listen on (likely 4561) if( my_port == NULL || ! *my_port ) { // if undefined, then go with default - my_port = DEF_CTL_PORT; + my_port = DEF_CTL_PORT; daddr = DEF_CTL_PORT; // backwards compat; if ctl port not hard defined, default is to listen } else { daddr = DEF_RTG_WK_ADDR; // if ctl port is defined, then default changes to connecting to well known RM addr @@ -235,7 +323,7 @@ static void* rtc( void* vctx ) { flags |= RTCFL_HAVE_UPDATE; // and signal not to try to request an update my_port = tokens[1]; } else { - // rtg_addr points at rt mgr address and my port set from env or default stands as is + // rtg_addr points at rt mgr address and my port set from env or default stands as is } break; } @@ -259,11 +347,12 @@ static void* rtc( void* vctx ) { blabber = 0; while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side - if( (flags & RTCFL_HAVE_UPDATE) == 0 ) { // no route table updated from rt mgr; request one + if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) { // no route table updated from rt mgr; request one if( ctx->rtg_whid < 0 ) { ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr ); } send_update_req( pvt_cx, ctx ); + nxt_rt_req = time( NULL ) + rt_req_freq; } msg = rmr_torcv_msg( pvt_cx, msg, 1000 ); @@ -278,63 +367,16 @@ static void* rtc( void* vctx ) { rt_epcounts( ctx->rtable, ctx->my_name ); } } + + if( ctx->shutdown != 0 ) { + break; // mostly for unit test, but allows a forced stop + } } vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message if( msg != NULL && msg->len > 0 ) { - payload = msg->payload; - mlen = msg->len; // usable bytes in the payload - if( vlevel > 1 ) { - rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d bytes (%s)\n", msg->mtype, (int) mlen, msg->payload ); - } else { - if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen ); - } - - switch( msg->mtype ) { - case RMRRM_TABLE_DATA: - if( (flags & RTCFL_HAVE_UPDATE) == 0 ) { - flags |= RTCFL_HAVE_UPDATE; - rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" ); - } - - if( pbuf_size <= mlen ) { - if( pbuf ) { - free( pbuf ); - } - if( mlen < 512 ) { - pbuf_size = 512; - } else { - pbuf_size = mlen * 2; - } - pbuf = (char *) malloc( sizeof( char ) * pbuf_size ); - } - memcpy( pbuf, payload, mlen ); - pbuf[mlen] = 0; // don't depend on sender making this a legit string - - curr = pbuf; - while( curr ) { // loop over each record in the buffer - nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark - - if( nextr ) { - *(nextr++) = 0; - } - - if( vlevel > 1 ) { - rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr ); - } - parse_rt_rec( ctx, pvt_cx, curr, vlevel ); // parse record and add to in progress table - - curr = nextr; - } - - msg->len = 0; // force back into the listen loop - break; - - default: - rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len ); - break; - } + rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags ); } if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this @@ -347,7 +389,7 @@ static void* rtc( void* vctx ) { } #ifndef SI95_BUILD -// this is nng specific inas much as we allow raw (non-RMR) messages +// this is nng specific inas much as we allow raw (non-RMR) messages /* NOTE: This is the original rtc code when we supported "raw" nano/nng messages @@ -405,7 +447,7 @@ static void* raw_rtc( void* vctx ) { size_t buf_size; // nng needs var pointer not just size? char* nextr; // pointer at next record in the message char* curr; // current record - int i; + int i; long blabber = 0; // time of last blabber so we don't flood if rtg goes bad int cstate = -1; // connection state to rtg int state; // processing state of some nng function @@ -434,7 +476,7 @@ static void* raw_rtc( void* vctx ) { if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) { vfd = open( eptr, O_RDONLY ); vlevel = refresh_vlevel( vfd ); - } + } read_static_rt( ctx, vlevel ); // seed the route table if one provided @@ -475,7 +517,7 @@ static void* raw_rtc( void* vctx ) { sleep( count_delay ); rt_epcounts( ctx->rtable, ctx->my_name ); } - + free( fport ); // parinoid free and return return NULL; } @@ -570,9 +612,9 @@ static void* raw_rtc( void* vctx ) { rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr ); } if( raw_interface ) { - parse_rt_rec( ctx, NULL, curr, vlevel ); // nil pvt to parser as we can't ack messages + parse_rt_rec( ctx, NULL, curr, vlevel, NULL ); // nil pvt to parser as we can't ack messages } else { - parse_rt_rec( ctx, pvt_cx, curr, vlevel ); // parse record and add to in progress table + parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg ); // parse record and add to in progress table } curr = nextr;