Incrementing mc-listener container tag
[ric-app/mc.git] / sidecars / listener / mcl.c
index 262486b..00aee96 100644 (file)
 
 #include <rmr/rmr.h>
 #include <rmr/rmr_symtab.h>
+#include <rmr/RIC_message_types.h>
 
 #include "mcl.h"
 
+#ifndef FOREVER
+#define FOREVER 1
+#endif
+
 #define READER 0
 #define WRITER 1
 
@@ -83,7 +88,7 @@ typedef struct {
 /*
        Set up for raw data capture. We look for directory overriedes from
        environment variables, and then invoke the rdc_init() to actually
-       set things upd.
+       set things up.
 */
 static void* setup_rdc() {
        void*   ctx;
@@ -144,18 +149,27 @@ static void* setup_rdc() {
        dest is nil. The header is of the form:
                <delim><len><timestamp>
 
+       Field lengths (bytes) are:
+               delim           4      
+               len                     8       (7 digits + 0)
+               timestamp       16  (15 digits + 0)
+
+       
        Timestamp is a single unsigned long long in ASCII; ms since epoch.
        If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
-       the timestamp generated will be 1570113591103.
+       the timestamp generated will be 1570113591103. 
+
+       The lenght and timestamp fields in the header are zero terminated so 
+       they can be parsed as a string (atoi etc).
 */
 static char* build_hdr( int len, char* dest, int dest_len ) {
        struct timespec ts;         // time just before call executed
 
        if( dest == NULL ) {
-               dest_len = 48;
+               dest_len = MCL_EXHDR_SIZE + 2;                  // more than enough room
                dest = (char *) malloc( sizeof( char ) * dest_len );
        } else {
-               if( dest_len < 28 ) {           // shouldn't happen, but take no chances
+               if( dest_len < MCL_EXHDR_SIZE ) {               // shouldn't happen, but take no chances
                        memset( dest, 0, dest_len );
                        return NULL;
                }
@@ -164,8 +178,8 @@ static char* build_hdr( int len, char* dest, int dest_len ) {
        memset( dest, 0, dest_len );
 
        clock_gettime( CLOCK_REALTIME, &ts );
-       sprintf( dest, "%s%07d", MCL_DELIM, len );
-       sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
+       snprintf( dest, dest_len, "%s%07d", MCL_DELIM, len );
+       snprintf( dest+12, dest_len-13, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
 
        return dest;
 }
@@ -183,7 +197,7 @@ static char* build_hdr( int len, char* dest, int dest_len ) {
 */
 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
        char    wbuf[1024];
-       int             fd;                             // real file des
+       int             fd;                                     // real file des
        int             jfd = -1;                       // junk file des
        int             state;
 
@@ -234,10 +248,17 @@ static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
        allowing for direct update of counts after the write.
 */
 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
-       fifo_t* fifo;
+       fifo_t* fifo = NULL;
        void*   hash;
 
-       if( io_dir == READER ) {                // with an integer key, we nned two hash tables
+       if( ctx == NULL ) {
+               if( fref != NULL ) {
+                       *fref = NULL;
+               }
+               return -1;
+       }
+
+       if( io_dir == READER ) {                // with an integer key, we need two hash tables
                hash = ctx->rd_hash;
        } else {
                hash = ctx->wr_hash;
@@ -245,20 +266,24 @@ static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
 
        if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
                fifo = (fifo_t *) malloc( sizeof( *fifo ) );
-               if( fifo == NULL ) {
-                       return -1;
+               if( fifo != NULL ) {
+                       memset( fifo, 0, sizeof( *fifo ) );
+                       fifo->key = mtype;
+                       fifo->fd = open_fifo( ctx, mtype, io_dir );
+                       if( fifo->fd >= 0 ) {                                   // save only on good open
+                               rmr_sym_map( hash, mtype, fifo );
+                       } else {
+                               free( fifo );
+                               fifo = NULL;
+                       }
                }
-
-               memset( fifo, 0, sizeof( *fifo ) );
-               fifo->key = mtype;
-               fifo->fd = open_fifo( ctx, mtype, io_dir );
-               rmr_sym_map( hash, mtype, fifo );
        }
 
        if( fref != NULL ) {
                *fref = fifo;
        }
-       return fifo->fd;
+
+       return fifo == NULL ? -1 : fifo->fd;
 }
 
 /*
@@ -478,7 +503,7 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd
                        read_header( fd, wbuf );
                        msg_len = need = atoi( wbuf + MCL_LEN_OFF );                            // read the length
                        if( timestamp ) {
-                               strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
+                               strncpy( timestamp, wbuf + MCL_TSTAMP_OFF+1, MCL_TSTAMP_SIZE );
                        }
                } else {
                        if( timestamp != NULL ) {                                               // won't be there, but ensure it's not garbage
@@ -551,8 +576,12 @@ extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int l
 
        If long_hdr is true, then we geneate an extended header with a delimiter and
        timestamp.
+
+       The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
+       message.  When the health check message is received it is responded to 
+       with the current state of processing (ok or err).
 */
-extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
+extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
        mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
        fifo_t*         fifo;                                   // fifo to chalk counts on
        rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
@@ -582,39 +611,50 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
 
        rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
 
-       while( 1 ) {
+       do {
                mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
 
-               if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0  ) {
-                       fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
-                       if( fd >= 0 ) {
-                               if( long_hdr ) {
-                                       build_hdr( mbuf->len, header, sizeof( header ) );
-                                       hwlen = MCL_EXHDR_SIZE;
-                               } else {
-                                       snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
-                                       hwlen = MCL_LEN_SIZE;
-                               }
+               if( mbuf != NULL && mbuf->state == RMR_OK ) {
+                       if( mbuf->mtype == RIC_HEALTH_CHECK_REQ ) {
+                               mbuf->mtype = RIC_HEALTH_CHECK_RESP;            // if we're here we are running and all is ok
+                               mbuf->sub_id = -1;
+                               mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE );  // ensure payload is large enough
+                               strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
+                               rmr_rts_msg( ctx->mrc, mbuf );
+                               continue;
+                       }
+                       
+                       if( mbuf->len > 0  ) {
+                               fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
+                               if( fd >= 0 ) {
+                                       if( long_hdr ) {
+                                               build_hdr( mbuf->len, header, sizeof( header ) );
+                                               hwlen = MCL_EXHDR_SIZE;
+                                       } else {
+                                               snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
+                                               hwlen = MCL_LEN_SIZE;
+                                       }
 
-                               if( (state = write( fd, header, hwlen )) != hwlen ) {           // write exactly MCL_LEN_SIZE bytes from the buffer
-                                       drops++;
-                                       total_drops++;
-                                       chalk_error( fifo );
-                               } else {
-                                       if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
-                                               errors++;
+                                       if( (state = write( fd, header, hwlen )) != hwlen ) {           // write exactly MCL_LEN_SIZE bytes from the buffer
+                                               drops++;
+                                               total_drops++;
                                                chalk_error( fifo );
                                        } else {
-                                               chalk_ok( fifo );
-                                               count++;
-                                               total++;
+                                               if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
+                                                       errors++;
+                                                       chalk_error( fifo );
+                                               } else {
+                                                       chalk_ok( fifo );
+                                                       count++;
+                                                       total++;
+                                               }
                                        }
                                }
-                       }
 
-                       if( rdc_ctx != NULL ) {
-                               rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
-                               rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                                // write the raw data
+                               if( rdc_ctx != NULL ) {
+                                       rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
+                                       rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                                // write the raw data
+                               }
                        }
                }
 
@@ -632,7 +672,7 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
                                fflush( stdout );
                        }
                }
-       }
+       } while( FOREVER );                             // forever allows for escape during unit testing
 }