Add ability for RMR to request route table 34/2534/2 3.2.0
authorE. Scott Daniels <daniels@research.att.com>
Fri, 14 Feb 2020 14:23:59 +0000 (09:23 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Tue, 18 Feb 2020 21:05:40 +0000 (16:05 -0500)
The route table collector has been extended to allow RMR to send
a request to have the route manager deliver a new table. This
change also provides for an ack/nack to be sent back to route
manager to indicate the state of a route table update.

There is also a small wormhole bug fix:
Wormhole send was not clearing the call ID and some message
were being treated as call response on the other side.

Issue-ID: RIC-92
Issue-ID: RIC-91

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I8ffa7919b42412e4604130885b968f32b5448294

17 files changed:
CHANGES
CMakeLists.txt
doc/src/man/env_var_list.im
doc/src/man/rmr_rcv_msg.3.xfm
src/rmr/common/include/RIC_message_types.h
src/rmr/common/include/rmr_agnostic.h
src/rmr/common/src/rt_generic_static.c
src/rmr/common/src/rtc_static.c
src/rmr/common/src/wormholes.c
src/rmr/nng/include/rmr_nng_private.h
src/rmr/nng/src/rmr_nng.c
src/rmr/si/include/rmr_si_private.h
src/rmr/si/src/mt_call_si_static.c
src/rmr/si/src/rmr_si.c
src/rmr/si/src/rtable_si_static.c
src/rmr/si/src/rtc_si_static.c [deleted file]
src/rmr/si/src/sr_si_static.c

diff --git a/CHANGES b/CHANGES
index e41168b..eea67a4 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,10 @@
 API and build change  and fix summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2020 February 18; version 3.2.0
+       Added support for new Route Manager and it's ability to accept
+       a request for table update.
+
 2020 February 14; version 3.1.3
        Fix bug in SIsend which was causing a core dump in some cases
        where the application attempted to send on a connection that
index 7e1efeb..6bf455b 100644 (file)
@@ -37,8 +37,8 @@ project( rmr LANGUAGES C )
 cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "3" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
-set( minor_version "1" )
-set( patch_level "3" )
+set( minor_version "2" )
+set( patch_level "0" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index a9d4297..e1ed9a9 100644 (file)
@@ -1,4 +1,4 @@
-.** vim: ts=4 noet sw=4:
+).** vim: ts=4 noet sw=42
 .if false
 ==================================================================================
        Copyright (c) 2019 Nokia
@@ -23,7 +23,7 @@
        Abstract:       This is a list of environment variables which are recognised
                                by RMR. This is an embed file as it is referenced by both 
                                manual pages and the 'read the docs' source (allowing a single
-                               point of maintenence).
+                               point of maintenance).
 
        Date:           6 November 2019 (broken from the main manual page)
        Author:         E. Scott Daniels
 
 
 &beg_dlist(.75i : ^&bold_font )
