X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=sidecars%2Flistener%2Fmcl.c;fp=sidecars%2Flistener%2Fmcl.c;h=78908d9ca22f0311f1d9cf0c36ecaf59fe236aca;hb=fe2bd3618e4748333078f91c26a8a3f5dcf184c4;hp=0000000000000000000000000000000000000000;hpb=53a0c6d44d66556623c6aee61eb7b6de9c4fd41b;p=ric-app%2Fmc.git diff --git a/sidecars/listener/mcl.c b/sidecars/listener/mcl.c new file mode 100644 index 0000000..78908d9 --- /dev/null +++ b/sidecars/listener/mcl.c @@ -0,0 +1,637 @@ +// vim: ts=4 sw=4 noet: +/* +-------------------------------------------------------------------------------- + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +-------------------------------------------------------------------------------- +*/ + +/* + Mnemonic: mcl.c. + Abstract: The mc listener library content. All external functions + should start with mcl_ and all stderr messages should have + (mcl) as the first token following the severity indicator. + + Date: 22 August 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#include +#include + +#include "mcl.h" + +#define READER 0 +#define WRITER 1 + +#define TRUE 1 +#define FALSE 0 + +/* + Information about one file descriptor. This is pointed to by the hash + such that the message type can be used as a key to look up the fifo's + file descriptor. +*/ +typedef struct { + int fd; // open fdes + int key; // symtab key + long long wcount; // number of writes + long long drops; // number dropped + + long long wcount_rp; // number of writes during last reporting period + long long drops_rp; // number dropped during last reporting period +} fifo_t; + +/* + Our conext. Pointers to the read and write hash tables (both keyed on the message + type), the message router (RMR) context, and other goodies. +*/ +typedef struct { + void* mrc; // the message router's context + void* wr_hash; // symtable to look up pipe info based on mt for writing + void* rd_hash; // we support reading from pipes, but need a different FD for that + char* fifo_dir; // directory where we open fifos + +} mcl_ctx_t; + +// -------- private ------------------------------------------------------- + + +/* + Set up for raw data capture. We look for directory overriedes from + environment variables, and then invoke the rdc_init() to actually + set things upd. +*/ +static void* setup_rdc() { + void* ctx; + int value; // value computed for something + char* ep; // pointer to environment var + char* sdir = "/tmp/rdc/stage"; // default directory names + char* fdir = "/tmp/rdc/final"; + char* suffix = ".rdc"; + char* done = NULL; + + if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) { + if( ep != NULL && atoi( ep ) == 0 ) { + logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep ); + return NULL; + } + } + + if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) { + sdir = ep; + } else { + mkdir( "/tmp/rdc", 0755 ); // we ignore failures here as it could likely exist + mkdir( sdir, 0755 ); + } + + if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) { + fdir = ep; + } else { + mkdir( "/tmp/rdc", 0755 ); // we ignore failures again -- very likely it's there + mkdir( fdir, 0755 ); + } + + if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) { + suffix = ep; + } + + if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) { + done = ep; + } + + ctx = rdc_init( sdir, fdir, suffix, done ); + if( ctx == NULL ) { + logit( LOG_ERR, "rdc_init did not generate a context" ); + } else { + logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir ); + logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir ); + } + + if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) { + value = atoi( ep ); + logit( LOG_INFO, "setting frequency: %d", value ); + rdc_set_freq( ctx, value ); + } + return ctx; +} + +/* + Builds an extended header in the buffer provided, or allocates a new buffer if + dest is nil. The header is of the form: + + + Timestamp is a single unsigned long long in ASCII; ms since epoch. + If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103 + the timestamp generated will be 1570113591103. +*/ +static char* build_hdr( int len, char* dest, int dest_len ) { + struct timespec ts; // time just before call executed + + if( dest == NULL ) { + dest_len = 48; + dest = (char *) malloc( sizeof( char ) * dest_len ); + } else { + if( dest_len < 28 ) { // shouldn't happen, but take no chances + memset( dest, 0, dest_len ); + return NULL; + } + } + + memset( dest, 0, dest_len ); + + clock_gettime( CLOCK_REALTIME, &ts ); + sprintf( dest, "%s%07d", MCL_DELIM, len ); + sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 ); + + return dest; +} + +/* + Build a file name and open. The io_direction is either READER or + WRITER. For a writer we must 'trick' the system into allowing us + to open a pipe for writing in non-blocking mode so that we can + report on drops (messages we couldn't write because there was no + reader). The trick is to open a reader on the pipe so that when + we open the writer there's a reader and the open won't fail. As + soon as we have the writer open, we can close the junk reader. + + If the desired fifo does not exist, it is created. +*/ +static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) { + char wbuf[1024]; + int fd; // real file des + int jfd = -1; // junk file des + int state; + + if( ctx == NULL || mtype < 0 ) { + return -1; + } + + snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype ); + + state = mkfifo( wbuf, 0660 ); // make the fifo; this will fail if it exists and that's ok + if( state != 0 && errno != EEXIST ) { + logit( LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) ); + return -1; + } + + if( io_dir == READER ) { + fd = open( wbuf, O_RDONLY ); // just open the reader + if( fd < 0 ) { + logit( LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) ); + } + } else { + jfd = open( wbuf, O_RDWR | O_NONBLOCK ); // must have a reader before we can open a non-blocking writer + if( jfd < 0 ) { + logit( LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) ); + } + + fd = open( wbuf, O_WRONLY | O_NONBLOCK ); // this will be our actual writer, in non-blocking mode + if( fd < 0 ) { + logit( LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) ); + } + + close( jfd ); // should be safe to close this + } + + + return fd; +} + +/* + Given a message type, return the file des of the fifo that + the payload should be written to. Returns the file des, or -1 + on error. When sussing out a read file descriptor this will + block until there is a fifo for the message type which is + open for reading. + + If fref is not nil, then a pointer to the fifo info block is returned + allowing for direct update of counts after the write. +*/ +static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) { + fifo_t* fifo; + void* hash; + + if( io_dir == READER ) { // with an integer key, we nned two hash tables + hash = ctx->rd_hash; + } else { + hash = ctx->wr_hash; + } + + if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) { + fifo = (fifo_t *) malloc( sizeof( *fifo ) ); + if( fifo == NULL ) { + return -1; + } + + memset( fifo, 0, sizeof( *fifo ) ); + fifo->key = mtype; + fifo->fd = open_fifo( ctx, mtype, io_dir ); + rmr_sym_map( hash, mtype, fifo ); + } + + if( fref != NULL ) { + *fref = fifo; + } + return fifo->fd; +} + +/* + Make marking counts easier in code +*/ +static inline void chalk_error( fifo_t* fifo ) { + if( fifo != NULL ) { + fifo->drops++; + fifo->drops_rp++; + } +} + +static inline void chalk_ok( fifo_t* fifo ) { + if( fifo != NULL ) { + fifo->wcount++; + fifo->wcount_rp++; + } +} + +/* + Callback function driven to traverse the symtab and generate the + counts for each fifo. +*/ +static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) { + fifo_t* fifo; + int report_period = 60; + + if( data ) { + report_period = *((int *) data); + } + + if( (fifo = (fifo_t *) thing) != NULL ) { + logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld", + fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp ); + + fifo->wcount_rp = 0; // reset the report counts + fifo->drops_rp = 0; + } +} + +// ---------- public ------------------------------------------------------ +/* + Sets a signal handler for sigpipe so we don't crash if a reader closes the + last reading fd on a pipe. We could do this automatically, but if the user + programme needs to trap sigpipe too, this gives them the option not to have + us interfere. +*/ +extern int mcl_set_sigh( ) { + signal( SIGPIPE, SIG_IGN ); +} + +/* + "Opens" the interface to RMR such that messages sent to the application will + be available via the rmr receive funcitons. This is NOT automatically called + by the mk_context function as some applications will be using the mc library + for non-RMR, fifo, chores. +*/ +extern int mcl_start_listening( void* vctx, char* port, int wait4ready ) { + mcl_ctx_t* ctx; + int announce = 0; + + if( (ctx = (mcl_ctx_t*) vctx) == NULL ) { + return 0; + } + + ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines! + if( ctx->mrc == NULL ) { + logit( LOG_CRIT, "start listening: unable to initialise RMr" ); + return 0; + } + + while( wait4ready && ! rmr_ready( ctx->mrc ) ) { // only senders need to wait + if( announce <= 0 ) { + logit( LOG_INFO, "waiting for RMR to show ready" ); + announce = 10; + } else { + announce--; + } + + sleep( 1 ); + } + + return 1; +} + +/* + Blocks until a message arives with a good return code or we timeout. Returns the + rmr message buffer. Timeout value epxected in seconds. +*/ +extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) { + mcl_ctx_t* ctx; + + if( (ctx = (mcl_ctx_t *) vctx) == NULL ) { + return NULL; + } + + if( ctx->mrc == NULL ) { + logit( LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" ); + exit( 1 ); + } + + do { + msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 ); // wait for next + } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) ); + + return msg; +} + +/* + Create the context. +*/ +extern void* mcl_mk_context( char* dir ) { + mcl_ctx_t* ctx; + + if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) { + memset( ctx, 0, sizeof( *ctx ) ); + ctx->fifo_dir = strdup( dir ); + ctx->wr_hash = rmr_sym_alloc( 1001 ); + ctx->rd_hash = rmr_sym_alloc( 1001 ); + + if( ctx->wr_hash == NULL || ctx->rd_hash == NULL ) { + logit( LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" ); + free( ctx ); + return NULL; + } + } + + return (void *) ctx; +} + +/* + Read the header. Best case we read the expected number of bytes, get all + of them and find that they start with the delemiter. Worst case + We have to wait for all of the header, or need to synch at the next + delimeter. We assume best case most likely and handle it as such. +*/ +static void read_header( int fd, char* buf ) { + int len; + int need = MCL_EXHDR_SIZE; // total needed + int dneed; // delimieter needed + int rlen; + char* rp; // read position in buf + + len = read( fd, buf, need ); + if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // best case, most likely + return; + } + + while( TRUE ) { + if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim + rp = buf + len; + dneed = strlen( MCL_DELIM ) - len; + + while( dneed > 0 ) { + len = read( fd, rp, dneed ); + dneed -= len; + rp += len; + } + } + + if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // have a good delimiter, just need to wait for bytes + need = MCL_EXHDR_SIZE - strlen( MCL_DELIM ); + rp = buf + (MCL_EXHDR_SIZE - need); + + while( need > 0 ) { + len = read( fd, rp, need ); + need -= len; + rp += len; + } + + return; + } + + while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop + len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer) + } + + need = MCL_EXHDR_SIZE - len; + } +} + + +/* + Read one record from the fifo that the message type maps to. + Writes at max ublen bytes into the ubuf. + + If long_hdrs is true (!0), then we expect that the stream in the fifo + has extended headers (