X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=sidecars%2Flistener%2Fmcl.c;fp=sidecars%2Flistener%2Fmcl.c;h=0000000000000000000000000000000000000000;hb=835ea34297bb2d238c1bedaf5430953ef43d58da;hp=a50924b93f8ad498ae9f993075479aa81aeec653;hpb=0380b52b8987cfa776ee840f85cf6f4d8b661711;p=ric-app%2Fmc.git diff --git a/sidecars/listener/mcl.c b/sidecars/listener/mcl.c deleted file mode 100644 index a50924b..0000000 --- a/sidecars/listener/mcl.c +++ /dev/null @@ -1,707 +0,0 @@ -// vim: ts=4 sw=4 noet: -/* --------------------------------------------------------------------------------- - Copyright (c) 2018-2019 AT&T Intellectual Property. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. --------------------------------------------------------------------------------- -*/ - -/* - Mnemonic: mcl.c. - Abstract: The mc listener library content. All external functions - should start with mcl_ and all stderr messages should have - (mcl) as the first token following the severity indicator. - - Date: 22 August 2019 - Author: E. Scott Daniels -*/ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -#include -#include -#include - -#include "mcl.h" - -#ifndef FOREVER -#define FOREVER 1 -#endif - -#define READER 0 -#define WRITER 1 - -#define TRUE 1 -#define FALSE 0 - -/* - Information about one file descriptor. This is pointed to by the hash - such that the message type can be used as a key to look up the fifo's - file descriptor. -*/ -typedef struct { - int fd; // open fdes - int key; // symtab key - long long wcount; // number of writes - long long drops; // number dropped - - long long wcount_rp; // number of writes during last reporting period - long long drops_rp; // number dropped during last reporting period -} fifo_t; - -/* - Our conext. Pointers to the read and write hash tables (both keyed on the message - type), the message router (RMR) context, and other goodies. -*/ -typedef struct { - void* mrc; // the message router's context - void* wr_hash; // symtable to look up pipe info based on mt for writing - void* rd_hash; // we support reading from pipes, but need a different FD for that - char* fifo_dir; // directory where we open fifos - -} mcl_ctx_t; - -// -------- private ------------------------------------------------------- - - -/* - Set up for raw data capture. We look for directory overriedes from - environment variables, and then invoke the rdc_init() to actually - set things up. -*/ -static void* setup_rdc() { - void* ctx; - int value; // value computed for something - char* ep; // pointer to environment var - char* sdir = "/tmp/rdc/stage"; // default directory names - char* fdir = "/tmp/rdc/final"; - char* suffix = ".rdc"; - char* done = NULL; - - if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) { // exists and is 0 - logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)\n" ); - return NULL; - } - - if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) { - sdir = ep; - } else { - mkdir( "/tmp/rdc", 0755 ); // we ignore failures here as it could likely exist - mkdir( sdir, 0755 ); - } - - if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) { - fdir = ep; - } else { - mkdir( "/tmp/rdc", 0755 ); // we ignore failures again -- very likely it's there - mkdir( fdir, 0755 ); - } - - if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) { - suffix = ep; - } - - if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) { - done = ep; - } - - ctx = rdc_init( sdir, fdir, suffix, done ); - if( ctx == NULL ) { - logit( LOG_ERR, "rdc_init did not generate a context" ); - } else { - logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir ); - logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir ); - } - - if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) { - value = atoi( ep ); - logit( LOG_INFO, "setting frequency: %d", value ); - rdc_set_freq( ctx, value ); - } - return ctx; -} - -/* - Builds an extended header in the buffer provided, or allocates a new buffer if - dest is nil. The header is of the form: - - - Field lengths (bytes) are: - delim 4 - len 8 (7 digits + 0) - timestamp 16 (15 digits + 0) - - - Timestamp is a single unsigned long long in ASCII; ms since epoch. - If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103 - the timestamp generated will be 1570113591103. - - The lenght and timestamp fields in the header are zero terminated so - they can be parsed as a string (atoi etc). -*/ -static char* build_hdr( int len, char* dest, int dest_len ) { - struct timespec ts; // time just before call executed - - if( dest == NULL ) { - dest_len = MCL_EXHDR_SIZE + 2; // more than enough room - dest = (char *) malloc( sizeof( char ) * dest_len ); - } else { - if( dest_len < MCL_EXHDR_SIZE ) { // shouldn't happen, but take no chances - memset( dest, 0, dest_len ); - return NULL; - } - } - - memset( dest, 0, dest_len ); - - clock_gettime( CLOCK_REALTIME, &ts ); - snprintf( dest, dest_len, "%s%07d", MCL_DELIM, len ); - snprintf( dest+12, dest_len-13, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 ); - - return dest; -} - -/* - Build a file name and open. The io_direction is either READER or - WRITER. For a writer we must 'trick' the system into allowing us - to open a pipe for writing in non-blocking mode so that we can - report on drops (messages we couldn't write because there was no - reader). The trick is to open a reader on the pipe so that when - we open the writer there's a reader and the open won't fail. As - soon as we have the writer open, we can close the junk reader. - - If the desired fifo does not exist, it is created. -*/ -static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) { - char wbuf[1024]; - int fd; // real file des - int jfd = -1; // junk file des - int state; - - if( ctx == NULL || mtype < 0 ) { - return -1; - } - - snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype ); - - state = mkfifo( wbuf, 0660 ); // make the fifo; this will fail if it exists and that's ok - if( state != 0 && errno != EEXIST ) { - logit( LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) ); - return -1; - } - - if( io_dir == READER ) { - fd = open( wbuf, O_RDONLY ); // just open the reader - if( fd < 0 ) { - logit( LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) ); - } - } else { - jfd = open( wbuf, O_RDWR | O_NONBLOCK ); // must have a reader before we can open a non-blocking writer - if( jfd < 0 ) { - logit( LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) ); - return -1; - } - - fd = open( wbuf, O_WRONLY | O_NONBLOCK ); // this will be our actual writer, in non-blocking mode - if( fd < 0 ) { - logit( LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) ); - } - - close( jfd ); // should be safe to close this - } - - - return fd; -} - -/* - Given a message type, return the file des of the fifo that - the payload should be written to. Returns the file des, or -1 - on error. When sussing out a read file descriptor this will - block until there is a fifo for the message type which is - open for reading. - - If fref is not nil, then a pointer to the fifo info block is returned - allowing for direct update of counts after the write. -*/ -static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) { - fifo_t* fifo = NULL; - void* hash; - - if( ctx == NULL ) { - if( fref != NULL ) { - *fref = NULL; - } - return -1; - } - - if( io_dir == READER ) { // with an integer key, we need two hash tables - hash = ctx->rd_hash; - } else { - hash = ctx->wr_hash; - } - - if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) { - fifo = (fifo_t *) malloc( sizeof( *fifo ) ); - if( fifo != NULL ) { - memset( fifo, 0, sizeof( *fifo ) ); - fifo->key = mtype; - fifo->fd = open_fifo( ctx, mtype, io_dir ); - if( fifo->fd >= 0 ) { // save only on good open - rmr_sym_map( hash, mtype, fifo ); - } else { - free( fifo ); - fifo = NULL; - } - } - } - - if( fref != NULL ) { - *fref = fifo; - } - - return fifo == NULL ? -1 : fifo->fd; -} - -/* - Make marking counts easier in code -*/ -static inline void chalk_error( fifo_t* fifo ) { - if( fifo != NULL ) { - fifo->drops++; - fifo->drops_rp++; - } -} - -static inline void chalk_ok( fifo_t* fifo ) { - if( fifo != NULL ) { - fifo->wcount++; - fifo->wcount_rp++; - } -} - -/* - Callback function driven to traverse the symtab and generate the - counts for each fifo. Sonar will complain about unused parameters which - are normal for callbacks. Further, sonar will grumble about st, and entry - not being const; we can't unless RMR proto for the callback changes. -*/ -static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) { - fifo_t* fifo; - int report_period = 60; - - if( data ) { - report_period = *((int *) data); - } - - if( (fifo = (fifo_t *) thing) != NULL ) { - logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld", - fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp ); - - fifo->wcount_rp = 0; // reset the report counts - fifo->drops_rp = 0; - } -} - -// ---------- public ------------------------------------------------------ -/* - Sets a signal handler for sigpipe so we don't crash if a reader closes the - last reading fd on a pipe. We could do this automatically, but if the user - programme needs to trap sigpipe too, this gives them the option not to have - us interfere. -*/ -extern int mcl_set_sigh( ) { - signal( SIGPIPE, SIG_IGN ); -} - -/* - "Opens" the interface to RMR such that messages sent to the application will - be available via the rmr receive funcitons. This is NOT automatically called - by the mk_context function as some applications will be using the mc library - for non-RMR, fifo, chores. -*/ -extern int mcl_start_listening( void* vctx, char* port, int wait4ready ) { - mcl_ctx_t* ctx; - int announce = 0; - - if( (ctx = (mcl_ctx_t*) vctx) == NULL ) { - return 0; - } - - ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines! - if( ctx->mrc == NULL ) { - logit( LOG_CRIT, "start listening: unable to initialise RMr" ); - return 0; - } - - while( wait4ready && ! rmr_ready( ctx->mrc ) ) { // only senders need to wait - if( announce <= 0 ) { - logit( LOG_INFO, "waiting for RMR to show ready" ); - announce = 10; - } else { - announce--; - } - - sleep( 1 ); - } - - return 1; -} - -/* - Blocks until a message arives with a good return code or we timeout. Returns the - rmr message buffer. Timeout value epxected in seconds. -*/ -extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) { - mcl_ctx_t* ctx; - - if( (ctx = (mcl_ctx_t *) vctx) == NULL ) { - return NULL; - } - - if( ctx->mrc == NULL ) { - logit( LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" ); - exit( 1 ); - } - - do { - msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 ); // wait for next - } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) ); - - return msg; -} - -/* - Create the context. -*/ -extern void* mcl_mk_context( const char* dir ) { - mcl_ctx_t* ctx; - - if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) { - memset( ctx, 0, sizeof( *ctx ) ); - ctx->fifo_dir = strdup( dir ); - ctx->wr_hash = rmr_sym_alloc( 1001 ); - ctx->rd_hash = rmr_sym_alloc( 1001 ); - - if( ctx->wr_hash == NULL || ctx->rd_hash == NULL ) { - logit( LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" ); - free( ctx ); - return NULL; - } - } - - return (void *) ctx; -} - -/* - Read the header. Best case we read the expected number of bytes, get all - of them and find that they start with the delemiter. Worst case - We have to wait for all of the header, or need to synch at the next - delimeter. We assume best case most likely and handle it as such. -*/ -static void read_header( int fd, char* buf ) { - int len; - int need = MCL_EXHDR_SIZE; // total needed - int dneed; // delimieter needed - char* rp; // read position in buf - - len = read( fd, buf, need ); - if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // best case, most likely - return; - } - - while( TRUE ) { - if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim - rp = buf + len; - dneed = strlen( MCL_DELIM ) - len; - - while( dneed > 0 ) { - len = read( fd, rp, dneed ); - dneed -= len; - rp += len; - } - } - - if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // have a good delimiter, just need to wait for bytes - need = MCL_EXHDR_SIZE - strlen( MCL_DELIM ); - rp = buf + (MCL_EXHDR_SIZE - need); - - while( need > 0 ) { - len = read( fd, rp, need ); - need -= len; - rp += len; - } - - return; - } - - while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop - len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer) - } - } -} - - -/* - Read one record from the fifo that the message type maps to. - Writes at max ublen bytes into the ubuf. - - If long_hdrs is true (!0), then we expect that the stream in the fifo - has extended headers (