-&ditem(RMR_ASYNC_CONN) Allows the asynch connection mode to be turned off (by setting the
+&ditem(RMR_ASYNC_CONN) Allows the async connection mode to be turned off (by setting the
        value to 0. When set to 1, or missing from the environment, RMR will invoke the
-       connection interface in the transport mechanism using the non-blocking (asynch)
+       connection interface in the transport mechanism using the non-blocking (async)
        mode.  This will likely result in many "soft failures" (retry) until the connection
        is established, but allows the application to continue unimpeeded should the
        connection be slow to set up.
        &half_space
 
-&ditem(RMR_BIND_IF) This provides the interface that RMr will bind listen ports to allowing
+&ditem(RMR_BIND_IF) This provides the interface that RMR will bind listen ports to allowing
        for a single interface to be used rather than listening across all interfaces.
-       This should be the IP address assigned to the interface that RMr should listen
-       on, and if not defined RMr will listen on all interfaces.
+       This should be the IP address assigned to the interface that RMR should listen
+       on, and if not defined RMR will listen on all interfaces.
        &half_space
 
-&ditem(RMR_RTG_SVC) RMr opens a TCP listen socket using the port defined by this
-       environment variable and expects that the route table generator process
-       will connect to this port.
-       If not supplied the port 4561 is used.
+&ditem(RMR_CTL_PORT) 
+       This variable defines the port that RMR should open for communications
+       with Route Manager, and other RMR control applications. 
+       If not defined, the port 4561 is assumed.
+
+       &space
+       Previously, the &cw(RMR_RTG_SVC) (route table generator service port)
+       was used to define this port.
+       However, a future version of Route Manager will require RMR to connect
+       and request tables, thus that variable is now used to supply the Route
+       Manager well known address and port.
+
+       &space
+       To maintain backwards compatablibility with the older Route Manager versions,
+       the presence of this variable in the environment will shift RMR's behaviour
+       with respect to the default value used when &cw(RMR_RTG_SVC) is &bold(not) defined.
+
+       &space
+       When &cw(RMR_CTL_PORT) is &bold(defined^:)
+       RMR assumes that Route Manager requires RMR to connect and request table
+       updates is made, and the default well known address for Route manager
+       is used (routemgr:4561).
+
+       &space
+       When &cw(RMR_CTL_PORT) is &bold(undefined^:)
+       RMR assumes that Route Manager will connect and push table updates, thus the
+       default listen port (4561) is used.
+
+       &space
+       To avoid any possible misinterpretation and/or incorrect assumptions on the part
+       of RMR, it is recommended that both the  &cw(RMR_CTL_PORT) and &cw(RMR_RTG_SVC)
+       be defined.
+       In the case where both variables are defined, RMR will behave exactly as is
+       communicated with the variable's values.
+       &half_space
+
+&ditem(RMR_RTG_SVC) 
+       The value of this variable depends on the Route Manager in use.
+       &space
+       When the Route Manager is expecting to connect to an xAPP and push
+       route tables, this variable must indicate the &cw(port) which RMR should
+       use to listen for these connections. 
+
+       &space
+       When the Route Manager is expecting RMR to connect and request a
+       table update during initialisation, the variable should be the 
+       &cw(host:port) of the Route Manager process. 
+       
+       &space
+       The &cw(RMR_CTL_PORT) variable (added with the support of sending table update
+       requests to Route manager), controls the behaviour if this variable is not set.
+       See the description of that variable for details.
        &half_space
 
 &ditem(RMR_HR_LOG) 
                &half_space
                &ditem(3)       Warnings and all messages written with a lower value.
                &half_space
-               &ditem(4)       Informationional and all messages written with a lower value.
+               &ditem(4)       Informational and all messages written with a lower value.
                &half_space
                &ditem(5)       Debugging mode -- all messages written, however this requires RMR to have been compiled with debugging support enabled.
        &end_dlist
        &half_space
 
-&ditem(RMR_RTG_ISRAW) Is set to 1 if the route table generator is sending "plain" messages
-       (not using RMr to send messages, 0 if the rtg is using RMr to send. The default
-       is 1 as we don't expect the rtg to use RMr.
+&ditem(RMR_RTG_ISRAW) 
+       &bold(Deprecated.)
+       Should be set to 1 if the route table generator is sending "plain" messages
+       (not using RMR to send messages, 0 if the rtg is using RMR to send. The default
+       is 1 as we don't expect the rtg to use RMR.
+       &half_space
+       This variable is only recognised when using the NNG transport library as 
+       it is not possible to support NNG "raw" communications with other transport
+       libraries.  It is also necessary to match the value of this variable
+       with the capabilities of the Route Manager; at some point in the future
+       RMR will assume that all Route Manager messages will arrive via an RMR
+       connection and will ignore this variable.
 
 &ditem(RMR_SEED_RT) This is used to supply a static route table which can be used for
        debugging, testing, or if no route table generator process is being used to
        supply the route table.
-       If not defined, no static table is used and RMr will not report &ital(ready)
+       If not defined, no static table is used and RMR will not report &ital(ready)
        until a table is received.
        The static route table may contain both the route table (between newrt start
        and end records), and the MEID map (between meid_map start and end records)
        the rmr_rts_msg() function to return a response to the sender. If not supplied
        RMR will use the hostname which in some container environments might not be
        routable.
+       &half_space
+       The value of this variable is also used for Route Manager messages which are
+       sent via an RMR connection.
 
 &ditem(RMR_VCTL_FILE) This supplies the name of a verbosity control file. The core
        RMR functions do not produce messages unless there is a critical failure. However,
index b67b4bb..c53dcf5 100644 (file)
@@ -61,7 +61,32 @@ the message information (state, length, payload), or a NULL pointer in the case
 of an extreme error.
 
 &h2(ERRORS)
-The &ital(state) field in the message buffer will indicate either &cw(RMR_OK) or
+The &ital(state) field in the message buffer will indicate either &cw(RMR_OK) 
+when the message receive process was successful and the message can be used
+by the caller.
+Depending on the underlying transport mechanism, one of the following RMR 
+error stats may be returned:
+&half_space
+
+&beg_dlist(.75i : ^&bold_font )
+&di(RMR_ERR_EMPTY) The message received had no payload, or was completely empty.
+
+&half_space
+&di(RMR_ERR_TIMEOUT) For some transport mechanisms, or if reading the receive
+queue from multiple threads, it is possible for one thread to find no data
+waiting when it queries the queue.  When this state is reported, the 
+message buffer does not contain message data and the user application should
+reinvoke the receive function.
+&end_dlist
+
+&space
+
+
+When an RMR error state is reported, the underlying &cw(errno) value might
+provide more information. The following is a list of possible values that
+might accompany the states listed above:
+
+&half_space
 &cw(RMR_ERR_EMPTY) if an empty message was received. 
 If a nil pointer is returned, or any other state value was set in the message
 buffer,  &cw(errno) will be set to one of the following:
index aeb360e..8e0c67c 100644 (file)
 
 
 
-/* Header  file defining  message types
-   for various RMR messages
-
-
-    ------------------
-    WORK IN PROGRESS
-    ------------------
-
+/* 
+       Header  file defining  message types for various RMR messages
 */
 
 #define RIC_UNDEFINED                          -1
 
-// ---- RESERVED -------------------------------------------
-// all message types 0 - 99 are reserved for RMR
-// ---------------------------------------------------------
+/* 
+---------------------------------------------------------
+       RMR Reserved types
+               All message types 0 - 99 are reserved for RMM.
+---------------------------------------------------------
+*/
+               
+#define RMRRM_TABLE_DATA                       20              // table data from route manger
+#define        RMRRM_REQ_TABLE                         21              // request for table update to route mangager
+#define RMRRM_TABLE_STATE                      22              // state of table to route mgr
+
 
 // --- please keep additions in numerical order ------
 
index cb8114f..6b1537b 100644 (file)
@@ -56,7 +56,8 @@ typedef struct uta_ctx  uta_ctx_t;
 
                                                                                        // environment variable names we'll suss out
 #define ENV_BIND_IF            "RMR_BIND_IF"           // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
-#define ENV_RTG_PORT   "RMR_RTG_SVC"           // the port we'll listen on for rtg connections
+#define ENV_RTG_PORT   "RMR_RTG_SVC"           // the port we'll listen on for rtg connections (deprecated; see RTG_SVC and CTL_PORT)
+#define ENV_RTG_ADDR   "RMR_RTG_SVC"           // the address we will connect to for route manager updates
 #define ENV_SEED_RT            "RMR_SEED_RT"           // where we expect to find the name of the seed route table
 #define ENV_SEED_MEMAP "RMR_SEED_MEMAP"        // where we expect to find the name of the seed route table
 #define ENV_RTG_RAW            "RMR_RTG_ISRAW"         // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
@@ -66,6 +67,7 @@ typedef struct uta_ctx  uta_ctx_t;
 #define ENV_SRC_ID             "RMR_SRC_ID"            // forces this string (adding :port, max 63 ch) into the source field; host name used if not set
 #define ENV_LOG_HR             "RMR_HR_LOG"            // set to 0 to turn off human readable logging and write using some formatting
 #define ENV_LOG_VLEVEL "RMR_LOG_VLEVEL"        // set the verbosity level (0 == 0ff; 1 == crit .... 5 == debug )
+#define ENV_CTL_PORT   "RMR_CTL_PORT"          // route collector will listen here for control messages (4561 default)
 
 #define NO_FLAGS       0                               // no flags to pass to a function
 
@@ -75,6 +77,7 @@ typedef struct uta_ctx  uta_ctx_t;
 //#define IFL_....
 
 #define CFL_MTC_ENABLED        0x01            // multi-threaded call is enabled
+#define CFL_NO_RTACK   0x02            // no route table ack needed when end received
 
                                                                        // context flags
 #define CTXFL_WARN             0x01            // ok to warn on stderr for some things that shouldn't happen
@@ -90,8 +93,10 @@ typedef struct uta_ctx  uta_ctx_t;
 #define MAX_CALL_ID            255                     // largest call ID that is supported
 
 //#define DEF_RTG_MSGID        ""                              // default to pick up all messages from rtg
-#define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on
+#define DEF_CTL_PORT   "4561"                  // default control port that rtc listens on
+#define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on (deprecated)
 #define DEF_COMM_PORT  "tcp:4560"              // default port we use for normal communications
+#define DEF_RTG_WK_ADDR        "routemgr:4561" // well known address for the route manager
 #define DEF_TR_LEN             (-1)                    // use default trace data len from context
 
 #define UNSET_SUBID            (-1)                    // initial value on msg allocation indicating not set
@@ -305,9 +310,13 @@ static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
-static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
+static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel );
 static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
 static void* rtc( void* vctx );
 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
 
+// --------- route manager communications -----------------
+static void send_rt_ack( uta_ctx_t* ctx, int state, char* reason );
+static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx );
+
 #endif
index 245c9b5..f098f53 100644 (file)
@@ -46,6 +46,8 @@
 #include <unistd.h>
 #include <netdb.h>
 
+#include <RIC_message_types.h>         // needed for route manager messages
+
 
 /*
        Passed to a symtab foreach callback to construct a list of pointers from
@@ -192,6 +194,85 @@ static void  rt_epcounts( route_table_t* rt, char* id ) {
        rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
 }
 
+// ------------ route manager communication -------------------------------------------------
+/*
+       Send a request for a table update to the route manager. Updates come in
+       async, so send and go.
+
+       pctx is the private context for the thread; ctx is the application context
+       that we need to be able to send the application ID in case rt mgr needs to
+       use it to idenfity us.
+
+       Returns 0 if we were not able to send a request.
+*/
+static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
+       rmr_mbuf_t*     smsg;
+       int     state = 0;
+
+       if( ctx->rtg_whid < 0 ) {
+               return state;
+       }
+
+       smsg = rmr_alloc_msg( pctx, 1024 );
+       if( smsg != NULL ) {
+               smsg->mtype = RMRRM_REQ_TABLE;
+               smsg->sub_id = 0;
+               snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
+               rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
+               smsg->len = strlen( smsg->payload ) + 1;
+       
+               smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
+               if( (state = smsg->state) != RMR_OK ) {
+                       rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
+                       rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
+                       ctx->rtg_whid = -1;
+               }
+
+               rmr_free_msg( smsg );
+       }
+
+       return state;
+}
+
+/*
+       Send an ack to the route table manager for a table ID that we are
+       processing.      State is 1 for OK, and 0 for failed. Reason might 
+       be populated if we know why there was a failure.
+
+       Context should be the PRIVATE context that we use for messages
+       to route manger and NOT the user's context.
+*/
+static void send_rt_ack( uta_ctx_t* ctx, int state, char* reason ) {
+       rmr_mbuf_t*     smsg;
+       
+       if( ctx == NULL || ctx->rtg_whid < 0 ) {
+               return;
+       }
+
+       if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
+               return;
+       }
+
+       smsg = rmr_alloc_msg( ctx, 1024 );
+       if( smsg != NULL ) {
+               smsg->mtype = RMRRM_TABLE_STATE;
+               smsg->sub_id = 0;
+               snprintf( smsg->payload, 1024, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", 
+                       ctx->table_id == NULL ? "<id-missing>" : ctx->table_id, reason == NULL ? "" : reason );
+
+               smsg->len = strlen( smsg->payload ) + 1;
+       
+               rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
+               smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+               if( (state = smsg->state) != RMR_OK ) {
+                       rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
+                       rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
+                       ctx->rtg_whid = -1;
+               }
+
+               rmr_free_msg( smsg );
+       }
+}
 
 // ------------------------------------------------------------------------------------------------
 /*
@@ -573,20 +654,26 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
 
 
        For a RT update, we expect:
-               newrt|{start|end [hold]}
+               newrt | start | <table-id>
+               newrt | end | <count>
                rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
                mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
                mse| <mtype>[,sender] | <sub-id> | %meid
 
 
        For a meid map update we expect:
-               meid_map | start
+               meid_map | start | <table-id>
                meid_map | end | <count> | <md5-hash>
                mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
                mme_del | <meid0> <meid1>...<meidn>
 
+
+       The pctx is our private context that must be used to send acks/status
+       messages back to the route manager.  The regular ctx is the ctx that
+       the user has been given and thus that's where we have to hang the route
+       table we're working with.
 */
