06ceb82e5dad2e5535efe948651f5c658b3348f9
[ric-app/mc.git] / src / sidecars / listener / mcl.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       mcl.c.
22         Abstract:       The mc listener library content. All external functions
23                                 should start with mcl_ and all stderr messages should have
24                                 (mcl) as the first token following the severity indicator.
25                                 
26         Date:           22 August 2019
27         Author:         E. Scott Daniels
28 */
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <time.h>
35 #include <string.h>
36 #include <fcntl.h>
37 #include <signal.h>
38 #include <sys/stat.h>
39
40 #include <rmr/rmr.h>
41 #include <rmr/rmr_symtab.h>
42
43 #include "mcl.h"
44
45 #define READER 0
46 #define WRITER 1
47
48 #define TRUE    1
49 #define FALSE   0
50
51
52 /*
53         Information about one file descriptor. This is pointed to by the hash
54         such that the message type can be used as a key to look up the fifo's
55         file descriptor.
56 */
57 typedef struct {
58         int     fd;                                     // open fdes
59         int key;                                // symtab key
60         long long wcount;               // number of writes
61         long long drops;                // number dropped
62
63         long long wcount_rp;    // number of writes during last reporting period
64         long long drops_rp;             // number dropped during last reporting period
65 } fifo_t;
66
67 /*
68         Our conext.  Pointers to the read and write hash tables (both keyed on the message
69         type), the message router (RMR) context, and other goodies.
70 */
71 typedef struct {
72         void*   mrc;                            // the message router's context
73         void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
74         void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
75         char*   fifo_dir;                       // directory where we open fifos
76         
77 } mcl_ctx_t;
78
79 // -------- private -------------------------------------------------------
80
81
82 /*
83         Builds an extended header in the buffer provided, or allocates a new buffer if
84         dest is nil. The header is of the form:
85                 <delim><len><timestamp>
86
87         Timestamp is a single unsigned long long in ASCII; ms since epoch.
88         If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
89         the timestamp generated will be 1570113591103.
90 */
91 static char* build_hdr( int len, char* dest, int dest_len ) {
92         struct timespec ts;         // time just before call executed
93
94         if( dest == NULL ) {
95                 dest_len = 48;
96                 dest = (char *) malloc( sizeof( char ) * dest_len );
97         } else {
98                 if( dest_len < 28 ) {           // shouldn't happen, but take no chances
99                         memset( dest, 0, dest_len );
100                         return NULL;
101                 }
102         }
103
104         memset( dest, 0, dest_len );
105
106         clock_gettime( CLOCK_REALTIME, &ts );
107         sprintf( dest, "%s%07d", MCL_DELIM, len );
108         sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
109
110         return dest;
111 }
112
113 /*      
114         Build a file name and open. The io_direction is either READER or
115         WRITER.  For a writer we must 'trick' the system into allowing us
116         to open a pipe for writing in non-blocking mode so that we can 
117         report on drops (messages we couldn't write because there was no
118         reader).  The trick is to open a reader on the pipe so that when
119         we open the writer there's a reader and the open won't fail. As
120         soon as we have the writer open, we can close the junk reader.
121
122         If the desired fifo does not exist, it is created.
123 */
124 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
125         char    wbuf[1024];
126         int             fd;                             // real file des
127         int             jfd = -1;                       // junk file des
128         int             state;
129
130         if( ctx == NULL || mtype < 0 ) {
131                 return -1;
132         }
133
134         snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
135         
136         state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
137         if( state != 0 && errno != EEXIST ) {
138                 fprintf( stderr, "[ERR] (mcl) unable to create fifo: %s: %s\n", wbuf, strerror( errno ) );
139                 return -1;
140         }
141
142         if( io_dir == READER ) {
143                 fd = open( wbuf, O_RDONLY  );                   // just open the reader
144                 if( fd < 0 ) {
145                         fprintf( stderr, "[ERR] (mcl) fifo open failed (ro): %s: %s\n", wbuf, strerror( errno ) );
146                 }
147         } else {
148                 jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
149                 if( jfd < 0 ) {
150                         fprintf( stderr, "[ERR] (mcl) fifo open failed (rw): %s: %s\n", wbuf, strerror( errno ) );
151                 }
152         
153                 fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
154                 if( fd < 0 ) {
155                         fprintf( stderr, "[ERR] (mcl) fifo open failed (wo): %s: %s\n", wbuf, strerror( errno ) );
156                 }
157
158                 close( jfd );                   // should be safe to close this
159         }
160
161
162         return fd;
163 }
164
165 /*
166         Given a message type, return the file des of the fifo that
167         the payload should be written to.        Returns the file des, or -1
168         on error. When sussing out a read file descriptor this will
169         block until there is a fifo for the message type which is
170         open for reading.
171
172         If fref is not nil, then a pointer to the fifo info block is returned
173         allowing for direct update of counts after the write.
174 */
175 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
176         fifo_t* fifo;
177         void*   hash;
178
179         if( io_dir == READER ) {                // with an integer key, we nned two hash tables
180                 hash = ctx->rd_hash;
181         } else {
182                 hash = ctx->wr_hash;
183         }
184
185         if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
186                 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
187                 if( fifo == NULL ) {
188                         return -1;
189                 }
190
191                 memset( fifo, 0, sizeof( *fifo ) );
192                 fifo->key = mtype;
193                 fifo->fd = open_fifo( ctx, mtype, io_dir );
194                 rmr_sym_map( hash, mtype, fifo );
195         }
196
197         if( fref != NULL ) {
198                 *fref = fifo;
199         }
200         return fifo->fd;
201 }
202
203 /*
204         Make marking counts easier in code
205 */
206 static inline void chalk_error( fifo_t* fifo ) {
207         if( fifo != NULL ) {
208                 fifo->drops++;
209                 fifo->drops_rp++;
210         }
211 }
212
213 static inline void chalk_ok( fifo_t* fifo ) {
214         if( fifo != NULL ) {
215                 fifo->wcount++;
216                 fifo->wcount_rp++;
217         }
218 }
219
220 /*
221         Callback function driven to traverse the symtab and generate the 
222         counts for each fifo.
223 */
224 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
225         fifo_t* fifo;
226         int             report_period = 60;
227
228         if( data ) {
229                 report_period = *((int *) data);
230         }
231
232         if( (fifo = (fifo_t *) thing) != NULL ) {
233                 fprintf( stdout, "[STAT] (mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld\n", 
234                         fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
235
236                 fifo->wcount_rp = 0;            // reset the report counts
237                 fifo->drops_rp = 0;
238         }
239 }
240
241 // ---------- public ------------------------------------------------------
242 /*
243         Sets a signal handler for sigpipe so we don't crash if a reader closes the
244         last reading fd on a pipe. We could do this automatically, but if the user
245         programme needs to trap sigpipe too, this gives them the option not to have
246         us interfere.
247 */
248 extern int mcl_set_sigh( ) {
249         signal( SIGPIPE, SIG_IGN );
250 }
251
252 /*
253         "Opens" the interface to RMR such that messages sent to the application will
254         be available via the rmr receive funcitons. This is NOT automatically called
255         by the mk_context function as some applications will be using the mc library
256         for non-RMR, fifo, chores.
257 */
258 extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
259         mcl_ctx_t*      ctx;
260         int             announce = 0;
261
262         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
263                 return 0;
264         }
265
266         ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
267         if( ctx->mrc == NULL ) {
268                 fprintf( stderr, "[CRIT]  unable to initialise RMr\n" );
269                 return 0;
270         }
271
272         while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
273                 if( announce <= 0 ) {
274                         fprintf( stderr, "[INFO] waiting for RMR to show ready\n" );
275                         announce = 10;
276                 } else {
277                         announce--;
278                 }
279
280                 sleep( 1 );
281         }
282
283         return 1;
284 }
285
286 /*
287         Blocks until a message arives with a good return code or we timeout. Returns the 
288         rmr message buffer. Timeout value epxected in seconds.
289 */
290 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
291         mcl_ctx_t*      ctx;
292
293         if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
294                 return NULL;
295         }
296
297         if( ctx->mrc == NULL ) {
298                 fprintf( stderr, "bad context\n" );
299                 exit( 1 );
300         }
301
302         do {
303                 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 );                           // wait for next
304         } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
305
306         return msg;
307 }
308
309 /*
310         Create the context.
311 */
312 extern  void* mcl_mk_context( char* dir ) {
313         mcl_ctx_t*      ctx;
314
315         if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
316                 memset( ctx, 0, sizeof( *ctx ) );
317                 ctx->fifo_dir = strdup( dir );
318                 ctx->wr_hash = rmr_sym_alloc( 1001 );
319                 ctx->rd_hash = rmr_sym_alloc( 1001 );
320
321                 if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
322                         fprintf( stderr, "[ERR] (mcl) unable to allocate hash table for fifo keys\n" );
323                         free( ctx );
324                         return NULL;
325                 }
326         }
327
328         return (void *) ctx;
329 }
330
331 /*
332         Read the header. Best case we read the expected number of bytes, get all
333         of them and find that they start with the delemiter.  Worst case
334         We have to wait for all of the header, or need to synch at the next
335         delimeter. We assume best case most likely and handle it as such.
336 */
337 static void read_header( int fd, char* buf ) {
338         int len;
339         int need = MCL_EXHDR_SIZE;              // total needed
340         int dneed;                                              // delimieter needed
341         int     rlen;
342         char*   rp;                             // read position in buf
343
344         len = read( fd, buf, need );
345         if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
346                 return;
347         }
348         
349         while( TRUE ) {
350                 if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
351                         rp = buf + len;
352                         dneed = strlen( MCL_DELIM ) - len;
353         
354                         while( dneed > 0 ) {
355                                 len = read( fd, rp, dneed );
356                                 dneed -= len;
357                                 rp += len;
358                         }
359                 }
360
361                 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
362                         need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
363                         rp = buf + (MCL_EXHDR_SIZE - need);
364         
365                         while( need > 0 ) {
366                                 len = read( fd, rp, need );
367                                 need -= len;
368                                 rp += len;
369                         }
370
371                         return;
372                 }
373
374                 while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
375                         len = read( fd, buf, 1 );               // because we ignore start byte that might be in the buffer)
376                 }
377
378                 need = MCL_EXHDR_SIZE - len;
379         }
380 }
381
382
383 /*
384         Read one record from the fifo that the message type maps to.
385         Writes at max ublen bytes into the ubuf.
386
387         If long_hdrs is true (!0), then we expect that the stream in the fifo
388         has extended headers (<delim><len><time>), and will write the timestamp
389         from the header into the buffer pointed to by timestamp. The buffer is
390         assumed to be at least MCL_TSTAMP_SIZE bytes in length.
391
392         Further, when extended headers are being used, this function will
393         automatically resynchronise if it detects an issue.
394
395         The function could look for the delimiter and automatically detect whether
396         or not extended headers are in use, but if the stream is out of synch on the
397         first read, this cannot be done, so the funciton requires that the caller
398         know that the FIFO contains extended headers.  
399 */
400 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
401         int fd;
402         int len;
403         int     msg_len;
404         int     got = 0;                                                // number of bytes we actually got
405         int need;
406         char wbuf[4096];
407         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
408         fifo_t* fref = NULL;                            // the fifo struct we found
409
410         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
411                 errno = EINVAL;
412                 return 0;
413         }
414
415         if( (fd = suss_fifo( ctx, mtype, READER, NULL ))  >= 0 )  {
416                 if( long_hdrs ) {
417                         read_header( fd, wbuf );
418                         msg_len = need = atoi( wbuf + MCL_LEN_OFF );                            // read the length
419                         if( timestamp ) {
420                                 strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
421                         }
422                 } else {
423                         if( timestamp != NULL ) {                                               // won't be there, but ensure it's not garbage
424                                 *timestamp = 0;
425                         }
426
427                         len = read( fd, wbuf, MCL_LEN_SIZE );                   // we assume we will get all 8 as there isn't a way to sync the old stream
428                         msg_len = need = atoi( wbuf );
429                 }
430
431                 
432                 if( need > ublen ) {
433                         need = ublen;                                           // cannot give them more than they can take
434                 }
435                 while( need > 0 ) {
436                         len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );          
437                         memcpy( ubuf+got, wbuf, len );
438                         got += len;
439                         need -= len;
440                 }
441
442                 if( msg_len > got ) {                                   // we must ditch rest of this message
443                         need = msg_len = got;
444                         while( need > 0 ) {
445                                 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );          
446                                 need -= len;
447                         }
448                 }
449
450                 return got;
451         }
452
453         errno = EBADFD;
454         return 0;
455 }
456
457 /*
458         Read one record from the fifo that the message type maps to.
459         Writes at max ublen bytes into the ubuf. If extended headers are in use
460         this function will ignore the timestamp.
461
462         If long_hdrs is true (!0), then we expect that the stream in the fifo
463         has extended headers (<delim><len><time>).
464 */
465 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
466         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
467 }
468
469 /*
470         Read a single message from the FIFO returning it in the caller's buffer. If extended
471         headers are being used, and the caller supplied a timestamp buffer, the timestamp
472         which was in the header will be returned in that buffer.  The return value is the number
473         of bytes in the buffer; 0 indicates an error and errno should be set.   
474 */
475 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
476         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
477 }
478
479
480 /*
481         Will read messages and fan them out based on the message type. This should not
482         return and if it does the caller should assume an error. 
483
484         The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
485         string , followed by that number of 'raw' bytes. The raw bytes are the payload 
486         exactly as received.
487
488         The report parameter is the frequency, in seconds, for writing a short
489         status report to stdout. If 0 then it's off.
490
491         If long_hdr is true, then we geneate an extended header with a delimiter and
492         timestamp.
493 */
494 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
495         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
496         fifo_t*         fifo;                                   // fifo to chalk counts on
497         rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
498         char            wbuf[128];                              // buffer to build len string in
499         int                     state;
500         int                     fd;                                             // file des to write to
501         long long       total = 0;                              // total messages received and written
502         long long       total_drops = 0;                // total messages received and written
503         long            count = 0;                              // messages received and written during last reporting period
504         long            errors = 0;                             // unsuccessful payload writes
505         long            drops;                                  // number of drops
506         time_t          next_report = 0;                // we'll report every 2 seconds if report is true
507         time_t          now;
508         int                     hwlen;                                  // write len for header
509
510         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
511                 fprintf( stderr, "[ERR] (mcl) invalid context given to fanout\n" );
512                 errno = EINVAL;
513                 return;
514         }
515
516         if( report < 0 ) {
517                 report = 0;
518         }
519
520         while( 1 ) {
521                 mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
522
523                 if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0  ) {
524                         fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
525                         if( fd >= 0 ) {
526                                 if( long_hdr ) {
527                                         build_hdr( mbuf->len, wbuf, sizeof( wbuf ) );
528                                         hwlen = MCL_EXHDR_SIZE;
529                                 } else {
530                                         snprintf( wbuf, sizeof( wbuf ), "%07d", mbuf->len );                    // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
531                                         hwlen = MCL_LEN_SIZE;
532                                 }
533
534                                 if( (state = write( fd, wbuf, hwlen )) != hwlen ) {             // write exactly MCL_LEN_SIZE bytes from the buffer
535                                         drops++;
536                                         total_drops++;
537                                         chalk_error( fifo );
538                                 } else {
539                                         if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload      
540                                                 errors++;
541                                                 chalk_error( fifo );
542                                         } else {
543                                                 chalk_ok( fifo );
544                                                 count++;
545                                                 total++;
546                                         }
547                                 }
548                         }
549                 }
550
551                 if( report ) {
552                         if( (now = time( NULL ) ) > next_report ) {
553                         rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
554                                 fflush( stdout );
555
556                                 fprintf( stdout, "[STAT] (mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors\n", 
557                                         total, total_drops, report, count, drops, errors );
558                                 next_report = now + report;
559                                 count = 0;
560                                 drops = 0;
561
562                                 fflush( stdout );
563                         }
564                 }
565         }
566 }
567
568