60346ba749a79e091c4803480832672a0be843d4
[ric-app/mc.git] / sidecars / listener / src / mcl.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       mcl.c.
22         Abstract:       The mc listener library content. All external functions
23                                 should start with mcl_ and all stderr messages should have
24                                 (mcl) as the first token following the severity indicator.
25
26         Date:           22 August 2019
27         Author:         E. Scott Daniels
28 */
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <time.h>
35 #include <string.h>
36 #include <fcntl.h>
37 #include <signal.h>
38 #include <sys/stat.h>
39 #include <sys/types.h>
40
41
42 #include <rmr/rmr.h>
43 #include <rmr/rmr_symtab.h>
44 #include <rmr/RIC_message_types.h>
45
46 #include "mcl.h"
47
48 #ifndef FOREVER
49 #define FOREVER 1
50 #endif
51
52 #define READER 0
53 #define WRITER 1
54
55 #define TRUE    1
56 #define FALSE   0
57
58 /*
59         Information about one file descriptor. This is pointed to by the hash
60         such that the message type can be used as a key to look up the fifo's
61         file descriptor.
62 */
63 typedef struct {
64         int     fd;                                     // open fdes
65         int key;                                // symtab key
66         long long wcount;               // number of writes
67         long long drops;                // number dropped
68
69         long long wcount_rp;    // number of writes during last reporting period
70         long long drops_rp;             // number dropped during last reporting period
71 } fifo_t;
72
73 /*
74         Our conext.  Pointers to the read and write hash tables (both keyed on the message
75         type), the message router (RMR) context, and other goodies.
76 */
77 typedef struct {
78         void*   mrc;                            // the message router's context
79         void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
80         void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
81         char*   fifo_dir;                       // directory where we open fifos
82
83 } mcl_ctx_t;
84
85 // -------- private -------------------------------------------------------
86
87
88 /*
89         Set up for raw data capture. We look for directory overriedes from
90         environment variables, and then invoke the rdc_init() to actually
91         set things up.
92 */
93 static void* setup_rdc() {
94         void*   ctx;
95         int             value;                                                  // value computed for something
96         char*   ep;                                                             // pointer to environment var
97         char*   sdir = "/tmp/rdc/stage";                // default directory names
98         char*   fdir = "/tmp/rdc/final";
99         char*   suffix = ".rdc";
100         char*   done = NULL;
101
102         if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) {                                    // exists and is 0
103                 logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)" );
104                 return NULL;
105         }
106
107         if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
108                 sdir = ep;
109         } else {
110                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures here as it could likely exist
111                 mkdir( sdir, 0755 );
112         }
113
114         if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
115                 fdir = ep;
116         } else {
117                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures again -- very likely it's there
118                 mkdir( fdir, 0755 );
119         }
120
121         if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
122                 suffix = ep;
123         }
124
125         if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
126                 done = ep;
127         }
128
129         ctx = rdc_init( sdir, fdir, suffix, done );
130         if( ctx == NULL ) {
131                 logit( LOG_ERR, "rdc_init did not generate a context" );
132         } else {
133                 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
134                 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
135         }
136
137         if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
138                 value = atoi( ep );
139                 logit( LOG_INFO, "setting frequency: %d", value );
140                 rdc_set_freq( ctx, value );
141         }
142         return ctx;
143 }
144
145 /*
146         Builds an extended header in the buffer provided, or allocates a new buffer if
147         dest is nil. The header is of the form:
148                 <delim><len><timestamp>
149
150         Field lengths (bytes) are:
151                 delim           4    
152                 len                     8       (7 digits + 0)
153                 timestamp       16  (15 digits + 0)
154
155
156         Timestamp is a single unsigned long long in ASCII; ms since epoch.
157         If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
158         the timestamp generated will be 1570113591103.
159
160         The lenght and timestamp fields in the header are zero terminated so
161         they can be parsed as a string (atoi etc).
162 */
163 static char* build_hdr( int len, char* dest, int dest_len ) {
164         struct timespec ts;         // time just before call executed
165
166         if( dest == NULL ) {
167                 dest_len = MCL_EXHDR_SIZE + 2;                  // more than enough room
168                 dest = (char *) malloc( sizeof( char ) * dest_len );
169         } else {
170                 if( dest_len < MCL_EXHDR_SIZE ) {               // shouldn't happen, but take no chances
171                         memset( dest, 0, dest_len );
172                         return NULL;
173                 }
174         }
175
176         memset( dest, 0, dest_len );
177
178         clock_gettime( CLOCK_REALTIME, &ts );
179         snprintf( dest, dest_len, "%s%07d", MCL_DELIM, len );
180         snprintf( dest+12, dest_len-13, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
181
182         return dest;
183 }
184
185 /*
186         Build a file name and open. The io_direction is either READER or
187         WRITER.  For a writer we must 'trick' the system into allowing us
188         to open a pipe for writing in non-blocking mode so that we can
189         report on drops (messages we couldn't write because there was no
190         reader).  The trick is to open a reader on the pipe so that when
191         we open the writer there's a reader and the open won't fail. As
192         soon as we have the writer open, we can close the junk reader.
193
194         If the desired fifo does not exist, it is created.
195 */
196 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
197         char    wbuf[1024];
198         int             fd;                                     // real file des
199         int             jfd = -1;                       // junk file des
200         int             state;
201
202         if( ctx == NULL || mtype < 0 ) {
203                 return -1;
204         }
205
206         snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
207
208         state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
209         if( state != 0 && errno != EEXIST ) {
210                 logit(  LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
211                 return -1;
212         }
213
214         if( io_dir == READER ) {
215                 fd = open( wbuf, O_RDONLY  );                   // just open the reader
216                 if( fd < 0 ) {
217                         logit(  LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
218                 }
219         } else {
220                 jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
221                 if( jfd < 0 ) {
222                         logit(  LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
223                         return -1;
224                 }
225
226                 fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
227                 if( fd < 0 ) {
228                         logit(  LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
229                 }
230
231                 close( jfd );                   // should be safe to close this
232         }
233
234
235         return fd;
236 }
237
238 /*
239         Given a message type, return the file des of the fifo that
240         the payload should be written to.        Returns the file des, or -1
241         on error. When sussing out a read file descriptor this will
242         block until there is a fifo for the message type which is
243         open for reading.
244
245         If fref is not nil, then a pointer to the fifo info block is returned
246         allowing for direct update of counts after the write.
247 */
248 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
249         fifo_t* fifo = NULL;
250         void*   hash;
251
252         if( ctx == NULL ) {
253                 if( fref != NULL ) {
254                         *fref = NULL;
255                 }
256                 return -1;
257         }
258
259         if( io_dir == READER ) {                // with an integer key, we need two hash tables
260                 hash = ctx->rd_hash;
261         } else {
262                 hash = ctx->wr_hash;
263         }
264
265         if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
266                 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
267                 if( fifo != NULL ) {
268                         memset( fifo, 0, sizeof( *fifo ) );
269                         fifo->key = mtype;
270                         fifo->fd = open_fifo( ctx, mtype, io_dir );
271                         if( fifo->fd >= 0 ) {                                   // save only on good open
272                                 rmr_sym_map( hash, mtype, fifo );
273                         } else {
274                                 free( fifo );
275                                 fifo = NULL;
276                         }
277                 }
278         } else {
279                 if( fifo->fd < 0 ) {                            // it existed, but was closed; reopen
280                         fifo->fd = open_fifo( ctx, mtype, io_dir );
281                 }
282         }
283
284         if( fref != NULL ) {
285                 *fref = fifo;
286         }
287
288         return fifo == NULL ? -1 : fifo->fd;
289 }
290
291 /*
292         Should we need to close a FIFO we do so and leave the block in the hash
293         with a bad FD so that we'll attempt to reopen on next use.
294 */
295 static void close_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
296         fifo_t* fifo;
297         void*   hash;
298
299         if( ctx == NULL ) {
300                 return;
301         }
302
303         if( io_dir == READER ) {                // with an integer key, we need two hash tables
304                 hash = ctx->rd_hash;
305         } else {
306                 hash = ctx->wr_hash;
307         }
308
309         if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) != NULL ) {
310                 if( fifo->fd >= 0 ) {
311                         close( fifo->fd );
312                         fifo->fd = -1;
313                 }
314         }
315 }
316
317 /*
318         Make marking counts easier in code
319 */
320 static inline void chalk_error( fifo_t* fifo ) {
321         if( fifo != NULL ) {
322                 fifo->drops++;
323                 fifo->drops_rp++;
324         }
325 }
326
327 static inline void chalk_ok( fifo_t* fifo ) {
328         if( fifo != NULL ) {
329                 fifo->wcount++;
330                 fifo->wcount_rp++;
331         }
332 }
333
334 /*
335         Callback function driven to traverse the symtab and generate the
336         counts for each fifo.  Sonar will complain about unused parameters which
337         are normal for callbacks. Further, sonar will grumble about st, and entry
338         not being const; we can't unless RMR proto for the callback changes.
339 */
340 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
341         fifo_t* fifo;
342         int             report_period = 60;
343
344         if( data ) {
345                 report_period = *((int *) data);
346         }
347
348         if( (fifo = (fifo_t *) thing) != NULL ) {
349                 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
350                         fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
351
352                 fifo->wcount_rp = 0;            // reset the report counts
353                 fifo->drops_rp = 0;
354                 return;                                         // return here to avoid sonar required hack below
355         }
356
357         /*
358                 Sonar doesn't grok the fact that for callback functions some parms are naturally
359                 ignored. So, to eliminate the 5 code smells because we only care about thing, we
360                 have this hack....
361         */
362         if( st == NULL && entry == NULL && name == NULL && data == NULL ) {
363                 fprintf( stderr, "mdcl: all parms to callback stats were nil\n" );
364         }
365 }
366
367 /*
368         Writes the indicated bytes (n2write) from buf onto the fd. Returns only after
369         full buffer is written, or there is a hard error (not eagain or eintr).
370         Returns the number written; if less than n2write the caller may assume
371         that there was a hard error and errno should reflect.
372 */
373 static inline int write_all( int fd, char const* buf, int n2write ) {
374         ssize_t remain = 0;                     // number of bytes remaining to write
375         ssize_t wrote = 0;                      // number of bytes written thus far
376         ssize_t state = 0;
377
378         if( fd < 0 ) {
379                 errno = EBADFD;
380                 return 0;
381         }
382
383         errno = 0;
384         remain = n2write;
385         do {
386                 if( (state = write( fd, buf + wrote, remain )) > 0 ) {
387                         wrote += state;
388                         remain = n2write - wrote;
389                 }
390         } while( remain > 0 && (errno == EINTR || errno == EAGAIN) ) ;
391
392         return wrote;
393 }
394
395 /*
396         Similar to write_all, this will write all bytes in the buffer, but
397         will return failure if the first write attempt fails with 0 written
398         (assuming that the pipe has no reader). We use this when writing the
399         header bytes; we want to drop the message if we can't even write one
400         byte, but if we write one, we must loop until all are written.
401
402         Returns the number written. If that value is less than n2write, then
403         the caller may assume a hard error occurred and errno should reflect.
404         If 0 is returned it can be assumed that the FIFO would block/has no
405         reader.
406 */
407 static inline int write_all_nb( int fd, char const* buf, int n2write ) {
408         ssize_t remain = 0;                     // number of bytes remaining to write
409         ssize_t wrote = 0;                      // number of bytes written
410
411         if( fd < 0 ) {
412                 errno = EBADFD;
413                 return 0;
414         }
415
416         errno = 0;
417         remain = n2write;
418         wrote = write( fd, buf, remain );
419         if( wrote < 0 ) {                                                               // report error with exception for broken pipe
420                 return errno == EPIPE ? 0 : -1;                         // broken pipe we assume no reader and return 0 since nothing written
421         }
422
423         if( wrote < n2write  &&  wrote > 0 ) {                  // if we wrote anything, we must tough it out and write all if it was short
424                 wrote +=  write_all( fd, buf + wrote, n2write - wrote );
425         }
426
427         return wrote;
428 }
429
430 // ---------- public ------------------------------------------------------
431 /*
432         Sets a signal handler for sigpipe so we don't crash if a reader closes the
433         last reading fd on a pipe. We could do this automatically, but if the user
434         programme needs to trap sigpipe too, this gives them the option not to have
435         us interfere.
436 */
437 extern int mcl_set_sigh( ) {
438         signal( SIGPIPE, SIG_IGN );
439 }
440
441 /*
442         "Opens" the interface to RMR such that messages sent to the application will
443         be available via the rmr receive funcitons. This is NOT automatically called
444         by the mk_context function as some applications will be using the mc library
445         for non-RMR, fifo, chores.
446 */
447 extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
448         mcl_ctx_t*      ctx;
449         int             announce = 0;
450
451         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
452                 return 0;
453         }
454
455         ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
456         if( ctx->mrc == NULL ) {
457                 logit(  LOG_CRIT, "start listening: unable to initialise RMr" );
458                 return 0;
459         }
460
461         while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
462                 if( announce <= 0 ) {
463                         logit(  LOG_INFO, "waiting for RMR to show ready" );
464                         announce = 10;
465                 } else {
466                         announce--;
467                 }
468
469                 sleep( 1 );
470         }
471
472         return 1;
473 }
474
475 /*
476         Blocks until a message arives with a good return code or we timeout. Returns the
477         rmr message buffer. Timeout value epxected in seconds.
478 */
479 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
480         mcl_ctx_t*      ctx;
481
482         if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
483                 return NULL;
484         }
485
486         if( ctx->mrc == NULL ) {
487                 logit(  LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
488                 exit( 1 );
489         }
490
491         do {
492                 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 );                           // wait for next
493         } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
494
495         return msg;
496 }
497
498 /*
499         Create the context.
500 */
501 extern  void* mcl_mk_context( const char* dir ) {
502         mcl_ctx_t*      ctx;
503
504         if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
505                 memset( ctx, 0, sizeof( *ctx ) );
506                 ctx->fifo_dir = strdup( dir );
507                 ctx->wr_hash = rmr_sym_alloc( 1001 );
508                 ctx->rd_hash = rmr_sym_alloc( 1001 );
509
510                 if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
511                         logit(  LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
512                         free( ctx );
513                         return NULL;
514                 }
515         }
516
517         return (void *) ctx;
518 }
519
520 /*
521         Read the header. Best case we read the expected number of bytes, get all
522         of them and find that they start with the delemiter.  Worst case
523         We have to wait for all of the header, or need to synch at the next
524         delimeter. We assume best case most likely and handle it as such.
525 */
526 static void read_header( int fd, char* buf ) {
527         size_t len;
528         size_t need = MCL_EXHDR_SIZE;           // total needed
529         size_t dneed;                                           // delimieter needed
530         char*   rp;                                                     // read position in buf
531
532         len = read( fd, buf, need );
533         if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
534                 return;
535         }
536
537         while( TRUE ) {
538                 if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
539                         rp = buf + len;
540                         dneed = strlen( MCL_DELIM ) - len;
541
542                         while( dneed > 0 ) {
543                                 len = read( fd, rp, dneed );
544                                 dneed -= len;
545                                 rp += len;
546                         }
547                 }
548
549                 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
550                         need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
551                         rp = buf + (MCL_EXHDR_SIZE - need);
552
553                         while( need > 0 ) {
554                                 len = read( fd, rp, need );
555                                 need -= len;
556                                 rp += len;
557                         }
558
559                         return;
560                 }
561
562                 while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
563                         len = read( fd, buf, 1 );               // because we ignore start byte that might be in the buffer)
564                 }
565         }
566 }
567
568
569 /*
570         Read one record from the fifo that the message type maps to.
571         Writes at max ublen bytes into the ubuf.
572
573         If long_hdrs is true (!0), then we expect that the stream in the fifo
574         has extended headers (<delim><len><time>), and will write the timestamp
575         from the header into the buffer pointed to by timestamp. The buffer is
576         assumed to be at least MCL_TSTAMP_SIZE bytes in length.
577
578         Further, when extended headers are being used, this function will
579         automatically resynchronise if it detects an issue.
580
581         The function could look for the delimiter and automatically detect whether
582         or not extended headers are in use, but if the stream is out of synch on the
583         first read, this cannot be done, so the funciton requires that the caller
584         know that the FIFO contains extended headers.
585 */
586 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
587         int fd;
588         int len;
589         int     msg_len;
590         int     got = 0;                                                // number of bytes we actually got
591         int need;
592         char wbuf[4096];
593         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
594
595         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
596                 errno = EINVAL;
597                 return 0;
598         }
599
600         if( (fd = suss_fifo( ctx, mtype, READER, NULL ))  >= 0 )  {
601                 if( long_hdrs ) {
602                         read_header( fd, wbuf );
603                         msg_len = need = atoi( wbuf + MCL_LEN_OFF );                            // read the length
604                         if( timestamp ) {
605                                 strncpy( timestamp, wbuf + MCL_TSTAMP_OFF+1, MCL_TSTAMP_SIZE );
606                         }
607                 } else {
608                         if( timestamp != NULL ) {                                               // won't be there, but ensure it's not garbage
609                                 *timestamp = 0;
610                         }
611
612                         read( fd, wbuf, MCL_LEN_SIZE );                                 // we assume we will get all 8 bytes as there isn't a way to sync the old stream
613                         msg_len = need = atoi( wbuf );
614                 }
615
616
617                 if( need > ublen ) {
618                         need = ublen;                                           // cannot give them more than they can take
619                 }
620                 while( need > 0 ) {
621                         len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
622                         memcpy( ubuf+got, wbuf, len );
623                         got += len;
624                         need -= len;
625                 }
626
627                 if( msg_len > got ) {                                   // we must ditch rest of this message
628                         need = msg_len - got;
629                         while( need > 0 ) {
630                                 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
631                                 need -= len;
632                         }
633                 }
634
635                 return got;
636         }
637
638         errno = EBADFD;
639         return 0;
640 }
641
642 /*
643         Read one record from the fifo that the message type maps to.
644         Writes at max ublen bytes into the ubuf. If extended headers are in use
645         this function will ignore the timestamp.
646
647         If long_hdrs is true (!0), then we expect that the stream in the fifo
648         has extended headers (<delim><len><time>).
649 */
650 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
651         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
652 }
653
654 /*
655         Read a single message from the FIFO returning it in the caller's buffer. If extended
656         headers are being used, and the caller supplied a timestamp buffer, the timestamp
657         which was in the header will be returned in that buffer.  The return value is the number
658         of bytes in the buffer; 0 indicates an error and errno should be set.
659 */
660 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
661         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
662 }
663
664
665 /*
666         Will read messages and fan them out based on the message type. This should not
667         return and if it does the caller should assume an error.
668
669         The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
670         string , followed by that number of 'raw' bytes. The raw bytes are the payload
671         exactly as received.
672
673         The report parameter is the frequency, in seconds, for writing a short
674         status report to stdout. If 0 then it's off.
675
676         If long_hdr is true, then we geneate an extended header with a delimiter and
677         timestamp.
678
679         The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
680         message.  When the health check message is received it is responded to
681         with the current state of processing (ok or err).
682 */
683 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
684         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
685         fifo_t*         fifo;                                   // fifo to chalk counts on
686         rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
687         char            header[128];                    // header we'll pop in front of the payload
688         int                     fd;                                             // file des to write to
689         long long       total = 0;                              // total messages received and written
690         long long       total_drops = 0;                // total messages received and written
691         long            count = 0;                              // messages received and written during last reporting period
692         long            errors = 0;                             // unsuccessful payload writes
693         long            drops = 0;                              // number of drops
694         time_t          next_report = 0;                // we'll report every 2 seconds if report is true
695         time_t          now;
696         size_t          hwlen;                                  // write len for header
697         size_t          wrote;                                  // number of bytes actually written
698         void*           rdc_ctx = NULL;                 // raw data capture context
699         void*           rdc_buf = NULL;                 // capture buffer
700
701         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
702                 logit(  LOG_ERR, "(mcl) invalid context given to fanout" );
703                 errno = EINVAL;
704                 return;
705         }
706
707         if( report < 0 ) {
708                 report = 0;
709         }
710
711         rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
712
713         do {
714                 mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
715
716                 if( mbuf != NULL && mbuf->state == RMR_OK ) {
717                         if( mbuf->mtype == RIC_HEALTH_CHECK_REQ ) {
718                                 mbuf->mtype = RIC_HEALTH_CHECK_RESP;            // if we're here we are running and all is ok
719                                 mbuf->sub_id = -1;
720                                 mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE );  // ensure payload is large enough
721                                 if( mbuf->payload != NULL ) {
722                                         strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
723                                         rmr_rts_msg( ctx->mrc, mbuf );
724                                 }
725                                 continue;
726                         }
727
728                         if( mbuf->len > 0  ) {
729                                 if( long_hdr ) {
730                                         build_hdr( mbuf->len, header, sizeof( header ) );
731                                         hwlen = MCL_EXHDR_SIZE;
732                                 } else {
733                                         snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
734                                         hwlen = MCL_LEN_SIZE;
735                                 }
736
737                                 fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );                                      // map the message type to an open fd
738                                 if( fd >= 0 ) {
739                                         if( (wrote = write_all_nb( fd, header, hwlen )) == 0 ) {                // write header; 0 indicates no reader, drop silently
740                                                 drops++;
741                                                 total_drops++;
742                                                 chalk_error( fifo );
743                                         } else {
744                                                 if( wrote != hwlen ) {
745                                                         logit( LOG_ERR, "(mcl): error writing header to fifo; mt=%d wrote=%d tried=%d: %s", mbuf->mtype, wrote, hwlen, strerror( errno ) );
746                                                         errors++;
747                                                         chalk_error( fifo );
748                                                         close_fifo( ctx, mbuf->mtype, WRITER );
749                                                 } else {
750                                                         if( write_all( fd, mbuf->payload, mbuf->len ) != mbuf->len ) {          // we wrote a header, so we must write all; no drop at this point
751                                                                 logit( LOG_ERR, "(mcl): error writing payload to fifo; mt=%d: %s\n", mbuf->mtype, strerror( errno ) );
752                                                                 close_fifo( ctx, mbuf->mtype, WRITER );
753                                                         } else {
754                                                                 chalk_ok( fifo );
755                                                                 count++;
756                                                                 total++;
757                                                         }
758                                                 }
759                                         }
760                                 }
761
762                                 if( rdc_ctx != NULL ) {                                         // always put the message to the rdc files if collecting; eve if pipe write failed
763                                         rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
764                                         rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                // write the raw data
765                                 }
766                         }
767                 }
768
769                 if( report ) {
770                         if( (now = time( NULL ) ) > next_report ) {
771                                 rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
772                                 fflush( stdout );
773
774                                 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
775                                         total, total_drops, report, count, drops, errors );
776                                 next_report = now + report;
777                                 count = 0;
778                                 drops = 0;
779                                 errors = 0;
780
781                                 fflush( stdout );
782                         }
783                 }
784
785                 if( ! FOREVER ) {                       // allow escape during unit tests; compiled out othewise, but sonar won't see that
786                         free( rdc_buf );
787                         break;                                  // sonar grumbles if we put FOREVER into the while; maddening
788                 }
789         } while( 1 );
790 }
791
792
793 /*
794         Given a buffer and length, along with the message type, look up the fifo and write
795         the buffer. Returns 0 on error; 1 on success.
796 */
797 extern int mcl_fifo_one( void* vctx, const char* payload, int plen, int mtype ) {
798         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
799         fifo_t*         fifo;                                   // fifo to chalk counts on
800         size_t          state = -1;
801         int                     fd;                                             // file des to write to
802
803         if( plen <= 0  || payload == NULL ) {
804                 return 1;
805         }
806
807         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
808                 logit( LOG_ERR, "(mcl) invalid context given to fifo_one\n" );
809                 return 0;
810         }
811
812         fd = suss_fifo( ctx, mtype, WRITER, &fifo );            // map the message type to an open fd
813         if( fd >= 0 ) {
814                 state = write( fd, payload, plen );
815         }
816
817         return state == (size_t) plen;
818 }