1 // :vim ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is a simple route manager simulation which provides the ability
24 to push a route table into one or more xAPPs. Designed just to
25 drive the internal RMr route table collector outside of the static
26 file allowing for testing of port definition in some containerised
29 This application does not persist; it generates a set of tables based
30 on the config file, connects to all applications listed, distributes
31 the table, and exits. If periodic delivery of one or more different
32 configurations needs to be executed, use a shell script to wrap this
33 application in a loop.
36 Author: E. Scott Daniels
41 # comment and blank lines allowed
42 # trailing comments allowed
44 # port is used for any app listed in send2 which does not have a trailing :port
45 # it may be supplied as a different value before each table, and if not
46 # redefined applies to all subsequent tables.
49 # A table consists of a send2 list (app[:port]) which are the applications that will
50 # receive the table. Each table may contain one or more entries. Entries define
51 # the message type and subscription ID, along with one or more round robin groups.
52 # A rrgroup is one or more app:port "endpoints" which RMr will use when sending
53 # messages of the indicated type/subid. Port on a rrgroup is rquired and is the
54 # port that the application uses for app to app communications.
57 port: xapp-rtg-listen-port # 4561 default
59 send2: app1:port app2:port ... appn:port
63 rrgroup: app:port... app:port
68 rrgroup: app:port ... app:port
78 #include <sys/epoll.h>
83 #include <sys/types.h>
85 #include "req_resp.c" // simple nano interface for request/response connections
88 #define CONNECTED 1 // we've established a shunt connection to the app
93 #define ALLOC_NEW 1 // rrsend should allocate a new buffer
95 #define MAX_TABLES 16 // total tables we support
96 #define MAX_SEND2 64 // max number of apps that a table can be sent to
97 #define MAX_APPS 1024 // max total apps defined (tables * send2)
98 #define MAX_GROUPS 64 // max num of round robin groups per entry
99 #define MAX_RRG_SIZE 64 // max number of apps in a group
100 #define MAX_ENTRIES 256 // max entries in a table
101 #define MAX_TOKENS 512 // max tokens we'll break a buffer into
104 // ---------------------------------------------------------------------------------------
107 Things we need to track for an application.
110 void* shunt; // the rr context to shunt directly to an app
111 int state; // connected or not
112 char* name; // IP address or DNS name and port for connecting
113 char* port; // rr wants two strings as it builds it's own NN string
118 // ----- table stuff (very staticly sized, but this isn't for prod) ----------------------
120 A round robin group in a table entry
122 typedef struct rrgroup {
123 int napps; // number of apps
124 char* apps[MAX_RRG_SIZE];
128 A single table entry.
130 typedef struct entry {
131 int mtype; // entry message type
132 int subid; // entry sub id
134 rrgroup_t groups[MAX_GROUPS]; // the entry's groups
138 Defines a table which we will distribute.
140 typedef struct table {
141 int napps; // number of apps this table is sent to
142 int first_app; // first app in minfo that we send to
143 int nentries; // number of entries
144 entry_t entries[MAX_ENTRIES];
148 Master set of contextual information.
150 typedef struct master {
151 int napps; // number in use (next insert point)
153 int port; // the port applications open by default for our connections
154 app_t apps[MAX_APPS];
155 table_t tables[MAX_TABLES];
159 Record buffer; file in memory which can be iterated over a record at a time.
161 typedef struct rbuffer {
162 char* buffer; // stuff read from file
163 char* rec; // next record
164 int at_end; // true if end was reached
170 typedef struct tokens {
171 char* buffer; // buffer that tokens points into
172 int ntoks; // number of tokens in tokens
173 char* tokens[MAX_TOKENS]; // pointers into buffer at the start of each token 0..ntokens-1
177 // ----- token utilities ------------------------------------------------------------
180 Frees a token manager.
182 static void free_tokens( tokens_t* t ) {
191 Simple tokeniser. If sep is whitespace, then leading whitespace (all, not just
192 sep) is ignored; if sep is not whitespace, then leadign whitespace is included
193 in the first token. If sep is whitespace, consecutive instances of whitespace
194 are treated as a single seperator:
195 if sep given as space, then
196 "bug boo" and "bug boo" both generate two tokens: (bug) (boo)
198 if sep given as pipe (|), then
199 "bug||boo" generates three tokens: (bug), (), (boo)
201 Each token is a zero terminated string.
204 static tokens_t* tokenise( char* buf, char sep ) {
207 char end_sep; // if quoted endsep will be the quote mark
209 if( !buf || !(*buf) ) {
213 t = (tokens_t *) malloc( sizeof( *t ) );
214 memset( t, 0, sizeof( *t ) );
216 t->buffer = strdup( buf );
217 buf = t->buffer; // convenience
219 if( isspace( sep ) ) { // if sep is in whitespace class
220 while( buf != NULL && *buf && isspace( *buf ) ) { // pass over any leading whitespace
224 for( i = 0; i < strlen( buf ); i++ ) {
225 if( buf[i] == '\t' ) {
231 while( buf != NULL && t->ntoks < MAX_TOKENS && *buf ) {
240 t->tokens[t->ntoks++] = buf; // capture token start
242 if( (buf = strchr( buf, end_sep )) != NULL ) { // find token end
245 if( end_sep != sep ) {
249 if( isspace( sep ) ) { // treat consec seperators as one if sep is whitespace
250 while( *buf == sep ) {
260 // ----- file/record management utilities ------------------------------------------------------------
263 Read an entire file into a single buffer.
265 static char* f2b( char* fname ) {
267 off_t fsize = 8192; // size of the file
268 off_t nread; // number of bytes read
270 char* buf; // input buffer
272 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
273 if( fstat( fd, &stats ) >= 0 ) {
274 if( stats.st_size <= 0 ) { // empty file
278 fsize = stats.st_size; // stat ok, save the file size
281 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
285 if( fd < 0 ) { // didn't open or empty
286 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
294 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
300 nread = read( fd, buf, fsize );
301 if( nread < 0 || nread > fsize ) { // failure of some kind
303 errno = EFBIG; // likely too much to handle
315 Read a file into a buffer, and set a record buffer to manage it.
317 static rbuffer_t* f2r( char* fname ) {
318 char* raw; // raw buffer
321 if( (raw = f2b( fname )) == NULL ) {
325 r = (rbuffer_t *) malloc( sizeof( *r ) );
326 memset( r, 0, sizeof( *r ) );
328 r->rec = raw; // point at first (only) record
332 Return a pointer to the next record in the buffer, or nil if at
335 static char* next_rec( rbuffer_t* r ) {
338 if( !r || r->at_end ) {
343 r->rec = strchr( r->rec, '\n' );
351 r->at_end = TRUE; // mark for next call
357 static void free_rbuf( rbuffer_t* r ) {
359 if( r->buffer != NULL ) {
368 // ----- table management ------------------------------------------------------------
371 Run the app list and attempt to open a shunt to any unconnected application.
372 Returns the number of applications that could not be connected to.
374 static int connect2all( master_t* mi ) {
383 for( i = 0; i < mi->napps; i++ ) {
384 if( mi->apps[i].state != CONNECTED ) {
385 fprintf( stderr, "[INF] opening shunt to: %s:%s\n", mi->apps[i].name, mi->apps[i].port );
386 mi->apps[i].shunt = rr_connect( mi->apps[i].name, mi->apps[i].port );
387 if( mi->apps[i].shunt == NULL) {
390 fprintf( stderr, "[INFO] shunt created to: %s:%s\n", mi->apps[i].name, mi->apps[i].port );
391 mi->apps[i].state = CONNECTED;
400 Add an application to the current table.
402 static void add_app( master_t* mi, char* app_name ) {
407 if( mi == NULL || app_name == NULL ) {
411 if( mi->napps > MAX_APPS ) {
412 fprintf( stderr, "[WARN] too many applications, ignoring: %s\n", app_name );
415 if( (ch = strchr( app_name, ':' )) == NULL ) { // assume we are using the default rm listen port
416 snprintf( wbuf, sizeof( wbuf ), "%d", mi->port );
419 *(ch++) = 0; // name port given, split and point at port
423 mi->apps[mi->napps].name = strdup( app_name );
424 mi->apps[mi->napps].port = strdup( app_port );
425 mi->apps[mi->napps].state = !CONNECTED;
431 Initialise things; returns a master info context.
433 static master_t* init( char* cfname ) {
436 rbuffer_t* rb; // record manager for reading config file
442 table_t* table = NULL;
443 rrgroup_t* rrg = NULL;
444 entry_t* entry = NULL;
447 mi = (master_t *) malloc( sizeof( *mi ) );
451 memset( mi, 0, sizeof( *mi ) );
454 rb = f2r( cfname ); // get a record buffer to parse the config file
456 fprintf( stderr, "[FAIL] unable to open config file: %s: %s\n", cfname, strerror( errno ) );
461 while( (rec = next_rec( rb )) != NULL ) {
464 tokens = tokenise( rec, ' ' );
466 fprintf( stderr, "parsing %d: %s\n", rec_num, rec );
468 for( i = 0; i < tokens->ntoks && *tokens->tokens[i] != '#'; i++ ); // simple comment strip
471 if( tokens->ntoks > 0 ) {
472 tok = tokens->tokens[0];
473 switch( *tok ) { // faster jump table based on 1st ch; strcmp only if needed later
478 if( table != NULL && table->nentries < MAX_ENTRIES ) {
479 entry = &table->entries[table->nentries++];
480 entry->subid = -1; // no subscription id if user omits
482 fprintf( stderr, "[ERR] @%d no table started, or table full\n", rec_num );
487 if( entry != NULL ) {
488 entry->mtype = atoi( tokens->tokens[1] );
490 fprintf( stderr, "[ERR] @%d no entry started\n", rec_num );
495 if( tokens->ntoks > 1 ) {
496 mi->port = atoi( tokens->tokens[1] );
497 if( mi->port < 1000 ) {
498 fprintf( stderr, "[WRN] @%d assigned default xAPP port smells fishy: %s\n", rec_num, tokens->tokens[1] );
503 case 'r': // round robin group
504 if( entry != NULL && entry->ngroups < MAX_GROUPS ) {
505 if( tokens->ntoks < MAX_RRG_SIZE ) {
506 rrg = &entry->groups[entry->ngroups++];
508 for( i = 1; i < tokens->ntoks; i++ ) {
509 rrg->apps[rrg->napps++] = strdup( tokens->tokens[i] );
512 fprintf( stderr, "[ERR] @%d round robin group too big.\n", rec_num );
515 fprintf( stderr, "[ERR] @%d no previous entry, or entry is full\n", rec_num );
520 if( *(tok+1) == 'e' ) { // send2
521 if( table != NULL && tokens->ntoks < MAX_SEND2 ) {
522 table->first_app = mi->napps;
524 for( i = 1; i < tokens->ntoks; i++ ) {
525 add_app( mi, tokens->tokens[i] );
528 table->napps = tokens->ntoks - 1;
531 if( entry != NULL ) {
532 entry->subid = atoi( tokens->tokens[1] );
534 fprintf( stderr, "[ERR] @%d no entry started\n", rec_num );
541 if( mi->ntables < MAX_TABLES ) {
542 table = &mi->tables[mi->ntables++];
544 fprintf( stderr, "[ERR] @%d too many tables defined\n", rec_num );
550 fprintf( stderr, "record from config was ignored: %s\n", rec );
563 Build a buffer with the entry n from table t. Both entry and table
565 Caller must free returned buffer.
567 static char* mk_entry( master_t* mi, int table, int entry ) {
577 if( !mi || mi->ntables <= table ) {
581 tab = &mi->tables[table];
582 if( tab->nentries <= entry ) {
586 ent = &tab->entries[entry];
588 snprintf( wbuf, sizeof( wbuf ), "mse | %d | %d | ", ent->mtype, ent->subid ); // we only generate mse records
590 len = strlen( wbuf );
591 for( i = 0; i < ent->ngroups; i++ ) {
593 strcat( wbuf, "; " );
596 for( j = 0; j < ent->groups[i].napps; j++ ) {
597 alen = strlen( ent->groups[i].apps[j] ) + 3; // not percise, but close enough for testing
598 if( alen + len > sizeof( wbuf ) ) {
599 fprintf( stderr, "[ERR] entry %d for table %d is too large to format\n", entry, table );
607 strcat( wbuf, ent->groups[i].apps[j] );
611 strcat( wbuf, "\n" );
612 return strdup( wbuf );
616 Sends a buffer to all apps in the range.
618 void send2range( master_t* mi, char* buf, int first, int n2send ) {
619 int a; // application offset in master array
620 int last; // stopping point (index)
621 rr_mbuf_t* mbuf = NULL;
623 if( !mi ) { // safe to dance
628 last = first + n2send;
630 mbuf = rr_new_buffer( mbuf, strlen( buf ) + 5 ); // ensure buffer is large enough
632 fprintf( stderr, "%s ", mi->apps[a].name );
634 memcpy( mbuf->payload, buf, strlen( buf ) );
635 mbuf->used = strlen( buf );
636 mbuf = rr_send( mi->apps[a].shunt, mbuf, ALLOC_NEW );
640 fprintf( stderr, "\n" );
643 rr_free_mbuf( mbuf );
647 Formats the entries for the table and sends to all applications that the
648 table should be sent to per the send2 directive in the config.
650 static void send_table( master_t* mi, int table ) {
652 char* ent; // entry string to send
653 int e = 0; // entry number
656 rr_mbuf_t* mbuf = NULL;
658 if( !mi || table > mi->ntables ) {
662 tab = &mi->tables[table];
663 fprintf( stderr, "[INF] send table start message: " );
664 send2range( mi, "newrt | start\n", tab->first_app, tab->napps ); // send table start req to all
666 while( (ent = mk_entry( mi, table, e++ )) != NULL ) { // build each entry once, send to all
667 fprintf( stderr, "[INF] sending table %d entry %d: ", table, e );
668 send2range( mi, ent, tab->first_app, tab->napps );
671 fprintf( stderr, "[INF] send table end message: " );
672 send2range( mi, "newrt | end\n", tab->first_app, tab->napps ); // send table end notice to all
676 Run the list of tables and send them all out.
678 static void send_all_tables( master_t* mi ) {
685 for( i = 0; i < mi->ntables; i++ ) {
691 // ----------------- testing ----------------------------------------------------------------------
694 Dump table entries in the form we'll send to apps to stderr.
696 static void print_tables( master_t* mi ) {
705 for( t = 0; t < mi->ntables; t++ ) {
706 fprintf( stderr, "=== table %d ===\n", t );
708 while( (ent = mk_entry( mi, t, e++ )) != NULL ) {
709 fprintf( stderr, "%s", ent );
715 static void print_tokens( tokens_t* tokens ) {
718 fprintf( stderr, "there are %d tokens\n", tokens->ntoks );
719 for( i = 0; i < tokens->ntoks; i++ ) {
720 fprintf( stderr, "[%02d] (%s)\n", i, tokens->tokens[i] );
724 static void self_test( char* fname ) {
725 char* s0 = " Now is the time for all to stand up and cheer!";
726 char* s1 = " Now is \"the time for all\" to stand up and cheer!";
727 char* s2 = " field1 | field2||field4|field5";
732 tokens = tokenise( s0, ' ' );
733 if( tokens->ntoks != 11 ) {
734 fprintf( stderr, "didn't parse into 11 tokens (got %d): %s\n", tokens->ntoks, s1 );
736 print_tokens( tokens );
738 tokens = tokenise( s1, ' ' );
739 if( tokens->ntoks != 8 ) {
740 fprintf( stderr, "didn't parse into 11 tokens (got %d): %s\n", tokens->ntoks, s1 );
742 print_tokens( tokens );
744 tokens = tokenise( s2, '|' );
745 if( tokens->ntoks != 5 ) {
746 fprintf( stderr, "didn't parse into 5 tokens (got %d): %s\n", tokens->ntoks, s2 );
748 print_tokens( tokens );
750 free_tokens( tokens );
752 if( fname == NULL ) {
758 fprintf( stderr, "[FAIL] couldn't read file into rbuffer: %s\n", strerror( errno ) );
760 while( (s2 = next_rec( rb )) != NULL ) {
761 fprintf( stderr, "record: (%s)\n", s2 );
771 // ----------------------------------------------------------------------------------------------
773 int main( int argc, char** argv ) {
778 if( strcmp( argv[1], "selftest" ) == 0 ) {
779 self_test( argc > 1 ? argv[2] : NULL );
783 mi = init( argv[1] ); // parse the config and generate table structs
785 fprintf( stderr, "[CRI] initialisation failed\n" );
791 while( (not_ready = connect2all( mi ) ) > 0 ) {
792 fprintf( stderr, "[INF] still waiting to connect to %d applications\n", not_ready );
796 fprintf( stderr, "[INF] connected to all applications, sending tables\n" );
798 send_all_tables( mi );
800 fprintf( stderr, "[INFO] usage: %s file-name\n", argv[0] );