Add the wormhole call function 17/2717/2 3.5.0
authorE. Scott Daniels <daniels@research.att.com>
Mon, 9 Mar 2020 17:57:39 +0000 (13:57 -0400)
committerE. Scott Daniels <daniels@research.att.com>
Mon, 9 Mar 2020 18:14:03 +0000 (14:14 -0400)
This change adds the rmr_wh_call() function to provide a
wormhole send which blocks until a response is received.

Issue-ID: RICAPP-80

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Ic97001db8a9fb177c70045fc3d0326805b9fb17b

14 files changed:
CHANGES
CMakeLists.txt
doc/CMakeLists.txt
doc/src/man/rmr_wh_call.3.xfm [new file with mode: 0644]
docs/rel-notes.rst
docs/user-guide.rst
src/rmr/common/include/rmr.h
src/rmr/common/include/rmr_agnostic.h
src/rmr/common/src/rt_generic_static.c
src/rmr/common/src/rtc_static.c
src/rmr/common/src/wormholes.c
src/rmr/nng/src/rmr_nng.c
src/rmr/si/src/rmr_si.c
src/rmr/si/src/sr_si_static.c

diff --git a/CHANGES b/CHANGES
index aa261b0..0b2898c 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
 API and build change  and fix summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2020 March 9; version 3.5.0
+       Added new wormhole send function: rmr_wh_call().
+
 2020 March 6; version 3.4.0
        Add new wormhole state function: rmr_wh_state().
 
index 97e5c8b..12958e8 100644 (file)
@@ -39,7 +39,7 @@ project( rmr LANGUAGES C )
 cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "3" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
-set( minor_version "4" )
+set( minor_version "5" )
 set( patch_level "0" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
index 5c0a5d1..02c8413 100644 (file)
@@ -89,6 +89,7 @@ if( BUILD_DOC )
                rmr_set_stimeout.3
                rmr_get_xact.3
                rmr_wh_state.3
+               rmr_wh_call.3
        )
 
        # initialise lists of files we generated