-static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
+static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel ) {
        int i;
        int ntoks;                                                      // number of tokens found in something
        int ngtoks;
@@ -594,6 +681,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
        rtable_ent_t*   rte;                            // route table entry added
        char*   tokens[128];
        char*   tok;                                            // pointer into a token or string
+       char    wbuf[1024];
 
        if( ! buf ) {
                return;
@@ -630,6 +718,18 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                        case 'n':                                                                                               // newrt|{start|end}
                                tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
+                                       if( ntoks >2 ) {
+                                               if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
+                                                       rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
+                                                               ctx->new_rtable->updates, tokens[2] );
+                                                       snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+                                                       send_rt_ack( pctx, RMR_OK, wbuf );
+                                                       uta_rt_drop( ctx->new_rtable );
+                                                       ctx->new_rtable = NULL;
+                                                       break;
+                                               }
+                                       }
+
                                        if( ctx->new_rtable ) {
                                                uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
                                                ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
@@ -643,18 +743,32 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                                        rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
                                                        rt_stats( ctx->rtable );
                                                }
+
+                                               send_rt_ack( pctx, RMR_OK, NULL );
                                        } else {
                                                if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
                                        }
-                               } else {                                                                                        // start a new table.
-                                       if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
+                               } else {                                                                                                                        // start a new table.
+                                       if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
+                                               send_rt_ack( pctx, !RMR_OK, "table not complete" );                     // nack the one that was pending as end never made it
+
                                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
                                                uta_rt_drop( ctx->new_rtable );
                                        }
 
