Merge "enhance(test): Add route generation simulator"
authorMatti Hiltunen <hiltunen@att.com>
Mon, 17 Jun 2019 17:01:00 +0000 (17:01 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Mon, 17 Jun 2019 17:01:00 +0000 (17:01 +0000)
test/rtg_sim/BUILD [new file with mode: 0644]
test/rtg_sim/CMakeLists.txt [new file with mode: 0644]
test/rtg_sim/README [new file with mode: 0644]
test/rtg_sim/req_resp.c [new file with mode: 0644]
test/rtg_sim/req_resp.h [new file with mode: 0644]
test/rtg_sim/rtm_sim.c [new file with mode: 0644]

diff --git a/test/rtg_sim/BUILD b/test/rtg_sim/BUILD
new file mode 100644 (file)
index 0000000..0f2feee
--- /dev/null
@@ -0,0 +1,38 @@
+#
+#==================================================================================
+#       Copyright (c) 2019 Nokia
+#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+To build the rtm_sim application, the expected CMake "process" is followed
+and described below:
+
+       mkdir .build
+       cd .build
+       cmake ..
+       make package
+
+
+This will generate at least a .deb, and if the underlying system can support
+it an RPM package as well.
+
+
+Once installed, the rtm_sim application can be started from the command line:
+       rtm_sim config-file-name
+
+The config file defines the table(s) which are to be constructed and distributed
+to applications.
+
diff --git a/test/rtg_sim/CMakeLists.txt b/test/rtg_sim/CMakeLists.txt
new file mode 100644 (file)
index 0000000..6753dd3
--- /dev/null
@@ -0,0 +1,77 @@
+
+#
+#==================================================================================
+#      Copyright (c) 2019 Nokia
+#      Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# simulate the route table manager
+project( rtm_sim LANGUAGES C )
+cmake_minimum_required( VERSION 3.5 )
+
+set( major_version "1" )
+set( minor_version "0" )
+set( patch_level "1" )
+
+set( CMAKE_POSITION_INDEPENDENT_CODE ON )
+set( CMAKE_CXX_FLAGS "-g -Wall -I /usr/local/include" )
+
+find_library( nanomsg libnanomsg.a )
+
+find_program( rpm NAMES rpmbuild )                                     # rpm package gen requires this to be installed
+
+if( "${rpm}" MATCHES "rpm-NOTFOUND" )                          # cannot build rpm
+       set( pkg_list "DEB" )
+       message( "+++ `make package` will generate only deb package; cannot find support to generate rpm packages" )
+else()
+       set( pkg_list "DEB;RPM" )
+       message( "+++ `make package` will generate both deb and rpm packages" )
+endif()
+
+add_executable( rtm_sim rtm_sim.c )
+target_link_libraries( rtm_sim nanomsg )
+
+install( 
+       TARGETS rtm_sim
+       DESTINATION /usr/local/bin
+)
+
+IF( EXISTS "${CMAKE_ROOT}/Modules/CPack.cmake" )
+       include( InstallRequiredSystemLibraries )
+
+       set( CPACK_set_DESTDIR "on" )
+       set( CPACK_PACKAGING_INSTALL_PREFIX "${install_root}" )
+       set( CPACK_GENERATOR ${pkg_list} )
+
+       set( CPACK_PACKAGE_DESCRIPTION "Simplistic route table manager simulator." )
+       set( CPACK_PACKAGE_DESCRIPTION_SUMMARY "RT manager simulation" )
+       set( CPACK_PACKAGE_VENDOR "None" )
+       set( CPACK_PACKAGE_CONTACT "None" )
+       set( CPACK_PACKAGE_VERSION_MAJOR "${major_version}" )
+       set( CPACK_PACKAGE_VERSION_MINOR "${minor_version}" )
+       set( CPACK_PACKAGE_VERSION_PATCH "${patch_level}" )
+       set( CPACK_PACKAGE_FILE_NAME "rtm_sim-${major_version}.${minor_version}.${patch_level}-${CMAKE_SYSTEM_PROCESSOR}" )
+
+       # we build and ship the libraries, so there is NO dependency
+       set( CPACK_DEBIAN_PACKAGE_DEPENDS "libnanomsg0 (>=0.4)" )
+
+       set( CPACK_DEBIAN_PACKAGE_PRIORITY "optional" )
+       set( CPACK_DEBIAN_PACKAGE_SECTION "test" )
+       set( CPACK_DEBIAN_ARCHITECTURE ${CMAKE_SYSTEM_PROCESSOR} )
+       set( CPACK_RPM_ARCHITECTURE ${CMAKE_SYSTEM_PROCESSOR} )
+
+       INCLUDE( CPack )
+ENDIF()
diff --git a/test/rtg_sim/README b/test/rtg_sim/README
new file mode 100644 (file)
index 0000000..2ab2b7c
--- /dev/null
@@ -0,0 +1,36 @@
+#
+#==================================================================================
+#       Copyright (c) 2019 Nokia
+#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+The rtm_sim is an extremely simple application which reads a configuration
+file, builds one or more route tables, and sends the table(s) to the
+indicated applications.  The applications are assumed to be RMr based
+applications which are listening either on a well known port (4561) for
+table updates, or on a port which is indicated in the config file.
+
+The sim is _not_ intended to be a complete replacement for any route
+table mangager, but simply allows the ability to:
+
+       1) ensure that the RMr based application can receive real-time
+          table updates (thus is configured correctly with the needed
+          port properly exposed via the container environment if
+          applicable).
+
+       2) allow the generation of one or more route tables from a central
+          point during testing without having to run a more complicated
+          route manager for basic testing.
diff --git a/test/rtg_sim/req_resp.c b/test/rtg_sim/req_resp.c
new file mode 100644 (file)
index 0000000..9d58472
--- /dev/null
@@ -0,0 +1,352 @@
+// :vi ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       req_rep.c
+       Abstract:       A "library" module which allows a programme to easily be a requestor
+                               or replier.  Some functions are compatable with publishing (mbuf
+                               allocation and management).  Underlying we use the NN_PAIR and NOT
+                               the req/rep model as that model is an inflexible, lock step, exchange
+                               which does not lend well for a request that results in more than one
+                               response messages, or no response.
+
+                               The user must be aware that once a session is established on the
+                               host:port listener, another session will not be accepted until the
+                               first is terminated; nano makes no provision for multiple concurrent
+                               sesssions with either the PAIR or REQ/RESP models.
+
+                               We also support starting the publisher socket as the buffer and
+                               send functions can be used for the publisher too.
+
+                               CAUTION:  this is based on nanomsg, not NNG. The underlying protocols
+                                       are compatable, and because NNG has an emulation mode it is possible
+                                       to link successsfully with the nng library, BUT that will not
+                                       work here.   Link only with nanomsg.
+
+       Date:           18 January 2018
+       Author:         E. Scott Daniels
+
+*/
+
+#include <ctype.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <errno.h>
+#include <stdint.h>
+
+#include <nanomsg/nn.h>
+#include <nanomsg/pair.h>
+#include <nanomsg/pipeline.h>
+#include <nanomsg/pubsub.h>
+
+#include "req_resp.h"
+
+#define NULL_SOCKET 0          // fluff that is treated like a nil pointer check by coverage checker
+
+
+/*
+       Connect to the host as a requestor. returns context if
+       successful.
+*/
+extern void* rr_connect( char* host, char* port ) {
+       rr_ctx_t* ctx = NULL;
+       char    wbuf[1024];
+       int             state;
+
+       if( host == NULL || port == NULL ) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       ctx = (rr_ctx_t *) malloc( sizeof *ctx );
+       if( ctx == NULL ) {
+               errno = ENOMEM;
+               return NULL;
+       }
+
+       //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR );
+       ctx->nn_sock = nn_socket( AF_SP, NN_PUSH );
+       if( ctx->nn_sock < NULL_SOCKET ) {
+               free( ctx );
+               return NULL;
+       }
+       snprintf( wbuf, sizeof( wbuf ), "tcp://%s:%s", host, port );
+       state = nn_connect( ctx->nn_sock, wbuf );
+       if( state < 0 ) {
+               fprintf( stderr, "rr_conn: connect failed: %s: %d %s\n", wbuf, errno, strerror( errno ) );
+               nn_close( ctx->nn_sock );
+               free( ctx );
+               return NULL;
+       }
+
+       //fprintf( stderr, "rr_conn: connect successful: %s\n", wbuf );
+       return (void *) ctx;
+}
+
+
+/*
+       Set up as a listener on any interface with the given port.
+*/
+extern void* rr_start_listening( char* port ) {
+       rr_ctx_t* ctx;
+       char    wbuf[1024];
+       int             state;
+
+       if( port == NULL ) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       ctx = (rr_ctx_t *) malloc( sizeof *ctx );
+       if( ctx == NULL ) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR );
+       ctx->nn_sock = nn_socket( AF_SP, NN_PULL );
+       if( ctx->nn_sock < NULL_SOCKET ) {
+               free( ctx );
+               return NULL;
+       }
+
+       snprintf( wbuf, sizeof( wbuf ), "tcp://0.0.0.0:%s", port );
+       state = nn_bind( ctx->nn_sock, wbuf );
+       if( state < 0 ) {
+               nn_close( ctx->nn_sock );
+               free( ctx );
+               return NULL;
+       }
+
+       return (void *) ctx;
+}
+
+/*
+       Configure and bind the publisher. Port is a string as it's probably read from
+       the command line, so no need to atoi() it for us.  We can use the rr_* functions
+       for message buffers and sending, so we reuse their context rather than define our
+       own.
+
+*/
+extern void*  open_publisher( char*  port ) {
+       rr_ctx_t*       pctx;
+       char            conn_info[1024];
+
+       if( (pctx = (rr_ctx_t *) malloc( sizeof( *pctx )) ) == NULL ) {
+               return NULL;
+       }
+
+    pctx->nn_sock = nn_socket( AF_SP, NN_PUB );                // publishing socket
+    if( pctx->nn_sock < 0 ) {
+        fprintf( stderr, "[CRI] unable to open publish socket: %s\n", strerror( errno ) );
+               free( pctx );
+        return NULL;
+    }
+
+       snprintf( conn_info, sizeof( conn_info ), "tcp://0.0.0.0:%s", port );                   // listen on any interface
+    if( nn_bind( pctx->nn_sock, conn_info ) < 0) {                                                                     // bind and automatically accept client sessions
+        fprintf (stderr, "[CRI] unable to bind publising port: %s: %s\n", port, strerror( errno ) );
+        nn_close ( pctx->nn_sock );
+               free( pctx );
+        return NULL;
+    }
+
+       return (void *) pctx;
+}
+
+extern rr_mbuf_t* rr_new_buffer( rr_mbuf_t* mb, int len ) {
+
+       if( ! mb ) {
+               mb = (rr_mbuf_t *) malloc( sizeof( *mb ) );
+               mb->size = len;
+               mb->payload = NULL;
+       } else {
+               if( mb->size < len ) {                                  // if requested len is larger than current payload
+                       nn_freemsg( mb->payload );
+                       mb->payload = NULL;
+               } else {
+                       len = mb->size;
+               }
+       }
+       mb->used = 0;
+
+       if( len > 0 && !mb->payload ) {                                                         // allow a payloadless buffer to be allocated
+               mb->payload = nn_allocmsg( len, 0 );
+       }
+
+       return mb;
+}
+
+/*
+       Closes the currently open session.
+*/
+extern void rr_close( void* vctx ) {
+       rr_ctx_t* ctx;
+
+       if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+               return;
+       }
+
+       if( ctx->nn_sock < NULL_SOCKET ) {
+               return;
+       }
+
+       nn_close( ctx->nn_sock );
+       ctx->nn_sock = -1;
+}
+
+extern void rr_free( void* vctx ) {
+       rr_ctx_t* ctx;
+
+       if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+               return;
+       }
+
+       rr_close( ctx );
+       nn_term();
+       free( ctx );
+}
+
+extern void rr_free_mbuf( rr_mbuf_t* mbuf ) {
+       if( mbuf->payload ) {
+               nn_freemsg( mbuf->payload );
+               mbuf->payload = NULL;
+               mbuf->used = -2;                                // just in case they held a pointer and try to use it
+       }
+
+       free( mbuf );
+}
+
+extern rr_mbuf_t*  rr_receive( void* vctx, rr_mbuf_t* mbuf, int len ) {
+       rr_ctx_t* ctx;
+
+       if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               return NULL;
+       }
+       if( ctx->nn_sock < 0 ) {
+               errno = ESTALE;                                 // stale/bad socket fd
+               return NULL;
+       }
+
+       mbuf = rr_new_buffer( mbuf, len );
+       if( mbuf == NULL ) {
+               return NULL;
+       }
+
+       *mbuf->payload = 0;
+       if( (mbuf->used = nn_recv( ctx->nn_sock, mbuf->payload, mbuf->size, 0 )) > 0 ) {
+               errno = 0;                                              // nano doesn't seem to clear errno here
+       }
+       return mbuf;
+}
+
+extern rr_mbuf_t* rr_send( void* vctx, rr_mbuf_t* mbuf, int alloc_buf ) {
+       rr_ctx_t* ctx;
+       int len;
+       int state;
+
+       if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       if( ctx->nn_sock < 0 ) {
+               errno = ESTALE;                                 // stale/bad socket fd
+               return NULL;
+       }
+
+       if( ! mbuf ) {
+               errno = ENOBUFS;                        // not quite right, but close enough
+               return NULL;
+       }
+
+       if( ! mbuf->payload ) {                 // no payload????
+               errno = EFAULT;                         // nil is a bad address after all :)
+               return mbuf;
+       }
+
+       errno = 0;
+       //fprintf( stderr, "rrsend is sending %d bytes....\n", mbuf->used );
+       if( (state = nn_send( ctx->nn_sock, &mbuf->payload, NN_MSG, 0 )) > 0 ) {
+               //fprintf( stderr, "send ok to %d:  %d %s\n", ctx->nn_sock, state, strerror( errno ) );
+               mbuf->used = 0;
+               if( alloc_buf ) {
+                       mbuf->payload = nn_allocmsg( mbuf->size, 0 );                                   // allocate the next send buffer
+               } else {
+                       mbuf->payload = NULL;
+                       mbuf->used = -1;
+               }
+
+               errno = 0;
+       } else {
+               fprintf( stderr, "send failed %d %s\n", state, strerror( errno ) );
+       }
+
+       return mbuf;
+}
+
+/*
+       Set the receive timeout to time. If time >100 we assume the time is milliseconds,
+       else we assume seconds. Setting -1 is always block.
+       Returns the nn value (0 on success <0 on error).
+*/
+extern int rr_rcv_to( void* vctx, int time ) {
+       rr_ctx_t* ctx;
+
+       if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       if( time > 0 ) {
+               if( time < 100 ) {
+                       time = time * 1000;                     // assume seconds, nn wants ms
+               }
+       }
+
+       return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
+}
+
+/*
+       Set the send timeout to time. If time >100 we assume the time is milliseconds,
+       else we assume seconds. Setting -1 is always block.
+       Returns the nn value (0 on success <0 on error).
+*/
+extern int rr_send_to( void* vctx, int time ) {
+       rr_ctx_t* ctx;
+
+       if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       if( time > 0 ) {
+               if( time < 100 ) {
+                       time = time * 1000;                     // assume seconds, nn wants ms
+               }
+       }
+
+       return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
+}
+
diff --git a/test/rtg_sim/req_resp.h b/test/rtg_sim/req_resp.h
new file mode 100644 (file)
index 0000000..0473c8b
--- /dev/null
@@ -0,0 +1,60 @@
+// :vi ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+#ifndef _req_resp_h
+#define _req_resp_h
+
+/*
+       A message buffer that can be used for either req/resp context setup
+       or for publishing in a pub/sub setup.
+*/
+typedef struct rr_mbuf {
+       int     used;                   // bytes actually used
+       int size;                       // allocated size of payload
+       char*   payload;
+} rr_mbuf_t;
+
+
+/*
+       A 'context' to interface with nano; so far very simple, but could
+       expand, so we use the struct.
+*/
+typedef struct rr_ctx {
+       int     nn_sock;
+} rr_ctx_t;
+
+
+// ---- prototypes for the rr library ----------------------
+// vctx is the pointer returned by the connect or start listening functions
+// and is passed to nearly all other functions.
+
+extern void* rr_connect( char* host, char* port );
+extern void* rr_start_listening( char* port );
+extern rr_mbuf_t* rr_new_buffer(  rr_mbuf_t* mb, int len );
+extern void rr_close( void* vctx );
+extern void rr_free( void* vctx );
+extern void rr_free_mbuf( rr_mbuf_t* mbuf );
+extern void*  open_publisher( char*  port );
+extern int rr_rcv_to( void* vctx, int time );
+extern int rr_send_to( void* vctx, int time );
+extern rr_mbuf_t*  rr_receive( void* vctx, rr_mbuf_t* mbuf, int len );
+extern rr_mbuf_t* rr_send( void* vctx, rr_mbuf_t* mbuf, int alloc_buf );
+
+#endif
diff --git a/test/rtg_sim/rtm_sim.c b/test/rtg_sim/rtm_sim.c
new file mode 100644 (file)
index 0000000..c52bb2d
--- /dev/null
@@ -0,0 +1,803 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       rtm_sim.c
+       Abstract:       This is a simple route manager simulation which provides the ability
+                               to push a route table into one or more xAPPs. Designed just to
+                               drive the internal RMr route table collector outside of the static
+                               file allowing for testing of port definition in some containerised
+                               environments.
+
+                               This application does not persist; it generates a set of tables based
+                               on the config file, connects to all applications listed, distributes
+                               the table, and exits.  If periodic delivery of one or more different
+                               configurations needs to be executed, use a shell script to wrap this
+                               application in a loop.
+
+       Date:           14 June 2019
+       Author:         E. Scott Daniels
+*/
+
+/*
+config file format:
+       #       comment and blank lines allowed
+       #       trailing comments allowed
+
+       # port is used for any app listed in send2 which does not have a trailing :port
+       # it may be supplied as a different value before each table, and if not
+       # redefined applies to all subsequent tables.
+       #
+
+       # A table consists of a send2 list (app[:port]) which are the applications that will
+       # receive the table. Each table may contain one or more entries.  Entries define
+       # the message type and subscription ID, along with one or more round robin groups.
+       # A rrgroup is one or more app:port "endpoints" which RMr will use when sending
+       # messages of the indicated type/subid.  Port on a rrgroup is rquired and is the
+       # port that the application uses for app to app communications.
+       #
+
+       port:   xapp-rtg-listen-port    # 4561 default
+       table:
+               send2: app1:port app2:port ... appn:port
+               entry:
+                       mtype:  n
+                       subid:  n
+                       rrgroup:        app:port... app:port
+
+               entry:
+                       mtype: n
+                       subid: n
+                       rrgroup: app:port ... app:port
+*/
+
+
+#include <ctype.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include  <time.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include "req_resp.c"          // simple nano interface for request/response connections
+
+
+#define CONNECTED      1               // we've established a shunt connection to the app
+
+#define TRUE           1
+#define FALSE          0
+
+#define ALLOC_NEW      1               // rrsend should allocate a new buffer
+
+#define MAX_TABLES     16              // total tables we support
+#define MAX_SEND2      64              // max number of apps that a table can be sent to
+#define MAX_APPS       1024    // max total apps defined (tables * send2)
+#define MAX_GROUPS     64              // max num of round robin groups per entry
+#define MAX_RRG_SIZE 64                // max number of apps in a group
+#define MAX_ENTRIES    256             // max entries in a table
+#define MAX_TOKENS     512             // max tokens we'll break a buffer into
+
+
+// ---------------------------------------------------------------------------------------
+
+/*
+       Things we need to track for an application.
+*/
+typedef struct app {
+       void*   shunt;                  // the rr context to shunt directly to an app
+       int             state;                  // connected or not
+       char*   name;                   // IP address or DNS name and port for connecting
+       char*   port;                   // rr wants two strings as it builds it's own NN string
+} app_t;
+
+
+
+// ----- table stuff (very staticly sized, but this isn't for prod) ----------------------
+/*
+       A round robin group in a table entry
+*/
+typedef struct rrgroup {
+       int napps;                                              // number of apps
+       char*   apps[MAX_RRG_SIZE];
+} rrgroup_t;
+
+/*
+       A single table entry.
+*/
+typedef struct entry {
+       int             mtype;                                  // entry message type
+       int             subid;                                  // entry sub id
+       int             ngroups;
+       rrgroup_t       groups[MAX_GROUPS];     // the entry's groups
+} entry_t;
+
+/*
+       Defines a table which we will distribute.
+*/
+typedef struct table {
+       int             napps;                                          // number of apps this table is sent to
+       int             first_app;                                      // first app in minfo that we send to
+       int             nentries;                                       // number of entries
+       entry_t entries[MAX_ENTRIES];
+} table_t;
+
+/*
+       Master set of contextual information.
+*/
+typedef struct master {
+       int             napps;                          // number in use (next insert point)
+       int             ntables;
+       int             port;                           // the port applications open by default for our connections
+       app_t   apps[MAX_APPS];
+       table_t tables[MAX_TABLES];
+} master_t;
+
+/*
+       Record buffer; file in memory which can be iterated over a record at a time.
+*/
+typedef struct rbuffer {
+       char*   buffer;                         // stuff read from file
+       char*   rec;                            // next record
+       int             at_end;                         // true if end was reached
+} rbuffer_t;
+
+/*
+       Set of tokens.
+*/
+typedef struct tokens {
+       char*   buffer;                                 // buffer that tokens points into
+       int             ntoks;                                  // number of tokens in tokens
+       char*   tokens[MAX_TOKENS];             // pointers into buffer at the start of each token 0..ntokens-1
+} tokens_t;
+
+
+// ----- token utilities ------------------------------------------------------------
+
+/*
+       Frees a token manager.
+*/
+static void free_tokens( tokens_t* t ) {
+       if( t == NULL ) {
+               return;
+       }
+
+       free( t );
+}
+
+/*
+       Simple tokeniser.  If sep is whitespace, then leading whitespace (all, not just
+       sep) is ignored; if sep is not whitespace, then leadign whitespace is included
+       in the first token.  If sep is whitespace, consecutive instances of whitespace
+       are treated as a single seperator:
+               if sep given as space, then
+               "bug boo" and "bug     boo"  both generate two tokens: (bug) (boo)
+
+               if sep given as pipe (|), then
+               "bug||boo"   generates three tokens:  (bug), (), (boo)
+
+       Each token is a zero terminated string.
+
+*/
+static tokens_t* tokenise( char* buf, char sep ) {
+       tokens_t*       t;
+       int                     i;
+       char            end_sep;                // if quoted endsep will be the quote mark
+
+       if( !buf || !(*buf) ) {
+               return NULL;
+       }
+
+       t = (tokens_t *) malloc( sizeof( *t ) );
+       memset( t, 0, sizeof( *t ) );
+
+       t->buffer = strdup( buf );
+       buf = t->buffer;                                        // convenience
+
+       if( isspace( sep ) ) {                                                                          // if sep is in whitespace class
+               while( buf != NULL && *buf && isspace( *buf ) ) {               // pass over any leading whitespace
+                       buf++;
+               }
+
+               for( i = 0; i < strlen( buf ); i++ ) {
+                       if( buf[i] == '\t' ) {
+                               buf[i] = ' ';
+                       }
+               }
+       }
+
+       while( buf != NULL && t->ntoks < MAX_TOKENS && *buf  ) {
+               if( *buf == '"' ) {
+                       end_sep = '"';
+                       buf++;
+
+               } else {
+                       end_sep = sep;
+               }
+
+               t->tokens[t->ntoks++] = buf;                                            // capture token start
+
+               if( (buf = strchr( buf, end_sep )) != NULL ) {                  // find token end
+                       *(buf++) = 0;
+
+                       if( end_sep != sep ) {
+                               buf++;
+                       }
+
+                       if( isspace( sep ) ) {                          // treat consec seperators as one if sep is whitespace
+                               while( *buf == sep ) {
+                                       buf++;
+                               }
+                       }
+               }
+       }
+
+       return t;
+}
+
+// ----- file/record management utilities ------------------------------------------------------------
+
+/*
+       Read an entire file into a single buffer.
+*/
+static char* f2b( char* fname ) {
+       struct stat     stats;
+       off_t           fsize = 8192;   // size of the file
+       off_t           nread;                  // number of bytes read
+       int                     fd;
+       char*           buf;                    // input buffer
+
+       if( (fd = open( fname, O_RDONLY )) >= 0 ) {
+               if( fstat( fd, &stats ) >= 0 ) {
+                       if( stats.st_size <= 0 ) {                                      // empty file
+                               close( fd );
+                               fd = -1;
+                       } else {
+                               fsize = stats.st_size;                                          // stat ok, save the file size
+                       }
+               } else {
+                       fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
+               }
+       }
+
+       if( fd < 0 ) {                                                                                  // didn't open or empty
+               if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
+                       return NULL;
+               }
+
+               *buf = 0;
+               return buf;
+       }
+
+       if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
+               close( fd );
+               errno = ENOMEM;
+               return NULL;
+       }
+
+       nread = read( fd, buf, fsize );
+       if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
+               free( buf );
+               errno = EFBIG;                                                                                  // likely too much to handle
+               close( fd );
+               return NULL;
+       }
+
+       buf[nread] = 0;
+
+       close( fd );
+       return buf;
+}
+
+/*
+       Read a file into a buffer, and set a record buffer to manage it.
+*/
+static rbuffer_t* f2r( char* fname ) {
+       char*           raw;                    // raw buffer
+       rbuffer_t*      r;
+
+       if( (raw = f2b( fname )) == NULL ) {
+               return NULL;
+       }
+
+       r = (rbuffer_t *) malloc( sizeof( *r ) );
+       memset( r, 0, sizeof( *r ) );
+       r->buffer = raw;
+       r->rec = raw;                                                   // point at first (only) record
+}
+
+/*
+       Return a pointer to the next record in the buffer, or nil if at
+       end of buffer.
+*/
+static char* next_rec( rbuffer_t* r ) {
+       char*   rec;
+
+       if( !r || r->at_end ) {
+               return NULL;
+       }
+
+       rec = r->rec;
+       r->rec = strchr( r->rec, '\n' );
+       if( r->rec ) {
+               *r->rec = 0;
+               r->rec++;
+               if( *r->rec == 0 ) {
+                       r->at_end = TRUE;
+               }
+       } else {
+               r->at_end = TRUE;                               // mark for next call
+       }
+
+       return rec;
+}
+
+static void free_rbuf( rbuffer_t* r ) {
+       if( r != NULL ) {
+               if( r->buffer != NULL ) {
+                       free( r->buffer );
+               }
+
+               free( r );
+       }
+}
+
+
+// ----- table management ------------------------------------------------------------
+
+/*
+       Run the app list and attempt to open a shunt to any unconnected application.
+       Returns the number of applications that could not be connected to.
+*/
+static int connect2all( master_t* mi ) {
+       int     errors = 0;
+       int i;
+
+
+       if( mi == NULL ) {
+               return 1;
+       }
+
+       for( i = 0; i < mi->napps; i++ ) {
+               if( mi->apps[i].state != CONNECTED ) {
+                       fprintf( stderr, "[INF] opening shunt to: %s:%s\n", mi->apps[i].name, mi->apps[i].port );
+                       mi->apps[i].shunt = rr_connect( mi->apps[i].name, mi->apps[i].port );
+                       if( mi->apps[i].shunt == NULL) {
+                               errors++;
+                       } else {
+                               fprintf( stderr, "[INFO] shunt created to: %s:%s\n", mi->apps[i].name, mi->apps[i].port );
+                               mi->apps[i].state = CONNECTED;
+                       }
+               }
+       }
+
+       return errors;
+}
+
+/*
+       Add an application to the current table.
+*/
+static void add_app( master_t* mi, char* app_name ) {
+       char    wbuf[256];
+       char*   app_port;
+       char*   ch;
+
+       if( mi == NULL || app_name == NULL ) {
+               return;
+       }
+
+       if( mi->napps > MAX_APPS ) {
+               fprintf( stderr, "[WARN] too many applications, ignoring: %s\n", app_name );
+       }
+
+       if( (ch = strchr( app_name, ':' )) == NULL ) {          // assume we are using the default rm listen port
+               snprintf( wbuf, sizeof( wbuf ), "%d", mi->port );
+               app_port = wbuf;
+       } else {
+               *(ch++) = 0;                                                                    // name port given, split and point at port
+               app_port = ch;
+       }
+
+       mi->apps[mi->napps].name = strdup( app_name );
+       mi->apps[mi->napps].port = strdup( app_port );
+       mi->apps[mi->napps].state = !CONNECTED;
+       mi->napps++;
+}
+
+
+/*
+       Initialise things; returns a master info context.
+*/
+static master_t* init( char* cfname ) {
+       master_t*       mi;
+       char            wbuf[128];
+       rbuffer_t*      rb;                                                     // record manager for reading config file
+       char*           rec;
+       tokens_t*       tokens;
+       int                     i;
+       int                     errors;
+       char*           tok;
+       table_t*        table = NULL;
+       rrgroup_t*      rrg = NULL;
+       entry_t*        entry = NULL;
+       int                     rec_num = 0;
+
+       mi = (master_t *) malloc( sizeof( *mi ) );
+       if( mi == NULL ) {
+               return NULL;
+       }
+       memset( mi, 0, sizeof( *mi ) );
+       mi->port = 4561;
+
+       rb = f2r( cfname );                                             // get a record buffer to parse the config file
+       if( rb == NULL ) {
+               fprintf( stderr, "[FAIL] unable to open config file: %s: %s\n", cfname, strerror( errno ) );
+               free( mi );
+               return NULL;
+       }
+
+       while( (rec = next_rec( rb )) != NULL ) {
+               if( *rec ) {
+                       rec_num++;
+                       tokens = tokenise( rec, ' ' );
+
+                       fprintf( stderr, "parsing %d: %s\n", rec_num, rec );
+
+                       for( i = 0; i < tokens->ntoks && *tokens->tokens[i] != '#'; i++ );                      // simple comment strip
+                       tokens->ntoks = i;
+
+                       if( tokens->ntoks > 0 ) {
+                               tok = tokens->tokens[0];
+                               switch( *tok ) {                                        // faster jump table based on 1st ch; strcmp only if needed later
+                                       case '#':
+                                               break;
+
+                                       case 'e':
+                                               if( table != NULL  && table->nentries < MAX_ENTRIES ) {
+                                                       entry = &table->entries[table->nentries++];
+                                                       entry->subid = -1;                      // no subscription id if user omits
+                                               } else {
+                                                       fprintf( stderr, "[ERR] @%d no table started, or table full\n", rec_num );
+                                               }
+                                               break;
+
+                                       case 'm':
+                                               if( entry != NULL ) {
+                                                       entry->mtype = atoi( tokens->tokens[1] );
+                                               } else {
+                                                       fprintf( stderr, "[ERR] @%d no entry started\n", rec_num );
+                                               }
+                                               break;
+
+                                       case 'p':
+                                               if( tokens->ntoks > 1 ) {
+                                                       mi->port = atoi( tokens->tokens[1] );
+                                                       if( mi->port < 1000 ) {
+                                                               fprintf( stderr, "[WRN] @%d assigned default xAPP port smells fishy: %s\n", rec_num, tokens->tokens[1] );
+                                                       }
+                                               }
+                                               break;
+
+                                       case 'r':                                                       // round robin group
+                                               if( entry != NULL && entry->ngroups < MAX_GROUPS ) {
+                                                       if( tokens->ntoks < MAX_RRG_SIZE ) {
+                                                               rrg = &entry->groups[entry->ngroups++];
+
+                                                               for( i = 1; i < tokens->ntoks; i++ ) {
+                                                                       rrg->apps[rrg->napps++] = strdup( tokens->tokens[i] );
+                                                               }
+                                                       } else {
+                                                               fprintf( stderr, "[ERR] @%d round robin group too big.\n", rec_num );
+                                                       }
+                                               } else {
+                                                       fprintf( stderr, "[ERR] @%d no previous entry, or entry is full\n", rec_num );
+                                               }
+                                               break;
+
+                                       case 's':
+                                               if( *(tok+1) == 'e' ) {                 // send2
+                                                       if( table != NULL && tokens->ntoks < MAX_SEND2  ) {
+                                                               table->first_app = mi->napps;
+
+                                                               for( i = 1; i < tokens->ntoks; i++ ) {
+                                                                       add_app( mi, tokens->tokens[i] );
+                                                               }
+
+                                                               table->napps = tokens->ntoks - 1;
+                                                       }
+                                               } else {                                                // subid
+                                                       if( entry != NULL ) {
+                                                               entry->subid = atoi( tokens->tokens[1] );
+                                                       } else {
+                                                               fprintf( stderr, "[ERR] @%d no entry started\n", rec_num );
+                                                       }
+                                               }
+                                               break;
+
+                                       case 't':
+                                               entry = NULL;
+                                               if( mi->ntables < MAX_TABLES ) {
+                                                       table = &mi->tables[mi->ntables++];
+                                               } else {
+                                                       fprintf( stderr, "[ERR] @%d too many tables defined\n", rec_num );
+                                                       table = NULL;
+                                               }
+                                               break;
+
+                                       default:
+                                               fprintf( stderr, "record from config was ignored: %s\n", rec );
+                                               break;
+                               }
+                       }
+               }
+       }
+
+       free_rbuf( rb );
+       return mi;
+}
+
+
+/*
+       Build a buffer with the entry n from table t. Both entry and table
+       numbers are 0 based.
+       Caller must free returned buffer.
+*/
+static char* mk_entry( master_t* mi, int table, int entry ) {
+       char            wbuf[4096];
+       char            sbuf[256];
+       table_t*        tab;
+       entry_t*        ent;
+       int                     i;
+       int                     j;
+       int                     len;
+       int                     alen;
+
+       if( !mi || mi->ntables <= table ) {
+               return NULL;
+       }
+
+       tab = &mi->tables[table];
+       if( tab->nentries <= entry ) {
+               return NULL;
+       }
+
+       ent = &tab->entries[entry];
+
+       snprintf( wbuf, sizeof( wbuf ), "mse | %d | %d | ", ent->mtype, ent->subid  );                  // we only generate mse records
+
+       len = strlen( wbuf );
+       for( i = 0; i < ent->ngroups; i++ ) {
+               if( i ) {
+                       strcat( wbuf, "; " );
+               }
+
+               for( j = 0; j < ent->groups[i].napps; j++ ) {
+                       alen = strlen( ent->groups[i].apps[j] ) + 3;                            // not percise, but close enough for testing
+                       if( alen + len > sizeof( wbuf ) ) {
+                               fprintf( stderr, "[ERR] entry %d for table %d is too large to format\n", entry, table );
+                               return NULL;
+                       }
+
+                       if( j ) {
+                               strcat( wbuf, "," );
+                       }
+
+                       strcat( wbuf, ent->groups[i].apps[j] );
+               }
+       }
+
+       strcat( wbuf, "\n" );
+       return strdup( wbuf );
+}
+
+/*
+       Sends a buffer to all apps in the range.
+*/
+void send2range( master_t* mi, char* buf, int first, int n2send ) {
+       int                     a;                                              // application offset in master array
+       int                     last;                                   // stopping point (index)
+       rr_mbuf_t*      mbuf = NULL;
+
+       if( !mi  ) {                                                                                    // safe to dance
+               return;
+       }
+
+       a = first;
+       last = first + n2send;
+
+       mbuf = rr_new_buffer( mbuf, strlen( buf ) + 5 );                // ensure buffer is large enough
+       while(  a < last ) {
+               fprintf( stderr, "%s ", mi->apps[a].name );
+
+               memcpy( mbuf->payload, buf, strlen( buf ) );
+               mbuf->used = strlen( buf );
+               mbuf = rr_send( mi->apps[a].shunt, mbuf, ALLOC_NEW );
+
+               a++;
+
+               fprintf( stderr, "\n" );
+       }
+
+       rr_free_mbuf( mbuf );
+}
+
+/*
+       Formats the entries for the table and sends to all applications that the
+       table should be sent to per the send2 directive in the config.
+*/
+static void send_table( master_t* mi, int table ) {
+       table_t*        tab;
+       char*           ent;                            // entry string to send
+       int                     e = 0;                          // entry number
+       int                     a;
+       int                     last;
+       rr_mbuf_t*      mbuf = NULL;
+
+       if( !mi || table > mi->ntables ) {
+               return;
+       }
+
+       tab = &mi->tables[table];
+       fprintf( stderr, "[INF] send table start message: " );
+       send2range( mi, "newrt | start\n", tab->first_app, tab->napps );                // send table start req to all
+
+       while( (ent = mk_entry( mi, table, e++ )) != NULL ) {                                   // build each entry once, send to all
+               fprintf( stderr, "[INF] sending table %d entry %d: ", table, e );
+               send2range( mi, ent, tab->first_app, tab->napps );
+       }
+
+       fprintf( stderr, "[INF] send table end message: " );
+       send2range( mi, "newrt | end\n", tab->first_app, tab->napps );          // send table end notice to all
+}
+
+/*
+       Run the list of tables and send them all out.
+*/
+static void send_all_tables( master_t* mi ) {
+       int i;
+
+       if( ! mi ) {
+               return;
+       }
+
+       for( i = 0; i < mi->ntables; i++ ) {
+               send_table( mi, i );
+       }
+}
+
+
+// ----------------- testing ----------------------------------------------------------------------
+
+/*
+       Dump table entries in the form we'll send to apps to stderr.
+*/
+static void print_tables( master_t* mi ) {
+       char*   ent;
+       int             t;
+       int             e;
+
+       if( ! mi ) {
+               return;
+       }
+
+       for( t = 0; t < mi->ntables; t++ ) {
+               fprintf( stderr, "=== table %d ===\n", t );
+               e = 0;
+               while( (ent = mk_entry( mi, t, e++ )) != NULL ) {
+                       fprintf( stderr, "%s", ent );
+                       free( ent );
+               }
+       }
+}
+
+static void print_tokens( tokens_t* tokens ) {
+       int i;
+
+       fprintf( stderr, "there are %d tokens\n", tokens->ntoks );
+       for( i = 0; i < tokens->ntoks; i++ ) {
+               fprintf( stderr, "[%02d] (%s)\n", i, tokens->tokens[i] );
+       }
+}
+
+static void self_test( char* fname ) {
+       char*   s0 = "            Now is   the time for     all    to stand up and cheer!";
+       char*   s1 = "            Now is   \"the time for     all\"    to stand up and cheer!";
+       char*   s2 = " field1 | field2||field4|field5";
+       tokens_t*       tokens;
+       rbuffer_t       *rb;
+       int i;
+
+       tokens = tokenise( s0, ' ' );
+       if( tokens->ntoks != 11 ) {
+               fprintf( stderr, "didn't parse into 11 tokens (got %d): %s\n", tokens->ntoks, s1 );
+       }
+       print_tokens( tokens );
+
+       tokens = tokenise( s1, ' ' );
+       if( tokens->ntoks != 8 ) {
+               fprintf( stderr, "didn't parse into 11 tokens (got %d): %s\n", tokens->ntoks, s1 );
+       }
+       print_tokens( tokens );
+
+       tokens = tokenise( s2, '|' );
+       if( tokens->ntoks != 5 ) {
+               fprintf( stderr, "didn't parse into 5 tokens (got %d): %s\n", tokens->ntoks, s2 );
+       }
+       print_tokens( tokens );
+
+       free_tokens( tokens );
+
+       if( fname == NULL ) {
+               return;
+       }
+
+       rb = f2r( fname );
+       if( rb == NULL ) {
+               fprintf( stderr, "[FAIL] couldn't read file into rbuffer: %s\n", strerror( errno ) );
+       } else {
+               while( (s2 = next_rec( rb )) != NULL ) {
+                       fprintf( stderr, "record: (%s)\n", s2 );
+               }
+
+               free_rbuf( rb );
+       }
+
+       return;
+}
+
+
+// ----------------------------------------------------------------------------------------------
+
+int main( int argc, char** argv ) {
+       void*   mi;
+       int             not_ready;
+
+       if( argc > 1 ) {
+               if( strcmp( argv[1], "selftest" ) == 0 ) {
+                       self_test( argc > 1 ? argv[2] : NULL );
+                       exit( 0 );
+               }
+
+               mi = init( argv[1] );                                                                   // parse the config and generate table structs
+               if( ! mi ) {
+                       fprintf( stderr, "[CRI] initialisation failed\n" );
+                       exit( 1 );
+               }
+
+               print_tables( mi );
+
+               while( (not_ready = connect2all( mi ) ) > 0 ) {
+                       fprintf( stderr, "[INF] still waiting to connect to %d applications\n", not_ready );
+                       sleep( 2 );
+               }
+
+               fprintf( stderr, "[INF] connected to all applications, sending tables\n" );
+
+               send_all_tables( mi );
+       } else {
+               fprintf( stderr, "[INFO] usage: %s file-name\n", argv[0] );
+       }
+}
+