a50924b93f8ad498ae9f993075479aa81aeec653
[ric-app/mc.git] / sidecars / listener / mcl.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       mcl.c.
22         Abstract:       The mc listener library content. All external functions
23                                 should start with mcl_ and all stderr messages should have
24                                 (mcl) as the first token following the severity indicator.
25
26         Date:           22 August 2019
27         Author:         E. Scott Daniels
28 */
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <time.h>
35 #include <string.h>
36 #include <fcntl.h>
37 #include <signal.h>
38 #include <sys/stat.h>
39 #include <sys/types.h>
40
41
42 #include <rmr/rmr.h>
43 #include <rmr/rmr_symtab.h>
44 #include <rmr/RIC_message_types.h>
45
46 #include "mcl.h"
47
48 #ifndef FOREVER
49 #define FOREVER 1
50 #endif
51
52 #define READER 0
53 #define WRITER 1
54
55 #define TRUE    1
56 #define FALSE   0
57
58 /*
59         Information about one file descriptor. This is pointed to by the hash
60         such that the message type can be used as a key to look up the fifo's
61         file descriptor.
62 */
63 typedef struct {
64         int     fd;                                     // open fdes
65         int key;                                // symtab key
66         long long wcount;               // number of writes
67         long long drops;                // number dropped
68
69         long long wcount_rp;    // number of writes during last reporting period
70         long long drops_rp;             // number dropped during last reporting period
71 } fifo_t;
72
73 /*
74         Our conext.  Pointers to the read and write hash tables (both keyed on the message
75         type), the message router (RMR) context, and other goodies.
76 */
77 typedef struct {
78         void*   mrc;                            // the message router's context
79         void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
80         void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
81         char*   fifo_dir;                       // directory where we open fifos
82
83 } mcl_ctx_t;
84
85 // -------- private -------------------------------------------------------
86
87
88 /*
89         Set up for raw data capture. We look for directory overriedes from
90         environment variables, and then invoke the rdc_init() to actually
91         set things up.
92 */
93 static void* setup_rdc() {
94         void*   ctx;
95         int             value;                                                  // value computed for something
96         char*   ep;                                                             // pointer to environment var
97         char*   sdir = "/tmp/rdc/stage";                // default directory names
98         char*   fdir = "/tmp/rdc/final";
99         char*   suffix = ".rdc";
100         char*   done = NULL;
101
102         if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) {                                    // exists and is 0
103                 logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)\n" );
104                 return NULL;
105         }
106
107         if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
108                 sdir = ep;
109         } else {
110                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures here as it could likely exist
111                 mkdir( sdir, 0755 );
112         }
113
114         if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
115                 fdir = ep;
116         } else {
117                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures again -- very likely it's there
118                 mkdir( fdir, 0755 );
119         }
120
121         if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
122                 suffix = ep;
123         }
124
125         if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
126                 done = ep;
127         }
128
129         ctx = rdc_init( sdir, fdir, suffix, done );
130         if( ctx == NULL ) {
131                 logit( LOG_ERR, "rdc_init did not generate a context" );
132         } else {
133                 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
134                 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
135         }
136
137         if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
138                 value = atoi( ep );
139                 logit( LOG_INFO, "setting frequency: %d", value );
140                 rdc_set_freq( ctx, value );
141         }
142         return ctx;
143 }
144
145 /*
146         Builds an extended header in the buffer provided, or allocates a new buffer if
147         dest is nil. The header is of the form:
148                 <delim><len><timestamp>
149
150         Field lengths (bytes) are:
151                 delim           4     
152                 len                     8       (7 digits + 0)
153                 timestamp       16  (15 digits + 0)
154
155
156         Timestamp is a single unsigned long long in ASCII; ms since epoch.
157         If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
158         the timestamp generated will be 1570113591103.
159
160         The lenght and timestamp fields in the header are zero terminated so
161         they can be parsed as a string (atoi etc).
162 */
163 static char* build_hdr( int len, char* dest, int dest_len ) {
164         struct timespec ts;         // time just before call executed
165
166         if( dest == NULL ) {
167                 dest_len = MCL_EXHDR_SIZE + 2;                  // more than enough room
168                 dest = (char *) malloc( sizeof( char ) * dest_len );
169         } else {
170                 if( dest_len < MCL_EXHDR_SIZE ) {               // shouldn't happen, but take no chances
171                         memset( dest, 0, dest_len );
172                         return NULL;
173                 }
174         }
175
176         memset( dest, 0, dest_len );
177
178         clock_gettime( CLOCK_REALTIME, &ts );
179         snprintf( dest, dest_len, "%s%07d", MCL_DELIM, len );
180         snprintf( dest+12, dest_len-13, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
181
182         return dest;
183 }
184
185 /*
186         Build a file name and open. The io_direction is either READER or
187         WRITER.  For a writer we must 'trick' the system into allowing us
188         to open a pipe for writing in non-blocking mode so that we can
189         report on drops (messages we couldn't write because there was no
190         reader).  The trick is to open a reader on the pipe so that when
191         we open the writer there's a reader and the open won't fail. As
192         soon as we have the writer open, we can close the junk reader.
193
194         If the desired fifo does not exist, it is created.
195 */
196 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
197         char    wbuf[1024];
198         int             fd;                                     // real file des
199         int             jfd = -1;                       // junk file des
200         int             state;
201
202         if( ctx == NULL || mtype < 0 ) {
203                 return -1;
204         }
205
206         snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
207
208         state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
209         if( state != 0 && errno != EEXIST ) {
210                 logit(  LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
211                 return -1;
212         }
213
214         if( io_dir == READER ) {
215                 fd = open( wbuf, O_RDONLY  );                   // just open the reader
216                 if( fd < 0 ) {
217                         logit(  LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
218                 }
219         } else {
220                 jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
221                 if( jfd < 0 ) {
222                         logit(  LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
223                         return -1;
224                 }
225
226                 fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
227                 if( fd < 0 ) {
228                         logit(  LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
229                 }
230
231                 close( jfd );                   // should be safe to close this
232         }
233
234
235         return fd;
236 }
237
238 /*
239         Given a message type, return the file des of the fifo that
240         the payload should be written to.        Returns the file des, or -1
241         on error. When sussing out a read file descriptor this will
242         block until there is a fifo for the message type which is
243         open for reading.
244
245         If fref is not nil, then a pointer to the fifo info block is returned
246         allowing for direct update of counts after the write.
247 */
248 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
249         fifo_t* fifo = NULL;
250         void*   hash;
251
252         if( ctx == NULL ) {
253                 if( fref != NULL ) {
254                         *fref = NULL;
255                 }
256                 return -1;
257         }
258
259         if( io_dir == READER ) {                // with an integer key, we need two hash tables
260                 hash = ctx->rd_hash;
261         } else {
262                 hash = ctx->wr_hash;
263         }
264
265         if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
266                 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
267                 if( fifo != NULL ) {
268                         memset( fifo, 0, sizeof( *fifo ) );
269                         fifo->key = mtype;
270                         fifo->fd = open_fifo( ctx, mtype, io_dir );
271                         if( fifo->fd >= 0 ) {                                   // save only on good open
272                                 rmr_sym_map( hash, mtype, fifo );
273                         } else {
274                                 free( fifo );
275                                 fifo = NULL;
276                         }
277                 }
278         }
279
280         if( fref != NULL ) {
281                 *fref = fifo;
282         }
283
284         return fifo == NULL ? -1 : fifo->fd;
285 }
286
287 /*
288         Make marking counts easier in code
289 */
290 static inline void chalk_error( fifo_t* fifo ) {
291         if( fifo != NULL ) {
292                 fifo->drops++;
293                 fifo->drops_rp++;
294         }
295 }
296
297 static inline void chalk_ok( fifo_t* fifo ) {
298         if( fifo != NULL ) {
299                 fifo->wcount++;
300                 fifo->wcount_rp++;
301         }
302 }
303
304 /*
305         Callback function driven to traverse the symtab and generate the
306         counts for each fifo.  Sonar will complain about unused parameters which
307         are normal for callbacks. Further, sonar will grumble about st, and entry
308         not being const; we can't unless RMR proto for the callback changes.
309 */
310 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
311         fifo_t* fifo;
312         int             report_period = 60;
313
314         if( data ) {
315                 report_period = *((int *) data);
316         }
317
318         if( (fifo = (fifo_t *) thing) != NULL ) {
319                 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
320                         fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
321
322                 fifo->wcount_rp = 0;            // reset the report counts
323                 fifo->drops_rp = 0;
324         }
325 }
326
327 // ---------- public ------------------------------------------------------
328 /*
329         Sets a signal handler for sigpipe so we don't crash if a reader closes the
330         last reading fd on a pipe. We could do this automatically, but if the user
331         programme needs to trap sigpipe too, this gives them the option not to have
332         us interfere.
333 */
334 extern int mcl_set_sigh( ) {
335         signal( SIGPIPE, SIG_IGN );
336 }
337
338 /*
339         "Opens" the interface to RMR such that messages sent to the application will
340         be available via the rmr receive funcitons. This is NOT automatically called
341         by the mk_context function as some applications will be using the mc library
342         for non-RMR, fifo, chores.
343 */
344 extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
345         mcl_ctx_t*      ctx;
346         int             announce = 0;
347
348         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
349                 return 0;
350         }
351
352         ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
353         if( ctx->mrc == NULL ) {
354                 logit(  LOG_CRIT, "start listening: unable to initialise RMr" );
355                 return 0;
356         }
357
358         while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
359                 if( announce <= 0 ) {
360                         logit(  LOG_INFO, "waiting for RMR to show ready" );
361                         announce = 10;
362                 } else {
363                         announce--;
364                 }
365
366                 sleep( 1 );
367         }
368
369         return 1;
370 }
371
372 /*
373         Blocks until a message arives with a good return code or we timeout. Returns the
374         rmr message buffer. Timeout value epxected in seconds.
375 */
376 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
377         mcl_ctx_t*      ctx;
378
379         if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
380                 return NULL;
381         }
382
383         if( ctx->mrc == NULL ) {
384                 logit(  LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
385                 exit( 1 );
386         }
387
388         do {
389                 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 );                           // wait for next
390         } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
391
392         return msg;
393 }
394
395 /*
396         Create the context.
397 */
398 extern  void* mcl_mk_context( const char* dir ) {
399         mcl_ctx_t*      ctx;
400
401         if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
402                 memset( ctx, 0, sizeof( *ctx ) );
403                 ctx->fifo_dir = strdup( dir );
404                 ctx->wr_hash = rmr_sym_alloc( 1001 );
405                 ctx->rd_hash = rmr_sym_alloc( 1001 );
406
407                 if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
408                         logit(  LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
409                         free( ctx );
410                         return NULL;
411                 }
412         }
413
414         return (void *) ctx;
415 }
416
417 /*
418         Read the header. Best case we read the expected number of bytes, get all
419         of them and find that they start with the delemiter.  Worst case
420         We have to wait for all of the header, or need to synch at the next
421         delimeter. We assume best case most likely and handle it as such.
422 */
423 static void read_header( int fd, char* buf ) {
424         int len;
425         int need = MCL_EXHDR_SIZE;              // total needed
426         int dneed;                                              // delimieter needed
427         char*   rp;                             // read position in buf
428
429         len = read( fd, buf, need );
430         if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
431                 return;
432         }
433
434         while( TRUE ) {
435                 if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
436                         rp = buf + len;
437                         dneed = strlen( MCL_DELIM ) - len;
438
439                         while( dneed > 0 ) {
440                                 len = read( fd, rp, dneed );
441                                 dneed -= len;
442                                 rp += len;
443                         }
444                 }
445
446                 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
447                         need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
448                         rp = buf + (MCL_EXHDR_SIZE - need);
449
450                         while( need > 0 ) {
451                                 len = read( fd, rp, need );
452                                 need -= len;
453                                 rp += len;
454                         }
455
456                         return;
457                 }
458
459                 while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
460                         len = read( fd, buf, 1 );               // because we ignore start byte that might be in the buffer)
461                 }
462         }
463 }
464
465
466 /*
467         Read one record from the fifo that the message type maps to.
468         Writes at max ublen bytes into the ubuf.
469
470         If long_hdrs is true (!0), then we expect that the stream in the fifo
471         has extended headers (<delim><len><time>), and will write the timestamp
472         from the header into the buffer pointed to by timestamp. The buffer is
473         assumed to be at least MCL_TSTAMP_SIZE bytes in length.
474
475         Further, when extended headers are being used, this function will
476         automatically resynchronise if it detects an issue.
477
478         The function could look for the delimiter and automatically detect whether
479         or not extended headers are in use, but if the stream is out of synch on the
480         first read, this cannot be done, so the funciton requires that the caller
481         know that the FIFO contains extended headers.
482 */
483 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
484         int fd;
485         int len;
486         int     msg_len;
487         int     got = 0;                                                // number of bytes we actually got
488         int need;
489         char wbuf[4096];
490         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
491
492         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
493                 errno = EINVAL;
494                 return 0;
495         }
496
497         if( (fd = suss_fifo( ctx, mtype, READER, NULL ))  >= 0 )  {
498                 if( long_hdrs ) {
499                         read_header( fd, wbuf );
500                         msg_len = need = atoi( wbuf + MCL_LEN_OFF );                            // read the length
501                         if( timestamp ) {
502                                 strncpy( timestamp, wbuf + MCL_TSTAMP_OFF+1, MCL_TSTAMP_SIZE );
503                         }
504                 } else {
505                         if( timestamp != NULL ) {                                               // won't be there, but ensure it's not garbage
506                                 *timestamp = 0;
507                         }
508
509                         read( fd, wbuf, MCL_LEN_SIZE );                                 // we assume we will get all 8 bytes as there isn't a way to sync the old stream
510                         msg_len = need = atoi( wbuf );
511                 }
512
513
514                 if( need > ublen ) {
515                         need = ublen;                                           // cannot give them more than they can take
516                 }
517                 while( need > 0 ) {
518                         len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
519                         memcpy( ubuf+got, wbuf, len );
520                         got += len;
521                         need -= len;
522                 }
523
524                 if( msg_len > got ) {                                   // we must ditch rest of this message
525                         need = msg_len - got;
526                         while( need > 0 ) {
527                                 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
528                                 need -= len;
529                         }
530                 }
531
532                 return got;
533         }
534
535         errno = EBADFD;
536         return 0;
537 }
538
539 /*
540         Read one record from the fifo that the message type maps to.
541         Writes at max ublen bytes into the ubuf. If extended headers are in use
542         this function will ignore the timestamp.
543
544         If long_hdrs is true (!0), then we expect that the stream in the fifo
545         has extended headers (<delim><len><time>).
546 */
547 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
548         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
549 }
550
551 /*
552         Read a single message from the FIFO returning it in the caller's buffer. If extended
553         headers are being used, and the caller supplied a timestamp buffer, the timestamp
554         which was in the header will be returned in that buffer.  The return value is the number
555         of bytes in the buffer; 0 indicates an error and errno should be set.
556 */
557 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
558         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
559 }
560
561
562 /*
563         Will read messages and fan them out based on the message type. This should not
564         return and if it does the caller should assume an error.
565
566         The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
567         string , followed by that number of 'raw' bytes. The raw bytes are the payload
568         exactly as received.
569
570         The report parameter is the frequency, in seconds, for writing a short
571         status report to stdout. If 0 then it's off.
572
573         If long_hdr is true, then we geneate an extended header with a delimiter and
574         timestamp.
575
576         The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
577         message.  When the health check message is received it is responded to
578         with the current state of processing (ok or err).
579 */
580 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
581         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
582         fifo_t*         fifo;                                   // fifo to chalk counts on
583         rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
584         char            header[128];                    // header we'll pop in front of the payload
585         int                     fd;                                             // file des to write to
586         long long       total = 0;                              // total messages received and written
587         long long       total_drops = 0;                // total messages received and written
588         long            count = 0;                              // messages received and written during last reporting period
589         long            errors = 0;                             // unsuccessful payload writes
590         long            drops = 0;                              // number of drops
591         time_t          next_report = 0;                // we'll report every 2 seconds if report is true
592         time_t          now;
593         size_t          hwlen;                                  // write len for header
594         void*           rdc_ctx = NULL;                 // raw data capture context
595         void*           rdc_buf = NULL;                 // capture buffer
596
597         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
598                 logit(  LOG_ERR, "(mcl) invalid context given to fanout" );
599                 errno = EINVAL;
600                 return;
601         }
602
603         if( report < 0 ) {
604                 report = 0;
605         }
606
607         rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
608
609         do {
610                 mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
611
612                 if( mbuf != NULL && mbuf->state == RMR_OK ) {
613                         if( mbuf->mtype == RIC_HEALTH_CHECK_REQ ) {
614                                 mbuf->mtype = RIC_HEALTH_CHECK_RESP;            // if we're here we are running and all is ok
615                                 mbuf->sub_id = -1;
616                                 mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE );  // ensure payload is large enough
617                                 if( mbuf->payload != NULL ) {
618                                         strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
619                                         rmr_rts_msg( ctx->mrc, mbuf );
620                                 }
621                                 continue;
622                         }
623
624                         if( mbuf->len > 0  ) {
625                                 if( long_hdr ) {
626                                         build_hdr( mbuf->len, header, sizeof( header ) );
627                                         hwlen = MCL_EXHDR_SIZE;
628                                 } else {
629                                         snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
630                                         hwlen = MCL_LEN_SIZE;
631                                 }
632
633                                 fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
634                                 if( fd >= 0 ) {
635                                         if( write( fd, header, hwlen ) != hwlen ) {                     // write exactly MCL_LEN_SIZE bytes from the buffer
636                                                 drops++;
637                                                 total_drops++;
638                                                 chalk_error( fifo );
639                                         } else {
640                                                 if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
641                                                         errors++;
642                                                         chalk_error( fifo );
643                                                 } else {
644                                                         chalk_ok( fifo );
645                                                         count++;
646                                                         total++;
647                                                 }
648                                         }
649                                 }
650
651                                 if( rdc_ctx != NULL ) {
652                                         rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
653                                         rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                // write the raw data
654                                 }
655                         }
656                 }
657
658                 if( report ) {
659                         if( (now = time( NULL ) ) > next_report ) {
660                                 rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
661                                 fflush( stdout );
662
663                                 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
664                                         total, total_drops, report, count, drops, errors );
665                                 next_report = now + report;
666                                 count = 0;
667                                 drops = 0;
668                                 errors = 0;
669
670                                 fflush( stdout );
671                         }
672                 }
673
674                 if( ! FOREVER ) {                       // allow escape during unit tests; compiled out othewise, but sonar won't see that
675                         free( rdc_buf );
676                         break;                                  // sonar grumbles if we put FOREVER into the while; maddening
677                 }
678         } while( 1 );
679 }
680
681
682 /*
683         Given a buffer and length, along with the message type, look up the fifo and write
684         the buffer. Returns 0 on error; 1 on success.
685 */
686 extern int mcl_fifo_one( void* vctx, const char* payload, int plen, int mtype ) {
687         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
688         fifo_t*         fifo;                                   // fifo to chalk counts on
689         size_t          state = -1;
690         int                     fd;                                             // file des to write to
691
692         if( plen <= 0  || payload == NULL ) {
693                 return 1;
694         }
695
696         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
697                 logit( LOG_ERR, "(mcl) invalid context given to fifo_one\n" );
698                 return 0;
699         }
700
701         fd = suss_fifo( ctx, mtype, WRITER, &fifo );            // map the message type to an open fd
702         if( fd >= 0 ) {
703                 state = write( fd, payload, plen );
704         }
705
706         return state == (size_t) plen;
707 }