Add ability to capture raw messages
[ric-app/mc.git] / src / sidecars / listener / mcl.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       mcl.c.
22         Abstract:       The mc listener library content. All external functions
23                                 should start with mcl_ and all stderr messages should have
24                                 (mcl) as the first token following the severity indicator.
25
26         Date:           22 August 2019
27         Author:         E. Scott Daniels
28 */
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <time.h>
35 #include <string.h>
36 #include <fcntl.h>
37 #include <signal.h>
38 #include <sys/stat.h>
39 #include <sys/types.h>
40
41
42 #include <rmr/rmr.h>
43 #include <rmr/rmr_symtab.h>
44
45 #include "mcl.h"
46
47 #define READER 0
48 #define WRITER 1
49
50 #define TRUE    1
51 #define FALSE   0
52
53 /*
54         Information about one file descriptor. This is pointed to by the hash
55         such that the message type can be used as a key to look up the fifo's
56         file descriptor.
57 */
58 typedef struct {
59         int     fd;                                     // open fdes
60         int key;                                // symtab key
61         long long wcount;               // number of writes
62         long long drops;                // number dropped
63
64         long long wcount_rp;    // number of writes during last reporting period
65         long long drops_rp;             // number dropped during last reporting period
66 } fifo_t;
67
68 /*
69         Our conext.  Pointers to the read and write hash tables (both keyed on the message
70         type), the message router (RMR) context, and other goodies.
71 */
72 typedef struct {
73         void*   mrc;                            // the message router's context
74         void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
75         void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
76         char*   fifo_dir;                       // directory where we open fifos
77
78 } mcl_ctx_t;
79
80 // -------- private -------------------------------------------------------
81
82
83 /*
84         Set up for raw data capture. We look for directory overriedes from
85         environment variables, and then invoke the rdc_init() to actually
86         set things upd.
87 */
88 static void* setup_rdc() {
89         void*   ctx;
90         int             value;                                                  // value computed for something
91         char*   ep;                                                             // pointer to environment var
92         char*   sdir = "/tmp/rdc/stage";                // default directory names
93         char*   fdir = "/tmp/rdc/final";
94         char*   suffix = ".rdc";
95         char*   done = NULL;
96
97         if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
98                 if( ep != NULL  && atoi( ep ) == 0 ) {
99                         logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
100                         return NULL;
101                 }
102         }
103
104         if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
105                 sdir = ep;
106         } else {
107                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures here as it could likely exist
108                 mkdir( sdir, 0755 );
109         }
110
111         if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
112                 fdir = ep;
113         } else {
114                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures again -- very likely it's there
115                 mkdir( fdir, 0755 );
116         }
117
118         if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
119                 suffix = ep;
120         }
121
122         if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
123                 done = ep;
124         }
125
126         ctx = rdc_init( sdir, fdir, suffix, done );
127         if( ctx == NULL ) {
128                 logit( LOG_ERR, "rdc_init did not generate a context" );
129         } else {
130                 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
131                 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
132         }
133
134         if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
135                 value = atoi( ep );
136                 logit( LOG_INFO, "setting frequency: %d", value );
137                 rdc_set_freq( ctx, value );     
138         }
139         return ctx;
140 }
141
142 /*
143         Builds an extended header in the buffer provided, or allocates a new buffer if
144         dest is nil. The header is of the form:
145                 <delim><len><timestamp>
146
147         Timestamp is a single unsigned long long in ASCII; ms since epoch.
148         If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
149         the timestamp generated will be 1570113591103.
150 */
151 static char* build_hdr( int len, char* dest, int dest_len ) {
152         struct timespec ts;         // time just before call executed
153
154         if( dest == NULL ) {
155                 dest_len = 48;
156                 dest = (char *) malloc( sizeof( char ) * dest_len );
157         } else {
158                 if( dest_len < 28 ) {           // shouldn't happen, but take no chances
159                         memset( dest, 0, dest_len );
160                         return NULL;
161                 }
162         }
163
164         memset( dest, 0, dest_len );
165
166         clock_gettime( CLOCK_REALTIME, &ts );
167         sprintf( dest, "%s%07d", MCL_DELIM, len );
168         sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
169
170         return dest;
171 }
172
173 /*
174         Build a file name and open. The io_direction is either READER or
175         WRITER.  For a writer we must 'trick' the system into allowing us
176         to open a pipe for writing in non-blocking mode so that we can
177         report on drops (messages we couldn't write because there was no
178         reader).  The trick is to open a reader on the pipe so that when
179         we open the writer there's a reader and the open won't fail. As
180         soon as we have the writer open, we can close the junk reader.
181
182         If the desired fifo does not exist, it is created.
183 */
184 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
185         char    wbuf[1024];
186         int             fd;                             // real file des
187         int             jfd = -1;                       // junk file des
188         int             state;
189
190         if( ctx == NULL || mtype < 0 ) {
191                 return -1;
192         }
193
194         snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
195
196         state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
197         if( state != 0 && errno != EEXIST ) {
198                 logit(  LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
199                 return -1;
200         }
201
202         if( io_dir == READER ) {
203                 fd = open( wbuf, O_RDONLY  );                   // just open the reader
204                 if( fd < 0 ) {
205                         logit(  LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
206                 }
207         } else {
208                 jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
209                 if( jfd < 0 ) {
210                         logit(  LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
211                 }
212
213                 fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
214                 if( fd < 0 ) {
215                         logit(  LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
216                 }
217
218                 close( jfd );                   // should be safe to close this
219         }
220
221
222         return fd;
223 }
224
225 /*
226         Given a message type, return the file des of the fifo that
227         the payload should be written to.        Returns the file des, or -1
228         on error. When sussing out a read file descriptor this will
229         block until there is a fifo for the message type which is
230         open for reading.
231
232         If fref is not nil, then a pointer to the fifo info block is returned
233         allowing for direct update of counts after the write.
234 */
235 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
236         fifo_t* fifo;
237         void*   hash;
238
239         if( io_dir == READER ) {                // with an integer key, we nned two hash tables
240                 hash = ctx->rd_hash;
241         } else {
242                 hash = ctx->wr_hash;
243         }
244
245         if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
246                 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
247                 if( fifo == NULL ) {
248                         return -1;
249                 }
250
251                 memset( fifo, 0, sizeof( *fifo ) );
252                 fifo->key = mtype;
253                 fifo->fd = open_fifo( ctx, mtype, io_dir );
254                 rmr_sym_map( hash, mtype, fifo );
255         }
256
257         if( fref != NULL ) {
258                 *fref = fifo;
259         }
260         return fifo->fd;
261 }
262
263 /*
264         Make marking counts easier in code
265 */
266 static inline void chalk_error( fifo_t* fifo ) {
267         if( fifo != NULL ) {
268                 fifo->drops++;
269                 fifo->drops_rp++;
270         }
271 }
272
273 static inline void chalk_ok( fifo_t* fifo ) {
274         if( fifo != NULL ) {
275                 fifo->wcount++;
276                 fifo->wcount_rp++;
277         }
278 }
279
280 /*
281         Callback function driven to traverse the symtab and generate the
282         counts for each fifo.
283 */
284 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
285         fifo_t* fifo;
286         int             report_period = 60;
287
288         if( data ) {
289                 report_period = *((int *) data);
290         }
291
292         if( (fifo = (fifo_t *) thing) != NULL ) {
293                 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
294                         fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
295
296                 fifo->wcount_rp = 0;            // reset the report counts
297                 fifo->drops_rp = 0;
298         }
299 }
300
301 // ---------- public ------------------------------------------------------
302 /*
303         Sets a signal handler for sigpipe so we don't crash if a reader closes the
304         last reading fd on a pipe. We could do this automatically, but if the user
305         programme needs to trap sigpipe too, this gives them the option not to have
306         us interfere.
307 */
308 extern int mcl_set_sigh( ) {
309         signal( SIGPIPE, SIG_IGN );
310 }
311
312 /*
313         "Opens" the interface to RMR such that messages sent to the application will
314         be available via the rmr receive funcitons. This is NOT automatically called
315         by the mk_context function as some applications will be using the mc library
316         for non-RMR, fifo, chores.
317 */
318 extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
319         mcl_ctx_t*      ctx;
320         int             announce = 0;
321
322         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
323                 return 0;
324         }
325
326         ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
327         if( ctx->mrc == NULL ) {
328                 logit(  LOG_CRIT, "start listening: unable to initialise RMr" );
329                 return 0;
330         }
331
332         while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
333                 if( announce <= 0 ) {
334                         logit(  LOG_INFO, "waiting for RMR to show ready" );
335                         announce = 10;
336                 } else {
337                         announce--;
338                 }
339
340                 sleep( 1 );
341         }
342
343         return 1;
344 }
345
346 /*
347         Blocks until a message arives with a good return code or we timeout. Returns the
348         rmr message buffer. Timeout value epxected in seconds.
349 */
350 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
351         mcl_ctx_t*      ctx;
352
353         if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
354                 return NULL;
355         }
356
357         if( ctx->mrc == NULL ) {
358                 logit(  LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
359                 exit( 1 );
360         }
361
362         do {
363                 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 );                           // wait for next
364         } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
365
366         return msg;
367 }
368
369 /*
370         Create the context.
371 */
372 extern  void* mcl_mk_context( char* dir ) {
373         mcl_ctx_t*      ctx;
374
375         if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
376                 memset( ctx, 0, sizeof( *ctx ) );
377                 ctx->fifo_dir = strdup( dir );
378                 ctx->wr_hash = rmr_sym_alloc( 1001 );
379                 ctx->rd_hash = rmr_sym_alloc( 1001 );
380
381                 if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
382                         logit(  LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
383                         free( ctx );
384                         return NULL;
385                 }
386         }
387
388         return (void *) ctx;
389 }
390
391 /*
392         Read the header. Best case we read the expected number of bytes, get all
393         of them and find that they start with the delemiter.  Worst case
394         We have to wait for all of the header, or need to synch at the next
395         delimeter. We assume best case most likely and handle it as such.
396 */
397 static void read_header( int fd, char* buf ) {
398         int len;
399         int need = MCL_EXHDR_SIZE;              // total needed
400         int dneed;                                              // delimieter needed
401         int     rlen;
402         char*   rp;                             // read position in buf
403
404         len = read( fd, buf, need );
405         if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
406                 return;
407         }
408
409         while( TRUE ) {
410                 if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
411                         rp = buf + len;
412                         dneed = strlen( MCL_DELIM ) - len;
413
414                         while( dneed > 0 ) {
415                                 len = read( fd, rp, dneed );
416                                 dneed -= len;
417                                 rp += len;
418                         }
419                 }
420
421                 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
422                         need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
423                         rp = buf + (MCL_EXHDR_SIZE - need);
424
425                         while( need > 0 ) {
426                                 len = read( fd, rp, need );
427                                 need -= len;
428                                 rp += len;
429                         }
430
431                         return;
432                 }
433
434                 while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
435                         len = read( fd, buf, 1 );               // because we ignore start byte that might be in the buffer)
436                 }
437
438                 need = MCL_EXHDR_SIZE - len;
439         }
440 }
441
442
443 /*
444         Read one record from the fifo that the message type maps to.
445         Writes at max ublen bytes into the ubuf.
446
447         If long_hdrs is true (!0), then we expect that the stream in the fifo
448         has extended headers (<delim><len><time>), and will write the timestamp
449         from the header into the buffer pointed to by timestamp. The buffer is
450         assumed to be at least MCL_TSTAMP_SIZE bytes in length.
451
452         Further, when extended headers are being used, this function will
453         automatically resynchronise if it detects an issue.
454
455         The function could look for the delimiter and automatically detect whether
456         or not extended headers are in use, but if the stream is out of synch on the
457         first read, this cannot be done, so the funciton requires that the caller
458         know that the FIFO contains extended headers.
459 */
460 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
461         int fd;
462         int len;
463         int     msg_len;
464         int     got = 0;                                                // number of bytes we actually got
465         int need;
466         char wbuf[4096];
467         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
468         fifo_t* fref = NULL;                            // the fifo struct we found
469
470         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
471                 errno = EINVAL;
472                 return 0;
473         }
474
475         if( (fd = suss_fifo( ctx, mtype, READER, NULL ))  >= 0 )  {
476                 if( long_hdrs ) {
477                         read_header( fd, wbuf );
478                         msg_len = need = atoi( wbuf + MCL_LEN_OFF );                            // read the length
479                         if( timestamp ) {
480                                 strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
481                         }
482                 } else {
483                         if( timestamp != NULL ) {                                               // won't be there, but ensure it's not garbage
484                                 *timestamp = 0;
485                         }
486
487                         len = read( fd, wbuf, MCL_LEN_SIZE );                   // we assume we will get all 8 as there isn't a way to sync the old stream
488                         msg_len = need = atoi( wbuf );
489                 }
490
491
492                 if( need > ublen ) {
493                         need = ublen;                                           // cannot give them more than they can take
494                 }
495                 while( need > 0 ) {
496                         len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
497                         memcpy( ubuf+got, wbuf, len );
498                         got += len;
499                         need -= len;
500                 }
501
502                 if( msg_len > got ) {                                   // we must ditch rest of this message
503                         need = msg_len = got;
504                         while( need > 0 ) {
505                                 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
506                                 need -= len;
507                         }
508                 }
509
510                 return got;
511         }
512
513         errno = EBADFD;
514         return 0;
515 }
516
517 /*
518         Read one record from the fifo that the message type maps to.
519         Writes at max ublen bytes into the ubuf. If extended headers are in use
520         this function will ignore the timestamp.
521
522         If long_hdrs is true (!0), then we expect that the stream in the fifo
523         has extended headers (<delim><len><time>).
524 */
525 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
526         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
527 }
528
529 /*
530         Read a single message from the FIFO returning it in the caller's buffer. If extended
531         headers are being used, and the caller supplied a timestamp buffer, the timestamp
532         which was in the header will be returned in that buffer.  The return value is the number
533         of bytes in the buffer; 0 indicates an error and errno should be set.
534 */
535 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
536         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
537 }
538
539
540 /*
541         Will read messages and fan them out based on the message type. This should not
542         return and if it does the caller should assume an error.
543
544         The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
545         string , followed by that number of 'raw' bytes. The raw bytes are the payload
546         exactly as received.
547
548         The report parameter is the frequency, in seconds, for writing a short
549         status report to stdout. If 0 then it's off.
550
551         If long_hdr is true, then we geneate an extended header with a delimiter and
552         timestamp.
553 */
554 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
555         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
556         fifo_t*         fifo;                                   // fifo to chalk counts on
557         rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
558         char            header[128];                    // header we'll pop in front of the payload
559         int                     state;
560         int                     fd;                                             // file des to write to
561         long long       total = 0;                              // total messages received and written
562         long long       total_drops = 0;                // total messages received and written
563         long            count = 0;                              // messages received and written during last reporting period
564         long            errors = 0;                             // unsuccessful payload writes
565         long            drops;                                  // number of drops
566         time_t          next_report = 0;                // we'll report every 2 seconds if report is true
567         time_t          now;
568         int                     hwlen;                                  // write len for header
569         void*           rdc_ctx = NULL;                 // raw data capture context
570         void*           rdc_buf = NULL;                 // capture buffer
571
572         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
573                 logit(  LOG_ERR, "(mcl) invalid context given to fanout" );
574                 errno = EINVAL;
575                 return;
576         }
577
578         if( report < 0 ) {
579                 report = 0;
580         }
581
582         rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
583
584         while( 1 ) {
585                 mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
586
587                 if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0  ) {
588                         fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
589                         if( fd >= 0 ) {
590                                 if( long_hdr ) {
591                                         build_hdr( mbuf->len, header, sizeof( header ) );
592                                         hwlen = MCL_EXHDR_SIZE;
593                                 } else {
594                                         snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
595                                         hwlen = MCL_LEN_SIZE;
596                                 }
597
598                                 if( (state = write( fd, header, hwlen )) != hwlen ) {           // write exactly MCL_LEN_SIZE bytes from the buffer
599                                         drops++;
600                                         total_drops++;
601                                         chalk_error( fifo );
602                                 } else {
603                                         if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
604                                                 errors++;
605                                                 chalk_error( fifo );
606                                         } else {
607                                                 chalk_ok( fifo );
608                                                 count++;
609                                                 total++;
610                                         }
611                                 }
612                         }
613
614                         if( rdc_ctx != NULL ) {
615                                 rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
616                                 rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                                // write the raw data
617                         }
618                 }
619
620                 if( report ) {
621                         if( (now = time( NULL ) ) > next_report ) {
622                         rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
623                                 fflush( stdout );
624
625                                 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
626                                         total, total_drops, report, count, drops, errors );
627                                 next_report = now + report;
628                                 count = 0;
629                                 drops = 0;
630
631                                 fflush( stdout );
632                         }
633                 }
634         }
635 }
636
637