+                                       if( ctx->table_id != NULL ) {
+                                               free( ctx->table_id );
+                                       }
+                                       if( ntoks >2 ) {
+                                               ctx->table_id = strdup( tokens[2] );
+                                       } else {
+                                               ctx->table_id = NULL;
+                                       }
+
                                        ctx->new_rtable = NULL;
                                        ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
+                                       ctx->new_rtable->updates = 0;                                           // init count of entries received
                                        if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
                                }
                                break;
@@ -730,6 +844,13 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                                uta_rt_drop( ctx->new_rtable );
                                        }
 
+                                       if( ntoks >2 ) {
+                                               if( ctx->table_id != NULL ) {
+                                                       free( ctx->table_id );
+                                               }
+                                               ctx->table_id = strdup( tokens[2] );
+                                       }
+
                                        ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
                                        ctx->new_rtable->updates = 0;                                           // init count of updates received
                                        if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
@@ -787,7 +908,7 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
                        return;
                }
 
-               parse_rt_rec( ctx, rec, vlevel );
+               parse_rt_rec( ctx, NULL, rec, vlevel );                 // no pvt context as we can't ack
        }
 
        if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
index b411dbe..45a277a 100644 (file)
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include <RIC_message_types.h>         // needed for RMR/Rt Mgr msg types
+
+// ---- local constants ------------------
+                                                                               // flags
+#define RTCFL_HAVE_UPDATE      0x01            // an update from RM was received
+
+#define MAX_RTC_BUF    5 * 1024                        // max buffer size we'll expect is 4k, add some fudge room
+
+// ------------------------------------------------------------------------------------------------
+
+/*
+       Loop forever (assuming we're running in a pthread reading the static table
+       every minute or so.
+*/
+static void* rtc_file( void* vctx ) {
+       uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
+       char*   eptr;
+       int             vfd = -1;                                       // verbose file des if we have one
+       int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
+       char    wbuf[256];
+
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
+               return NULL;
+       }
+
+       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
+               vfd = open( eptr, O_RDONLY );
+       }
+
+       ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
+       while( 1 ) {
+               if( vfd >= 0 ) {
+                       wbuf[0] = 0;
+                       lseek( vfd, 0, 0 );
+                       read( vfd, wbuf, 10 );
+                       vlevel = atoi( wbuf );
+               }
+
+               read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
+
+               sleep( 60 );
+       }
+}
+
 static int refresh_vlevel( int vfd ) {
        int vlevel = 0;
        char    rbuf[128];
@@ -57,6 +103,261 @@ static int refresh_vlevel( int vfd ) {
 }
 
 /*
+       Route Table Collector
+       A side thread which either attempts to connect and request a table
+       from the Route Manager, or opens a port and listens for Route Manager
+       to push table updates.
+
+       It may do other things along the way (latency measurements, alarms, 
+       respond to RMR pings, etc.).
+
+       The behaviour with respect to listening for Route Manager updates vs
+       the initiation of the connection and sending a request depends on the
+       value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
+       host:port, or IP:port, is given, then we assume that we make the connection
+       and send a request for the table (request mode).  If the variable is just 
+       a port, then we assume Route Manager will connect and push updates (original 
+       method).
+
+       If the variable is not defined, the default behaviour, in order to be
+       backwards compatable, depends on the presence of the ENV_CTL_PORT 
+       (RMR_CTL_PORT) variable (new with the support for requesting a table).
+
+       
+       ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
+       unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
+                                                                       wait for Rt Mgr to push tables
+
+       set                             unset                   Use the default Rt Mgr wellknown addr
+                                                                       and port (DEF_RTG_WK_ADDR) to connect
+                                                                       and request a table. The control port
+                                                                       used is the value set by ENV_CTL_PORT.
+
+       unset                   set                             As described above. The default control
+                                                                       port (DEF_CTL_PORT) is used. 
+
+       When we are running in request mode, then we will send the RMR message 
+       RMRRM_REFRESH to this address (wormhole) as a request for the route manager 
+       to send a new table. We will attempt to connect and send requests until
+       we have a table. Calls to rmr_ready() will report FALSE until a table is
+       loaded _unless_ a seed table was given.
+
+       Route table information is expected to arrive on RMR messages with type 
+       RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
+       table record, so the payload is as it appears in the seed file or as
+       delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
+       to completely supply a new table or table update. See the header for parse_rt_rec
+       in common for a description of possible message contents.
+
+       Buffers received from the route table generator can contain multiple newline terminated
+       records, but each buffer must be less than 4K in length, and the last record in a
+       buffer may NOT be split across buffers.
+
+       Other chores:
+       In addition to the primary task of getting, vetting, and installing a new route table, or
+       updates to the existing table, this thread will periodically cause the send counts for each
+       endpoint known to be written to standard error. The frequency is once every 180 seconds, and
+       more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
+*/
+static void* rtc( void* vctx ) {
+       uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
+       uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
+       rmr_mbuf_t*     msg = NULL;                             // message from rtg
+       char*   payload;                                        // payload in the message
+       size_t  mlen;
+       char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
+       char*   rtg_addr;                                       // host:port address of route table generator (route manager)
+       char*   daddr;                                          // duplicated rtg address string to parse/trash
+       size_t  buf_size;                                       // nng needs var pointer not just size?
+       char*   nextr;                                          // pointer at next record in the message
+       char*   curr;                                           // current record
+       int     i;
+       long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
+       int             cstate = -1;                            // connection state to rtg
+       int             state;                                          // processing state of some nng function
+       char*   tokens[128];
+       char    wbuf[128];
+       char*   pbuf = NULL;
+       int             pbuf_size = 0;                          // number allocated in pbuf
+       int             ntoks;
+       int             vfd = -1;                                       // verbose file des if we have one
+       int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
+       char*   eptr;
+       int             epfd = -1;                                      // fd for epoll so we can multi-task
+       struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
+       struct  epoll_event epe;                        // event definition for event to listen to
+       int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
+       int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
+       int             flags = 0;
+
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
+               return NULL;
+       }
+
+       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
+               vfd = open( eptr, O_RDONLY );
+               vlevel = refresh_vlevel( vfd );
+       }
+
+       ctx->flags |= CFL_NO_RTACK;                     // don't ack when reading from a file
+       read_static_rt( ctx, vlevel );                  // seed the route table if one provided
+       ctx->flags &= ~CFL_NO_RTACK;
+
+
+       my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
+       if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
+               my_port = DEF_CTL_PORT; 
+               daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
+       } else {
+               daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
+       }
+
+       if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) {              // undefined, use default set above
+               rtg_addr = daddr;
+       }
+
+       daddr = strdup( rtg_addr );                                                                     // dup to destroy during parse
+
+       ntoks = uta_tokenise( daddr, tokens, 120, ':' );                        // should be host:ip of rt mgr (could be port only which we assume is old listen port)
+       switch( ntoks ) {
+               case 0:                                                                 // should not happen, but prevent accidents and allow default to ignore additional tokens
+                       break;
+
+               case 1:
+                       my_port = tokens[0];                    // just port -- assume backlevel environment where we just listen
+                       flags |= RTCFL_HAVE_UPDATE;             // prevent sending update reqests
+                       break;
+
+               default:
+                       if( strcmp( tokens[0], "tcp" ) == 0 ) {                 // old school nng tcp:xxxx so we listen on xxx
+                               flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
+                               my_port = tokens[1];
+                       } else {
+                               // rtg_addr points at rt mgr address and my port set from env or default stands as is   
+                       }
+                       break;
+       }
+
+       if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
+               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
+
+               while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
+                       sleep( count_delay );
+                       rt_epcounts( ctx->rtable, ctx->my_name );
+               }
+
+               return NULL;
+       }
+
+       ctx->rtg_whid = -1;
+
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
+
+       bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
+       blabber = 0;
+       while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
+               while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
+                       if( (flags & RTCFL_HAVE_UPDATE) == 0 ) {                                                // no route table updated from rt mgr; request one
+                               if( ctx->rtg_whid < 0 ) {
+                                       ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
+                               }
+                               send_update_req( pvt_cx, ctx );
+                       }
+
+                       msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
+
+                       if( time( NULL ) > blabber  ) {
+                               vlevel = refresh_vlevel( vfd );
+                               if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
+                                       blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
+                                       if( blabber > bump_freq ) {
+                                               count_delay = 300;
+                                       }
+                                       rt_epcounts( ctx->rtable, ctx->my_name );
+                               }
+                       }
+               }
+
+               vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
+
+               if( msg != NULL && msg->len > 0 ) {
+                       payload = msg->payload;
+                       mlen = msg->len;                                        // usable bytes in the payload
+                       if( vlevel > 1 ) {
+                               rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d bytes (%s)\n", msg->mtype, (int) mlen, msg->payload );
+                       } else {
+                               if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
+                       }
+
+                       switch( msg->mtype ) {
+                               case RMRRM_TABLE_DATA:
+                                       if( (flags & RTCFL_HAVE_UPDATE) == 0 ) {
+                                               flags |= RTCFL_HAVE_UPDATE;
+                                               rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
+                                       }
+
+                                       if( pbuf_size <= mlen ) {
+                                               if( pbuf ) {
+                                                       free( pbuf );
+                                               }
+                                               if( mlen < 512 ) {
+                                                       pbuf_size = 512;
+                                               } else {
+                                                       pbuf_size = mlen * 2;
+                                               }
+                                               pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
+                                       }
+                                       memcpy( pbuf, payload, mlen );
+                                       pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
+
+                                       curr = pbuf;
+                                       while( curr ) {                                                         // loop over each record in the buffer
+                                               nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
+
+                                               if( nextr ) {
+                                                       *(nextr++) = 0;
+                                               }
+
+                                               if( vlevel > 1 ) {
+                                                       rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
+                                               }
+                                               parse_rt_rec( ctx, pvt_cx, curr, vlevel );              // parse record and add to in progress table
+
+                                               curr = nextr;
+                                       }
+
+                                       msg->len = 0;                           // force back into the listen loop
+                                       break;
+
+                               default:
+                                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
+                                       break;
+                       }
+               }
+
+               if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
+                       return NULL;
+               }
+
+       }
+
+       return NULL;    // unreachable, but some compilers don't see that and complain.
+}
+
+#ifndef SI95_BUILD
+// this is nng specific inas much as we allow raw (non-RMR) messages 
+
+/*
+       NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
+                       from the route manger.  It is deprecated in favour of managing all RM-RMR
+                       communications via an RMR session.
+
+                       The rtc() function above is the new and preferred function regardless
+                       of transport.
+
+       -----------------------------------------------------------------------------------
        Route Table Collector
        A side thread which opens a socket and subscribes to a routing table generator.
        It may do other things along the way (latency measurements?).
@@ -93,13 +394,12 @@ static int refresh_vlevel( int vfd ) {
        endpoint known to be written to standard error. The frequency is once every 180 seconds, and
        more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
 */