diff --git a/doc/src/man/rmr_wh_call.3.xfm b/doc/src/man/rmr_wh_call.3.xfm
new file mode 100644 (file)
index 0000000..7e1b935
--- /dev/null
@@ -0,0 +1,166 @@
+.if false
+==================================================================================
+       Copyright (c) 2020 Nokia 
+       Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+.fi
+.if false
+       Mnemonic        rmr_wh_call_3.xfm
+       Abstract        The manual page for the rmr_wh_call function.
+       Author          E. Scott Daniels
+       Date            28 January 2019
+.fi
+
+.gv e LIB lib
+.im &{lib}/man/setup.im 
+
+&line_len(6i)
+
+&h1(RMR Library Functions)
+&h2(NAME)
+       rmr_wh_call
+
+&h2(SYNOPSIS )
+&indent
+&ex_start
+#include <rmr/rmr.h>
+
+rmr_mbuf_t* rmr_wh_call( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg, int call_id, int max_wait )
+
+&ex_end
+&uindent
+
+&h2(DESCRIPTION)
+The &cw(rmr_wh_call) function accepts a message buffer (msg) from the user application 
+and attempts to send it using the wormhole ID provided (whid).
+If the send is successful, the call will block until either a response message is
+received, or the &cw(max_wait) number of milliseconds has passed.
+In order for the response to be recognised as a response, the remote process &bold(must)
+use &cw(rmr_rts_msg()) to send their response.
+
+&space
+Like &ital(rmr_wh_send_msg,) this function attempts to send the message directly
+to a process at the other end of a wormhole which was created with &ital(rmr_wh-open().)
+When sending message via wormholes, the normal RMr routing based on message type is
+ignored, and the caller may leave the message type unspecified in the message buffer
+(unless it is needed by the receiving process).
+
+The &cw(call_id) parameter is a number in the range of 2 through 255 and is used to 
+identify the calling thread in order to properly match a response message when it
+arrives.
+Providing this value, and ensuring the proper uniqueness,  is the responsibility of the 
+user application and as such the ability to use the &cw(rmr_wh_call()) function from 
+potentially non-threaded concurrent applications (such as Go's goroutines) is possible.
+
+.** pull in common retry text
+.im &{lib}/man/retry.im 
+
+&h2(RETURN VALUE)
+On success, new message buffer, with the payload containing the response from the remote 
+endpoint is returned.
+The state in this buffer will reflect the overall send operation state and should be
+&cw(RMR_OK.)
+
+&space
+If a message is returned with a state which is anything other than &cw(RMR_OK,) the indication
+is that the send was not successful.
+The user application must check the state and determine the course of action.
+If the return value is NULL, no message, the indication is that there was no response
+received within the timeout (max_wait) period of time.
+
+&h2(ERRORS)
+The following values may be passed back in the &ital(state) field of the returned message
+buffer. 
+
+&space
+&beg_dlist(.75i : ^&bold_font )
+&di(RMR_ERR_WHID) The wormhole ID passed in was not associated with an open wormhole, or was out of range for a valid ID.
+&di(RMR_ERR_NOWHOPEN) No wormholes exist, further attempt to validate the ID are skipped.
+&di(RMR_ERR_BADARG) The message buffer pointer did not refer to a valid message.
+&di(RMR_ERR_NOHDR)  The header in the message buffer was not valid or corrupted.
+&end_dlist
+
+&h2(EXAMPLE)
+The following is a simple example of how the a wormhole is created (rmr_wh_open) and then
+how &cw(rmr_wh_send_msg) function is used to send messages.
+Some error checking is omitted for clarity.
+
+&space
+&ex_start
+
+#include <rmr/rmr.h>   // system headers omitted for clarity
+
+int main() {
+   rmr_whid_t whid = -1;   // wormhole id for sending
+   void* mrc;      //msg router context
+        int i;
+   rmr_mbuf_t*  sbuf;      // send buffer
+   int     count = 0;
+
+   mrc = rmr_init( "43086", RMR_MAX_RCV_BYTES, RMRFL_NONE );
+   if( mrc == NULL ) {
+      fprintf( stderr, "[FAIL] unable to initialise RMr environment\n" );
+      exit( 1 );
+   }
+
+   while( ! rmr_ready( mrc ) ) {               // wait for routing table info
+      sleep( 1 );
+   }
+
+   sbuf = rmr_alloc_msg( mrc, 2048 );
+
+   while( 1 ) {
+     if( whid < 0 ) {
+       whid = rmr_wh_open( mrc, "localhost:6123" );  // open fails if endpoint refuses conn
+          if( RMR_WH_CONNECTED( wh ) ) { 
+           snprintf( sbuf->payload, 1024, "periodic update from sender: %d", count++ );
+           sbuf->len =  strlen( sbuf->payload );
+           sbuf = rmr_wh_call( mrc, whid, sbuf, 1000 );                // expect a response in 1s or less
+           if( sbuf != NULL && sbuf->state = RMR_OK ) {
+             sprintf( stderr, "response: %s\n", sbuf->payload );       // assume they sent a string
+           } else {
+             sprintf( stderr, "response not received, or send error\n" );
+           }
+        }
+      }
+
+      sleep( 5 );
+   }
+}
+&ex_end
+
+
+&h2(SEE ALSO )
+.ju off
+rmr_alloc_msg(3),
+rmr_call(3),
+rmr_free_msg(3),
+rmr_init(3),
+rmr_payload_size(3),
+rmr_rcv_msg(3),
+rmr_rcv_specific(3),
+rmr_rts_msg(3),
+rmr_ready(3),
+rmr_fib(3),
+rmr_has_str(3),
+rmr_tokenise(3),
+rmr_mk_ring(3),
+rmr_ring_free(3),
+rmr_set_stimeout(3),
+rmr_wh_open(3),
+rmr_wh_close(3),
+rmr_wh_state(3)
+.ju on
+
index 133abf1..a1bf816 100644 (file)
@@ -15,6 +15,12 @@ file at the repo root; please refer to that file for a
 completely up to date listing of API changes. 
  
  
+2020 March 9; version 3.5.0 
+-------------------------------------------------------------------------------------------- 
+Added new wormhole send function: rmr_wh_call(). 
 2020 March 6; version 3.4.0 
 -------------------------------------------------------------------------------------------- 
  
index a59239d..7f45ec8 100644 (file)
@@ -3703,6 +3703,205 @@ rmr_ready(3), rmr_fib(3), rmr_has_str(3), rmr_tokenise(3),
 rmr_mk_ring(3), rmr_ring_free(3), rmr_set_trace(3) 
  
  
+NAME 
+-------------------------------------------------------------------------------------------- 
+rmr_wh_call 
+SYNOPSIS 
+-------------------------------------------------------------------------------------------- 
+:: 
+  
+ #include <rmr/rmr.h>
+ rmr_mbuf_t* rmr_wh_call( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg, int call_id, int max_wait )
+DESCRIPTION 
+-------------------------------------------------------------------------------------------- 
+The rmr_wh_call function accepts a message buffer (msg) from 
+the user application and attempts to send it using the 
+wormhole ID provided (whid). If the send is successful, the 
+call will block until either a response message is received, 
+or the max_wait number of milliseconds has passed. In order 
+for the response to be recognised as a response, the remote 
+process **must** use rmr_rts_msg() to send their response. 
+Like *rmr_wh_send_msg,* this function attempts to send the 
+message directly to a process at the other end of a wormhole 
+which was created with *rmr_wh-open().* When sending message 
+via wormholes, the normal RMr routing based on message type 
+is ignored, and the caller may leave the message type 
+unspecified in the message buffer (unless it is needed by the 
+receiving process). The call_id parameter is a number in the 
+range of 2 through 255 and is used to identify the calling 
+thread in order to properly match a response message when it 
+arrives. Providing this value, and ensuring the proper 
+uniqueness, is the responsibility of the user application and 
+as such the ability to use the rmr_wh_call() function from 
+potentially non-threaded concurrent applications (such as 
+Go's goroutines) is possible. 
+Retries 
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
+The send operations in RMr will retry *soft* send failures 
+until one of three conditions occurs: 
+1. 
+   
+  The message is sent without error 
+   
+2. 
+   
+  The underlying transport reports a * hard * failure 
+   
+3. 
+   
+  The maximum number of retry loops has been attempted 
+A retry loop consists of approximately 1000 send attemps ** 
+without** any intervening calls to * sleep() * or * usleep(). 
+* The number of retry loops defaults to 1, thus a maximum of 
+1000 send attempts is performed before returning to the user 
+application. This value can be set at any point after RMr 
+initialisation using the * rmr_set_stimeout() * function 
+allowing the user application to completely disable retires 
+(set to 0), or to increase the number of retry loops. 
+Transport Level Blocking 
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
+The underlying transport mechanism used to send messages is 
+configured in *non-blocking* mode. This means that if a 
+message cannot be sent immediately the transport mechanism 
+will **not** pause with the assumption that the inability to 
+send will clear quickly (within a few milliseconds). This 
+means that when the retry loop is completely disabled (set to 
+0), that the failure to accept a message for sending by the 
+underlying mechanisms (software or hardware) will be reported 
+immediately to the user application. 
+It should be noted that depending on the underlying transport 
+mechanism being used, it is extremly possible that during 
+normal operations that retry conditions are very likely to 
+happen. These are completely out of RMr's control, and there 
+is nothing that RMr can do to avoid or midigate these other 
+than by allowing RMr to retry the send operation, and even 
+then it is possible (e.g. during connection reattempts), that 
+a single retry loop is not enough to guarentee a successful 
+send. 
+RETURN VALUE 
+-------------------------------------------------------------------------------------------- 
+On success, new message buffer, with the payload containing 
+the response from the remote endpoint is returned. The state 
+in this buffer will reflect the overall send operation state 
+and should be RMR_OK. 
+If a message is returned with a state which is anything other 
+than RMR_OK, the indication is that the send was not 
+successful. The user application must check the state and 
+determine the course of action. If the return value is NULL, 
+no message, the indication is that there was no response 
+received within the timeout (max_wait) period of time. 
+ERRORS 
+-------------------------------------------------------------------------------------------- 
+The following values may be passed back in the *state* field 
+of the returned message buffer. 
+RMR_ERR_WHID 
+   
+  The wormhole ID passed in was not associated with an open 
+  wormhole, or was out of range for a valid ID. 
+RMR_ERR_NOWHOPEN 
+   
+  No wormholes exist, further attempt to validate the ID are 
+  skipped. 
+RMR_ERR_BADARG 
+   
+  The message buffer pointer did not refer to a valid 
+  message. 
+RMR_ERR_NOHDR 
+   
+  The header in the message buffer was not valid or 
+  corrupted. 
+EXAMPLE 
+-------------------------------------------------------------------------------------------- 
+The following is a simple example of how the a wormhole is 
+created (rmr_wh_open) and then how rmr_wh_send_msg function 
+is used to send messages. Some error checking is omitted for 
+clarity. 
+:: 
+  
+ #include <rmr/rmr.h>    .// system headers omitted for clarity
+ int main() {
+    rmr_whid_t whid = -1;   // wormhole id for sending
+    void* mrc;      //msg router context
+         int i;
+    rmr_mbuf_t*  sbuf;      // send buffer
+    int     count = 0;
+    mrc = rmr_init( "43086", RMR_MAX_RCV_BYTES, RMRFL_NONE );
+    if( mrc == NULL ) {
+       fprintf( stderr, "[FAIL] unable to initialise RMr environment\\n" );
+       exit( 1 );
+    }
+    while( ! rmr_ready( mrc ) ) {    e    i// wait for routing table info
+       sleep( 1 );
+    }
+    sbuf = rmr_alloc_msg( mrc, 2048 );
+    while( 1 ) {
+      if( whid < 0 ) {
+        whid = rmr_wh_open( mrc, "localhost:6123" );  // open fails if endpoint refuses conn
+        w   if( RMR_WH_CONNECTED( wh ) ) { 
+            snprintf( sbuf->payload, 1024, "periodic update from sender: %d", count++ );
+            sbuf->len =  strlen( sbuf->payload );
+            sbuf = rmr_wh_call( mrc, whid, sbuf, 1000 );    f    s// expect a response in 1s or less
+            if( sbuf != NULL && sbuf->state = RMR_OK ) {
+              sprintf( stderr, "response: %s\\n", sbuf->payload );    x// assume they sent a string
+            } else {
+              sprintf( stderr, "response not received, or send error\\n" );
+            }
+         }
+       }
+       sleep( 5 );
+    }
+ }
+SEE ALSO 
+-------------------------------------------------------------------------------------------- 
+rmr_alloc_msg(3), rmr_call(3), rmr_free_msg(3), rmr_init(3), 
+rmr_payload_size(3), rmr_rcv_msg(3), rmr_rcv_specific(3), 
+rmr_rts_msg(3), rmr_ready(3), rmr_fib(3), rmr_has_str(3), 
+rmr_tokenise(3), rmr_mk_ring(3), rmr_ring_free(3), 
+rmr_set_stimeout(3), rmr_wh_open(3), rmr_wh_close(3), 
+rmr_wh_state(3) 
 NAME 
 -------------------------------------------------------------------------------------------- 
  
index cc2c0c8..4932641 100644 (file)
@@ -135,6 +135,7 @@ extern void rmr_set_low_latency( void* vctx );
 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to );
 extern rmr_mbuf_t*  rmr_tralloc_msg( void* context, int msize, int trsize, unsigned const char* data );
 extern rmr_whid_t rmr_wh_open( void* vctx, char const* target );
+extern rmr_mbuf_t* rmr_wh_call( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg, int call_id, int max_wait );
 extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg );
 extern int rmr_wh_state( void* vctx, rmr_whid_t whid );
 extern void rmr_wh_close( void* vctx, int whid );
