X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=sidecars%2Flistener%2Fmcl.c;fp=sidecars%2Flistener%2Fmcl.c;h=a50924b93f8ad498ae9f993075479aa81aeec653;hb=216e43c47dc133f10f095f78dad1ac8b8c1a55f9;hp=00aee969d907850fc6b24ee190b597f31b626645;hpb=ee54f8a6949329913d30e638593ed8e3cc515575;p=ric-app%2Fmc.git diff --git a/sidecars/listener/mcl.c b/sidecars/listener/mcl.c index 00aee96..a50924b 100644 --- a/sidecars/listener/mcl.c +++ b/sidecars/listener/mcl.c @@ -99,11 +99,9 @@ static void* setup_rdc() { char* suffix = ".rdc"; char* done = NULL; - if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) { - if( ep != NULL && atoi( ep ) == 0 ) { - logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep ); - return NULL; - } + if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) { // exists and is 0 + logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)\n" ); + return NULL; } if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) { @@ -139,7 +137,7 @@ static void* setup_rdc() { if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) { value = atoi( ep ); logit( LOG_INFO, "setting frequency: %d", value ); - rdc_set_freq( ctx, value ); + rdc_set_freq( ctx, value ); } return ctx; } @@ -150,16 +148,16 @@ static void* setup_rdc() { Field lengths (bytes) are: - delim 4 + delim 4 len 8 (7 digits + 0) timestamp 16 (15 digits + 0) - + Timestamp is a single unsigned long long in ASCII; ms since epoch. If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103 - the timestamp generated will be 1570113591103. + the timestamp generated will be 1570113591103. - The lenght and timestamp fields in the header are zero terminated so + The lenght and timestamp fields in the header are zero terminated so they can be parsed as a string (atoi etc). */ static char* build_hdr( int len, char* dest, int dest_len ) { @@ -305,7 +303,9 @@ static inline void chalk_ok( fifo_t* fifo ) { /* Callback function driven to traverse the symtab and generate the - counts for each fifo. + counts for each fifo. Sonar will complain about unused parameters which + are normal for callbacks. Further, sonar will grumble about st, and entry + not being const; we can't unless RMR proto for the callback changes. */ static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) { fifo_t* fifo; @@ -395,7 +395,7 @@ extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) { /* Create the context. */ -extern void* mcl_mk_context( char* dir ) { +extern void* mcl_mk_context( const char* dir ) { mcl_ctx_t* ctx; if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) { @@ -424,7 +424,6 @@ static void read_header( int fd, char* buf ) { int len; int need = MCL_EXHDR_SIZE; // total needed int dneed; // delimieter needed - int rlen; char* rp; // read position in buf len = read( fd, buf, need ); @@ -433,7 +432,7 @@ static void read_header( int fd, char* buf ) { } while( TRUE ) { - if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim + if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim rp = buf + len; dneed = strlen( MCL_DELIM ) - len; @@ -457,11 +456,9 @@ static void read_header( int fd, char* buf ) { return; } - while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop + while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer) } - - need = MCL_EXHDR_SIZE - len; } } @@ -491,7 +488,6 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd int need; char wbuf[4096]; mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable - fifo_t* fref = NULL; // the fifo struct we found if( (ctx = (mcl_ctx_t*) vctx) == NULL ) { errno = EINVAL; @@ -510,7 +506,7 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd *timestamp = 0; } - len = read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 as there isn't a way to sync the old stream + read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 bytes as there isn't a way to sync the old stream msg_len = need = atoi( wbuf ); } @@ -526,7 +522,7 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd } if( msg_len > got ) { // we must ditch rest of this message - need = msg_len = got; + need = msg_len - got; while( need > 0 ) { len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need ); need -= len; @@ -578,7 +574,7 @@ extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int l timestamp. The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ - message. When the health check message is received it is responded to + message. When the health check message is received it is responded to with the current state of processing (ok or err). */ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) { @@ -586,16 +582,15 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) { fifo_t* fifo; // fifo to chalk counts on rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call char header[128]; // header we'll pop in front of the payload - int state; int fd; // file des to write to long long total = 0; // total messages received and written long long total_drops = 0; // total messages received and written long count = 0; // messages received and written during last reporting period long errors = 0; // unsuccessful payload writes - long drops; // number of drops + long drops = 0; // number of drops time_t next_report = 0; // we'll report every 2 seconds if report is true time_t now; - int hwlen; // write len for header + size_t hwlen; // write len for header void* rdc_ctx = NULL; // raw data capture context void* rdc_buf = NULL; // capture buffer @@ -619,23 +614,25 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) { mbuf->mtype = RIC_HEALTH_CHECK_RESP; // if we're here we are running and all is ok mbuf->sub_id = -1; mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE ); // ensure payload is large enough - strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) ); - rmr_rts_msg( ctx->mrc, mbuf ); + if( mbuf->payload != NULL ) { + strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) ); + rmr_rts_msg( ctx->mrc, mbuf ); + } continue; } - + if( mbuf->len > 0 ) { + if( long_hdr ) { + build_hdr( mbuf->len, header, sizeof( header ) ); + hwlen = MCL_EXHDR_SIZE; + } else { + snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1 + hwlen = MCL_LEN_SIZE; + } + fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd if( fd >= 0 ) { - if( long_hdr ) { - build_hdr( mbuf->len, header, sizeof( header ) ); - hwlen = MCL_EXHDR_SIZE; - } else { - snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1 - hwlen = MCL_LEN_SIZE; - } - - if( (state = write( fd, header, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer + if( write( fd, header, hwlen ) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer drops++; total_drops++; chalk_error( fifo ); @@ -653,14 +650,14 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) { if( rdc_ctx != NULL ) { rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write - rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data + rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data } } } if( report ) { if( (now = time( NULL ) ) > next_report ) { - rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table + rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table fflush( stdout ); logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors", @@ -668,11 +665,17 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) { next_report = now + report; count = 0; drops = 0; + errors = 0; fflush( stdout ); } } - } while( FOREVER ); // forever allows for escape during unit testing + + if( ! FOREVER ) { // allow escape during unit tests; compiled out othewise, but sonar won't see that + free( rdc_buf ); + break; // sonar grumbles if we put FOREVER into the while; maddening + } + } while( 1 ); } @@ -680,13 +683,13 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) { Given a buffer and length, along with the message type, look up the fifo and write the buffer. Returns 0 on error; 1 on success. */ -extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) { +extern int mcl_fifo_one( void* vctx, const char* payload, int plen, int mtype ) { mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable fifo_t* fifo; // fifo to chalk counts on - int state = -1; + size_t state = -1; int fd; // file des to write to - if( plen <= 0 ) { + if( plen <= 0 || payload == NULL ) { return 1; } @@ -698,7 +701,7 @@ extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) { fd = suss_fifo( ctx, mtype, WRITER, &fifo ); // map the message type to an open fd if( fd >= 0 ) { state = write( fd, payload, plen ); - } + } - return state == plen; + return state == (size_t) plen; }