-static void* rtc( void* vctx ) {
+static void* raw_rtc( void* vctx ) {
        uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
        uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
        rmr_mbuf_t*     msg = NULL;                             // message from rtg
        char*   payload;                                        // payload in the message
        size_t  mlen;
-       size_t  clen;                                           // length to copy and mark
        char*   port;                                           // a port number we listen/connect to
        char*   fport;                                          // pointer to the real buffer to free
        size_t  buf_size;                                       // nng needs var pointer not just size?
@@ -144,9 +444,12 @@ static void* rtc( void* vctx ) {
                port = strdup( port );
        }
 
+/*
+       this test is now done in init and this function is started _only_ if the value was 1
        if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
                raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
        }
+*/
 
        fport = port;           // must hold to free
 
@@ -236,9 +539,9 @@ static void* rtc( void* vctx ) {
                        payload = msg->payload;
                        mlen = msg->len;                                        // usable bytes in the payload
                        if( vlevel > 1 ) {
-                               rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
+                               rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
                        } else {
-                               if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
+                               if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
                        }
 
                        if( pbuf_size <= mlen ) {
@@ -264,9 +567,13 @@ static void* rtc( void* vctx ) {
                                }
 
                                if( vlevel > 1 ) {
-                                       rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
+                                       rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
+                               }
+                               if( raw_interface ) {
+                                       parse_rt_rec( ctx, NULL, curr, vlevel );                // nil pvt to parser as we can't ack messages
+                               } else {
+                                       parse_rt_rec( ctx, pvt_cx, curr, vlevel );              // parse record and add to in progress table
                                }
-                               parse_rt_rec( ctx, curr, vlevel );              // parse record and add to in progress table
 
                                curr = nextr;
                        }
@@ -281,6 +588,7 @@ static void* rtc( void* vctx ) {
 
        return NULL;    // unreachable, but some compilers don't see that and complain.
 }
+#endif
 
 
 #endif
index 1167c25..76567f9 100644 (file)
@@ -209,7 +209,7 @@ extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
 
 
        if( (ep = rt_ensure_ep( ctx->rtable, target )) == NULL ) {              // get pointer to ep if there, create new if not
-               rmr_vlog( RMR_VL_ERR, "wormhole_open: ensure ep returned bad: target=%s\n", target );
+               rmr_vlog( RMR_VL_ERR, "wormhole_open: ensure ep returned bad: target=(%s)\n", target );
                return -1;                      // ensure sets errno
        }
 
@@ -220,7 +220,12 @@ extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
                }
 
                if( whm->eps[i] == ep ) {
-                       return i;                                                       // we're already pointing to it, just send it back again
+                       if(  whm->eps[i]->open ) {                                      // we know about it and it's open
+                               return i;                                                               // just send back the reference
+                       }
+
+                       whid = i;                                                                       // have it, but not open, reopen
+                       break;
                }
        }
 
@@ -249,6 +254,7 @@ extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg
        uta_ctx_t*      ctx;
        endpoint_t*     ep;                             // enpoint that wormhole ID references
        wh_mgt_t *whm;
+       char* d1;                                       // point at the call-id in the header
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -273,7 +279,7 @@ extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg
                return msg;
        }
 