index c396097..43faf4a 100644 (file)
@@ -311,13 +311,17 @@ static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
-static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel );
+static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf );
 static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
 static void* rtc( void* vctx );
 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
 
 // --------- route manager communications -----------------
-static void send_rt_ack( uta_ctx_t* ctx, char* table_id, int state, char* reason );
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* mbuf, char* table_id, int state, char* reason );
 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx );
 
+// -------- internal functions that can be referenced by common functions -------
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep );
+
+
 #endif
index 3fc7ded..d934da5 100644 (file)
@@ -241,9 +241,15 @@ static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
 
        Context should be the PRIVATE context that we use for messages
        to route manger and NOT the user's context.
+
+       If a message buffere is passed we use that and use return to sender
+       assuming that this might be a response to a call and that is needed
+       to send back to the proper calling thread. If msg is nil, we allocate
+       and use it.
 */
-static void send_rt_ack( uta_ctx_t* ctx, char* table_id, int state, char* reason ) {
-       rmr_mbuf_t*     smsg;
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
+       int             use_rts = 1;
+       int             payload_size = 1024;
        
        if( ctx == NULL || ctx->rtg_whid < 0 ) {
                return;
@@ -253,24 +259,36 @@ static void send_rt_ack( uta_ctx_t* ctx, char* table_id, int state, char* reason
                return;
        }
 
-       smsg = rmr_alloc_msg( ctx, 1024 );
+       if( smsg != NULL ) {
+               smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
+       } else {
+               use_rts = 0;
+               smsg = rmr_alloc_msg( ctx, payload_size );
+       }
+
        if( smsg != NULL ) {
                smsg->mtype = RMRRM_TABLE_STATE;
-               smsg->sub_id = 0;
-               snprintf( smsg->payload, 1024, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", 
+               smsg->sub_id = -1;
+               snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", 
                        table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
 
                smsg->len = strlen( smsg->payload ) + 1;
        
                rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
-               smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+               if( use_rts ) {
+                       smsg = rmr_rts_msg( ctx, smsg );
+               } else {
+                       smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+               }
                if( (state = smsg->state) != RMR_OK ) {
                        rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
                        rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
                        ctx->rtg_whid = -1;
                }
 
-               rmr_free_msg( smsg );
+               if( ! use_rts ) {
+                       rmr_free_msg( smsg );                   // if not our message we must free the leftovers
+               }
        }
 }
 
@@ -672,8 +690,14 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
        messages back to the route manager.  The regular ctx is the ctx that
        the user has been given and thus that's where we have to hang the route
        table we're working with.
+
+       If mbuf is given, and we need to ack, then we ack using the mbuf and a
+       return to sender call (allows route manager to use wh_call() to send
+       an update and rts is required to get that back to the right thread).
+       If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
+       will be used.
 */
-static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel ) {
+static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
        int i;
        int ntoks;                                                      // number of tokens found in something
        int ngtoks;
@@ -723,7 +747,7 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                                        rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
                                                                ctx->new_rtable->updates, tokens[2] );
                                                        snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
-                                                       send_rt_ack( pctx, ctx->table_id, !RMR_OK, wbuf );
+                                                       send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
                                                        uta_rt_drop( ctx->new_rtable );
                                                        ctx->new_rtable = NULL;
                                                        break;
@@ -744,14 +768,14 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                                        rt_stats( ctx->rtable );
                                                }
 
