1 // vim: ts=4 sw=4 noet:
3 --------------------------------------------------------------------------------
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 --------------------------------------------------------------------------------
22 Abstract: The mc listener library content. All external functions
23 should start with mcl_ and all stderr messages should have
24 (mcl) as the first token following the severity indicator.
27 Author: E. Scott Daniels
39 #include <sys/types.h>
43 #include <rmr/rmr_symtab.h>
44 #include <rmr/RIC_message_types.h>
59 Information about one file descriptor. This is pointed to by the hash
60 such that the message type can be used as a key to look up the fifo's
65 int key; // symtab key
66 long long wcount; // number of writes
67 long long drops; // number dropped
69 long long wcount_rp; // number of writes during last reporting period
70 long long drops_rp; // number dropped during last reporting period
74 Our conext. Pointers to the read and write hash tables (both keyed on the message
75 type), the message router (RMR) context, and other goodies.
78 void* mrc; // the message router's context
79 void* wr_hash; // symtable to look up pipe info based on mt for writing
80 void* rd_hash; // we support reading from pipes, but need a different FD for that
81 char* fifo_dir; // directory where we open fifos
85 // -------- private -------------------------------------------------------
89 Set up for raw data capture. We look for directory overriedes from
90 environment variables, and then invoke the rdc_init() to actually
93 static void* setup_rdc() {
95 int value; // value computed for something
96 char* ep; // pointer to environment var
97 char* sdir = "/tmp/rdc/stage"; // default directory names
98 char* fdir = "/tmp/rdc/final";
99 char* suffix = ".rdc";
102 if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
103 if( ep != NULL && atoi( ep ) == 0 ) {
104 logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
109 if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
112 mkdir( "/tmp/rdc", 0755 ); // we ignore failures here as it could likely exist
116 if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
119 mkdir( "/tmp/rdc", 0755 ); // we ignore failures again -- very likely it's there
123 if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
127 if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
131 ctx = rdc_init( sdir, fdir, suffix, done );
133 logit( LOG_ERR, "rdc_init did not generate a context" );
135 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
136 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
139 if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
141 logit( LOG_INFO, "setting frequency: %d", value );
142 rdc_set_freq( ctx, value );
148 Builds an extended header in the buffer provided, or allocates a new buffer if
149 dest is nil. The header is of the form:
150 <delim><len><timestamp>
152 Field lengths (bytes) are:
155 timestamp 16 (15 digits + 0)
158 Timestamp is a single unsigned long long in ASCII; ms since epoch.
159 If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
160 the timestamp generated will be 1570113591103.
162 The lenght and timestamp fields in the header are zero terminated so
163 they can be parsed as a string (atoi etc).
165 static char* build_hdr( int len, char* dest, int dest_len ) {
166 struct timespec ts; // time just before call executed
169 dest_len = MCL_EXHDR_SIZE + 2; // more than enough room
170 dest = (char *) malloc( sizeof( char ) * dest_len );
172 if( dest_len < MCL_EXHDR_SIZE ) { // shouldn't happen, but take no chances
173 memset( dest, 0, dest_len );
178 memset( dest, 0, dest_len );
180 clock_gettime( CLOCK_REALTIME, &ts );
181 snprintf( dest, dest_len, "%s%07d", MCL_DELIM, len );
182 snprintf( dest+12, dest_len-13, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
188 Build a file name and open. The io_direction is either READER or
189 WRITER. For a writer we must 'trick' the system into allowing us
190 to open a pipe for writing in non-blocking mode so that we can
191 report on drops (messages we couldn't write because there was no
192 reader). The trick is to open a reader on the pipe so that when
193 we open the writer there's a reader and the open won't fail. As
194 soon as we have the writer open, we can close the junk reader.
196 If the desired fifo does not exist, it is created.
198 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
200 int fd; // real file des
201 int jfd = -1; // junk file des
204 if( ctx == NULL || mtype < 0 ) {
208 snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
210 state = mkfifo( wbuf, 0660 ); // make the fifo; this will fail if it exists and that's ok
211 if( state != 0 && errno != EEXIST ) {
212 logit( LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
216 if( io_dir == READER ) {
217 fd = open( wbuf, O_RDONLY ); // just open the reader
219 logit( LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
222 jfd = open( wbuf, O_RDWR | O_NONBLOCK ); // must have a reader before we can open a non-blocking writer
224 logit( LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
228 fd = open( wbuf, O_WRONLY | O_NONBLOCK ); // this will be our actual writer, in non-blocking mode
230 logit( LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
233 close( jfd ); // should be safe to close this
241 Given a message type, return the file des of the fifo that
242 the payload should be written to. Returns the file des, or -1
243 on error. When sussing out a read file descriptor this will
244 block until there is a fifo for the message type which is
247 If fref is not nil, then a pointer to the fifo info block is returned
248 allowing for direct update of counts after the write.
250 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
261 if( io_dir == READER ) { // with an integer key, we need two hash tables
267 if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
268 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
270 memset( fifo, 0, sizeof( *fifo ) );
272 fifo->fd = open_fifo( ctx, mtype, io_dir );
273 if( fifo->fd >= 0 ) { // save only on good open
274 rmr_sym_map( hash, mtype, fifo );
286 return fifo == NULL ? -1 : fifo->fd;
290 Make marking counts easier in code
292 static inline void chalk_error( fifo_t* fifo ) {
299 static inline void chalk_ok( fifo_t* fifo ) {
307 Callback function driven to traverse the symtab and generate the
308 counts for each fifo.
310 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
312 int report_period = 60;
315 report_period = *((int *) data);
318 if( (fifo = (fifo_t *) thing) != NULL ) {
319 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
320 fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
322 fifo->wcount_rp = 0; // reset the report counts
327 // ---------- public ------------------------------------------------------
329 Sets a signal handler for sigpipe so we don't crash if a reader closes the
330 last reading fd on a pipe. We could do this automatically, but if the user
331 programme needs to trap sigpipe too, this gives them the option not to have
334 extern int mcl_set_sigh( ) {
335 signal( SIGPIPE, SIG_IGN );
339 "Opens" the interface to RMR such that messages sent to the application will
340 be available via the rmr receive funcitons. This is NOT automatically called
341 by the mk_context function as some applications will be using the mc library
342 for non-RMR, fifo, chores.
344 extern int mcl_start_listening( void* vctx, char* port, int wait4ready ) {
348 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
352 ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
353 if( ctx->mrc == NULL ) {
354 logit( LOG_CRIT, "start listening: unable to initialise RMr" );
358 while( wait4ready && ! rmr_ready( ctx->mrc ) ) { // only senders need to wait
359 if( announce <= 0 ) {
360 logit( LOG_INFO, "waiting for RMR to show ready" );
373 Blocks until a message arives with a good return code or we timeout. Returns the
374 rmr message buffer. Timeout value epxected in seconds.
376 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
379 if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
383 if( ctx->mrc == NULL ) {
384 logit( LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
389 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 ); // wait for next
390 } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
398 extern void* mcl_mk_context( char* dir ) {
401 if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
402 memset( ctx, 0, sizeof( *ctx ) );
403 ctx->fifo_dir = strdup( dir );
404 ctx->wr_hash = rmr_sym_alloc( 1001 );
405 ctx->rd_hash = rmr_sym_alloc( 1001 );
407 if( ctx->wr_hash == NULL || ctx->rd_hash == NULL ) {
408 logit( LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
418 Read the header. Best case we read the expected number of bytes, get all
419 of them and find that they start with the delemiter. Worst case
420 We have to wait for all of the header, or need to synch at the next
421 delimeter. We assume best case most likely and handle it as such.
423 static void read_header( int fd, char* buf ) {
425 int need = MCL_EXHDR_SIZE; // total needed
426 int dneed; // delimieter needed
428 char* rp; // read position in buf
430 len = read( fd, buf, need );
431 if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // best case, most likely
436 if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
438 dneed = strlen( MCL_DELIM ) - len;
441 len = read( fd, rp, dneed );
447 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // have a good delimiter, just need to wait for bytes
448 need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
449 rp = buf + (MCL_EXHDR_SIZE - need);
452 len = read( fd, rp, need );
460 while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop
461 len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer)
464 need = MCL_EXHDR_SIZE - len;
470 Read one record from the fifo that the message type maps to.
471 Writes at max ublen bytes into the ubuf.
473 If long_hdrs is true (!0), then we expect that the stream in the fifo
474 has extended headers (<delim><len><time>), and will write the timestamp
475 from the header into the buffer pointed to by timestamp. The buffer is
476 assumed to be at least MCL_TSTAMP_SIZE bytes in length.
478 Further, when extended headers are being used, this function will
479 automatically resynchronise if it detects an issue.
481 The function could look for the delimiter and automatically detect whether
482 or not extended headers are in use, but if the stream is out of synch on the
483 first read, this cannot be done, so the funciton requires that the caller
484 know that the FIFO contains extended headers.
486 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
490 int got = 0; // number of bytes we actually got
493 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
494 fifo_t* fref = NULL; // the fifo struct we found
496 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
501 if( (fd = suss_fifo( ctx, mtype, READER, NULL )) >= 0 ) {
503 read_header( fd, wbuf );
504 msg_len = need = atoi( wbuf + MCL_LEN_OFF ); // read the length
506 strncpy( timestamp, wbuf + MCL_TSTAMP_OFF+1, MCL_TSTAMP_SIZE );
509 if( timestamp != NULL ) { // won't be there, but ensure it's not garbage
513 len = read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 as there isn't a way to sync the old stream
514 msg_len = need = atoi( wbuf );
519 need = ublen; // cannot give them more than they can take
522 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
523 memcpy( ubuf+got, wbuf, len );
528 if( msg_len > got ) { // we must ditch rest of this message
529 need = msg_len = got;
531 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
544 Read one record from the fifo that the message type maps to.
545 Writes at max ublen bytes into the ubuf. If extended headers are in use
546 this function will ignore the timestamp.
548 If long_hdrs is true (!0), then we expect that the stream in the fifo
549 has extended headers (<delim><len><time>).
551 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
552 return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
556 Read a single message from the FIFO returning it in the caller's buffer. If extended
557 headers are being used, and the caller supplied a timestamp buffer, the timestamp
558 which was in the header will be returned in that buffer. The return value is the number
559 of bytes in the buffer; 0 indicates an error and errno should be set.
561 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
562 return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
567 Will read messages and fan them out based on the message type. This should not
568 return and if it does the caller should assume an error.
570 The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
571 string , followed by that number of 'raw' bytes. The raw bytes are the payload
574 The report parameter is the frequency, in seconds, for writing a short
575 status report to stdout. If 0 then it's off.
577 If long_hdr is true, then we geneate an extended header with a delimiter and
580 The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
581 message. When the health check message is received it is responded to
582 with the current state of processing (ok or err).
584 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
585 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
586 fifo_t* fifo; // fifo to chalk counts on
587 rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call
588 char header[128]; // header we'll pop in front of the payload
590 int fd; // file des to write to
591 long long total = 0; // total messages received and written
592 long long total_drops = 0; // total messages received and written
593 long count = 0; // messages received and written during last reporting period
594 long errors = 0; // unsuccessful payload writes
595 long drops; // number of drops
596 time_t next_report = 0; // we'll report every 2 seconds if report is true
598 int hwlen; // write len for header
599 void* rdc_ctx = NULL; // raw data capture context
600 void* rdc_buf = NULL; // capture buffer
602 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
603 logit( LOG_ERR, "(mcl) invalid context given to fanout" );
612 rdc_ctx = setup_rdc( ); // pull rdc directories from enviornment and initialise
615 mbuf = mcl_get_msg( ctx, mbuf, report ); // wait up to report sec for msg (0 == block until message)
617 if( mbuf != NULL && mbuf->state == RMR_OK ) {
618 if( mbuf->mtype == RIC_HEALTH_CHECK_REQ ) {
619 mbuf->mtype = RIC_HEALTH_CHECK_RESP; // if we're here we are running and all is ok
621 mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE ); // ensure payload is large enough
622 strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
623 rmr_rts_msg( ctx->mrc, mbuf );
627 if( mbuf->len > 0 ) {
628 fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
631 build_hdr( mbuf->len, header, sizeof( header ) );
632 hwlen = MCL_EXHDR_SIZE;
634 snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
635 hwlen = MCL_LEN_SIZE;
638 if( (state = write( fd, header, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
643 if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload
654 if( rdc_ctx != NULL ) {
655 rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
656 rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
662 if( (now = time( NULL ) ) > next_report ) {
663 rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
666 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
667 total, total_drops, report, count, drops, errors );
668 next_report = now + report;
675 } while( FOREVER ); // forever allows for escape during unit testing
680 Given a buffer and length, along with the message type, look up the fifo and write
681 the buffer. Returns 0 on error; 1 on success.
683 extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) {
684 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
685 fifo_t* fifo; // fifo to chalk counts on
687 int fd; // file des to write to
693 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
694 logit( LOG_ERR, "(mcl) invalid context given to fifo_one\n" );
698 fd = suss_fifo( ctx, mtype, WRITER, &fifo ); // map the message type to an open fd
700 state = write( fd, payload, plen );
703 return state == plen;