-       errno = 0;                                                                                                      // nng seems not to set errno any longer, so ensure it's clear
+       errno = 0;
        if( msg->header == NULL ) {
                rmr_vlog( RMR_VL_ERR, "rmr_wh_send_msg: message had no header\n" );
                msg->state = RMR_ERR_NOHDR;
@@ -281,7 +287,13 @@ extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg
                return msg;
        }
 
+       d1 = DATA1_ADDR( msg->header );
+       d1[D1_CALLID_IDX] = NO_CALL_ID;                                                         // must blot out so it doesn't queue on a chute at the other end
+
        ep = whm->eps[whid];
+       if( ! ep->open ) {
+               rmr_wh_open( ctx, ep->name );
+       }
        return send2ep( ctx, ep, msg );                                                 // send directly to the endpoint
 }
 
index 878742c..919b225 100644 (file)
@@ -93,6 +93,10 @@ struct uta_ctx {
 
        pthread_t       rtc_th;                 // thread info for the rtc listener
        pthread_t       mtc_th;                 // thread info for the multi-thread call receive process
+
+                                                               // added for route manager request/states
+       rmr_whid_t      rtg_whid;               // wormhole id to the route manager for acks/requests
+       char*           table_id;               // table ID of the route table load in progress
 };
 
 
index fc72b36..faae333 100644 (file)
@@ -630,7 +630,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on NNG/d mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
@@ -751,9 +751,17 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                return NULL;
        }
 
