Add full unit tests for listener
[ric-app/mc.git] / sidecars / listener / mcl.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       mcl.c.
22         Abstract:       The mc listener library content. All external functions
23                                 should start with mcl_ and all stderr messages should have
24                                 (mcl) as the first token following the severity indicator.
25
26         Date:           22 August 2019
27         Author:         E. Scott Daniels
28 */
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <time.h>
35 #include <string.h>
36 #include <fcntl.h>
37 #include <signal.h>
38 #include <sys/stat.h>
39 #include <sys/types.h>
40
41
42 #include <rmr/rmr.h>
43 #include <rmr/rmr_symtab.h>
44
45 #include "mcl.h"
46
47 #ifndef FOREVER
48 #define FOREVER 1
49 #endif
50
51 #define READER 0
52 #define WRITER 1
53
54 #define TRUE    1
55 #define FALSE   0
56
57 /*
58         Information about one file descriptor. This is pointed to by the hash
59         such that the message type can be used as a key to look up the fifo's
60         file descriptor.
61 */
62 typedef struct {
63         int     fd;                                     // open fdes
64         int key;                                // symtab key
65         long long wcount;               // number of writes
66         long long drops;                // number dropped
67
68         long long wcount_rp;    // number of writes during last reporting period
69         long long drops_rp;             // number dropped during last reporting period
70 } fifo_t;
71
72 /*
73         Our conext.  Pointers to the read and write hash tables (both keyed on the message
74         type), the message router (RMR) context, and other goodies.
75 */
76 typedef struct {
77         void*   mrc;                            // the message router's context
78         void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
79         void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
80         char*   fifo_dir;                       // directory where we open fifos
81
82 } mcl_ctx_t;
83
84 // -------- private -------------------------------------------------------
85
86
87 /*
88         Set up for raw data capture. We look for directory overriedes from
89         environment variables, and then invoke the rdc_init() to actually
90         set things up.
91 */
92 static void* setup_rdc() {
93         void*   ctx;
94         int             value;                                                  // value computed for something
95         char*   ep;                                                             // pointer to environment var
96         char*   sdir = "/tmp/rdc/stage";                // default directory names
97         char*   fdir = "/tmp/rdc/final";
98         char*   suffix = ".rdc";
99         char*   done = NULL;
100
101         if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
102                 if( ep != NULL  && atoi( ep ) == 0 ) {
103                         logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
104                         return NULL;
105                 }
106         }
107
108         if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
109                 sdir = ep;
110         } else {
111                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures here as it could likely exist
112                 mkdir( sdir, 0755 );
113         }
114
115         if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
116                 fdir = ep;
117         } else {
118                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures again -- very likely it's there
119                 mkdir( fdir, 0755 );
120         }
121
122         if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
123                 suffix = ep;
124         }
125
126         if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
127                 done = ep;
128         }
129
130         ctx = rdc_init( sdir, fdir, suffix, done );
131         if( ctx == NULL ) {
132                 logit( LOG_ERR, "rdc_init did not generate a context" );
133         } else {
134                 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
135                 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
136         }
137
138         if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
139                 value = atoi( ep );
140                 logit( LOG_INFO, "setting frequency: %d", value );
141                 rdc_set_freq( ctx, value );     
142         }
143         return ctx;
144 }
145
146 /*
147         Builds an extended header in the buffer provided, or allocates a new buffer if
148         dest is nil. The header is of the form:
149                 <delim><len><timestamp>
150
151         Timestamp is a single unsigned long long in ASCII; ms since epoch.
152         If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
153         the timestamp generated will be 1570113591103.
154 */
155 static char* build_hdr( int len, char* dest, int dest_len ) {
156         struct timespec ts;         // time just before call executed
157
158         if( dest == NULL ) {
159                 dest_len = 48;
160                 dest = (char *) malloc( sizeof( char ) * dest_len );
161         } else {
162                 if( dest_len < 28 ) {           // shouldn't happen, but take no chances
163                         memset( dest, 0, dest_len );
164                         return NULL;
165                 }
166         }
167
168         memset( dest, 0, dest_len );
169
170         clock_gettime( CLOCK_REALTIME, &ts );
171         sprintf( dest, "%s%07d", MCL_DELIM, len );
172         sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
173
174         return dest;
175 }
176
177 /*
178         Build a file name and open. The io_direction is either READER or
179         WRITER.  For a writer we must 'trick' the system into allowing us
180         to open a pipe for writing in non-blocking mode so that we can
181         report on drops (messages we couldn't write because there was no
182         reader).  The trick is to open a reader on the pipe so that when
183         we open the writer there's a reader and the open won't fail. As
184         soon as we have the writer open, we can close the junk reader.
185
186         If the desired fifo does not exist, it is created.
187 */
188 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
189         char    wbuf[1024];
190         int             fd;                                     // real file des
191         int             jfd = -1;                       // junk file des
192         int             state;
193
194         if( ctx == NULL || mtype < 0 ) {
195                 return -1;
196         }
197
198         snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
199
200         state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
201         if( state != 0 && errno != EEXIST ) {
202                 logit(  LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
203                 return -1;
204         }
205
206         if( io_dir == READER ) {
207                 fd = open( wbuf, O_RDONLY  );                   // just open the reader
208                 if( fd < 0 ) {
209                         logit(  LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
210                 }
211         } else {
212                 jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
213                 if( jfd < 0 ) {
214                         logit(  LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
215                         return -1;
216                 }
217
218                 fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
219                 if( fd < 0 ) {
220                         logit(  LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
221                 }
222
223                 close( jfd );                   // should be safe to close this
224         }
225
226
227         return fd;
228 }
229
230 /*
231         Given a message type, return the file des of the fifo that
232         the payload should be written to.        Returns the file des, or -1
233         on error. When sussing out a read file descriptor this will
234         block until there is a fifo for the message type which is
235         open for reading.
236
237         If fref is not nil, then a pointer to the fifo info block is returned
238         allowing for direct update of counts after the write.
239 */
240 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
241         fifo_t* fifo = NULL;
242         void*   hash;
243
244         if( ctx == NULL ) {
245                 if( fref != NULL ) {
246                         *fref = NULL;
247                 }
248                 return -1;
249         }
250
251         if( io_dir == READER ) {                // with an integer key, we need two hash tables
252                 hash = ctx->rd_hash;
253         } else {
254                 hash = ctx->wr_hash;
255         }
256
257         if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
258                 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
259                 if( fifo != NULL ) {
260                         memset( fifo, 0, sizeof( *fifo ) );
261                         fifo->key = mtype;
262                         fifo->fd = open_fifo( ctx, mtype, io_dir );
263                         if( fifo->fd >= 0 ) {                                   // save only on good open
264                                 rmr_sym_map( hash, mtype, fifo );
265                         } else {
266                                 free( fifo );
267                                 fifo = NULL;
268                         }
269                 }
270         }
271
272         if( fref != NULL ) {
273                 *fref = fifo;
274         }
275
276         return fifo == NULL ? -1 : fifo->fd;
277 }
278
279 /*
280         Make marking counts easier in code
281 */
282 static inline void chalk_error( fifo_t* fifo ) {
283         if( fifo != NULL ) {
284                 fifo->drops++;
285                 fifo->drops_rp++;
286         }
287 }
288
289 static inline void chalk_ok( fifo_t* fifo ) {
290         if( fifo != NULL ) {
291                 fifo->wcount++;
292                 fifo->wcount_rp++;
293         }
294 }
295
296 /*
297         Callback function driven to traverse the symtab and generate the
298         counts for each fifo.
299 */
300 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
301         fifo_t* fifo;
302         int             report_period = 60;
303
304         if( data ) {
305                 report_period = *((int *) data);
306         }
307
308         if( (fifo = (fifo_t *) thing) != NULL ) {
309                 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
310                         fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
311
312                 fifo->wcount_rp = 0;            // reset the report counts
313                 fifo->drops_rp = 0;
314         }
315 }
316
317 // ---------- public ------------------------------------------------------
318 /*
319         Sets a signal handler for sigpipe so we don't crash if a reader closes the
320         last reading fd on a pipe. We could do this automatically, but if the user
321         programme needs to trap sigpipe too, this gives them the option not to have
322         us interfere.
323 */
324 extern int mcl_set_sigh( ) {
325         signal( SIGPIPE, SIG_IGN );
326 }
327
328 /*
329         "Opens" the interface to RMR such that messages sent to the application will
330         be available via the rmr receive funcitons. This is NOT automatically called
331         by the mk_context function as some applications will be using the mc library
332         for non-RMR, fifo, chores.
333 */
334 extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
335         mcl_ctx_t*      ctx;
336         int             announce = 0;
337
338         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
339                 return 0;
340         }
341
342         ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
343         if( ctx->mrc == NULL ) {
344                 logit(  LOG_CRIT, "start listening: unable to initialise RMr" );
345                 return 0;
346         }
347
348         while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
349                 if( announce <= 0 ) {
350                         logit(  LOG_INFO, "waiting for RMR to show ready" );
351                         announce = 10;
352                 } else {
353                         announce--;
354                 }
355
356                 sleep( 1 );
357         }
358
359         return 1;
360 }
361
362 /*
363         Blocks until a message arives with a good return code or we timeout. Returns the
364         rmr message buffer. Timeout value epxected in seconds.
365 */
366 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
367         mcl_ctx_t*      ctx;
368
369         if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
370                 return NULL;
371         }
372
373         if( ctx->mrc == NULL ) {
374                 logit(  LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
375                 exit( 1 );
376         }
377
378         do {
379                 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 );                           // wait for next
380         } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
381
382         return msg;
383 }
384
385 /*
386         Create the context.
387 */
388 extern  void* mcl_mk_context( char* dir ) {
389         mcl_ctx_t*      ctx;
390
391         if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
392                 memset( ctx, 0, sizeof( *ctx ) );
393                 ctx->fifo_dir = strdup( dir );
394                 ctx->wr_hash = rmr_sym_alloc( 1001 );
395                 ctx->rd_hash = rmr_sym_alloc( 1001 );
396
397                 if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
398                         logit(  LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
399                         free( ctx );
400                         return NULL;
401                 }
402         }
403
404         return (void *) ctx;
405 }
406
407 /*
408         Read the header. Best case we read the expected number of bytes, get all
409         of them and find that they start with the delemiter.  Worst case
410         We have to wait for all of the header, or need to synch at the next
411         delimeter. We assume best case most likely and handle it as such.
412 */
413 static void read_header( int fd, char* buf ) {
414         int len;
415         int need = MCL_EXHDR_SIZE;              // total needed
416         int dneed;                                              // delimieter needed
417         int     rlen;
418         char*   rp;                             // read position in buf
419
420         len = read( fd, buf, need );
421         if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
422                 return;
423         }
424
425         while( TRUE ) {
426                 if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
427                         rp = buf + len;
428                         dneed = strlen( MCL_DELIM ) - len;
429
430                         while( dneed > 0 ) {
431                                 len = read( fd, rp, dneed );
432                                 dneed -= len;
433                                 rp += len;
434                         }
435                 }
436
437                 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
438                         need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
439                         rp = buf + (MCL_EXHDR_SIZE - need);
440
441                         while( need > 0 ) {
442                                 len = read( fd, rp, need );
443                                 need -= len;
444                                 rp += len;
445                         }
446
447                         return;
448                 }
449
450                 while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
451                         len = read( fd, buf, 1 );               // because we ignore start byte that might be in the buffer)
452                 }
453
454                 need = MCL_EXHDR_SIZE - len;
455         }
456 }
457
458
459 /*
460         Read one record from the fifo that the message type maps to.
461         Writes at max ublen bytes into the ubuf.
462
463         If long_hdrs is true (!0), then we expect that the stream in the fifo
464         has extended headers (<delim><len><time>), and will write the timestamp
465         from the header into the buffer pointed to by timestamp. The buffer is
466         assumed to be at least MCL_TSTAMP_SIZE bytes in length.
467
468         Further, when extended headers are being used, this function will
469         automatically resynchronise if it detects an issue.
470
471         The function could look for the delimiter and automatically detect whether
472         or not extended headers are in use, but if the stream is out of synch on the
473         first read, this cannot be done, so the funciton requires that the caller
474         know that the FIFO contains extended headers.
475 */
476 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
477         int fd;
478         int len;
479         int     msg_len;
480         int     got = 0;                                                // number of bytes we actually got
481         int need;
482         char wbuf[4096];
483         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
484         fifo_t* fref = NULL;                            // the fifo struct we found
485
486         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
487                 errno = EINVAL;
488                 return 0;
489         }
490
491         if( (fd = suss_fifo( ctx, mtype, READER, NULL ))  >= 0 )  {
492                 if( long_hdrs ) {
493                         read_header( fd, wbuf );
494                         msg_len = need = atoi( wbuf + MCL_LEN_OFF );                            // read the length
495                         if( timestamp ) {
496                                 strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
497                         }
498                 } else {
499                         if( timestamp != NULL ) {                                               // won't be there, but ensure it's not garbage
500                                 *timestamp = 0;
501                         }
502
503                         len = read( fd, wbuf, MCL_LEN_SIZE );                   // we assume we will get all 8 as there isn't a way to sync the old stream
504                         msg_len = need = atoi( wbuf );
505                 }
506
507
508                 if( need > ublen ) {
509                         need = ublen;                                           // cannot give them more than they can take
510                 }
511                 while( need > 0 ) {
512                         len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
513                         memcpy( ubuf+got, wbuf, len );
514                         got += len;
515                         need -= len;
516                 }
517
518                 if( msg_len > got ) {                                   // we must ditch rest of this message
519                         need = msg_len = got;
520                         while( need > 0 ) {
521                                 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
522                                 need -= len;
523                         }
524                 }
525
526                 return got;
527         }
528
529         errno = EBADFD;
530         return 0;
531 }
532
533 /*
534         Read one record from the fifo that the message type maps to.
535         Writes at max ublen bytes into the ubuf. If extended headers are in use
536         this function will ignore the timestamp.
537
538         If long_hdrs is true (!0), then we expect that the stream in the fifo
539         has extended headers (<delim><len><time>).
540 */
541 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
542         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
543 }
544
545 /*
546         Read a single message from the FIFO returning it in the caller's buffer. If extended
547         headers are being used, and the caller supplied a timestamp buffer, the timestamp
548         which was in the header will be returned in that buffer.  The return value is the number
549         of bytes in the buffer; 0 indicates an error and errno should be set.
550 */
551 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
552         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
553 }
554
555
556 /*
557         Will read messages and fan them out based on the message type. This should not
558         return and if it does the caller should assume an error.
559
560         The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
561         string , followed by that number of 'raw' bytes. The raw bytes are the payload
562         exactly as received.
563
564         The report parameter is the frequency, in seconds, for writing a short
565         status report to stdout. If 0 then it's off.
566
567         If long_hdr is true, then we geneate an extended header with a delimiter and
568         timestamp.
569 */
570 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
571         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
572         fifo_t*         fifo;                                   // fifo to chalk counts on
573         rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
574         char            header[128];                    // header we'll pop in front of the payload
575         int                     state;
576         int                     fd;                                             // file des to write to
577         long long       total = 0;                              // total messages received and written
578         long long       total_drops = 0;                // total messages received and written
579         long            count = 0;                              // messages received and written during last reporting period
580         long            errors = 0;                             // unsuccessful payload writes
581         long            drops;                                  // number of drops
582         time_t          next_report = 0;                // we'll report every 2 seconds if report is true
583         time_t          now;
584         int                     hwlen;                                  // write len for header
585         void*           rdc_ctx = NULL;                 // raw data capture context
586         void*           rdc_buf = NULL;                 // capture buffer
587
588         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
589                 logit(  LOG_ERR, "(mcl) invalid context given to fanout" );
590                 errno = EINVAL;
591                 return;
592         }
593
594         if( report < 0 ) {
595                 report = 0;
596         }
597
598         rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
599
600         do {
601                 mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
602
603                 if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0  ) {
604                         fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
605                         if( fd >= 0 ) {
606                                 if( long_hdr ) {
607                                         build_hdr( mbuf->len, header, sizeof( header ) );
608                                         hwlen = MCL_EXHDR_SIZE;
609                                 } else {
610                                         snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
611                                         hwlen = MCL_LEN_SIZE;
612                                 }
613
614                                 if( (state = write( fd, header, hwlen )) != hwlen ) {           // write exactly MCL_LEN_SIZE bytes from the buffer
615                                         drops++;
616                                         total_drops++;
617                                         chalk_error( fifo );
618                                 } else {
619                                         if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
620                                                 errors++;
621                                                 chalk_error( fifo );
622                                         } else {
623                                                 chalk_ok( fifo );
624                                                 count++;
625                                                 total++;
626                                         }
627                                 }
628                         }
629
630                         if( rdc_ctx != NULL ) {
631                                 rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
632                                 rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                                // write the raw data
633                         }
634                 }
635
636                 if( report ) {
637                         if( (now = time( NULL ) ) > next_report ) {
638                         rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
639                                 fflush( stdout );
640
641                                 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
642                                         total, total_drops, report, count, drops, errors );
643                                 next_report = now + report;
644                                 count = 0;
645                                 drops = 0;
646
647                                 fflush( stdout );
648                         }
649                 }
650         } while( FOREVER );                             // forever allows for escape during unit testing
651 }
652
653
654 /*
655         Given a buffer and length, along with the message type, look up the fifo and write
656         the buffer. Returns 0 on error; 1 on success.
657 */
658 extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) {
659         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
660         fifo_t*         fifo;                                   // fifo to chalk counts on
661         int                     state = -1;
662         int                     fd;                                             // file des to write to
663
664         if( plen <= 0 ) {
665                 return 1;
666         }
667
668         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
669                 logit( LOG_ERR, "(mcl) invalid context given to fifo_one\n" );
670                 return 0;
671         }
672
673         fd = suss_fifo( ctx, mtype, WRITER, &fifo );            // map the message type to an open fd
674         if( fd >= 0 ) {
675                 state = write( fd, payload, plen );
676         } 
677
678         return state == plen;
679 }