-                                               send_rt_ack( pctx, ctx->table_id, RMR_OK, NULL );
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
                                        } else {
                                                if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
                                        }
                                } else {                                                                                                                        // start a new table.
                                        if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
-                                               send_rt_ack( pctx, ctx->table_id, !RMR_OK, "table not complete" );                      // nack the one that was pending as end never made it
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
 
                                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
                                                uta_rt_drop( ctx->new_rtable );
@@ -909,7 +933,7 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
                        return;
                }
 
-               parse_rt_rec( ctx, NULL, rec, vlevel );                 // no pvt context as we can't ack
+               parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
 
                rec = eor+1;
        }
index 45a277a..aa829ef 100644 (file)
@@ -1,8 +1,8 @@
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -323,7 +323,7 @@ static void* rtc( void* vctx ) {
                                                if( vlevel > 1 ) {
                                                        rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
                                                }
-                                               parse_rt_rec( ctx, pvt_cx, curr, vlevel );              // parse record and add to in progress table
+                                               parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
 
                                                curr = nextr;
                                        }
@@ -570,9 +570,9 @@ static void* raw_rtc( void* vctx ) {
                                        rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
                                }
                                if( raw_interface ) {
-                                       parse_rt_rec( ctx, NULL, curr, vlevel );                // nil pvt to parser as we can't ack messages
+                                       parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
                                } else {
-                                       parse_rt_rec( ctx, pvt_cx, curr, vlevel );              // parse record and add to in progress table
+                                       parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
                                }
 
                                curr = nextr;