-       if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
-               if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
-                       rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+       if( flags & FL_NOTHREAD ) {                                                             // if no rtc thread, we still need an empty route table for wormholes
+               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // so create one
+       } else {
+               if( (tok = getenv( ENV_RTG_RAW )) != NULL  && *tok == '0' ) {                   // use RMR for Rmgr comm only when specifically off
+                       if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rmr based rt collector thread
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+                       }
+               } else {
+                       if( pthread_create( &ctx->rtc_th,  NULL, raw_rtc, (void *) ctx ) ) {    // kick the raw msg rt collector thread
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+                       }
                }
        }
 
index 8c62082..99b5b20 100644 (file)
@@ -140,6 +140,10 @@ struct uta_ctx {
        pthread_t       rtc_th;                 // thread info for the rtc listener
        pthread_t       mtc_th;                 // thread info for the multi-thread call receive process
 
+                                                               // added for route manager request/states
+       rmr_whid_t      rtg_whid;               // wormhole id to the route manager for acks/requests
+       char*           table_id;               // table ID of the route table load in progress
+
                                                                // added for SI95 support
        si_ctx_t*       si_ctx;                 // the socket context
        int                     nrivers;                // allocated rivers
@@ -180,7 +184,6 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state );
 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) ;
 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  );
 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
-static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  );
 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
 
index 7a8b666..d6279fd 100644 (file)
@@ -144,15 +144,6 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        }
 
        river->state = RS_GOOD;
-
-/*
-fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
-for( i = 0; i < 40; i++ ) {
-       fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
-}
-fprintf( stderr, "\n" );
-*/
-
        remain = buflen;
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
                if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
index 31da2cb..7fa066b 100644 (file)
@@ -56,6 +56,7 @@
 #include "si95/socket_if.h"
 #include "si95/siproto.h"
 
+#define SI95_BUILD     1                       // we drop some common functions for si
 
 #include "rmr.h"                               // things the users see
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
@@ -66,7 +67,7 @@
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
-#include "rtc_si_static.c"                     // specific RMR only route table collector (SI only for now)
+#include "rtc_static.c"                                // route table collector (thread code)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
@@ -548,7 +549,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/c mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/e mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
@@ -690,7 +691,9 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                return NULL;
        }
 
-       if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need a RTC
+       if( flags & FL_NOTHREAD ) {                                     // thread set to off; no rout table collector started (could be called by the rtc thread itself)
+               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
+       } else {
                if( static_rtc ) {
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
index a79291d..659f857 100644 (file)
@@ -108,7 +108,6 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
                        rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to connect  to target: %s: %d %s\n", target, errno, strerror( errno ) );
                        ep->notify = 0;
                }
-               //nng_close( *nn_sock );
                return FALSE;
        }
 
