1 // vim: ts=4 sw=4 noet:
3 --------------------------------------------------------------------------------
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 --------------------------------------------------------------------------------
22 Abstract: The mc listener library content. All external functions
23 should start with mcl_ and all stderr messages should have
24 (mcl) as the first token following the severity indicator.
27 Author: E. Scott Daniels
39 #include <sys/types.h>
43 #include <rmr/rmr_symtab.h>
44 #include <rmr/RIC_message_types.h>
59 Information about one file descriptor. This is pointed to by the hash
60 such that the message type can be used as a key to look up the fifo's
65 int key; // symtab key
66 long long wcount; // number of writes
67 long long drops; // number dropped
69 long long wcount_rp; // number of writes during last reporting period
70 long long drops_rp; // number dropped during last reporting period
74 Our conext. Pointers to the read and write hash tables (both keyed on the message
75 type), the message router (RMR) context, and other goodies.
78 void* mrc; // the message router's context
79 void* wr_hash; // symtable to look up pipe info based on mt for writing
80 void* rd_hash; // we support reading from pipes, but need a different FD for that
81 char* fifo_dir; // directory where we open fifos
85 // -------- private -------------------------------------------------------
89 Set up for raw data capture. We look for directory overriedes from
90 environment variables, and then invoke the rdc_init() to actually
93 static void* setup_rdc() {
95 int value; // value computed for something
96 char* ep; // pointer to environment var
97 char* sdir = "/tmp/rdc/stage"; // default directory names
98 char* fdir = "/tmp/rdc/final";
99 char* suffix = ".rdc";
102 if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) { // exists and is 0
103 logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)" );
107 if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
110 mkdir( "/tmp/rdc", 0755 ); // we ignore failures here as it could likely exist
114 if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
117 mkdir( "/tmp/rdc", 0755 ); // we ignore failures again -- very likely it's there
121 if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
125 if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
129 ctx = rdc_init( sdir, fdir, suffix, done );
131 logit( LOG_ERR, "rdc_init did not generate a context" );
133 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
134 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
137 if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
139 logit( LOG_INFO, "setting frequency: %d", value );
140 rdc_set_freq( ctx, value );
146 Builds an extended header in the buffer provided, or allocates a new buffer if
147 dest is nil. The header is of the form:
148 <delim><len><timestamp>
150 Field lengths (bytes) are:
153 timestamp 16 (15 digits + 0)
156 Timestamp is a single unsigned long long in ASCII; ms since epoch.
157 If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
158 the timestamp generated will be 1570113591103.
160 The lenght and timestamp fields in the header are zero terminated so
161 they can be parsed as a string (atoi etc).
163 static char* build_hdr( int len, char* dest, int dest_len ) {
164 struct timespec ts; // time just before call executed
167 dest_len = MCL_EXHDR_SIZE + 2; // more than enough room
168 dest = (char *) malloc( sizeof( char ) * dest_len );
170 if( dest_len < MCL_EXHDR_SIZE ) { // shouldn't happen, but take no chances
171 memset( dest, 0, dest_len );
176 memset( dest, 0, dest_len );
178 clock_gettime( CLOCK_REALTIME, &ts );
179 snprintf( dest, dest_len, "%s%07d", MCL_DELIM, len );
180 snprintf( dest+12, dest_len-13, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
186 Build a file name and open. The io_direction is either READER or
187 WRITER. For a writer we must 'trick' the system into allowing us
188 to open a pipe for writing in non-blocking mode so that we can
189 report on drops (messages we couldn't write because there was no
190 reader). The trick is to open a reader on the pipe so that when
191 we open the writer there's a reader and the open won't fail. As
192 soon as we have the writer open, we can close the junk reader.
194 If the desired fifo does not exist, it is created.
196 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
198 int fd; // real file des
199 int jfd = -1; // junk file des
202 if( ctx == NULL || mtype < 0 ) {
206 snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
208 state = mkfifo( wbuf, 0660 ); // make the fifo; this will fail if it exists and that's ok
209 if( state != 0 && errno != EEXIST ) {
210 logit( LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
214 if( io_dir == READER ) {
215 fd = open( wbuf, O_RDONLY ); // just open the reader
217 logit( LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
220 jfd = open( wbuf, O_RDWR | O_NONBLOCK ); // must have a reader before we can open a non-blocking writer
222 logit( LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
226 fd = open( wbuf, O_WRONLY | O_NONBLOCK ); // this will be our actual writer, in non-blocking mode
228 logit( LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
231 close( jfd ); // should be safe to close this
239 Given a message type, return the file des of the fifo that
240 the payload should be written to. Returns the file des, or -1
241 on error. When sussing out a read file descriptor this will
242 block until there is a fifo for the message type which is
245 If fref is not nil, then a pointer to the fifo info block is returned
246 allowing for direct update of counts after the write.
248 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
259 if( io_dir == READER ) { // with an integer key, we need two hash tables
265 if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
266 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
268 memset( fifo, 0, sizeof( *fifo ) );
270 fifo->fd = open_fifo( ctx, mtype, io_dir );
271 if( fifo->fd >= 0 ) { // save only on good open
272 rmr_sym_map( hash, mtype, fifo );
279 if( fifo->fd < 0 ) { // it existed, but was closed; reopen
280 fifo->fd = open_fifo( ctx, mtype, io_dir );
288 return fifo == NULL ? -1 : fifo->fd;
292 Should we need to close a FIFO we do so and leave the block in the hash
293 with a bad FD so that we'll attempt to reopen on next use.
295 static void close_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
303 if( io_dir == READER ) { // with an integer key, we need two hash tables
309 if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) != NULL ) {
310 if( fifo->fd >= 0 ) {
318 Make marking counts easier in code
320 static inline void chalk_error( fifo_t* fifo ) {
327 static inline void chalk_ok( fifo_t* fifo ) {
335 Callback function driven to traverse the symtab and generate the
336 counts for each fifo. Sonar will complain about unused parameters which
337 are normal for callbacks. Further, sonar will grumble about st, and entry
338 not being const; we can't unless RMR proto for the callback changes.
340 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
342 int report_period = 60;
345 report_period = *((int *) data);
348 if( (fifo = (fifo_t *) thing) != NULL ) {
349 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
350 fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
352 fifo->wcount_rp = 0; // reset the report counts
354 return; // return here to avoid sonar required hack below
358 Sonar doesn't grok the fact that for callback functions some parms are naturally
359 ignored. So, to eliminate the 5 code smells because we only care about thing, we
362 if( st == NULL && entry == NULL && name == NULL && data == NULL ) {
363 fprintf( stderr, "mdcl: all parms to callback stats were nil\n" );
368 Writes the indicated bytes (n2write) from buf onto the fd. Returns only after
369 full buffer is written, or there is a hard error (not eagain or eintr).
370 Returns the number written; if less than n2write the caller may assume
371 that there was a hard error and errno should reflect.
373 static inline int write_all( int fd, char const* buf, int n2write ) {
374 ssize_t remain = 0; // number of bytes remaining to write
375 ssize_t wrote = 0; // number of bytes written thus far
386 if( (state = write( fd, buf + wrote, remain )) > 0 ) {
388 remain = n2write - wrote;
390 } while( remain > 0 && (errno == EINTR || errno == EAGAIN) ) ;
396 Similar to write_all, this will write all bytes in the buffer, but
397 will return failure if the first write attempt fails with 0 written
398 (assuming that the pipe has no reader). We use this when writing the
399 header bytes; we want to drop the message if we can't even write one
400 byte, but if we write one, we must loop until all are written.
402 Returns the number written. If that value is less than n2write, then
403 the caller may assume a hard error occurred and errno should reflect.
404 If 0 is returned it can be assumed that the FIFO would block/has no
407 static inline int write_all_nb( int fd, char const* buf, int n2write ) {
408 ssize_t remain = 0; // number of bytes remaining to write
409 ssize_t wrote = 0; // number of bytes written
418 wrote = write( fd, buf, remain );
419 if( wrote < 0 ) { // report error with exception for broken pipe
420 return errno == EPIPE ? 0 : -1; // broken pipe we assume no reader and return 0 since nothing written
423 if( wrote < n2write && wrote > 0 ) { // if we wrote anything, we must tough it out and write all if it was short
424 wrote += write_all( fd, buf + wrote, n2write - wrote );
430 // ---------- public ------------------------------------------------------
432 Sets a signal handler for sigpipe so we don't crash if a reader closes the
433 last reading fd on a pipe. We could do this automatically, but if the user
434 programme needs to trap sigpipe too, this gives them the option not to have
437 extern int mcl_set_sigh( ) {
438 signal( SIGPIPE, SIG_IGN );
442 "Opens" the interface to RMR such that messages sent to the application will
443 be available via the rmr receive funcitons. This is NOT automatically called
444 by the mk_context function as some applications will be using the mc library
445 for non-RMR, fifo, chores.
447 extern int mcl_start_listening( void* vctx, char* port, int wait4ready ) {
451 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
455 ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
456 if( ctx->mrc == NULL ) {
457 logit( LOG_CRIT, "start listening: unable to initialise RMr" );
461 while( wait4ready && ! rmr_ready( ctx->mrc ) ) { // only senders need to wait
462 if( announce <= 0 ) {
463 logit( LOG_INFO, "waiting for RMR to show ready" );
476 Blocks until a message arives with a good return code or we timeout. Returns the
477 rmr message buffer. Timeout value epxected in seconds.
479 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
482 if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
486 if( ctx->mrc == NULL ) {
487 logit( LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
492 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 ); // wait for next
493 } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
501 extern void* mcl_mk_context( const char* dir ) {
504 if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
505 memset( ctx, 0, sizeof( *ctx ) );
506 ctx->fifo_dir = strdup( dir );
507 ctx->wr_hash = rmr_sym_alloc( 1001 );
508 ctx->rd_hash = rmr_sym_alloc( 1001 );
510 if( ctx->wr_hash == NULL || ctx->rd_hash == NULL ) {
511 logit( LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
521 Read the header. Best case we read the expected number of bytes, get all
522 of them and find that they start with the delemiter. Worst case
523 We have to wait for all of the header, or need to synch at the next
524 delimeter. We assume best case most likely and handle it as such.
526 static void read_header( int fd, char* buf ) {
528 size_t need = MCL_EXHDR_SIZE; // total needed
529 size_t dneed; // delimieter needed
530 char* rp; // read position in buf
532 len = read( fd, buf, need );
533 if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // best case, most likely
538 if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
540 dneed = strlen( MCL_DELIM ) - len;
543 len = read( fd, rp, dneed );
549 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // have a good delimiter, just need to wait for bytes
550 need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
551 rp = buf + (MCL_EXHDR_SIZE - need);
554 len = read( fd, rp, need );
562 while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop
563 len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer)
570 Read one record from the fifo that the message type maps to.
571 Writes at max ublen bytes into the ubuf.
573 If long_hdrs is true (!0), then we expect that the stream in the fifo
574 has extended headers (<delim><len><time>), and will write the timestamp
575 from the header into the buffer pointed to by timestamp. The buffer is
576 assumed to be at least MCL_TSTAMP_SIZE bytes in length.
578 Further, when extended headers are being used, this function will
579 automatically resynchronise if it detects an issue.
581 The function could look for the delimiter and automatically detect whether
582 or not extended headers are in use, but if the stream is out of synch on the
583 first read, this cannot be done, so the funciton requires that the caller
584 know that the FIFO contains extended headers.
586 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
590 int got = 0; // number of bytes we actually got
593 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
595 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
600 if( (fd = suss_fifo( ctx, mtype, READER, NULL )) >= 0 ) {
602 read_header( fd, wbuf );
603 msg_len = need = atoi( wbuf + MCL_LEN_OFF ); // read the length
605 strncpy( timestamp, wbuf + MCL_TSTAMP_OFF+1, MCL_TSTAMP_SIZE );
608 if( timestamp != NULL ) { // won't be there, but ensure it's not garbage
612 read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 bytes as there isn't a way to sync the old stream
613 msg_len = need = atoi( wbuf );
618 need = ublen; // cannot give them more than they can take
621 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
622 memcpy( ubuf+got, wbuf, len );
627 if( msg_len > got ) { // we must ditch rest of this message
628 need = msg_len - got;
630 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
643 Read one record from the fifo that the message type maps to.
644 Writes at max ublen bytes into the ubuf. If extended headers are in use
645 this function will ignore the timestamp.
647 If long_hdrs is true (!0), then we expect that the stream in the fifo
648 has extended headers (<delim><len><time>).
650 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
651 return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
655 Read a single message from the FIFO returning it in the caller's buffer. If extended
656 headers are being used, and the caller supplied a timestamp buffer, the timestamp
657 which was in the header will be returned in that buffer. The return value is the number
658 of bytes in the buffer; 0 indicates an error and errno should be set.
660 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
661 return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
666 Will read messages and fan them out based on the message type. This should not
667 return and if it does the caller should assume an error.
669 The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
670 string , followed by that number of 'raw' bytes. The raw bytes are the payload
673 The report parameter is the frequency, in seconds, for writing a short
674 status report to stdout. If 0 then it's off.
676 If long_hdr is true, then we geneate an extended header with a delimiter and
679 The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
680 message. When the health check message is received it is responded to
681 with the current state of processing (ok or err).
683 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
684 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
685 fifo_t* fifo; // fifo to chalk counts on
686 rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call
687 char header[128]; // header we'll pop in front of the payload
688 int fd; // file des to write to
689 long long total = 0; // total messages received and written
690 long long total_drops = 0; // total messages received and written
691 long count = 0; // messages received and written during last reporting period
692 long errors = 0; // unsuccessful payload writes
693 long drops = 0; // number of drops
694 time_t next_report = 0; // we'll report every 2 seconds if report is true
696 size_t hwlen; // write len for header
697 size_t wrote; // number of bytes actually written
698 void* rdc_ctx = NULL; // raw data capture context
699 void* rdc_buf = NULL; // capture buffer
701 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
702 logit( LOG_ERR, "(mcl) invalid context given to fanout" );
711 rdc_ctx = setup_rdc( ); // pull rdc directories from enviornment and initialise
714 mbuf = mcl_get_msg( ctx, mbuf, report ); // wait up to report sec for msg (0 == block until message)
716 if( mbuf != NULL && mbuf->state == RMR_OK ) {
717 if( mbuf->mtype == RIC_HEALTH_CHECK_REQ ) {
718 mbuf->mtype = RIC_HEALTH_CHECK_RESP; // if we're here we are running and all is ok
720 mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE ); // ensure payload is large enough
721 if( mbuf->payload != NULL ) {
722 strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
723 rmr_rts_msg( ctx->mrc, mbuf );
728 if( mbuf->len > 0 ) {
730 build_hdr( mbuf->len, header, sizeof( header ) );
731 hwlen = MCL_EXHDR_SIZE;
733 snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
734 hwlen = MCL_LEN_SIZE;
737 fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
739 if( (wrote = write_all_nb( fd, header, hwlen )) == 0 ) { // write header; 0 indicates no reader, drop silently
744 if( wrote != hwlen ) {
745 logit( LOG_ERR, "(mcl): error writing header to fifo; mt=%d wrote=%d tried=%d: %s", mbuf->mtype, wrote, hwlen, strerror( errno ) );
748 close_fifo( ctx, mbuf->mtype, WRITER );
750 if( write_all( fd, mbuf->payload, mbuf->len ) != mbuf->len ) { // we wrote a header, so we must write all; no drop at this point
751 logit( LOG_ERR, "(mcl): error writing payload to fifo; mt=%d: %s\n", mbuf->mtype, strerror( errno ) );
752 close_fifo( ctx, mbuf->mtype, WRITER );
762 if( rdc_ctx != NULL ) { // always put the message to the rdc files if collecting; eve if pipe write failed
763 rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
764 rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
770 if( (now = time( NULL ) ) > next_report ) {
771 rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
774 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
775 total, total_drops, report, count, drops, errors );
776 next_report = now + report;
785 if( ! FOREVER ) { // allow escape during unit tests; compiled out othewise, but sonar won't see that
787 break; // sonar grumbles if we put FOREVER into the while; maddening
794 Given a buffer and length, along with the message type, look up the fifo and write
795 the buffer. Returns 0 on error; 1 on success.
797 extern int mcl_fifo_one( void* vctx, const char* payload, int plen, int mtype ) {
798 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
799 fifo_t* fifo; // fifo to chalk counts on
801 int fd; // file des to write to
803 if( plen <= 0 || payload == NULL ) {
807 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
808 logit( LOG_ERR, "(mcl) invalid context given to fifo_one\n" );
812 fd = suss_fifo( ctx, mtype, WRITER, &fifo ); // map the message type to an open fd
814 state = write( fd, payload, plen );
817 return state == (size_t) plen;