index d66ecdd..248748f 100644 (file)
@@ -297,6 +297,60 @@ extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg
        return send2ep( ctx, ep, msg );                                                 // send directly to the endpoint
 }
 
+/*
+       Send a message directly to an open wormhole and then block until a response has
+       been received.  The return is the same as for rmr_call(); the received buffer
+       or nil if no response was received.
+*/
+extern rmr_mbuf_t* rmr_wh_call( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg, int call_id, int max_wait ) {
+       uta_ctx_t*      ctx;
+       endpoint_t*     ep;                             // enpoint that wormhole ID references
+       wh_mgt_t *whm;
+       char* d1;                                       // point at the call-id in the header
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
+               errno = EINVAL;                                                                                         // if msg is null, this is their clue
+               if( msg != NULL ) {
+                       msg->state = RMR_ERR_BADARG;
+                       errno = EINVAL;                                                                                 // must ensure it's not eagain
+               }
+               return msg;
+       }
+
+       msg->state = RMR_OK;
+
+       if( (whm = ctx->wormholes) == NULL ) {
+               errno = EINVAL;                                                                                         // no wormholes open
+               msg->state = RMR_ERR_NOWHOPEN;
+               return msg;
+       }
+
+       if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
+               errno = EINVAL;
+               msg->state = RMR_ERR_WHID;
+               return msg;
+       }
+
+       errno = 0;
+       if( msg->header == NULL ) {
+               rmr_vlog( RMR_VL_ERR, "rmr_wh_call: message had no header\n" );
+               msg->state = RMR_ERR_NOHDR;
+               errno = EBADMSG;                                                                                // must ensure it's not eagain
+               return msg;
+       }
+
+       ep = whm->eps[whid];
+       if( ep != NULL ) {
+               if( ! ep->open ) {
+                       rmr_wh_open( ctx, ep->name );
+               }
+               return mt_call( vctx, msg, call_id, max_wait, ep );                     // use main (internal) call to setup and block
+       }
+
+       msg->state = RMR_ERR_NOENDPT;
+       return msg;
+}
+
 /*
        This will "close" a wormhole.  We don't actually drop the session as that might be needed
        by others, but we do pull the ep reference from the list.
index 3b590fa..8d55b44 100644 (file)
@@ -978,23 +978,15 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        return mbuf;
 }
 
-/*
-       Accept a message buffer and caller ID, send the message and then wait
-       for the receiver to tickle the semaphore letting us know that a message
-       has been received. The call_id is a value between 2 and 255, inclusive; if
-       it's not in this range an error will be returned. Max wait is the amount
-       of time in millaseconds that the call should block for. If 0 is given
-       then no timeout is set.
 
-       If the mt_call feature has not been initialised, then the attempt to use this
-       funciton will fail with RMR_ERR_NOTSUPP
+/*
+       This does the real work behind both of the outward facing call functions. See 
+       the rmr_mt_call() description for details modulo the comments blow.
 
-       If no matching message is received before the max_wait period expires, a
-       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
-       occurs after the message has been sent, then a nil pointer is returned
-       with errno set to some other value.
+       If ep is given, then we skip the normal route table endpoint selection. This is
+       likely a wormhole call.
 */
-extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
        rmr_mbuf_t* ombuf;                      // original mbuf passed in
        uta_ctx_t*      ctx;
        uta_mhdr_t*     hdr;                    // header in the transport buffer
@@ -1062,7 +1054,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                seconds = 1;                                                                            // use as flag later to invoked timed wait
        }
 
-       mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
+       if( ep != NULL ) {
+               mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
+       } else {
+               mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
+       }
        if( mbuf ) {
                if( mbuf->state != RMR_OK ) {
                        mbuf->tp_state = errno;
@@ -1104,6 +1100,30 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        return mbuf;
 }
 
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+
+       This is now just a wrapper to the real work horse so that we can provide
+       this and wormhole call functions without duplicating code.
+
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+       return mt_call( vctx, mbuf, call_id, max_wait, NULL );
+}
+
 /*
        Given an existing message buffer, reallocate the payload portion to
        be at least new_len bytes.  The message header will remain such that
index 047401d..8606241 100644 (file)
@@ -204,7 +204,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
                d1 = DATA1_ADDR( msg->header );
-               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
        }
 
        return mtosend_msg( vctx, msg, max_to );
@@ -939,23 +939,19 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        return mbuf;
 }
 
-/*
-       Accept a message buffer and caller ID, send the message and then wait
-       for the receiver to tickle the semaphore letting us know that a message
-       has been received. The call_id is a value between 2 and 255, inclusive; if
-       it's not in this range an error will be returned. Max wait is the amount
-       of time in millaseconds that the call should block for. If 0 is given
-       then no timeout is set.
 
-       If the mt_call feature has not been initialised, then the attempt to use this
-       funciton will fail with RMR_ERR_NOTSUPP
 
-       If no matching message is received before the max_wait period expires, a
-       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
-       occurs after the message has been sent, then a nil pointer is returned
-       with errno set to some other value.
+
+/*
+       This is the work horse for the multi-threaded call() function. It supports
+       both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
+       for for rmr_mt_call() modulo the caveat below.
+
+       If endpoint is given, then we assume that we're not doing normal route table
+       routing and that we should send directly to that endpoint (probably worm
+       hole).
 */
-extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
        rmr_mbuf_t* ombuf;                      // original mbuf passed in
        uta_ctx_t*      ctx;
        uta_mhdr_t*     hdr;                    // header in the transport buffer
@@ -1023,7 +1019,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                seconds = 1;                                                                            // use as flag later to invoked timed wait
        }
 
-       mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
+       if( ep == NULL ) {                                                                              // normal routing
+               mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
+       } else {
+               mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
+       }
        if( mbuf ) {
                if( mbuf->state != RMR_OK ) {
                        mbuf->tp_state = errno;
@@ -1066,6 +1066,29 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        return mbuf;
 }
 
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+
+       This is now just an outward facing wrapper so we can support wormhole calls.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+       return mt_call( vctx, mbuf, call_id, max_wait, NULL );
+}
+
+
 /*
        Given an existing message buffer, reallocate the payload portion to
        be at least new_len bytes.  The message header will remain such that
index 58e1e17..50aaeb8 100644 (file)
 #ifndef _sr_si_static_c
 #define _sr_si_static_c
 
-static void dump_40( char *p, char* label ) {
+static void dump_n( char *p, char* label, int n ) {
        int i;
+       int j;
+       int t = 0;
+       int     rows;
+
 
-       if( label )
-               fprintf( stderr, ">>>>> %s p=%p\n", label, p );
+       if( label ) {
+               fprintf( stderr, ">>>>> %s p=%p %d bytes\n", label, p, n );
+       }
+       
+       rows = (n/16) + ((n % 16) ? 1 : 0);
+       
+       for( j = 0; j < rows; j++ ) {
+               fprintf( stderr, "%04x: ", j * 16 );
 
-       for( i = 0; i < 40; i++ ) {
-               fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
+               for( i = 0; t < n && i < 16; i++, t++ ) {
+                       fprintf( stderr, "%02x ", (unsigned char) *p );
+                       p++;
+               }
+               fprintf( stderr, "\n" );
        }
-       fprintf( stderr, "\n" );
+}
+
+/*
+       backwards compatability.
+*/
+static void dump_40( char *p, char* label ) {
+       dump_n( p, label, 40 ); 
 }
 
 /*