diff --git a/src/rmr/si/src/rtc_si_static.c b/src/rmr/si/src/rtc_si_static.c
deleted file mode 100644 (file)
index dbccd58..0000000
+++ /dev/null
@@ -1,296 +0,0 @@
-// : vi ts=4 sw=4 noet :
-/*
-==================================================================================
-       Copyright (c) 2019-2020 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-          http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-==================================================================================
-*/
-
-/*
-       Mnemonic:       rtc_si_static.c
-       Abstract:       The route table collector is started as a separate pthread and
-                               is responsible for listening for route table updates from a
-                               route manager or route table generator process.
-
-                               This comes from the common src and may be moved back there once
-                               it is not necessary to support raw sessions (all route table
-                               gen messages are received over rmr channel).
-
-       Author:         E. Scott Daniels
-       Date:           29 November 2018 (extracted to common 13 March 2019)
-                               Imported to si base 17 Jan 2020.
-*/
-
-
-#ifndef _rtc_si_staic_c
-#define _rtc_si_staic_c
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <netdb.h>
-#include <errno.h>
-#include <string.h>
-#include <fcntl.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-
-/*
-       Loop forever (assuming we're running in a pthread reading the static table
-       every minute or so.
-*/
-static void* rtc_file( void* vctx ) {
-       uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
-       char*   eptr;
-       int             vfd = -1;                                       // verbose file des if we have one
-       int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
-       char    wbuf[256];
-
-
-       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
-               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
-               return NULL;
-       }
-
-       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
-               vfd = open( eptr, O_RDONLY );
-       }
-
-       while( 1 ) {
-               if( vfd >= 0 ) {
-                       wbuf[0] = 0;
-                       lseek( vfd, 0, 0 );
-                       read( vfd, wbuf, 10 );
-                       vlevel = atoi( wbuf );
-               }
-
-               read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
-
-               sleep( 60 );
-       }
-
-}
-
-static int refresh_vlevel( int vfd ) {
-       int vlevel = 0;
-       char    rbuf[128];
-
-       if( vfd >= 0 ) {                                        // if file is open, read current value
-               rbuf[0] = 0;
-               lseek( vfd, 0, 0 );
-               read( vfd, rbuf, 10 );
-               vlevel = atoi( rbuf );
-       }
-
-       return vlevel;
-}
-
-/*
-       Route Table Collector
-       A side thread which opens a socket and subscribes to a routing table generator.
-       It may do other things along the way (latency measurements?).
-
-       The pointer is a pointer to the context.
-
-       Listens for records from the route table generation publisher, expecting
-       one of the following, newline terminated, ASCII records:
-               rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
-               new|start                                                               // start of new table
-               new|end                                                                 // end of new table; complete
-
-               Name must be a host name which can be looked up via gethostbyname() (DNS).
-
-               Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
-                       for each message of the type that is sent.
-
-               Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
-                               group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
-
-               If multiple groups are given, when send() is called for the cooresponding message type,
-               the message will be sent to one endpoint in each group.
-
-               msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
-               that the entry applies only to the instance running with the hostname 'name.'
-
-       Buffers received from the route table generator can contain multiple newline terminated
-       records, but each buffer must be less than 4K in length, and the last record in a
-       buffer may NOT be split across buffers.
-
-       Other chores:
-       In addition to the primary task of getting, vetting, and installing a new route table, or
-       updates to the existing table, this thread will periodically cause the send counts for each
-       endpoint known to be written to standard error. The frequency is once every 180 seconds, and
-       more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
-*/
-static void* rtc( void* vctx ) {
-       uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
-       uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
-       rmr_mbuf_t*     msg = NULL;                             // message from rtg
-       char*   payload;                                        // payload in the message
-       size_t  mlen;
-       size_t  clen;                                           // length to copy and mark
-       char*   port;                                           // a port number we listen/connect to
-       char*   fport;                                          // pointer to the real buffer to free
-       size_t  buf_size;                                       // nng needs var pointer not just size?
-       char*   nextr;                                          // pointer at next record in the message
-       char*   curr;                                           // current record
-       int     i;
-       long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
-       int             cstate = -1;                            // connection state to rtg
-       int             state;                                          // processing state of some nng function
-       char*   tokens[128];
-       char    wbuf[128];
-       char*   pbuf = NULL;
-       int             pbuf_size = 0;                          // number allocated in pbuf
-       int             ntoks;
-       int             raw_interface = 0;                      // rtg is using raw NNG/Nano not RMr to send updates
-       int             vfd = -1;                                       // verbose file des if we have one
-       int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
-       char*   eptr;
-       int             epfd = -1;                                      // fd for epoll so we can multi-task
-       struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
-       struct  epoll_event epe;                        // event definition for event to listen to
-       int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
-       int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
-
-
-       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
-               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
-               return NULL;
-       }
-
-       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
-               vfd = open( eptr, O_RDONLY );
-               vlevel = refresh_vlevel( vfd );
-       }
-
-       read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
-
-       if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
-               port = strdup( DEF_RTG_PORT );
-       } else {
-               port = strdup( port );
-       }
-
-       if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
-               raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
-       }
-
-       fport = port;           // must hold to free
-
-       ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
-       switch( ntoks ) {
-               case 1:
-                               port = tokens[0];                       // just the port
-                               break;
-
-               case 2:
-                               port = tokens[1];                       // tcp:port or :port
-                               break;
-
-               default:
-                               port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
-                               break;
-       }
-
-       if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
-               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
-
-               while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
-                       sleep( count_delay );
-                       rt_epcounts( ctx->rtable, ctx->my_name );
-               }
-
-               free( fport );                                  // parinoid free and return
-               return NULL;
-       }
-
-       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
-       free( fport );
-
-       // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
-
-       bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
-       blabber = 0;
-       while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
-               while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
-                       msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
-
-                       if( time( NULL ) > blabber  ) {
-                               vlevel = refresh_vlevel( vfd );
-                               if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
-                                       blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
-                                       if( blabber > bump_freq ) {
-                                               count_delay = 300;
-                                       }
-                                       rt_epcounts( ctx->rtable, ctx->my_name );
-                               }
-                       }
-               }
-
-               vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
-
-               if( msg != NULL && msg->len > 0 ) {
-                       payload = msg->payload;
-                       mlen = msg->len;                                        // usable bytes in the payload
-                       if( vlevel > 1 ) {
-                               rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
-                       } else {
-                               if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
-                       }
-
-                       if( pbuf_size <= mlen ) {
-                               if( pbuf ) {
-                                       free( pbuf );
-                               }
-                               if( mlen < 512 ) {
-                                       pbuf_size = 512;
-                               } else {
-                                       pbuf_size = mlen * 2;
-                               }
-                               pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
-                       }
-                       memcpy( pbuf, payload, mlen );
-                       pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
-
-                       curr = pbuf;
-                       while( curr ) {                                                         // loop over each record in the buffer
-                               nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
-
-                               if( nextr ) {
-                                       *(nextr++) = 0;
-                               }
-
-                               if( vlevel > 1 ) {
-                                       rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
-                               }
-                               parse_rt_rec( ctx, curr, vlevel );              // parse record and add to in progress table
-
-                               curr = nextr;
-                       }
-
-                       if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
-                               break;
-                       }
-
-                       msg->len = 0;                           // force back into the listen loop
-               }
-       }
-
-       return NULL;    // unreachable, but some compilers don't see that and complain.
-}
-
-
-#endif
index ba22bc9..4dd32ea 100644 (file)
@@ -546,52 +546,6 @@ exit( 1 );
        return NULL;
 }
 
-/*
-       Receives a 'raw' message from a non-RMr sender (no header expected). The returned
-       message buffer cannot be used to send, and the length information may or may
-       not be correct (it is set to the length received which might be more than the
-       bytes actually in the payload).
-
-       Mostly this supports the route table collector, but could be extended with an
-       API external function.
-*/
-static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
-       return NULL;
-/*
-FIXME: do we need this in the SI world?  The only user was the route table collector
-       int state;
-       rmr_mbuf_t*     msg = NULL;             // msg received
-       size_t  rsize;                          // nng needs to write back the size received... grrr
-
-       if( old_msg ) {
-               msg = old_msg;
-       } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
-       }
-
-       //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                        // blocks hard until received
-       if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
-               return msg;
-       }
-       rsize = nng_msg_len( msg->tp_buf );
-
-       // do NOT use ref_tpbuf() here! Must fill these in manually.
-       msg->header = nng_msg_body( msg->tp_buf );
-       msg->len = rsize;                                                       // len is the number of bytes received
-       msg->alloc_len = rsize;
-       msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
-       msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
-       msg->state = RMR_OK;
-       msg->flags = MFL_RAW;
-       msg->payload = msg->header;                                     // payload is the whole thing; no header
-       msg->xaction = NULL;
-
-       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
-
-       return msg;
-*/
-}
-
 /*
        This does the hard work of actually sending the message to the given socket. On success,
        a new message struct is returned. On error, the original msg is returned with the state