Add SI95 transport support 57/2257/3 3.0.0
authorE. Scott Daniels <daniels@research.att.com>
Wed, 13 Nov 2019 14:40:22 +0000 (09:40 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Fri, 17 Jan 2020 16:14:49 +0000 (11:14 -0500)
The SI95 transport library provides an alternate transport
layer to NNG. The API presented by RMR on top of SI95
is exactly the same; only a link with rmr_si is needed to
make use of the alternate transport mechanism.

SI95 is NOT compatable with NNG and thus all RMR based
applications which must communicate must use the same
underlying transport library.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I0a971fa0f5de2dbb2fb7a95d5428e709b630f730

51 files changed:
CHANGES
CMakeLists.txt
examples/Makefile
examples/receiver.c
src/rmr/common/include/rmr.h
src/rmr/common/include/rmr_agnostic.h
src/rmr/common/src/ring_static.c
src/rmr/common/src/rt_generic_static.c
src/rmr/common/src/wormholes.c
src/rmr/nng/include/rmr_nng_private.h
src/rmr/nng/src/rmr_nng.c
src/rmr/nng/src/rtable_nng_static.c
src/rmr/si/CMakeLists.txt [new file with mode: 0644]
src/rmr/si/include/rmr_si_private.h [new file with mode: 0644]
src/rmr/si/src/mt_call_si_static.c [new file with mode: 0644]
src/rmr/si/src/rmr_si.c [new file with mode: 0644]
src/rmr/si/src/rtable_si_static.c [new file with mode: 0644]
src/rmr/si/src/rtc_si_static.c [new file with mode: 0644]
src/rmr/si/src/si95/README [new file with mode: 0644]
src/rmr/si/src/si95/siaddress.c [new file with mode: 0644]
src/rmr/si/src/si95/sialloc.c [new file with mode: 0644]
src/rmr/si/src/si95/sibldpoll.c [new file with mode: 0644]
src/rmr/si/src/si95/sicbreg.c [new file with mode: 0644]
src/rmr/si/src/si95/sicbstat.c [new file with mode: 0644]
src/rmr/si/src/si95/siclose.c [new file with mode: 0644]
src/rmr/si/src/si95/siconnect.c [new file with mode: 0644]
src/rmr/si/src/si95/siconst.h [new file with mode: 0644]
src/rmr/si/src/si95/siestablish.c [new file with mode: 0644]
src/rmr/si/src/si95/sigetadd.c [new file with mode: 0644]
src/rmr/si/src/si95/sigetname.c [new file with mode: 0644]
src/rmr/si/src/si95/siinit.c [new file with mode: 0644]
src/rmr/si/src/si95/silisten.c [new file with mode: 0644]
src/rmr/si/src/si95/sinew.c [new file with mode: 0644]
src/rmr/si/src/si95/sinewses.c [new file with mode: 0644]
src/rmr/si/src/si95/sipoll.c [new file with mode: 0644]
src/rmr/si/src/si95/siproto.h [new file with mode: 0644]
src/rmr/si/src/si95/sircv.c [new file with mode: 0644]
src/rmr/si/src/si95/sisend.c [new file with mode: 0644]
src/rmr/si/src/si95/sisendt.c [new file with mode: 0644]
src/rmr/si/src/si95/sisetup.h [new file with mode: 0644]
src/rmr/si/src/si95/sishutdown.c [new file with mode: 0644]
src/rmr/si/src/si95/sistruct.h [new file with mode: 0644]
src/rmr/si/src/si95/siterm.c [new file with mode: 0644]
src/rmr/si/src/si95/sitransport.h [new file with mode: 0644]
src/rmr/si/src/si95/sitrash.c [new file with mode: 0644]
src/rmr/si/src/si95/siwait.c [new file with mode: 0644]
src/rmr/si/src/si95/socket_if.h [new file with mode: 0644]
src/rmr/si/src/sr_si_static.c [new file with mode: 0644]
test/app_test/Makefile
test/app_test/caller.c
test/app_test/sender.c

diff --git a/CHANGES b/CHANGES
index 11264c5..1a46b2a 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,11 @@
 API and build change  and fix summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2020 January 16; version 3.0.0
+       Introduce support for SI95 transport library to replace NNG.
+       (RMR library versions will use leading odd numbers to avoid tag collisions
+        with the wrapper tags which will use even numbers.)
+
 2019 December 9; version 1.13.1
        Correct documentation and missing rel-notes update for RTD.
 
index a755c1b..2cff539 100644 (file)
@@ -19,6 +19,7 @@
 
 # This CMake definition supports several -D command line options:
 #
+#      -DDEBUG=n                       Enable debugging level n
 #      -DDEV_PKG=1                     Development package configuration
 #      -DBUILD_DOC=1           Man pages generated
 #      -DIGNORE_LIBDIR=1       Installation of rmr libries is into /usr/local/lib and ignores
 #      -DPACK_EXTERNALS=1      Include external libraries used to build in the run-time package
 #                                              (This makes some stand-alone unit testing of bindings possible, it
 #                                              is not meant to be used for production package generation.)
+#      -DGPROF=1                       Enable profiling compile time flags
 #      -DSKIP_EXTERNALS=1      Do not use NNG submodule when building; uee installed packages
 #      -DMAN_PREFIX=<path>     Supply a path where man pages are installed (default: /usr/share/man)
 
 project( rmr LANGUAGES C )
 cmake_minimum_required( VERSION 3.5 )
 
-set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
-set( minor_version "13" )
-set( patch_level "1" )
+set( major_version "3" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
+set( minor_version "0" )
+set( patch_level "0" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
@@ -100,10 +102,11 @@ if( DEBUG )                                       # if set, we'll set debugging on in the compile
        message( "+++ debugging is being set to ${DEBUG}" )
 else()
        set( debugging 0 )
-        message( "+++ debugging is set to off" )
+       message( "+++ debugging is set to off" )
 endif()
 unset( DEBUG CACHE )                                   # we don't want this to persist
 
+
 # define constants used in the version string, debugging, etc.
 add_definitions(
        -DGIT_ID=${git_id}
@@ -225,11 +228,21 @@ include_directories( "${CMAKE_CURRENT_BINARY_DIR}/include" )
 # Compiler flags
 #
 set( CMAKE_POSITION_INDEPENDENT_CODE ON )
-set( CMAKE_CXX_FLAGS "-g -Wall " )
+#set( CMAKE_C_FLAGS "-g -Wall " )
+#set( CMAKE_C_FLAGS "-g " )
+if( GPROF )                                    # if set, we'll set profiling flag on compiles
+       message( "+++ profiling is on" )
+       set( CMAKE_C_FLAGS "-pg " )
+else()
+       message( "+++ profiling is off" )
+       set( CMAKE_C_FLAGS "-g " )
+endif()
+unset( GPROF CACHE )                                   # we don't want this to persist
 
 # Include modules
 add_subdirectory( src/rmr/common )
 add_subdirectory( src/rmr/nng )
+add_subdirectory( src/rmr/si )
 add_subdirectory( doc )                                # this will auto skip if {X}fm is not available
 
 
@@ -254,6 +267,23 @@ if( DEV_PKG )
                VERSION ${major_version}.${minor_version}.${patch_level} )
 endif()
 
+add_library( rmr_si_shared SHARED "$<TARGET_OBJECTS:rmr_si_objects>;$<TARGET_OBJECTS:common_objects>" )
+set_target_properties( rmr_si_shared
+       PROPERTIES
+       OUTPUT_NAME "rmr_si"
+       SOVERSION ${major_version}
+       VERSION ${major_version}.${minor_version}.${patch_level} )
+
+# we only build/export the static archive (.a) if generating a dev package
+if( DEV_PKG )
+       add_library( rmr_si_static STATIC "$<TARGET_OBJECTS:rmr_si_objects>;$<TARGET_OBJECTS:common_objects>" )
+       set_target_properties( rmr_si_static
+               PROPERTIES
+               OUTPUT_NAME "rmr_si"
+               SOVERSION ${major_version}
+               VERSION ${major_version}.${minor_version}.${patch_level} )
+endif()
+
 # if externals need to be built, then we must force them to be built first by depending on them
 if( need_ext )
        if( DEV_PKG )
@@ -274,9 +304,9 @@ endif()
 # the library (.so) files and nothing more.
 #
 if( DEV_PKG )
-       set( target_list "rmr_nng_static" )
+       set( target_list "rmr_nng_static;rmr_si_static" )
 else()
-       set( target_list "rmr_nng_shared" )
+       set( target_list "rmr_nng_shared;rmr_si_shared" )
 endif()
 
 install( TARGETS ${target_list} EXPORT LibraryConfig
index 2572282..4935ee7 100644 (file)
 #
 
 # The demo programmes assume that RMr (along with nng) is installed
-# It may be necessary to set LD_LIBRARY_PATH=/usr/local/lib
+# It may be necessary to set LD_LIBRARY_PATH=/usr/local/lib. The SI95
+# library is a part of RMR, so there is no third library needed for
+# linking the SI95 based examples.
 
 
 .PHONY: all
-all: sender receiver
+all: sender receiver sender_si receiver_si
 
+# ---- binaries on top of NNG -------------------------
 receiver: receiver.c
        gcc $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
@@ -35,3 +38,21 @@ health_check: health_check.c
 msg_echo: msg_echo.c
        gcc $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
+
+
+# ----- binaries linked on top of SI95 -----------------
+# These all are based on the same source, but just need 
+# different link options.
+#
+
+receiver_si: receiver.c
+       gcc $< -g -o $@ -lrmr_si -lpthread -lm
+
+sender_si: sender.c
+       gcc $< -g -o $@ -lrmr_si -lpthread -lm
+
+health_check_: health_check.c
+       gcc $< -g -o $@ -lrmr_si -lpthread -lm
+
+msg_echo_si: msg_echo.c
+       gcc $< -g -o $@ -lrmr_si -lpthread -lm
index 6d31bb2..215fd54 100644 (file)
@@ -54,7 +54,7 @@ int main( int argc, char** argv ) {
        rmr_mbuf_t* msg = NULL;                         // message received
        int stat_freq = 10;                             // write stats after reciving this many messages
        int i;
-       char*   listen_port;
+       char*   listen_port = "4560";           // default to what has become the standard RMR port
        long long count = 0;
        long long bad = 0;
        long long empty = 0;
index a83ed90..e3d08d9 100644 (file)
@@ -102,6 +102,9 @@ typedef struct {
        unsigned char* id;                      // if we need an ID in the message separate from the xaction id
        int             flags;                          // various MFL_ (private) flags as needed
        int             alloc_len;                      // the length of the allocated space (hdr+payload)
+
+       void*   ring;                           // ring this buffer should be queued back to
+       int             rts_fd;                         // SI fd for return to sender
 } rmr_mbuf_t;
 
 
@@ -112,6 +115,7 @@ typedef int rmr_whid_t;                     // wormhole identifier returned by rmr_wh_open(), pass
 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size );
 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg );
 extern void rmr_close( void* vctx );
+extern void rmr_set_fack( void* vctx );
 extern void* rmr_init( char* proto_port, int max_msg_size, int flags );
 extern int rmr_init_trace( void* vctx, int size );
 extern int rmr_payload_size( rmr_mbuf_t* msg );
@@ -124,6 +128,7 @@ extern int rmr_ready( void* vctx );
 extern int rmr_set_rtimeout( void* vctx, int time );
 extern int rmr_set_stimeout( void* vctx, int time );
 extern int rmr_get_rcvfd( void* vctx );                                                                // only supported with nng
+extern void rmr_set_low_latency( void* vctx );
 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to );
 extern rmr_mbuf_t*  rmr_tralloc_msg( void* context, int msize, int trsize, unsigned const char* data );
 extern rmr_whid_t rmr_wh_open( void* vctx, char const* target );
index df4a316..86f4660 100644 (file)
@@ -243,6 +243,7 @@ typedef struct ring {
        uint16_t tail;                          // index of the tail (extract point)
        uint16_t nelements;                     // number of elements in the ring
        void**  data;                           // the ring data (pointers to blobs of stuff)
+       int             pfd;                            // event fd for the ring for epoll
 } ring_t;
 
 
index dafc946..93dfdb7 100644 (file)
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <sys/eventfd.h>
 
 #define RING_FAST 1                    // when set we skip nil pointer checks on the ring pointer
 
+/*
+       This returns the ring's pollable file descriptor. If one does not exist, then
+       it is created.
+*/
+static int uta_ring_getpfd( void* vr ) {
+       ring_t*         r;
+
+       if( !RING_FAST ) {                                                              // compiler should drop the conditional when always false
+               if( (r = (ring_t*) vr) == NULL ) {
+                       return 0;
+               }
+       } else {
+               r = (ring_t*) vr;
+       }
+
+       if( r->pfd < 0 ) {
+               r->pfd = eventfd( 0, EFD_SEMAPHORE | EFD_NONBLOCK );
+       }
+
+       return r->pfd;
+}
+
 /*
        Make a new ring.
 */
@@ -63,6 +86,7 @@ static void* uta_mk_ring( int size ) {
        }
 
        memset( r->data, 0, sizeof( void** ) * r->nelements );
+       r->pfd = eventfd( 0, EFD_SEMAPHORE | EFD_NONBLOCK );            // in semaphore mode counter is maintained with each insert/extract
        return (void *) r;
 }
 
@@ -88,6 +112,7 @@ static void uta_ring_free( void* vr ) {
 static inline void* uta_ring_extract( void* vr ) {
        ring_t*         r;
        uint16_t        ti;             // real index in data
+       int64_t ctr;            // pfd counter
 
        if( !RING_FAST ) {                                                              // compiler should drop the conditional when always false
                if( (r = (ring_t*) vr) == NULL ) {
@@ -107,6 +132,12 @@ static inline void* uta_ring_extract( void* vr ) {
                r->tail = 0;
        }
 
+       read( r->pfd, &ctr, sizeof( ctr )  );                           // when not in semaphore, this zeros the counter and value is meaningless
+/*
+future -- investigate if it's possible only to set/clear when empty or going to empty
+       if( r->tail == r->head ) {                                                              // if this emptied the ring, turn off ready
+       }
+*/
        return r->data[ti];
 }
 
@@ -116,6 +147,7 @@ static inline void* uta_ring_extract( void* vr ) {
 */
 static inline int uta_ring_insert( void* vr, void* new_data ) {
        ring_t*         r;
+       int64_t inc = 1;                                // used to set the counter in the pfd
 
        if( !RING_FAST ) {                                                              // compiler should drop the conditional when always false
                if( (r = (ring_t*) vr) == NULL ) {
@@ -129,6 +161,13 @@ static inline int uta_ring_insert( void* vr, void* new_data ) {
                return 0;
        }
 
+       write( r->pfd, &inc, sizeof( inc ) );
+/*
+future -- investigate if it's possible only to set/clear when empty or going to empty
+       if( r->tail == r->head ) {                                                              // turn on ready if ring was empty
+       }
+*/
+
        r->data[r->head] = new_data;
        r->head++;
        if( r->head >= r->nelements ) {
index 5a2b1fa..3c73847 100644 (file)
@@ -344,6 +344,7 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r
                                ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
                        }
                        rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
+                       rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
 
                        for( grp = 0; grp < ngtoks; grp++ ) {
                                if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
@@ -1085,6 +1086,7 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
                        return NULL;
                }
 
+               ep->notify = 1;                                                         // show notification on first connection failure
                ep->open = 0;                                                           // not connected
                ep->addr = uta_h2ip( ep_name );
                ep->name = strdup( ep_name );
index 2518b7a..2bc5c9f 100644 (file)
@@ -230,7 +230,7 @@ extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
                }
        }
 
-       if( !rt_link2_ep( ep ) ) {                      // start a connection if open already
+       if( !rt_link2_ep( ctx, ep ) ) {                 // start a connection if not already open
                errno = ECONNREFUSED;
                return -1;
        }
index bf59574..878742c 100644 (file)
@@ -44,6 +44,7 @@ struct endpoint {
        nng_dialer      dialer;         // the connection specific information (retry timout etc)
        int             open;                   // set to true if we've connected as socket cannot be checked directly)
        pthread_mutex_t gate;   // we must serialise when we open/link to the endpoint
+       int             notify;                 // when set we can write connect failure msgs to stderr
        long long scounts[EPSC_SIZE];           // send counts (indexed by EPSCOUNT_* constants
 };
 
@@ -107,7 +108,7 @@ static void free_ctx( uta_ctx_t* ctx );
 
 // --- rt table things ---------------------------
 static int uta_link2( endpoint_t* ep );
-static int rt_link2_ep( endpoint_t* ep );
+static int rt_link2_ep( void* vctx,  endpoint_t* ep );
 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp );
 static int uta_epsock_rr( rtable_ent_t* rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp );
 static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt );
index 7eafc81..720f6ad 100644 (file)
@@ -1114,3 +1114,12 @@ extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int co
 
        return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
 }
+
+/*
+       The following functions are "dummies" as NNG has no concept of supporting
+       them, but are needed to resolve calls at link time.
+*/
+
+extern void rmr_set_fack( void* p ) {
+       return;
+}
index 81e79ca..f5c9b7a 100644 (file)
@@ -138,8 +138,10 @@ static int uta_link2( endpoint_t* ep ) {
 /*
        This provides a protocol independent mechanism for establishing the connection to an endpoint.
        Return is true (1) if the link was opened; false on error.
+
+       For some flavours, the context is needed by this function, but not for nng.
 */
-static int rt_link2_ep( endpoint_t* ep ) {
+static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
        if( ep == NULL ) {
                return FALSE;
        }
diff --git a/src/rmr/si/CMakeLists.txt b/src/rmr/si/CMakeLists.txt
new file mode 100644 (file)
index 0000000..8afe7c2
--- /dev/null
@@ -0,0 +1,62 @@
+
+#
+#==================================================================================
+#      Copyright (c) 2019 Nokia 
+#      Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+
+# for clarity: this generates object, not a lib as the CM command implies. We also tag
+# them as rmr_si objects inasmuch as we might maintain the si code somewhere in this
+# repo as well.
+#
+add_library( rmr_si_objects OBJECT 
+       src/rmr_si.c 
+
+       src/si95/siaddress.c
+       src/si95/sibldpoll.c
+       src/si95/sicbreg.c
+       src/si95/sicbstat.c
+       src/si95/siclose.c
+       src/si95/siconnect.c
+       src/si95/siestablish.c
+       src/si95/sigetadd.c
+       src/si95/sigetname.c
+       src/si95/siinit.c
+       src/si95/silisten.c
+       src/si95/sinew.c
+       src/si95/sinewses.c
+       src/si95/sipoll.c
+       src/si95/sircv.c
+       src/si95/sisend.c
+       src/si95/sisendt.c
+       src/si95/sishutdown.c
+       src/si95/siterm.c
+       src/si95/sitrash.c
+       src/si95/siwait.c
+)
+
+#if( need_ext )
+#      add_dependencies( nano_objects si )             # force external things to build first
+#endif()
+
+target_include_directories (rmr_si_objects PUBLIC 
+       $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
+       $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../common/include>
+       $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../common/src>
+       $<INSTALL_INTERFACE:include>
+       PRIVATE src)
+
diff --git a/src/rmr/si/include/rmr_si_private.h b/src/rmr/si/include/rmr_si_private.h
new file mode 100644 (file)
index 0000000..132873c
--- /dev/null
@@ -0,0 +1,186 @@
+//  vim: ts=4 sw=4 noet :
+/*
+==================================================================================
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       uta_si_private.h
+       Abstract:       Private header information for the uta nng library functions.
+                               These are structs which have specific NNG types; anything that
+                               does not can be included in the common/rmr_agnostic.h header.
+
+       Author:         E. Scott Daniels
+       Date:           31 November 2018
+
+       Mods:           28 Feb 2019 -- moved the majority to the agnostic header.
+*/
+
+#ifndef _uta_private_h
+#define _uta_private_h
+
+// if pmode is off we don't compile in some checks in hopes of speeding things up
+#ifndef PARINOID_CHECKS
+#      define PARINOID_CHECKS 0
+#endif
+
+
+// ---- si specific things  -------------------------------------------
+
+#define TP_HDR_LEN     50              // bytes added to each message for transport needs
+
+                                                       // river states
+#define RS_NEW         0               // river is unitialised
+#define RS_GOOD                1               // flow is in progress
+#define RS_RESET       2               // flow was interrupted; reset on next receive
+
+#define SI_MAX_ADDR_LEN                512
+
+/*
+       Manages a river of inbound bytes.
+*/
+typedef struct {
+       int             state;          // RS_* constants
+       char*   accum;          // bytes being accumulated
+       int             nbytes;         // allocated size of accumulator
+       int             ipt;            // insertion point in accumulator
+       //int           max;            // size of accum
+       //int           expected;       // expected for a complete message
+       int             msg_size;       // size of the message being accumulated
+} river_t;
+
+
+/*
+       Callback context.
+typedef struct {
+       uta_ctx_t*      ctx;
+       
+} cbctx_t;
+*/
+
+// ---------------------------- mainline rmr things ----------------
+
+
+/*
+       Manages an endpoint. Type def for this is defined in agnostic.
+*/
+struct endpoint {
+       char*   name;                   // end point name (symtab reference)
+       char*   proto;                  // connection proto (should only be TCP, but future might bring others)
+       char*   addr;                   // address used for connection
+       int             nn_sock;                // we'll keep calling it nn_ because it's less changes to the code
+       //nng_dialer    dialer;         // the connection specific information (retry timout etc)
+       int             open;                   // set to true if we've connected as socket cannot be checked directly)
+       pthread_mutex_t gate;   // we must serialise when we open/link to the endpoint
+       long long scounts[EPSC_SIZE];           // send counts (indexed by EPSCOUNT_* constants
+
+                                                       // SI specific things
+       int notify;                             // if we fail, we log once until a connection happens; notify if set
+};
+
+/*
+       Epoll information needed for the rmr_torcv_msg() funciton
+*/
+typedef struct epoll_stuff {
+       struct epoll_event events[1];                           // wait on 1 possible events
+       struct epoll_event epe;                                         // event definition for event to listen to
+       int ep_fd;                                                                      // file des from nng
+       int poll_fd;                                                            // fd from nng
+} epoll_stuff_t;
+
+/*
+       Context describing our world. Should be returned to user programme on
+       call to initialise, and passed as first parm on all calls to other
+       visible functions.
+
+       The typedef is declared in the agnostic header.
+*/
+struct uta_ctx {
+       char*   my_name;                        // dns name of this host to set in sender field of a message
+       char*   my_ip;                          // the ip address we _think_ we are using sent in src_ip of the message for rts
+       int     shutdown;                               // thread notification if we need to tell them to stop
+       int max_mlen;                           // max message length payload+header
+       int     max_plen;                               // max payload length
+       int     flags;                                  // CTXFL_ constants
+       int nrtele;                                     // number of elements in the routing table
+       int send_retries;                       // number of retries send_msg() should attempt if eagain/timeout indicated by nng
+       int     trace_data_len;                 // number of bytes to allocate in header for trace data
+       int d1_len;                                     // extra header data 1 length
+       int d2_len;                                     // extra header data 2 length   (future)
+       int     nn_sock;                                // our general listen socket
+       route_table_t* rtable;          // the active route table
+       route_table_t* old_rtable;      // the previously used rt, sits here to allow for draining
+       route_table_t* new_rtable;      // route table under construction
+       if_addrs_t*     ip_list;                // list manager of the IP addresses that are on our known interfaces
+       void*   mring;                          // ring where msgs are queued while waiting for a call response msg
+       chute_t*        chutes;
+
+       char*   rtg_addr;                       // addr/port of the route table generation publisher
+       int             rtg_port;                       // the port that the rtg listens on
+
+       wh_mgt_t*       wormholes;              // management of user opened wormholes
+       epoll_stuff_t*  eps;            // epoll information needed for the rcv with timeout call
+
+       pthread_t       rtc_th;                 // thread info for the rtc listener
+       pthread_t       mtc_th;                 // thread info for the multi-thread call receive process
+
+                                                               // added for SI95 support
+       si_ctx_t*       si_ctx;                 // the socket context
+       int                     nrivers;                // allocated rivers
+       river_t*        rivers;                 // inbound flows (index is the socket fd)
+       int                     max_ibm;                // max size of an inbound message (river accum alloc size)
+       void*           zcb_mring;              // zero copy buffer mbuf ring
+};
+
+typedef uta_ctx_t uta_ctx;
+
+
+/*
+       Static prototypes for functions located here. All common protos are in the
+       agnostic header file.
+*/
+
+// --- initialisation and housekeeping -------
+static void* init(  char* uproto_port, int max_msg_size, int flags );
+static void free_ctx( uta_ctx_t* ctx );
+
+// --- rt table things ---------------------------
+static void uta_ep_failed( endpoint_t* ep );
+static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep );
+static int rt_link2_ep( void* vctx, endpoint_t* ep );
+static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt );
+static inline int xlate_si_state( int state, int def_state );
+
+// --- these have changes for si
+static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
+static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
+
+
+// --- msg ---------------------------------------
+static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo );
+static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state );
+static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) ;
+static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  );
+static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
+static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
+static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  );
+static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
+
+static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries );
+
+
+#endif
diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c
new file mode 100644 (file)
index 0000000..6e2e8aa
--- /dev/null
@@ -0,0 +1,260 @@
+// : vi ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       mt_call_si static.c
+       Abstract:       Static funcitons related to the multi-threaded call feature
+                               which are SI specific.
+
+       Author:         E. Scott Daniels
+       Date:           20 May 2019
+*/
+
+#ifndef _mtcall_si_static_c
+#define _mtcall_si_static_c
+#include <semaphore.h>
+
+static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
+       static  int warned = 0;
+       chute_t*        chute;
+
+       if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
+               rmr_free_msg( mbuf );                                                           // drop if ring is full
+               if( !warned ) {
+                       fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+                       warned++;
+               }
+
+               return;
+       }
+
+       chute = &ctx->chutes[0];
+       sem_post( &chute->barrier );                                                            // tickle the ring monitor
+}
+
+/*
+       Allocate a message buffer, point it at the accumulated (raw) message,
+       call ref to point to all of the various bits and set real len etc,
+       then we queue it.  Raw_msg is expected to include the transport goo
+       placed in front of the RMR header and payload.
+
+       done -- FIX ME?? can we eliminate the buffer copy here?
+*/
+static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
+       rmr_mbuf_t*             mbuf;
+       uta_mhdr_t*             hdr;            // header of the message received
+       unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
+       chute_t*                chute;
+       unsigned int    call_id;        // the id assigned to the call generated message
+
+       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+               if( raw_msg == NULL || msg_size <= 0 ) {
+                       return;
+               }
+       }
+
+/*
+       if( (mbuf = (rmr_mbuf_t *) malloc( sizeof( *mbuf ))) != NULL ) {                // alloc mbuf and point at various bits of payload
+               memset( mbuf, 0, sizeof( *mbuf ) );
+               mbuf->tp_buf = raw_msg;
+               mbuf->ring = ctx->zcb_mring;
+*/
+       if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
+               mbuf->tp_buf = raw_msg;
+               mbuf->rts_fd = sender_fd;
+
+               // eliminated :)   memcpy( mbuf->tp_buf, river->accum + offset, river->msg_size );
+
+               ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
+               hdr = mbuf->header;                                                     // convenience
+               if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
+                       queue_normal( ctx, mbuf );
+               } else {
+                       if( RMR_D1_LEN( hdr ) <= 0 ) {                                                                                  // no call-id data; just queue
+                               queue_normal( ctx, mbuf );
+                       } else {
+                               d1 = DATA1_ADDR( hdr );
+                               if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                       // call_id not set, just queue
+                                       queue_normal( ctx, mbuf );
+                               } else {
+                                       chute = &ctx->chutes[call_id];
+                                       chute->mbuf = mbuf;
+                                       sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
+                               }
+                       }
+               }
+       }
+}
+
+/*
+       This is the callback invoked when tcp data is received. It adds the data
+       to the buffer for the connection and if a complete message is received
+       then the message is queued onto the receive ring.
+
+       Return value indicates only that we handled the buffer and SI should continue
+       or that SI should terminate, so on error it's NOT wrong to return "ok".
+
+
+       FUTURE: to do this better, SI needs to support a 'ready to read' callback
+       which allows us to to the actual receive directly into our buffer.
+*/
+static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
+       uta_ctx_t*              ctx;
+       river_t*                river;                  // river associated with the fd passed in
+       int                             bidx = 0;               // transport buffer index
+       int                             remain;                 // bytes in transport buf that need to be moved
+       int*                    mlen;                   // pointer to spot in buffer for conversion to int
+       int                             need;                   // bytes needed for something
+       int                             i;
+
+       // for speed these checks should be enabled only in debug mode and assume we always get a good context
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return SI_RET_OK;
+       }
+
+       if( fd >= ctx->nrivers || fd < 0 ) {
+               if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+               return SI_RET_OK;
+       }
+
+       // -------- end debug checks -----------------
+
+       if( buflen <= 0 ) {
+               return SI_RET_OK;
+       }
+
+       river = &ctx->rivers[fd];
+       if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
+               if( river->state == RS_NEW ) {
+                       memset( river, 0, sizeof( *river ) );
+                       //river->nbytes = sizeof( char ) * (8 * 1024);
+                       river->nbytes = sizeof( char ) * ctx->max_ibm;                  // max inbound message size
+                       river->accum = (char *) malloc( river->nbytes );
+                       river->ipt = 0;
+               } else {
+                       // future -- sync to next marker
+                       river->ipt = 0;                                         // insert point
+               }
+       }
+
+       river->state = RS_GOOD;
+
+/*
+fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
+for( i = 0; i < 40; i++ ) {
+fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
+}
+fprintf( stderr, "\n" );
+*/
+
+       remain = buflen;
+       while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
+               if( DEBUG )  fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
+
+               // FIX ME: size in the message  needs to be network byte order  
+               if( river->msg_size <= 0 ) {                            // don't have a size yet
+                                                                                                       // FIX ME: we need a frame indicator to ensure alignment
+                       need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
+                       if( need > remain ) {                                                                           // the whole size isn't there
+                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
+                               memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
+                               river->ipt += remain;
+                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
+                               return SI_RET_OK;
+                       }
+
+                       if( river->ipt > 0 ) {                                                                          // if we captured the start of size last go round
+                               memcpy( &river->accum[river->ipt], buf + bidx, need );
+                               river->ipt += need;
+                               bidx += need;
+                               remain -= need;
+                               river->msg_size = *((int *) river->accum);                              
+                               if( DEBUG > 1 ) {
+                                       fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
+                                       if( river->msg_size > 500 ) {
+                                               dump_40( river->accum, "msg size way too large accum:"  );
+                                       }
+                               }
+                       } else {
+                               river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
+                       }
+                       if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
+               }
+
+               if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
+                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
+                       memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
+                       river->ipt += remain;
+                       remain = 0;
+               } else {
+                       need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
+                       if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
+                       memcpy( &river->accum[river->ipt], buf+bidx, need );            // grab just what is needed (might be more)
+                       buf2mbuf( ctx, river->accum, river->msg_size, fd );                             // build an RMR mbuf and queue
+
+                       river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
+                       river->msg_size = -1;
+                       river->ipt = 0;
+                       bidx += need;
+                       remain -= need; 
+               }
+       }
+
+       if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
+       return SI_RET_OK;
+}
+
+
+/*
+       This is expected to execute in a separate thread. It is responsible for
+       _all_ receives and queues them on the appropriate ring, or chute.
+       It does this by registering the callback function above with the SI world
+       and then caling SIwait() to drive the callback when data has arrived.
+
+
+       The "state" of the message is checked which determines where the message
+       is delivered.
+
+               Flags indicate that the message is a call generated message, then
+               the message is queued on the normal receive ring.
+
+               Chute ID is == 0, then the message is queued on the normal receive ring.
+
+               The transaction ID in the message matches the expected ID in the chute,
+               then the message is given to the chute and the chute's semaphore is tickled.
+
+               If none are true, the message is dropped.
+*/
+static void* mt_receive( void* vctx ) {
+       uta_ctx_t*      ctx;
+
+       if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
+               fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
+               return NULL;
+       }
+
+       if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
+       SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
+       SIwait( ctx->si_ctx );
+
+       return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
+}
+
+#endif
diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c
new file mode 100644 (file)
index 0000000..b1bfd1f
--- /dev/null
@@ -0,0 +1,1047 @@
+// vim: ts=4 sw=4 noet :
+/*
+==================================================================================
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       rmr_si.c
+       Abstract:       This is the compile point for the si version of the rmr
+                               library (formarly known as uta, so internal function names
+                               are likely still uta_*)
+
+                               With the exception of the symtab portion of the library,
+                               RMr is built with a single compile so as to "hide" the
+                               internal functions as statics.  Because they interdepend
+                               on each other, and CMake has issues with generating two
+                               different wormhole objects from a single source, we just
+                               pull it all together with a centralised comple using
+                               includes.
+
+                               Future:  the API functions at this point can be separated
+                               into a common source module.
+
+       Author:         E. Scott Daniels
+       Date:           1 February 2019
+*/
+
+#include <ctype.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <errno.h>
+#include <string.h>
+#include <errno.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <time.h>
+#include <arpa/inet.h>
+#include <semaphore.h>
+#include <pthread.h>
+
+#include "si95/socket_if.h"
+#include "si95/siproto.h"
+
+
+#include "rmr.h"                               // things the users see
+#include "rmr_agnostic.h"              // agnostic things (must be included before private)
+#include "rmr_si_private.h"    // things that we need too
+#include "rmr_symtab.h"
+
+#include "ring_static.c"                       // message ring support
+#include "rt_generic_static.c"         // route table things not transport specific
+#include "rtable_si_static.c"          // route table things -- transport specific
+#include "rtc_static.c"                                // route table collector
+#include "rtc_si_static.c"                     // our private test function
+#include "tools_static.c"
+#include "sr_si_static.c"                      // send/receive static functions
+#include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
+#include "mt_call_static.c"
+#include "mt_call_si_static.c"
+
+
+//------------------------------------------------------------------------------
+
+
+/*
+       Clean up a context.
+*/
+static void free_ctx( uta_ctx_t* ctx ) {
+       if( ctx ) {
+               if( ctx->rtg_addr ) {
+                       free( ctx->rtg_addr );
+               }
+       }
+}
+
+// --------------- public functions --------------------------------------------------------------------------
+
+/*
+       Returns the size of the payload (bytes) that the msg buffer references.
+       Len in a message is the number of bytes which were received, or should
+       be transmitted, however, it is possible that the mbuf was allocated
+       with a larger payload space than the payload length indicates; this
+       function returns the absolute maximum space that the user has available
+       in the payload. On error (bad msg buffer) -1 is returned and errno should
+       indicate the rason.
+
+       The allocated len stored in the msg is:
+               transport header length +
+               message header + 
+               user requested payload 
+
+       The msg header is a combination of the fixed RMR header and the variable
+       trace data and d2 fields which may vary for each message.
+*/
+extern int rmr_payload_size( rmr_mbuf_t* msg ) {
+       if( msg == NULL || msg->header == NULL ) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       errno = 0;
+       return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
+}
+
+/*
+       Allocates a send message as a zerocopy message allowing the underlying message protocol
+       to send the buffer without copy.
+*/
+extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
+       uta_ctx_t*      ctx;
+       rmr_mbuf_t*     m;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return NULL;
+       }
+
+       m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
+       return  m;
+}
+
+
+/*
+       Allocates a send message as a zerocopy message allowing the underlying message protocol
+       to send the buffer without copy. In addition, a trace data field of tr_size will be
+       added and the supplied data coppied to the buffer before returning the message to
+       the caller.
+*/
+extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
+       uta_ctx_t*      ctx;
+       rmr_mbuf_t*     m;
+       int state;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return NULL;
+       }
+
+       m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
+       if( m != NULL ) {
+               state = rmr_set_trace( m, data, tr_size );                              // roll their data in
+               if( state != tr_size ) {
+                       m->state = RMR_ERR_INITFAILED;
+               }
+       }
+
+       return  m;
+}
+
+/*
+       This provides an external path to the realloc static function as it's called by an
+       outward facing mbuf api function. Used to reallocate a message with a different
+       trace data size.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+       return realloc_msg( msg, new_tr_size );
+}
+
+
+/*
+       Return the message to the available pool, or free it outright.
+*/
+extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
+       //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
+       //return;
+
+       if( mbuf == NULL ) {
+               return;
+       }
+
+       if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
+               if( mbuf->tp_buf ) {
+                       free( mbuf->tp_buf );
+               }
+               free( mbuf );
+       }
+}
+
+/*
+       This is a wrapper to the real timeout send. We must wrap it now to ensure that
+       the call flag and call-id are reset
+*/
+extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
+       char* d1;                                                                                                                       // point at the call-id in the header
+
+       if( msg != NULL ) {
+               ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
+
+               d1 = DATA1_ADDR( msg->header );
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+       }       
+
+       return mtosend_msg( vctx, msg, max_to );
+}
+
+/*
+       Send with default max timeout as is set in the context.
+       See rmr_mtosend_msg() for more details on the parameters.
+       See rmr_stimeout() for info on setting the default timeout.
+*/
+extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
+       char* d1;                                                                                                               // point at the call-id in the header
+
+       if( msg != NULL ) {
+               ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
+
+               d1 = DATA1_ADDR( msg->header );
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+       }       
+
+       return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
+}
+
+/*
+       Return to sender allows a message to be sent back to the endpoint where it originated.
+
+       In the SI world the file descriptor that was the source of the message is captured in
+       the mbuffer and thus can be used to quickly find the target for an RTS call. 
+
+       The source information in the message is used to select the socket on which to write
+       the message rather than using the message type and round-robin selection. This
+       should return a message buffer with the state of the send operation set. On success
+       (state is RMR_OK, the caller may use the buffer for another receive operation), and on
+       error it can be passed back to this function to retry the send if desired. On error,
+       errno will liklely have the failure reason set by the nng send processing.
+       The following are possible values for the state in the message buffer:
+
+       Message states returned:
+               RMR_ERR_BADARG - argument (context or msg) was nil or invalid
+               RMR_ERR_NOHDR  - message did not have a header
+               RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
+               RMR_ERR_SENDFAILED - send failed; errno has nano error code
+               RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
+
+       A nil message as the return value is rare, and generally indicates some kind of horrible
+       failure. The value of errno might give a clue as to what is wrong.
+
+       CAUTION:
+               Like send_msg(), this is non-blocking and will return the msg if there is an errror.
+               The caller must check for this and handle it properly.
+*/
+extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
+       int                     nn_sock;                        // endpoint socket for send
+       uta_ctx_t*      ctx;
+       int                     state;
+       char*           hold_src;                       // we need the original source if send fails
+       char*           hold_ip;                        // also must hold original ip
+       int                     sock_ok = 0;            // true if we found a valid endpoint socket
+       endpoint_t*     ep = NULL;                      // end point to track counts
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
+               errno = EINVAL;                                                                                         // if msg is null, this is their clue
+               if( msg != NULL ) {
+                       msg->state = RMR_ERR_BADARG;
+                       msg->tp_state = errno;
+               }
+               return msg;
+       }
+
+       errno = 0;                                                                                                              // at this point any bad state is in msg returned
+       if( msg->header == NULL ) {
+               fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
+               msg->state = RMR_ERR_NOHDR;
+               msg->tp_state = errno;
+               return msg;
+       }
+
+       ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
+
+/*
+       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx );                      // src is always used first for rts
+       if( ! sock_ok ) {
+*/
+       if( (nn_sock = msg->rts_fd) < 0 ) {
+               if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
+                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+               }
+               if( ! sock_ok ) {
+                       msg->state = RMR_ERR_NOENDPT;
+                       return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
+               }
+       }
+
+
+       msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
+       hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
+       hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
+       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
+       msg = send_msg( ctx, msg, nn_sock, -1 );
+       if( msg ) {
+               if( ep != NULL ) {
+                       switch( msg->state ) {
+                               case RMR_OK:
+                                       ep->scounts[EPSC_GOOD]++;
+                                       break;
+                       
+                               case RMR_ERR_RETRY:
+                                       ep->scounts[EPSC_TRANS]++;
+                                       break;
+
+                               default:
+                                       // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
+                                       ep->scounts[EPSC_FAIL]++;
+                                       break;
+                       }
+               }
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
+               msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
+       }
+
+       free( hold_src );
+       free( hold_ip );
+       return msg;
+}
+
+/*
+       If multi-threading call is turned on, this invokes that mechanism with the special call
+       id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
+       behavour (described below) is carried out.  This is safe to use when mt is enabled, but
+       the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
+       a flexible timeout.
+
+       On timeout this function will return a nil pointer. If the original message could not
+       be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
+
+       Original behavour:
+       Call sends the message based on message routing using the message type, and waits for a
+       response message to arrive with the same transaction id that was in the outgoing message.
+       If, while wiating for the expected response,  messages are received which do not have the
+       desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
+       order that they were received.
+
+       Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
+       to ensure that no error was encountered. If the state is UTA_BADARG, then the message
+       may be resent (likely the context pointer was nil).  If the message is sent, but no
+       response is received, a nil message is returned with errno set to indicate the likley
+       issue:
+               ETIMEDOUT -- too many messages were queued before reciving the expected response
+               ENOBUFS -- the queued message ring is full, messages were dropped
+               EINVAL  -- A parameter was not valid
+               EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
+                                       user should call this function with the message again.
+
+*/
+extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
+       uta_ctx_t*              ctx;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
+               if( msg != NULL ) {
+                       msg->state = RMR_ERR_BADARG;
+               }
+               return msg;
+       }
+
+       return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
+}
+
+/*
+       The outward facing receive function. When invoked it will pop the oldest message
+       from the receive ring, if any are queued, and return it. If the ring is empty
+       then the receive function is invoked to wait for the next message to arrive (blocking).
+
+       If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
+       nil, a new one will be allocated. However, the caller should NOT expect to get the same
+       struct back (if a queued message is returned the message struct will be different).
+*/
+extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
+       uta_ctx_t*      ctx;
+       rmr_mbuf_t*     qm;                             // message that was queued on the ring
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               if( old_msg != NULL ) {
+                       old_msg->state = RMR_ERR_BADARG;
+                       old_msg->tp_state = errno;
+               }
+               return old_msg;
+       }
+       errno = 0;
+
+       return rmr_mt_rcv( ctx, old_msg, -1 );
+}
+
+/*
+       This allows a timeout based receive for applications unable to implement epoll_wait()
+       (e.g. wrappers).
+*/
+extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
+       uta_ctx_t*      ctx;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               if( old_msg != NULL ) {
+                       old_msg->state = RMR_ERR_BADARG;
+                       old_msg->tp_state = errno;
+               }
+               return old_msg;
+       }
+
+       return rmr_mt_rcv( ctx, old_msg, ms_to );
+}
+
+/*
+       This blocks until the message with the 'expect' ID is received. Messages which are received
+       before the expected message are queued onto the message ring.  The function will return
+       a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
+       expected message is received. If the queued message ring fills a nil pointer is returned
+       and errno is set to ENOBUFS.
+
+       Generally this will be invoked only by the call() function as it waits for a response, but
+       it is exposed to the user application as three is no reason not to.
+*/
+extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
+       uta_ctx_t*      ctx;
+       int     queued = 0;                             // number we pushed into the ring
+       int     exp_len = 0;                    // length of expected ID
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               if( msg != NULL ) {
+                       msg->state = RMR_ERR_BADARG;
+                       msg->tp_state = errno;
+               }
+               return msg;
+       }
+
+       errno = 0;
+
+       if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
+               return rmr_rcv_msg( ctx, msg );
+       }
+
+       exp_len = strlen( expect );
+       if( exp_len > RMR_MAX_XID ) {
+               exp_len = RMR_MAX_XID;
+       }
+       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
+
+       while( queued < allow2queue ) {
+               msg = rcv_msg( ctx, msg );                                      // hard wait for next
+               if( msg->state == RMR_OK ) {
+                       if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
+                               if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                               return msg;
+                       }
+
+                       if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
+                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
+                               errno = ENOBUFS;
+                               return NULL;
+                       }
+
+                       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
+                       queued++;
+                       msg = NULL;
+               }
+       }
+
+       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
+       errno = ETIMEDOUT;
+       return NULL;
+}
+
+/*
+       Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
+       _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
+       mechnism indicates eagain or etimeedout.  All other error conditions are reported
+       without this delay. Setting a timeout of 0 causes no retries to be attempted in
+       RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
+       but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
+       after every 1K send attempts until the "time" value is reached. Retries are abandoned
+       if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
+
+       The default, if this function is not used, is 1; meaning that RMr will retry, but will
+       not enter a sleep.  In all cases the caller should check the status in the message returned
+       after a send call.
+
+       Returns -1 if the context was invalid; RMR_OK otherwise.
+*/
+extern int rmr_set_stimeout( void* vctx, int time ) {
+       uta_ctx_t*      ctx;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return -1;
+       }
+
+       if( time < 0 ) {
+               time = 0;
+       }
+
+       ctx->send_retries = time;
+       return RMR_OK;
+}
+
+/*
+       Set receive timeout -- not supported in nng implementation
+
+       CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
+*/
+extern int rmr_set_rtimeout( void* vctx, int time ) {
+       fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
+       return 0;
+}
+
+
+/*
+       This is the actual init workhorse. The user visible function meerly ensures that the
+       calling programme does NOT set any internal flags that are supported, and then
+       invokes this.  Internal functions (the route table collector) which need additional
+       open ports without starting additional route table collectors, will invoke this
+       directly with the proper flag.
+*/
+static void* init(  char* uproto_port, int max_msg_size, int flags ) {
+       static  int announced = 0;
+       uta_ctx_t*      ctx = NULL;
+       char    bind_info[NNG_MAXADDRLEN];      // bind info
+       char*   proto = "tcp";                          // pointer into the proto/port string user supplied
+       char*   port;
+       char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
+       char*   proto_port;
+       char    wbuf[1024];                                     // work buffer
+       char*   tok;                                            // pointer at token in a buffer
+       char*   tok2;
+       int             state;
+       int             i;
+
+       if( ! announced ) {
+               fprintf( stderr, "[INFO] ric message routing library on SI95 mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+                       RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               announced = 1;
+       }
+
+       errno = 0;
+       if( uproto_port == NULL ) {
+               proto_port = strdup( DEF_COMM_PORT );
+       } else {
+               proto_port = strdup( uproto_port );             // so we can modify it
+       }
+
+       if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
+               errno = ENOMEM;
+               return NULL;
+       }
+       memset( ctx, 0, sizeof( uta_ctx_t ) );
+
+       if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" );
+       ctx->nrivers = 256;                                                             // number of input flows we'll manage
+       ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
+       memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
+       for( i = 0; i < ctx->nrivers; i++ ) {
+               ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
+       }
+
+       ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
+       ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
+       ctx->max_ibm = max_msg_size;                                    // default to user supplied message size
+
+       ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
+       init_mtcall( ctx );                                                             // set up call chutes
+
+       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring
+
+       ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
+       if( max_msg_size > 0 ) {
+               ctx->max_plen = max_msg_size;
+       }
+
+       // we're using a listener to get rtg updates, so we do NOT need this.
+       //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
+
+       ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
+       if( ctx->si_ctx == NULL ) {
+               fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" );
+               free_ctx( ctx );
+               return NULL;
+       }
+
+       if( (port = strchr( proto_port, ':' )) != NULL ) {
+               if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
+                       port++;
+               } else {
+                       *(port++) = 0;                  // term proto string and point at port string
+                       proto = proto_port;             // user supplied proto so point at it rather than default
+               }
+       } else {
+               port = proto_port;                      // assume something like "1234" was passed
+       }
+
+       if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
+               tok = strdup( tok );                                    // something we can destroy
+               if( *tok == '[' ) {                                             // we allow an ipv6 address here
+                       tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
+               } else {
+                       tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
+               }
+               if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
+                       *tok2 = 0;                                                      // make it so
+               }
+
+               snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
+               free( tok );
+       } else {
+               if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
+                       fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+                       return NULL;
+               }
+               if( (tok = strchr( wbuf, '.' )) != NULL ) {
+                       *tok = 0;                                                                       // we don't keep domain portion
+               }
+       }
+
+       ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
+       if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
+               fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
+               return NULL;
+       }
+
+       if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
+               if( atoi( tok ) > 0 ) {
+                       flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
+               }
+       }
+
+       ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+       if( flags & RMRFL_NAME_ONLY ) {
+               ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
+       } else {
+               ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
+               if( ctx->my_ip == NULL ) {
+                       fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
+                       strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
+               }
+       }
+       if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+
+       if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
+               if( *tok == '1' ) {
+                       ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
+               }
+       }
+
+
+       if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
+               interface = "0.0.0.0";
+       }
+       
+       snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
+       if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
+               fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
+               free_ctx( ctx );
+               return NULL;
+       }
+
+       if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need an rtc
+               if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread
+                       fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+               }
+       }
+
+       //fprintf( stderr, ">>>>> starting threaded receiver with ctx=%p si_ctx=%p\n", ctx, ctx->si_ctx );
+       ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
+       if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
+               fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+       }
+
+       free( proto_port );
+       return (void *) ctx;
+}
+
+/*
+       Initialise the message routing environment. Flags are one of the UTAFL_
+       constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
+       (tcp) to be used, then :port is all that is needed.
+
+       At the moment it seems that TCP really is the only viable protocol, but
+       we'll allow flexibility.
+
+       The return value is a void pointer which must be passed to most uta functions. On
+       error, a nil pointer is returned and errno should be set.
+
+       Flags:
+               No user flags supported (needed) at the moment, but this provides for extension
+               without drastically changing anything. The user should invoke with RMRFL_NONE to
+               avoid any misbehavour as there are internal flags which are suported
+*/
+extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
+       return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
+}
+
+/*
+       This sets the default trace length which will be added to any message buffers
+       allocated.  It can be set at any time, and if rmr_set_trace() is given a
+       trace len that is different than the default allcoated in a message, the message
+       will be resized.
+
+       Returns 0 on failure and 1 on success. If failure, then errno will be set.
+*/
+extern int rmr_init_trace( void* vctx, int tr_len ) {
+       uta_ctx_t* ctx;
+
+       errno = 0;
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               return 0;
+       }
+
+       ctx->trace_data_len = tr_len;
+       return 1;
+}
+
+/*
+       Return true if routing table is initialised etc. and app can send/receive.
+*/
+extern int rmr_ready( void* vctx ) {
+       uta_ctx_t *ctx;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return FALSE;
+       }
+
+       if( ctx->rtable != NULL ) {
+               return TRUE;
+       }
+
+       return FALSE;
+}
+
+/*
+       This returns the message queue ring's filedescriptor which can be used for
+       calls to epoll.  The user shouild NOT read, write, or close the fd.
+
+       Returns the file descriptor or -1 on error.
+*/
+extern int rmr_get_rcvfd( void* vctx ) {
+       uta_ctx_t* ctx;
+       int state;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return -1;
+       }
+
+/*
+       if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
+               fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
+               return -1;
+       }
+*/
+
+       return uta_ring_getpfd( ctx->mring );
+}
+
+
+/*
+       Clean up things.
+
+       There isn't an si_flush() per se, but we can pause, generate
+       a context switch, which should allow the last sent buffer to
+       flow. There isn't exactly an nng_term/close either, so there
+       isn't much we can do.
+*/
+extern void rmr_close( void* vctx ) {
+       uta_ctx_t *ctx;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return;
+       }
+
+       ctx->shutdown = 1;
+
+       SItp_stats( ctx->si_ctx );                      // dump some interesting stats
+
+       // FIX ME -- how to we turn off si; close all sessions etc?
+       //SIclose( ctx->nn_sock );
+
+}
+
+
+// ----- multi-threaded call/receive support -------------------------------------------------
+
+/*
+       Blocks on the receive ring chute semaphore and then reads from the ring
+       when it is tickled.  If max_wait is -1 then the function blocks until
+       a message is ready on the ring. Else max_wait is assumed to be the number
+       of millaseconds to wait before returning a timeout message.
+*/
+extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
+       uta_ctx_t*      ctx;
+       uta_mhdr_t*     hdr;                    // header in the transport buffer
+       chute_t*        chute;
+       struct timespec ts;                     // time info if we have a timeout
+       long    new_ms;                         // adjusted mu-sec
+       long    seconds = 0;            // max wait seconds
+       long    nano_sec;                       // max wait xlated to nano seconds
+       int             state;
+       rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
+       
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               if( mbuf ) {
+                       mbuf->state = RMR_ERR_BADARG;
+                       mbuf->tp_state = errno;
+               }
+               return mbuf;
+       }
+
+       ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
+
+       chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
+
+       if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
+               if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       if( ombuf ) {
+                               rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
+                       }       
+               } else {
+                       mbuf = ombuf;                                           // return original if it was given with timeout status
+                       if( ombuf != NULL ) {
+                               mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
+                               mbuf->len = 0;
+                       }
+               }
+
+               return mbuf;
+       }
+
+       if( ombuf ) {
+               ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
+               ombuf->len = 0;
+       }
+       if( max_wait > 0 ) {
+               clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
+
+               if( max_wait > 999 ) {
+                       seconds = max_wait / 1000;
+                       max_wait -= seconds * 1000;
+                       ts.tv_sec += seconds;
+               }
+               if( max_wait > 0 ) {
+                       nano_sec = max_wait * 1000000;
+                       ts.tv_nsec += nano_sec;
+                       if( ts.tv_nsec > 999999999 ) {
+                               ts.tv_nsec -= 999999999;
+                               ts.tv_sec++;
+                       }
+               }
+
+               seconds = 1;                                                                                                    // use as flag later to invoked timed wait
+       }
+
+       errno = EINTR;
+       state = -1;
+       while( state < 0 && errno == EINTR ) {
+               if( seconds ) {
+                       state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
+               } else {
+                       state = sem_wait( &chute->barrier );
+               }
+       }
+
+       if( state < 0 ) {
+               mbuf = ombuf;                           // return caller's buffer if they passed one in
+       } else {
+               errno = 0;                                              // interrupted call state could be left; clear
+               if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+               if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       mbuf->state = RMR_OK;
+
+                       if( ombuf ) {
+                               rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
+                       }
+               } else {
+                       errno = ETIMEDOUT;
+                       mbuf = ombuf;                           // no buffer, return user's if there
+               }
+       }
+
+       if( mbuf ) {
+               mbuf->tp_state = errno;
+       }
+       return mbuf;
+}
+
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+       rmr_mbuf_t* ombuf;                      // original mbuf passed in
+       uta_ctx_t*      ctx;
+       uta_mhdr_t*     hdr;                    // header in the transport buffer
+       chute_t*        chute;
+       unsigned char*  d1;                     // d1 data in header
+       struct timespec ts;                     // time info if we have a timeout
+       long    new_ms;                         // adjusted mu-sec
+       long    seconds = 0;            // max wait seconds
+       long    nano_sec;                       // max wait xlated to nano seconds
+       int             state;
+       
+       errno = EINVAL;
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
+               if( mbuf ) {
+                       mbuf->tp_state = errno;
+                       mbuf->state = RMR_ERR_BADARG;
+               }
+               return mbuf;
+       }
+
+       if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+               mbuf->state = RMR_ERR_NOTSUPP;
+               mbuf->tp_state = errno;
+               return mbuf;
+       }
+
+       if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
+               mbuf->state = RMR_ERR_BADARG;
+               mbuf->tp_state = errno;
+               return mbuf;
+       }
+
+       ombuf = mbuf;                                                                                                   // save to return timeout status with
+
+       chute = &ctx->chutes[call_id];
+       if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
+               rmr_free_msg( chute->mbuf );
+               chute->mbuf = NULL;
+       }
+       
+       hdr = (uta_mhdr_t *) mbuf->header;
+       hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
+       memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
+       d1 = DATA1_ADDR( hdr );
+       d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
+       mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
+
+       if( max_wait >= 0 ) {
+               clock_gettime( CLOCK_REALTIME, &ts );   
+
+               if( max_wait > 999 ) {
+                       seconds = max_wait / 1000;
+                       max_wait -= seconds * 1000;
+                       ts.tv_sec += seconds;
+               }
+               if( max_wait > 0 ) {
+                       nano_sec = max_wait * 1000000;
+                       ts.tv_nsec += nano_sec;
+                       if( ts.tv_nsec > 999999999 ) {
+                               ts.tv_nsec -= 999999999;
+                               ts.tv_sec++;
+                       }
+               }
+
+               seconds = 1;                                                                            // use as flag later to invoked timed wait
+       }
+
+       mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
+       if( mbuf ) {
+               if( mbuf->state != RMR_OK ) {
+                       mbuf->tp_state = errno;
+                       return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
+               }
+       }
+
+       state = 0;
+       errno = 0;
+       while( chute->mbuf == NULL && ! errno ) {
+               if( seconds ) {
+                       state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
+               } else {
+                       state = sem_wait( &chute->barrier );
+               }
+
+               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
+                       errno = 0;
+               }
+
+               if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
+                       if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
+                               rmr_free_msg( chute->mbuf );
+                               chute->mbuf = NULL;
+                               errno = 0;
+                       }
+               }
+       }
+
+       if( state < 0 ) {
+               return NULL;                                    // leave errno as set by sem wait call
+       }
+
+       mbuf = chute->mbuf;
+       mbuf->state = RMR_OK;
+       chute->mbuf = NULL;
+
+       return mbuf;
+}
+
+/*
+       Enable low latency things in the transport (when supported).
+*/
+extern void rmr_set_low_latency( void* vctx ) {
+       uta_ctx_t*      ctx;
+
+       if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
+               if( ctx->si_ctx != NULL ) {
+                       SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
+               }
+       }
+}
+
+/*
+       Turn on fast acks.
+*/
+extern void rmr_set_fack( void* vctx ) {
+       uta_ctx_t*      ctx;
+
+       if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
+               if( ctx->si_ctx != NULL ) {
+                       SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
+               }
+       }
+}
+
diff --git a/src/rmr/si/src/rtable_si_static.c b/src/rmr/si/src/rtable_si_static.c
new file mode 100644 (file)
index 0000000..cf83590
--- /dev/null
@@ -0,0 +1,426 @@
+// vim: ts=4 sw=4 noet :
+/*
+==================================================================================
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       rtable_si_static.c
+       Abstract:       Route table management functions which depend on the underlying
+                               transport mechanism and thus cannot be included with the generic
+                               route table functions.
+
+                               This module is designed to be included by any module (main) needing
+                               the static/private stuff.
+
+       Author:         E. Scott Daniels
+       Date:           29 November 2018
+*/
+
+#ifndef rtable_static_c
+#define        rtable_static_c
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <errno.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+
+// -----------------------------------------------------------------------------------------------------
+
+/*
+       Mark an endpoint closed because it's in a failing state.
+*/
+static void uta_ep_failed( endpoint_t* ep ) {
+       if( ep != NULL ) {
+               if( DEBUG ) fprintf( stderr, "[DBUG] connection to %s was closed\n", ep->name );
+               ep->open = 0;
+       }
+}
+
+/*
+       Establish a TCP connection to the indicated target (IP address).
+       Target assumed to be address:port.  The new socket is returned via the
+       user supplied pointer so that a success/fail code is returned directly.
+       Return value is 0 (false) on failure, 1 (true)  on success.
+
+       In order to support multi-threaded user applications we must hold a lock before
+       we attempt to create a dialer and connect. NNG is thread safe, but we can
+       get things into a bad state if we allow a collision here.  The lock grab
+       only happens on the intial session setup.
+*/
+static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+       static int      flags = 0;
+
+       char*           target;
+       char            conn_info[SI_MAX_ADDR_LEN];     // string to give to nano to make the connection
+       char*           addr;
+       int                     state = FALSE;
+       char*           tok;
+
+       if( ep == NULL ) {
+               if( DEBUG ) fprintf( stderr, "[DBUG] link2 ep was nil!\n" );
+               return FALSE;
+       }
+
+       target = ep->name;                              // always give name to transport so changing dest IP does not break reconnect
+       if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
+               if( ep->notify ) {
+                       fprintf( stderr, "[WARN] rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
+                       ep->notify = 0;
+               }
+               return FALSE;
+       }
+
+       pthread_mutex_lock( &ep->gate );                        // grab the lock
+       if( ep->open ) {
+               pthread_mutex_unlock( &ep->gate );
+               return TRUE;
+       }
+
+       snprintf( conn_info, sizeof( conn_info ), "%s", target );
+       errno = 0;
+       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] link2 attempting connection with: %s\n", conn_info );
+       if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
+               pthread_mutex_unlock( &ep->gate );
+
+               if( ep->notify ) {                                                      // need to notify if set
+                       fprintf( stderr, "[WRN] rmr: link2: unable to connect  to target: %s: %d %s\n", target, errno, strerror( errno ) );
+                       ep->notify = 0;
+               }
+               //nng_close( *nn_sock );
+               return FALSE;
+       }
+
+       if( DEBUG ) fprintf( stderr, "[INFO] rmr_si_link2: connection was successful to: %s\n", target );
+
+       ep->open = TRUE;                                                // set open/notify before giving up lock
+
+       if( ! ep->notify ) {                                            // if we yammered about a failure, indicate finally good
+               fprintf( stderr, "[INFO] rmr: link2: connection finally establisehd with target: %s\n", target );
+               ep->notify = 1;
+       }
+
+       pthread_mutex_unlock( &ep->gate );
+       return TRUE;
+}
+
+/*
+       This provides a protocol independent mechanism for establishing the connection to an endpoint.
+       Return is true (1) if the link was opened; false on error.
+*/
+static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
+       uta_ctx_t* ctx;
+
+       if( ep == NULL ) {
+               return FALSE;
+       }
+
+       if( ep->open )  {                       // already open, do nothing
+               return TRUE;
+       }
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return FALSE;
+       }
+
+       uta_link2( ctx->si_ctx, ep );
+       return ep->open;
+}
+
+
+/*
+       Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
+       hash we add it and create the endpoint struct.
+
+       The caller must supply the specific route table (we assume it will be pending, but they
+       could live on the edge and update the active one, though that's not at all a good idea).
+*/
+extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
+       endpoint_t*     ep;
+       rrgroup_t* rrg;                         // pointer at group to update
+
+       if( ! rte || ! rt ) {
+               fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
+               return NULL;
+       }
+
+       if( rte->nrrgroups <= group || group < 0 ) {
+               fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
+               return NULL;
+       }
+
+       //fprintf( stderr, ">>>> add ep grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
+       if( (rrg = rte->rrgroups[group]) == NULL ) {
+               if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
+                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
+                       return NULL;
+               }
+               memset( rrg, 0, sizeof( *rrg ) );
+
+               if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
+                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+                       return NULL;
+               }
+               memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
+
+               rte->rrgroups[group] = rrg;
+               //fprintf( stderr, ">>>> added new rrg grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
+
+               rrg->ep_idx = 0;                                                // next endpoint to send to
+               rrg->nused = 0;                                                 // number populated
+               rrg->nendpts = MAX_EP_GROUP;                    // number allocated
+
+               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
+       }
+
+       ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
+
+       if( rrg != NULL ) {
+               if( rrg->nused >= rrg->nendpts ) {
+                       // future: reallocate
+                       fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
+                       return NULL;
+               }
+
+               rrg->epts[rrg->nused] = ep;
+               rrg->nused++;
+       }
+
+       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused );
+       return ep;
+}
+
+
+/*
+       Given a name, find the nano socket needed to send to it. Returns the socket via
+       the user pointer passed in and sets the return value to true (1). If the
+       endpoint cannot be found false (0) is returned.
+*/
+static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
+       endpoint_t* ep;
+       int state = FALSE;
+
+       if( rt == NULL ) {
+               return FALSE;
+       }
+
+       ep =  rmr_sym_get( rt->hash, ep_name, 1 );
+       if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
+               *uepp = ep;
+       }
+       if( ep == NULL ) {
+               if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
+               if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
+                       return FALSE;
+               }
+       }
+
+       if( ! ep->open )  {                                                                             // not open -- connect now
+               if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
+               if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
+                       ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
+               }
+               if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
+                       state = TRUE;
+                       ep->open = TRUE;
+                       *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
+               }
+               if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
+       } else {
+               *nn_sock = ep->nn_sock;
+               state = TRUE;
+       }
+
+       return state;
+}
+
+/*
+       Make a round robin selection within a round robin group for a route table
+       entry. Returns the nanomsg socket if there is a rte for the message
+       key, and group is defined. Socket is returned via pointer in the parm
+       list (nn_sock).
+
+       The group is the group number to select from.
+
+       The user supplied (via pointer to) integer 'more' will be set if there are
+       additional groups beyond the one selected. This allows the caller to
+       to easily iterate over the group list -- more is set when the group should
+       be incremented and the function invoked again. Groups start at 0.
+
+       The return value is true (>0) if the socket was found and *nn_sock was updated
+       and false (0) if there is no associated socket for the msg type, group combination.
+       We return the index+1 from the round robin table on success so that we can verify
+       during test that different entries are being seleted; we cannot depend on the nng
+       socket being different as we could with nano.
+
+       NOTE:   The round robin selection index increment might collide with other
+               threads if multiple threads are attempting to send to the same round
+               robin group; the consequences are small and avoid locking. The only side
+               effect is either sending two messages in a row to, or skipping, an endpoint.
+               Both of these, in the grand scheme of things, is minor compared to the
+               overhead of grabbing a lock on each call.
+*/
+static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) {
+       endpoint_t*     ep;                             // seected end point
+       int  state = FALSE;                     // processing state
+       int dummy;
+       rrgroup_t* rrg;
+       int     idx;
+
+       //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
+
+       if( ! more ) {                          // eliminate cheks each time we need to use
+               more = &dummy;
+       }
+
+       if( ! nn_sock ) {                       // user didn't supply a pointer
+               if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr invalid nnsock pointer\n" );
+               errno = EINVAL;
+               *more = 0;
+               return FALSE;
+       }
+
+       if( rte == NULL ) {
+               if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr rte was nil; nothing selected\n" );
+               *more = 0;
+               return FALSE;
+       }
+
+       if( group < 0 || group >= rte->nrrgroups ) {
+               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] group out of range: group=%d max=%d\n", group, rte->nrrgroups );
+               *more = 0;
+               return FALSE;
+       }
+
+       if( (rrg = rte->rrgroups[group]) == NULL ) {
+               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rrg not found for group %d (ptr rrgroups[g] == nil)\n", group );
+               *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
+               return FALSE;
+       }
+
+       *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
+
+       switch( rrg->nused ) {
+               case 0:                         // nothing allocated, just punt
+                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] nothing allocated for the rrg\n" );
+                       return FALSE;
+
+               case 1:                         // exactly one, no rr to deal with
+                       ep = rrg->epts[0];
+                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] _rr returning socket with one choice in group \n" );
+                       state = TRUE;
+                       break;
+
+               default:                                                                                // need to pick one and adjust rr counts
+                       idx = rrg->ep_idx++ % rrg->nused;                       // see note above
+                       ep = rrg->epts[idx];                                            // select next endpoint
+                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
+                       state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
+                       break;
+       }
+
+       if( uepp != NULL ) {                                                            // caller may need refernce to endpoint too; give it if pointer supplied
+               *uepp = ep;
+       }
+       if( state ) {                                                                           // end point selected, open if not, get socket either way
+               if( ! ep->open ) {                                                              // not connected
+                       if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr selected endpoint not yet open; opening %s\n", ep->name );
+                       if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
+                               ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
+                       }
+
+                       if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
+                               ep->open = TRUE;
+                               *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
+                       } else {
+                               state = FALSE;
+                       }
+                       if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+               } else {
+                       *nn_sock = ep->nn_sock;
+               }
+       }
+
+       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] epsock_rr returns state=%d\n", state );
+       return state;
+}
+
+/*
+       Finds the rtable entry which matches the key. Returns a nil pointer if
+       no entry is found. If try_alternate is set, then we will attempt 
+       to find the entry with a key based only on the message type.
+*/
+static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
+       uint64_t key;                   // key is sub id and mtype banged together
+       rtable_ent_t* rte;              // the entry we found
+
+       if( rt == NULL || rt->hash == NULL ) {
+               return NULL;
+       }
+
+       key = build_rt_key( sid, mtype );                                                                                       // first try with a 'full' key
+       if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL)  ||  ! try_alt ) {          // found or not allowed to try the alternate, return what we have
+               return rte;
+       }
+
+       if( sid != UNSET_SUBID ) {                                                              // not found, and allowed to try alternate; and the sub_id was set
+               key = build_rt_key( UNSET_SUBID, mtype );                       // rebuild key
+               rte = rmr_sym_pull( rt->hash, key );                            // see what we get with this
+       }
+
+       return rte;
+}
+
+/*
+       Return a string of count information. E.g.:
+               <ep-name>:<port> <good> <hard-fail> <soft-fail>
+
+       Caller must free the string allocated if a buffer was not provided.
+
+       Pointer returned is to a freshly allocated string, or the user buffer
+       for convenience.
+
+       If the endpoint passed is a nil pointer, then we return a nil -- caller
+       must check!
+*/
+static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
+       char*   rs;                     // result string
+
+       if( ep == NULL ) {
+               return NULL;
+       }
+
+       if( ubuf != NULL ) {
+               rs = ubuf;
+       } else {
+               ubuf_len = 256;
+               rs = malloc( sizeof( char ) * ubuf_len );
+       }
+
+       snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
+
+       return rs;
+}
+
+#endif
diff --git a/src/rmr/si/src/rtc_si_static.c b/src/rmr/si/src/rtc_si_static.c
new file mode 100644 (file)
index 0000000..76fea79
--- /dev/null
@@ -0,0 +1,80 @@
+// : vi ts=4 sw=4 noet :
+/*
+==================================================================================
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       rtc_si_static.c
+       Abstract:       This is a test module to allow the route table to be read
+                               from a static spot and NOT to attempt to listen for updates
+                               from some outside source.
+
+       Author:         E. Scott Daniels
+       Date:           18 October 2019
+*/
+
+#ifndef _rtc_si_staic_c
+#define _rtc_si_staic_c
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <errno.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+/*
+       Loop forever (assuming we're running in a pthread reading the static table
+       every minute or so.
+*/
+static void* rtc_file( void* vctx ) {
+       uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
+       char*   eptr;
+       int             vfd = -1;                                       // verbose file des if we have one
+       int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
+       char    wbuf[256];
+
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
+               return NULL;
+       }
+
+       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
+               vfd = open( eptr, O_RDONLY );
+       }
+
+       while( 1 ) {
+               if( vfd >= 0 ) {
+                       wbuf[0] = 0;
+                       lseek( vfd, 0, 0 );
+                       read( vfd, wbuf, 10 );
+                       vlevel = atoi( wbuf );
+               }                
+       
+               read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
+
+               sleep( 60 );
+       }
+
+}
+#endif
diff --git a/src/rmr/si/src/si95/README b/src/rmr/si/src/si95/README
new file mode 100644 (file)
index 0000000..282dfee
--- /dev/null
@@ -0,0 +1,19 @@
+
+SI95 is a port of a lightweight TCP|UDP/IP communications library
+originally called Socket Interface and was written in 1995. For
+use by RMR as an integrated transport layer it has been dubbed
+SI95, and unneeded features (UDP, multicast, child process support)
+have been removed.  The orignal code was released into the public
+domain (open source) with a 2-part BSD style license; the orignal 
+authors have allowed the Apache license covering RMR to replace
+the original license text in each file provided that the original
+file headers are left to document the origin of the source. 
+
+The library is constructed as an integrated set of externally visible
+functions within the RMR library (librmr_si), however there is no
+plan to make any SI95 headers directly available in the RMR development
+package; in other words, it will be difficult for a user application
+to make use of SI95 directly.
+
+All external SI95 functions will retain their original SI prefix (e.g.
+SIsendt). 
diff --git a/src/rmr/si/src/si95/siaddress.c b/src/rmr/si/src/si95/siaddress.c
new file mode 100644 (file)
index 0000000..86f54df
--- /dev/null
@@ -0,0 +1,176 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+**************************************************************************
+*
+*  Mnemonic: SIaddress
+*  Abstract: This routine will convert a sockaddr_in structure to a
+*            dotted decimal address, or visa versa.
+*            If type == AC_TOADDR the src string may be: 
+*            xxx.xxx.xxx.xxx.portnumber or host-name.portnumber
+*            xxx.xxx.xxx.xxx.service[.protocol] or hostname;service[;protocol]
+*            if protocol is not supplied then tcp is assumed.
+*            hostname may be something like godzilla.moviemania.com
+*  Parms:    src - Pointer to source buffer
+*            dest- Pointer to dest buffer pointer 
+*            type- Type of conversion AC_TODOT converts sockaddr to human readable. AC_TOADDR 
+*                              converts character buffer to sockaddr.
+*  Returns:  Nothing.
+*  Date:     19 January 1995
+*  Author:   E. Scott Daniels
+*
+*  Modified: 22 Mar 1995 - To add support for ipx addresses.
+*                      18 Oct 2020 - drop old port separator (;)
+*
+*  CAUTION: The netdb.h header file is a bit off when it sets up the 
+*           hostent structure. It claims that h_addr_list is a pointer 
+*           to character pointers, but it is really a pointer to a list 
+*           of pointers to integers!!!
+*       
+***************************************************************************
+*/
+#include "sisetup.h"      //  get necessary defs and other stuff 
+#include <netdb.h>
+#include <stdio.h>
+#include <ctype.h>
+
+/* 
+       target: buffer with address  e.g.  192.168.0.1:4444  :4444 (listen) [::1]4444
+       family: PF_INET[6]  (let it be 0 to select based on addr in buffer 
+       proto: IPPROTO_TCP IPPROTO_UDP
+       type:   SOCK_STREAM SOCK_DGRAM
+
+       returns length of struct pointed to by rap (return addr blockpointer)
+*/
+extern int SIgenaddr( char *target, int proto, int family, int socktype, struct sockaddr **rap ) {
+       struct addrinfo hint;                           //  hints to give getaddrinfo 
+       struct addrinfo *list = NULL;           //  list of what comes back 
+       int     ga_flags = 0;                                   //  flags to pass to getaddrinfo in hints 
+       int     error = 0;
+       int     rlen = 0;                                               //  length of the addr that rap points to on return 
+       char    *pstr;                                          //  port string 
+       char    *dstr;                                          //  a copy of the users target that we can destroy 
+       char*   fptr;                                           // ptr we allocated and need to free (we may adjust dstr)
+
+    fptr = dstr = strdup( (char *) target );   //  copy so we can destroy it with strtok 
+       *rap = NULL;                                            //  ensure null incase something breaks 
+
+       while( isspace( *dstr ) ) {
+               dstr++;
+       }
+
+       if( *dstr == ':' ) {            //  user passed in :port -- so we assume this is for bind 
+               pstr = dstr;
+               *(pstr++) = 0;
+
+               ga_flags = AI_PASSIVE;
+       } else {
+               if( *dstr == '[' ) {                            // strip [ and ] from v6 and point pstring if port there
+                       dstr++;
+                       pstr = strchr( dstr, ']' );
+                       if( *pstr != ']' ) {
+                               return -1;
+                       }
+
+                       *(pstr++) = 0;
+                       if( *pstr == ':' ) {
+                               *(pstr++) = 0;
+                       } else {
+                               pstr = NULL;
+                       }
+               } else {                                                        // assume name or v4; point at port if there
+                       pstr = strchr( dstr, ':' );
+                       if( pstr != NULL ) {
+                               *(pstr++) = 0;
+                       }
+               }
+               ga_flags = AI_ADDRCONFIG;                       // don't return IPVx addresses unless one such address is configured
+       }
+
+       memset( &hint, 0, sizeof( hint  ) );
+       hint.ai_family = family;                        //  AF_INET AF_INET6...  let this be 0 to select best based on addr 
+       hint.ai_socktype = socktype;            //  SOCK_DGRAM SOCK_STREAM 
+       hint.ai_protocol = proto;                       //  IPPORTO_TCP IPPROTO_UDP 
+       hint.ai_flags = ga_flags;
+
+       if( DEBUG ) 
+               fprintf( stderr, "[DBUG] siaddress: calling getaddrinfo flags=%x proto=%d family=%d target=%s host=%s port=%s\n", 
+                               ga_flags, proto, family, target, dstr, pstr );
+
+       if( (error = getaddrinfo( dstr, pstr, &hint, &list )) ) {
+               fprintf( stderr, "error from getaddrinfo: target=%s host=%s port=%s(port): error=(%d) %s\n", target, dstr, pstr, error, gai_strerror( error ) );
+       } else {
+               *rap = (struct sockaddr *) malloc(  list->ai_addrlen );         //  alloc a buffer and give address to caller 
+               memcpy( *rap, list->ai_addr, list->ai_addrlen  );
+
+               rlen = list->ai_addrlen;
+               
+               freeaddrinfo( list );           //  ditch system allocated memory 
+       }
+
+       free( dstr );
+       return rlen;
+}
+
+
+/* 
+       Given a source address convert from one form to another based on type constant.
+       Type const == AC_TODOT   Convert source address structure to human readable string.
+       Type const == AC_TOADDR6 Convert source string (host:port or ipv6 address [n:n...:n]:port) to an address struct
+       Type const == AC_TOADDR  Convert source string (host:port or ipv4 dotted decimal address) to an address struct
+*/
+extern int SIaddress( void *src, void **dest, int type ) {
+       struct sockaddr_in *addr;       //  pointer to the address 
+       unsigned char *num;             //  pointer at the address number 
+       char wbuf[256];                 //  work buffer 
+       int i;         
+       int     rlen = 0;                                       //  return len - len of address struct or string
+
+       switch( type ) {
+               case AC_TODOT:                                  //  convert from a struct to human readable "dotted decimal"
+                       addr = (struct sockaddr_in *) src;
+                       num = (char *) &addr->sin_addr.s_addr;    //  point at the long 
+
+                       if( addr->sin_family == AF_INET6 ) {
+                               sprintf( wbuf, "[%u:%u:%u:%u:%u:%u]:%d", 
+                                               *(num+0), *(num+1), *(num+2), 
+                                               *(num+3), *(num+4), *(num+5) , 
+                                               (int) ntohs( addr->sin_port ) );
+                       } else {
+                               sprintf( wbuf, "%u.%u.%u.%u;%d", *(num+0), *(num+1), *(num+2), *(num+3), (int) ntohs(addr->sin_port) );
+                       }
+
+                       *dest = (void *) strdup( wbuf );
+                       rlen = strlen( *dest );
+                       break;
+
+               case AC_TOADDR6:                        //  from hostname;port string to address for send etc 
+                       return SIgenaddr( src, PF_INET6, IPPROTO_TCP, SOCK_STREAM, (struct sockaddr **) dest );
+                       break; 
+
+               case AC_TOADDR:                         //  from dotted decimal to address struct ip4 
+                       return SIgenaddr( src, PF_INET, IPPROTO_TCP, SOCK_STREAM, (struct sockaddr **) dest );
+                       break;
+       }
+
+       return rlen;
+}
+
diff --git a/src/rmr/si/src/si95/sialloc.c b/src/rmr/si/src/si95/sialloc.c
new file mode 100644 (file)
index 0000000..194cd6d
--- /dev/null
@@ -0,0 +1,68 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2018-2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       SIalloc
+       Abstract:       Alloc and free message buffers.
+
+       Date:           2 January 2020
+       Author:         E. Scott Daniels
+*/
+
+#include "sisetup.h"        //  get the standard include stuff 
+
+/*
+       Alloc a buffer with enough room for size bytes in the payload.
+       Returns a pointer to the payload portion of the buffer or NULL
+       if there is an error.  Errno likely set to something meaningful
+       on error
+*/
+SIalloc_msg( int size ) {
+       int need;
+       tp_hdr_t*       tp_buf;
+
+       if( size <= 0 ) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       
+       need = size + sizeof( tp_hdr_t );
+       tp_buf = (tp_hdr_t *)   alloc( sizeof( char ) * need );
+       if( tp_buf ) {
+               memcpy( tp_buf->marker, "@!@!", 4 );
+               tp_buf->len = -1;
+       }
+       
+       return tp_buf
+}
+
+/*
+       Free the message. We assume the user programme is calling and passing
+       a pointer to the payload portion which needs to be "backed up" to the
+       real buffer start to free.
+*/
+SIfree_msg( void* vmsg ) {
+       char* msg;
+
+       if( (msg = (char *) vmsg) != NULL {
+               free( msg - sizeof( tp_hdr_t ) );
+       }
+}
diff --git a/src/rmr/si/src/si95/sibldpoll.c b/src/rmr/si/src/si95/sibldpoll.c
new file mode 100644 (file)
index 0000000..fda0921
--- /dev/null
@@ -0,0 +1,91 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+*****************************************************************************
+*  Mnemonic: SIbldpoll
+*  Abstract: This routine will fill in the read and write fdsets in the
+*            general info struct based on the current transport provider
+*            list. Those tb blocks that have something queued to send will
+*            be added to the write fdset. The fdcount variable will be set to
+*            the highest sid + 1 and it can be passed to the select system
+*            call when it is made.
+*  Parms:    gptr  - Pointer to the general info structure
+*  Returns:  Nothing
+*  Date:     26 March 1995
+*  Author:   E. Scott Daniels
+*
+***************************************************************************
+*/
+#include "sisetup.h"      //  get definitions etc 
+#include "sitransport.h"
+
+extern void SIbldpoll( struct ginfo_blk* gptr  ) {
+       struct tp_blk *tpptr;                                   //  pointer into tp list 
+       struct tp_blk *nextb;                                   //  pointer into tp list 
+
+
+// FIX ME?  we don't seem to see this flag set
+       //if( gptr->flags & GIF_SESS_CHANGE ) { // session changed, must rebuild the poll lists
+               gptr->fdcount = -1;                                     //  reset largest sid found 
+
+               FD_ZERO( &gptr->readfds );                      //  reset the read and write sets 
+               FD_ZERO( &gptr->writefds );
+               FD_ZERO( &gptr->execpfds );
+       
+               for( tpptr = gptr->tplist; tpptr != NULL; tpptr = nextb ) {
+                       nextb = tpptr->next;
+                       if( tpptr->flags & TPF_DELETE ) {
+                               SIterm( gptr, tpptr );
+                       } else {
+                               if( tpptr->fd >= 0 ) {                       //  if valid file descriptor 
+                                       if( tpptr->fd >= gptr->fdcount ) {      
+                                               gptr->fdcount = tpptr->fd + 1;     //  save largest fd (+1) for select 
+                                       }
+       
+                                       FD_SET( tpptr->fd, &gptr->execpfds );     //  set all fds for execpts 
+       
+                                       if( !(tpptr->flags & TPF_DRAIN) ) {                  //  if not draining 
+                                               FD_SET( tpptr->fd, &gptr->readfds );       //  set test for data flag 
+                                       }
+       
+                                       if( tpptr->squeue != NULL ) {                  //  stuff pending to send ? 
+                                               FD_SET( tpptr->fd, &gptr->writefds );   //  set flag to see if writable 
+                                       }
+                               }
+                       }
+
+/*
+                       memcpy( &gptr->readfds_qs, &gptr->readfds, sizeof( fd_set ) );          // stash for use until change
+                       memcpy( &gptr->writefds_qs, &gptr->writefds, sizeof( fd_set ) );
+                       memcpy( &gptr->execpfds_qs, &gptr->execpfds, sizeof( fd_set ) );
+*/
+
+                       gptr->flags &= ~GIF_SESS_CHANGE;
+               }
+/*
+       } else {                                        // sessions are the same we can just dup the quick sets we saved
+               memcpy( &gptr->readfds, &gptr->readfds_qs, sizeof( fd_set ) );
+               memcpy( &gptr->writefds, &gptr->writefds_qs, sizeof( fd_set ) );
+               memcpy( &gptr->execpfds, &gptr->execpfds_qs, sizeof( fd_set ) );
+       }
+*/
+
+}                                 //  SIbldpoll 
diff --git a/src/rmr/si/src/si95/sicbreg.c b/src/rmr/si/src/si95/sicbreg.c
new file mode 100644 (file)
index 0000000..9adb0c0
--- /dev/null
@@ -0,0 +1,52 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/* X
+****************************************************************************
+*
+*  Mnemonic: SIcbreg
+*  Abstract: This routine will register a callback in the table. Callbacks
+*            are "unregistered" by passing a null function pointer.
+*  Parms:    gptr - pointer to the general information block (SIHANDLE)
+*            type - Type of callback to register (SI_CB_xxxxx)
+*            fptr - Pointer to the function to register
+*            dptr - Pointer that the user wants the callback function to get
+*  Returns:  Nothing.
+*  Date:     23 January 1995
+*  Author:   E. Scott Daniels
+*
+****************************************************************************
+*/
+#include "sisetup.h"     //  get defs and stuff 
+
+extern void SIcbreg( struct ginfo_blk *gptr, int type, int ((*fptr)()), void * dptr ) {
+
+       if( gptr == NULL ) {
+               fprintf( stderr, "[ERR] SIcbreg -- gptr was nil\n" );
+               exit( 1 );
+       }
+
+        if( gptr->magicnum == MAGICNUM ) {                     //  valid block from user ? 
+               if( type >= 0 && type < MAX_CBS ) {             //  if cb type is in range 
+                       gptr->cbtab[type].cbdata = dptr;                //  put in data 
+                       gptr->cbtab[type].cbrtn = fptr;                 //  save function ptr  
+               }
+       }
+}
diff --git a/src/rmr/si/src/si95/sicbstat.c b/src/rmr/si/src/si95/sicbstat.c
new file mode 100644 (file)
index 0000000..9a3ed7f
--- /dev/null
@@ -0,0 +1,54 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/* X
+*****************************************************************************
+*
+*  Mnemonic: SIcbstat
+*  Abstract: This routine is responsible for the generic handling of
+*            the return status from a call back routine.
+*  Parms:    gptr - pointer to the ginfo block
+*            status - The status that was returned by the call back
+*            type   - Type of callback (incase unregister)
+*  Returns:  Nothing.
+*  Date:     23 January 1995
+*  Author:   E. Scott Daniels
+*
+*****************************************************************************
+*/
+#include "sisetup.h"     //  get necessary defs etc 
+
+extern void SIcbstat( struct ginfo_blk *gptr, int status, int type )
+{
+
+ switch( status )
+  {
+   case SI_RET_UNREG:                   //  unregister the callback 
+    gptr->cbtab[type].cbrtn = NULL;     //  no pointer - cannot call 
+    break;
+
+   case SI_RET_QUIT:                 //  callback wants us to stop 
+    gptr->flags |= GIF_SHUTDOWN;    //  so turn on shutdown flag 
+    break;
+
+   default:                 //  ignore the others 
+    break;
+  }   //  end switch 
+}         //  SIcbstat 
diff --git a/src/rmr/si/src/si95/siclose.c b/src/rmr/si/src/si95/siclose.c
new file mode 100644 (file)
index 0000000..cb5cf36
--- /dev/null
@@ -0,0 +1,96 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/* X
+******************************************************************************
+*
+*  Mnemonic: SIclose
+*  Abstract: This routine allows the user application to close a port
+*            associated with a file descriptor. The port is unbound from
+*            the transport providor even if it is marked as a listen
+*            port. If the fd passed in is less than 0 this routine assumes
+*            that the UDP port opened during init is to be closed (user never
+*            receives a fd on this one).
+*  Parms:    gptr - The pointer to the ginfo block (SIHANDLE to the user)
+*            fd   - FD to close.
+*  Returns:  SI_OK if all goes well, SI_ERROR with SIerrno set if there is
+*            a problem.
+*  Date:     3 February 1995
+*  Author:   E. Scott Daniels
+*
+*  Modified: 19 Feb 1995 - To set TP blk to drain if output pending.
+*            10 May 1995 - To change SOCK_RAW to SOCK_DGRAM
+*              22 Feb 2002 - To accept TCP_LISTEN_PORT or UDP_PORT as fd
+******************************************************************************
+*/
+#include "sisetup.h"
+
+extern int SIclose( struct ginfo_blk *gptr, int fd )
+{
+
+ struct tp_blk *tpptr;      //  pointer into tp list 
+ int status = SI_ERROR;     //  status of processing 
+
+ gptr->sierr = SI_ERR_HANDLE;
+ if( gptr->magicnum == MAGICNUM )   //  good cookie at the gptr address? 
+  {
+   gptr->sierr = SI_ERR_SESSID;
+
+   if( fd >= 0 )     //  if caller knew the fd number 
+    {
+     for( tpptr = gptr->tplist; tpptr != NULL && tpptr->fd != fd;
+          tpptr = tpptr->next );   //  find the tppblock to close 
+    }
+   else  //  user did not know the fd - find first Listener or UDP tp blk 
+   {
+       if( fd == TCP_LISTEN_PORT )                     //  close first tcp listen port; else first udp 
+               for( tpptr = gptr->tplist; tpptr != NULL && !(tpptr->flags&& TPF_LISTENFD); tpptr = tpptr->next );   
+       else
+               for( tpptr = gptr->tplist; tpptr != NULL && tpptr->type != SOCK_DGRAM; tpptr = tpptr->next );
+   }
+
+   if( tpptr != NULL )
+    {
+     gptr->sierr = SI_ERR_TP;
+
+     if( tpptr->squeue == NULL )   //  if nothing is queued to send... 
+      {
+       tpptr->flags |= TPF_UNBIND;   //  ensure port is unbound from tp 
+       tpptr->flags |= TPF_DELETE;
+       {
+               int x = 1;
+
+               setsockopt(tpptr->fd, SOL_SOCKET, SO_LINGER, (char *)&x, sizeof( x ) ) ;
+       }
+       close( tpptr->fd );
+       tpptr->fd = -1;
+       tpptr->type = -1;
+                       //  siterm now called in build poll if tp is marked delete 
+       // SIterm( gptr, gptr, tpptr );*/        /* cleanup and remove from the list 
+      }
+     else                              //  stuff queued to send - mark port to drain 
+      tpptr->flags |= TPF_DRAIN;   //  and we will term the port when q empty 
+
+     status = SI_OK;               //  give caller a good status 
+    }                              //  end if we found a tpptr 
+  }                                //  end if the handle was good 
+
+ return( status );                 //  send the status back to the caller 
+}                                  //  SIclose 
diff --git a/src/rmr/si/src/si95/siconnect.c b/src/rmr/si/src/si95/siconnect.c
new file mode 100644 (file)
index 0000000..e7fd7b3
--- /dev/null
@@ -0,0 +1,96 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+***************************************************************************
+*
+*  Mnemonic:   SIconnect
+*  Abstract:   Start a TCP/IP session with another process.
+*  Parms:    
+*              addr - Pointer to a string containing the process' address
+*                              The address is either ipv4 or ipv6 formmat with the
+*                              port number separated with a semicolon (::1;4444,
+*                              localhost;4444 climber;4444 129.168.0.4;4444).
+*  Returns:    The session number if all goes well, SI_ERROR if not.
+*
+*  Date:               March 1995
+*  Author:             E. Scott Daniels
+*
+*  Mod:                08 Mar 2007 - conversion of sorts to support ipv6
+******************************************************************************
+*/
+#include "sisetup.h"
+#include "sitransport.h"
+
+/*
+       Accept a file descriptor and add it to the map.
+*/
+extern void SImap_fd( struct ginfo_blk *gptr, int fd, struct tp_blk* tpptr ) {
+       if( fd < MAX_FDS ) {
+               gptr->tp_map[fd] = tpptr;
+       } else {
+               fprintf( stderr, "[WRN] fd on connected session is out of range: %d\n", fd );
+       }
+}
+
+extern int SIconnect( struct ginfo_blk *gptr, char *abuf ) {
+       int status;
+       struct tp_blk *tpptr;           //  pointer to new block 
+       struct sockaddr *taddr;         // convenience pointer to addr of target
+       int alen = 0;                                   //  len of address struct 
+       int fd = SI_ERROR;              //  file descriptor to return to caller 
+
+       if( PARINOID_CHECKS ) {
+               if( gptr == NULL ) {
+                       return SI_ERROR;
+               }
+
+               gptr->sierr = SI_ERR_HANDLE;
+               if( gptr->magicnum != MAGICNUM ) {              // no cookie -- no connection
+                       return SI_ERROR;
+               }
+       }
+
+       gptr->sierr = SI_ERR_TPORT;
+       tpptr = SIconn_prep( gptr, TCP_DEVICE, abuf, 0 );                       // create tp struct, and socket. get peer address 0 == any family that suits the addr
+       if( tpptr != NULL ) {
+               taddr = tpptr->paddr;
+               gptr->sierr = SI_ERR_TP;
+               errno = 0;
+               if( connect( tpptr->fd, taddr, tpptr->palen ) != 0 ) {
+                       close( tpptr->fd );                             //  clean up fd and tp_block 
+                       SItrash( TP_BLK, tpptr );               //  free the trasnsport block 
+                       fd = SI_ERROR;                                  //  send bad session id num back 
+               } else  {                                       //  connect ok 
+                       gptr->sierr = 0;
+                       tpptr->flags |= TPF_SESSION;                    //  indicate we have a session here 
+                       tpptr->next = gptr->tplist;                     //  add block to the list 
+                       if( tpptr->next != NULL ) {
+                               tpptr->next->prev = tpptr;              //  if there - point back at new 
+                       }
+
+                       gptr->tplist = tpptr;                           //  point at new head 
+                       fd = tpptr->fd;                                 //  save for return value 
+                       SImap_fd( gptr, fd, tpptr );
+               }
+       }
+
+       return fd;                    
+}                                 
diff --git a/src/rmr/si/src/si95/siconst.h b/src/rmr/si/src/si95/siconst.h
new file mode 100644 (file)
index 0000000..9bd32c8
--- /dev/null
@@ -0,0 +1,73 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+       Copyright (c) 2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+                http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+*****************************************************************************
+*
+*  Mnemonic: siconst.h
+*  Abstract: Private constants for SI functions.
+*
+*  Date:        26 March 1995
+*  Author:   E. Scott Daniels
+*
+*
+*****************************************************************************
+*/
+
+#ifndef _siconst_h
+#define _siconst_h
+
+#ifndef DEBUG
+#define DEBUG 0
+#endif 
+
+
+//#define EOS   '\000'          //  end of string marker 
+
+                                                               //  general info block flags 
+#define GIF_SHUTDOWN   0x01   //  shutdown in progress 
+#define GIF_NODELAY            0x02   //  set no delay flag on t_opens 
+#define GIF_SESS_CHANGE        0x04    // session list has changed; new poll list needed
+
+                                                               //  transmission provider block flags 
+#define TPF_LISTENFD   0x01   //  set on tp blk that is fd for tcp listens 
+#define TPF_SESSION            0x02   //  session established on this fd 
+#define TPF_UNBIND             0x04   //  SIterm should unbind the fd 
+#define TPF_DRAIN              0x08   //  session is being drained 
+#define TPF_DELETE             0x10    //  block is ready for deletion -- when safe 
+
+#define MAX_CBS                        8        //  number of supported callbacks in table 
+#define MAX_RBUF               8192   //  max size of receive buffer 
+#define MAX_FDS                        2048    // max number of file descriptors
+
+#define TP_BLK 0                        //  block types for rsnew 
+#define GI_BLK 1                        //  global information block 
+#define IOQ_BLK        2                        //  input output block 
+
+#define MAGICNUM       219             //  magic number for validation of things 
+
+#define AC_TODOT  0                     //  convert address to dotted decimal string 
+#define AC_TOADDR 1                     //  address conversion from dotted dec 
+#define AC_TOADDR6 2           //  ipv6 address conversion from string to addr struct 
+#define AC_TOADDR6_4BIND 3     //  ipv6 address conversion from string to addr struct suitible for bind 
+
+#define NO_EVENT 0                     //  look returns 0 if no event on a fd 
+
+#endif
diff --git a/src/rmr/si/src/si95/siestablish.c b/src/rmr/si/src/si95/siestablish.c
new file mode 100644 (file)
index 0000000..bdb3f72
--- /dev/null
@@ -0,0 +1,212 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+
+/*
+*-----------------------------------------------------------------------------------
+*
+* Mnemonic:            SIestablish
+* Abstract:i   Prep functions that set up a socket for listening or making a
+*                              connection.
+* Date:        26 March 1995
+* Author:      E. Scott Daniels
+*
+* Modified:    19 Apr 1995 - To keep returned address of the port.
+*                              08 Mar 2007 - conversion for ipv6.
+*                              12 Oct 2020 - split into connect prep and listen prep
+*                                                              functions.
+*-----------------------------------------------------------------------------------
+*/
+
+
+#include "sisetup.h"       //  include the necessary setup stuff 
+#include "sitransport.h"
+#include <errno.h>
+#include <netinet/tcp.h>
+
+#ifndef SO_REUSEPORT
+#define SO_REUSEPORT 0
+#endif
+
+/*
+       Prep a socket for "listening."
+       This routine will open a socket and bind an address to it in
+       preparation for listening for connections or inbound UDP
+       datagrams. A file descriptor for the socket is captured and all
+       related information is placed into a transport provider (tp) block.
+
+       Type is the SI constant UDP_DEVICE or TCP_DEVICE
+       abuf points to the address that is to be bound to the socket.
+       Family is one of the AF_* constants (AF_ANY, AF_INET or AF_INET6)
+
+       The address should be one of these forms:
+                       [::1]:port                      // v6 localhost device (loop back)
+                       localhost:port          // v4 or 6 loopback depending on /etc/hosts
+                       0.0.0.0:port            // any interface
+                       addr:port                       // an address assigned to one of the devices
+
+       Returns a transport struct which is the main context for the listener.
+*/
+extern struct tp_blk *SIlisten_prep( struct ginfo_blk *gptr, int type, char* abuf, int family ) {
+       struct tp_blk *tptr;         //  pointer at new tp block 
+       int status = SI_OK;             //  processing status 
+       struct sockaddr *addr;          //  IP address we are requesting 
+       int protocol;                //  protocol for socket call 
+       char buf[256];               //  buffer to build request address in 
+       int optval = 0;
+       int alen = 0;
+
+       tptr = (struct tp_blk *) SInew( TP_BLK );     //  new transport info block 
+
+       if( tptr != NULL )
+       {
+               addr = NULL;
+
+               switch( type )                  //  things specifc to tcp or udp 
+               {
+                       case UDP_DEVICE:
+                               tptr->type = SOCK_DGRAM;
+                               protocol = IPPROTO_UDP;
+                               break;
+
+                       case TCP_DEVICE:
+                       default:
+                               tptr->type = SOCK_STREAM;
+                               protocol = IPPROTO_TCP;
+               }
+
+               alen = SIgenaddr( abuf, protocol, family, tptr->type, &addr );  //  family == 0 for type that suits the address passed in 
+               if( alen <= 0 ) {
+                       return NULL;
+               }
+
+               tptr->family = addr->sa_family;
+
+               if( (tptr->fd = SOCKET( tptr->family, tptr->type, protocol )) >= SI_OK ) {
+                       optval = 1;
+                       if( SO_REUSEPORT ) {
+                               SETSOCKOPT(tptr->fd, SOL_SOCKET, SO_REUSEPORT, (char *)&optval, sizeof( optval) ) ;
+                       }
+
+                       status = BIND( tptr->fd, (struct sockaddr *) addr, alen );
+                       if( status == SI_OK ) {
+                               tptr->addr = addr;              //  save address 
+                       } else {
+                               //fprintf( stderr, ">>>>> siestablish: bind failed: fam=%d type=%d pro=%d %s\n", tptr->family, tptr->type, protocol, strerror( errno ) );
+                               close( tptr->fd );
+                       }
+               } else {
+                       status = ! SI_OK;                       //  force bad return later 
+                       //fprintf( stderr, ">>>>> siestablish: socket not esablished: fam=%d type=%d pro=%d %s\n", tptr->family, tptr->type, protocol, strerror( errno ) );
+               }
+
+               if( status != SI_OK ) {                         //  socket or bind call failed - clean up stuff 
+                       fprintf( stderr, ">>>>> siestablish: bad state -- returning nil pointer\n" );
+                       free( addr );
+                       SItrash( TP_BLK, tptr );        //  free the trasnsport block 
+                       tptr = NULL;                    //  set to return nothing 
+               }
+       }
+
+       return tptr;
+}
+
+/*
+       Prep a socket to use to connect to a listener.
+       Establish a transport block and target address in prep to connect.
+       Type is the SI constant UDP_DEVICE or TCP_DEVICE. The abuf pointer
+       should point to either a name:port or IP:port string. Family should
+       be 0 to select the family best suited to the address provided, or
+       any (v4 or v6) if the address is a name. If a perticular type is
+       desired family should be either AF_INET or AF_INET6.  Using a
+       family of 0 (AF_ANY) is usually the best choice.
+*/
+extern struct tp_blk *SIconn_prep( struct ginfo_blk *gptr, int type, char *abuf, int family ) {
+       struct tp_blk *tptr;         //  pointer at new tp block 
+       struct sockaddr *addr;          //  IP address we are requesting 
+       int protocol;                //  protocol for socket call 
+       char buf[256];               //  buffer to build request address in 
+       int optval = 0;
+       int alen = 0;
+
+       tptr = (struct tp_blk *) SInew( TP_BLK );     //  new transport info block 
+
+       if( tptr != NULL )
+       {
+               addr = NULL;
+
+               switch( type )                  //  things specifc to tcp or udp 
+               {
+                       case UDP_DEVICE:
+                               tptr->type = SOCK_DGRAM;
+                               protocol = IPPROTO_UDP;
+                               break;
+
+                       case TCP_DEVICE:
+                       default:
+                               tptr->type = SOCK_STREAM;
+                               protocol = IPPROTO_TCP;
+               }
+
+               alen = SIgenaddr( abuf, protocol, family, tptr->type, &addr );  //  family == 0 for type that suits the address passed in 
+               if( alen <= 0 )
+               {
+                       //fprintf( stderr, ">>>>> siconn_prep: error generating an address struct for %s(abuf) %d(proto) %d(type): %s\n",
+                       //      abuf, protocol, tptr->type, strerror( errno ) );
+                       return NULL;
+               }
+
+               tptr->family = addr->sa_family;
+               tptr->palen = alen;
+
+               if( (tptr->fd = SOCKET( tptr->family, tptr->type, protocol )) >= SI_OK ) {
+                       optval = 1;
+
+                       if( SO_REUSEPORT ) {
+                               SETSOCKOPT( tptr->fd, SOL_SOCKET, SO_REUSEPORT, (char *)&optval, sizeof( optval) );
+                       }
+
+                       if( gptr->tcp_flags & SI_TF_NODELAY ) {
+                               optval = 1;
+                       } else {
+                               optval = 0;
+                       }
+                       //fprintf( stderr, ">>>>> conn_prep: setting no delay = %d\n", optval );
+                       SETSOCKOPT( tptr->fd, SOL_TCP, TCP_NODELAY, (void *)&optval, sizeof( optval) ) ;
+
+                       if( gptr->tcp_flags & SI_TF_FASTACK ) {
+                               optval = 1;
+                       } else {
+                               optval = 0;
+                       }
+                       //fprintf( stderr, ">>>>> conn_prep: setting quick ack = %d\n", optval );
+                       SETSOCKOPT( tptr->fd, SOL_TCP, TCP_QUICKACK, (void *)&optval, sizeof( optval) ) ;
+
+                       tptr->paddr = addr;                             // tuck the remote peer address away
+               } else {
+                       //fprintf( stderr, ">>>>> conn_prep: bad socket create: %s\n", strerror( errno ) );
+                       free( addr );
+                       SItrash( TP_BLK, tptr );        //  free the trasnsport block 
+                       tptr = NULL;                                    // we'll return nil
+               }
+       }
+
+       return tptr;
+}
diff --git a/src/rmr/si/src/si95/sigetadd.c b/src/rmr/si/src/si95/sigetadd.c
new file mode 100644 (file)
index 0000000..5cf3795
--- /dev/null
@@ -0,0 +1,55 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+******************************************************************************
+*
+*  Mnemonic: SIgetaddr
+*  Abstract: This routine will get the address of the first listening
+*            block on the tp list and return it in ASCII format to the
+*            caller.
+*  Parms:    gptr - Pointer to the global information block
+*            buf  - Pointer to the buffer to hold the ascii string
+*  Returns:  NI_OK if block found, NI_ERROR if no listen block exists
+*  Date:     18 April 1995
+*  Author:   E. Scott Daniels
+*
+******************************************************************************
+*/
+#include "sisetup.h"        //  get the standard include stuff 
+
+extern int SIgetaddr( struct ginfo_blk *gptr, char *buf ) {
+       struct tp_blk *tpptr;       //  Pointer into tp list 
+       int status = SI_ERROR;       //  return status 
+       char    *ibuf;          //  SIaddr now points us at a string, rather than filling ours 
+
+       for( tpptr = gptr->tplist; tpptr != NULL && !(tpptr->flags & TPF_LISTENFD);
+               tpptr = tpptr->next );
+
+       if( tpptr != NULL )
+       {
+               SIaddress( tpptr->addr, (void *) &ibuf, AC_TODOT );   //  convert to dot fmt 
+               strcpy( buf, ibuf );                            //  copy into caller's buffer 
+               free( ibuf );
+               status = SI_OK;                               //  ok status for return 
+       }
+
+       return status;
+}          
diff --git a/src/rmr/si/src/si95/sigetname.c b/src/rmr/si/src/si95/sigetname.c
new file mode 100644 (file)
index 0000000..04494eb
--- /dev/null
@@ -0,0 +1,41 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+ ------------------------------------------------------------------------
+ Mnemonic:     sigetname
+ Abstract:     returns the name of the socket for a given sid
+ Parms:                sid - the socket id as returned by open/listen/connect
+ Date:         21 July 2003 
+ Author:       E. Scott Daniels
+ ------------------------------------------------------------------------
+*/
+#include "sisetup.h"
+
+extern char *sigetname( int sid ) { 
+       struct sockaddr oaddr;     //  pointer to address in TCP binary format 
+       char    *buf;
+       int     len;
+
+       len = sizeof( oaddr );
+       getsockname( sid, &oaddr, &len );
+       SIaddress(  &oaddr, (void **) &buf, AC_TODOT );
+       return buf;
+}
diff --git a/src/rmr/si/src/si95/siinit.c b/src/rmr/si/src/si95/siinit.c
new file mode 100644 (file)
index 0000000..80bfbdc
--- /dev/null
@@ -0,0 +1,127 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+**************************************************************************
+*  Mnemonic: SIinitialise
+*  Abstract: Initialisation and other context management functions.
+*          
+*  Date:     26 March 1995
+*  Author:   E. Scott Daniels
+*
+*  Mod:                17 FEB 2002 - To convert to a globally managed gpointer 
+*                      09 Mar 2007 - To allow for ipv6 (added SIinitialise() to 
+*                              replace SIinit())
+**************************************************************************
+*/
+#include  "sisetup.h"     //  get the setup stuff 
+
+/*
+       Initialise the SI environment. Specifically:
+               allocate the global info block (context)
+
+       Returns a pointer to the block or nil on failure.
+       On failure errno should indicate the problem.
+*/
+extern struct ginfo_blk* SIinitialise( int opts )
+{
+       struct ginfo_blk *gptr = NULL;  //  pointer at gen info blk 
+       int     status = SI_OK;                 //  status of internal processing 
+       struct  tp_blk *tpptr;          //  pointer at tp stuff 
+       struct  sigaction sact;                 //  signal action block 
+       int     i;                                                      //  loop index 
+       int     signals = SI_DEF_SIGS;          //  signals to be set in SIsetsig 
+
+       errno = ENOMEM;
+
+       if( (gptr = SInew( GI_BLK )) != NULL ) {                //  make our context
+               gptr->rbuf = (char *) malloc( MAX_RBUF );   //  get rcv buffer
+               gptr->rbuflen = MAX_RBUF;
+               gptr->tp_map = (struct tp_blk **) malloc( sizeof( struct tp_blk *) * MAX_FDS );
+               if( gptr->tp_map == NULL ) {
+                       fprintf( stderr, "SIinit: unable to initialise tp_map: no memory\n" );
+                       free( gptr );
+                       return NULL;
+               }
+               memset( gptr->tp_map, 0, sizeof( struct tp_blk *) * MAX_FDS );
+
+               gptr->sierr = SI_ERR_TPORT;
+       
+               gptr->cbtab = (struct callback_blk *) malloc(
+                       (sizeof( struct callback_blk ) * MAX_CBS ) );
+               if( gptr->cbtab != NULL ) {
+                       for( i = 0; i < MAX_CBS; i++ ) {     //  initialize callback table 
+                               gptr->cbtab[i].cbdata = NULL;    //  no data and no functions 
+                               gptr->cbtab[i].cbrtn = NULL;
+                       }
+               } else {                 //  if call back table allocation failed - error off 
+                       SIshutdown( gptr );  //  clean up any open fds 
+                       free( gptr );
+                       gptr = NULL;       //  dont allow them to continue 
+               }
+
+               gptr->sierr = SI_OK;
+       }                     //  end if gen infor block allocated successfully 
+
+       
+       memset( &sact, 0, sizeof( sact ) );
+       sact.sa_handler = SIG_IGN;
+       sigaction( SIGPIPE, &sact, NULL );              // ignore pipe signals as for some bloody reason linux sets this off if write to closed socket
+
+       return gptr;            //  all's well that ends well 
+} 
+
+/*
+       This will set all of the tcp oriented flags in mask (SI_TF_* constants).
+*/
+extern void SIset_tflags( struct ginfo_blk* gp, int mask )  {
+       if( gp != NULL ) {
+               gp->tcp_flags |= mask;
+       }
+}
+
+/*
+       This will clear all tcp oriented flags set in mask.
+*/
+extern void SIclr_tflags( struct ginfo_blk* gp, int mask )  {
+       if( gp != NULL ) {
+               gp->tcp_flags &= ~mask;
+       }
+}
+
+/*
+       Dump stats to stderr.
+
+       NOTE:  the receive stats are the number of times that wait popped for
+               a file descriptor and NOT the actual number of RMR messages which
+               were contained.  Thus it is VERY likely that the receive count
+               reported will not match the number of actual messages sent. These
+               counts should be used only to track activity on a socket.
+*/
+extern void SItp_stats( void *vgp ) {
+       struct ginfo_blk* gp;
+       struct tp_blk* tp;
+
+       if( (gp = (struct ginfo_blk *) vgp) != NULL ) {
+               for( tp = gp->tplist; tp != NULL; tp = tp->next ) {
+                       fprintf( stderr, "[DBUG] si95: tp: fd=%d sent=%lld rcvd=%lld qc=%lld\n", tp->fd, tp->sent, tp->rcvd, tp->qcount );
+               }
+       }
+}
diff --git a/src/rmr/si/src/si95/silisten.c b/src/rmr/si/src/si95/silisten.c
new file mode 100644 (file)
index 0000000..c7151d4
--- /dev/null
@@ -0,0 +1,78 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+******************************************************************************
+*
+*  Mnemonic:   SIlistener
+*  Abstract:   Open a port on which connection requests (TCP) or datagrams (UDP)
+*                              can be received. SIlistener will open the ipv4/6 port based on
+*                              the address buffer passed in. The listener() obsoletes SIopen()
+*                              with regard to opening udp ports.
+*                              Allows the user to open multiple secondary listening ports
+*  Parms:      type - TCP_DEVICE or UDP_DEVICE
+*                              abuf - buffer containing either 0.0.0.0;port or ::1;port
+*
+*  Returns:    The file descriptor of the port, <0 if error
+*  Date:       26 March 1995 -- revised 13 Mar 2007 to support both ipv4 and 6
+*  Author:     E. Scott Daniels
+*
+*  Modified:   10 May 1995 - To change SOCK_RAW to SOCK_DGRAM
+*                              14 Mar 2007 - To enhance for ipv6
+******************************************************************************
+*/
+#include "sisetup.h"
+#include "sitransport.h"
+
+extern int SIlistener( struct ginfo_blk *gptr, int type, char *abuf ) {
+       struct tp_blk *tpptr;                   //  pointer into tp list
+       int status = SI_ERROR;                  //  status of processing
+
+       if( PARINOID_CHECKS ) {
+               if( gptr == NULL ) {
+                       return status;
+               }
+               gptr->sierr = SI_ERR_HANDLE;
+               if( gptr->magicnum != MAGICNUM )                        //  good cookie at the gptr address?
+                       return status;
+       }
+
+       gptr->sierr = SI_ERR_TP;
+       tpptr = SIlisten_prep( gptr, type, abuf, 0 );
+
+       if( tpptr != NULL )                          //  established a fd bound to the port ok
+       {                                               //  enable connection reqs
+               if( type == TCP_DEVICE )
+               {
+                       if( (status = LISTEN( tpptr->fd, 1 )) < SI_OK )
+                               return SI_ERROR;
+
+                       tpptr->flags |= TPF_LISTENFD;          //  flag it so we can search it out if needed
+               }
+
+               tpptr->next = gptr->tplist;             //  add to the list
+               if( tpptr->next != NULL )
+                       tpptr->next->prev = tpptr;
+               gptr->tplist = tpptr;
+               status = tpptr->fd;                     //  return the fd of the listener
+       }
+
+       return status;
+}
diff --git a/src/rmr/si/src/si95/sinew.c b/src/rmr/si/src/si95/sinew.c
new file mode 100644 (file)
index 0000000..652a532
--- /dev/null
@@ -0,0 +1,88 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+*******************************************************************************
+*
+*  Mnemonic: SInew
+*  Abstract: This routine is responsible for alocating a new block based on
+*            the block type and initializing it.
+*  Parms:    type - Block id to create
+*  Returns:  Pointer to the new block or NULL if not successful
+*  Date:     26 March 1995
+*  Author:   E. Scott Daniels
+*  Mod:                22 Feb 2002 - To ensure new field in tp block is initialised
+*
+******************************************************************************
+*/
+#include "sisetup.h"
+
+extern void *SInew( int type ) {
+       void *retptr;                  //  generic pointer for return
+       struct tp_blk *tpptr;          //  pointer at a new tp block
+       struct ginfo_blk *gptr;        //  pointer at gen info blk
+       struct ioq_blk *qptr;          //  pointer to an I/O queue block
+
+ switch( type ) {
+               case IOQ_BLK:              //  make an I/O queue block
+                       if( (qptr = (struct ioq_blk *) malloc( sizeof( struct ioq_blk) )) != NULL ) {
+                               qptr->addr = NULL;
+                               qptr->next = NULL;
+                               qptr->data = NULL;
+                               qptr->dlen = 0;
+                       }
+                       retptr = (void *) qptr;    //  set pointer for return
+                       break;
+
+               case TP_BLK:
+                       if( (tpptr = (struct tp_blk *) malloc( sizeof( struct tp_blk ) )) != NULL ) {
+                               memset( tpptr, 0, sizeof( *tpptr ) );
+                               tpptr->fd = -1;
+                               tpptr->type = -1;
+                               tpptr->flags = TPF_UNBIND;   //  default to unbind on termination
+                       }
+                       retptr = (void *) tpptr;   //  setup for later return
+                       break;
+
+               case GI_BLK:                //  create global info block
+                       if( (gptr = (struct ginfo_blk *) malloc( sizeof( struct ginfo_blk ) )) != NULL ) {
+                               memset( gptr, 0, sizeof( *gptr ) );
+
+                               gptr->magicnum = MAGICNUM;   //  inidicates valid block
+                               gptr->flags = 0;
+                               gptr->tplist = NULL;
+                               FD_ZERO( &gptr->readfds);      //  clear the fdsets
+                               FD_ZERO( &gptr->writefds) ;
+                               FD_ZERO( &gptr->execpfds );
+                               gptr->rbuf = NULL;             //  no read buffer
+                               gptr->cbtab = NULL;
+                               gptr->rbuflen = 0;
+                       }
+
+               retptr = (void *) gptr;    //  set up for return at end
+               break;
+
+               default:
+                       retptr = NULL;           //  bad type - just return null
+               break;
+       }                          //  end switch
+
+       return( retptr );           //  send back the new pointer
+}
diff --git a/src/rmr/si/src/si95/sinewses.c b/src/rmr/si/src/si95/sinewses.c
new file mode 100644 (file)
index 0000000..0b00ba4
--- /dev/null
@@ -0,0 +1,122 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+*****************************************************************************
+*
+*  Mnemonic:   SInewsession
+*  Abstract:   This routine can be called when a request for connection is
+*                              received. It will establish a new fd, create a new
+*                              transport provider block (added to the head of the list).
+*                              The security callback and connection callback
+*                              routines are driven from this routine.
+*  Parms:    gptr - Pointer to the general information block
+*            tpptr- Pointer to the tp block that describes the fd that
+*                   received the connection request (the listen tp block).
+*  Returns:  SI_OK if all went well, SI_ERROR if not.
+*  Date:     26 March 1995
+*  Author:   E. Scott Daniels
+*
+******************************************************************************
+*/
+#include "sisetup.h"          //  get necessary defs etc
+#include "sitransport.h"
+#include <netinet/tcp.h>
+
+extern int SInewsession( struct ginfo_blk *gptr, struct tp_blk *tpptr ) {
+       struct sockaddr *addr;             //  pointer to address of caller
+       struct spxopt_s *sopts;            //  pointer to spx options
+       struct tp_blk *newtp;              //  pointer at new tp block
+       int status = SI_OK;                //  processing status
+       int (*cbptr)();                    //  pointer to callback function
+       unsigned int addrlen;                           //  length of address from accept
+       char *buf = NULL;                                       //  pointer to address
+       int optval;
+
+       addr = (struct sockaddr *) malloc( sizeof( struct sockaddr ) );
+       addrlen = sizeof( struct sockaddr );
+
+       status = accept( tpptr->fd, addr, &addrlen );   //  accept and assign new fd (status)
+       if( status < 0 ) {
+               free( addr );
+               return SI_ERROR;
+       }
+
+       newtp = SInew( TP_BLK );                              //  get a new tp block for the session
+       if( newtp == NULL ) {
+               close( status );                                                // must disconnect the other side
+               free( addr );
+               return SI_ERROR;
+       }
+
+       newtp->next = gptr->tplist;                                     //  add new block to the head of the list
+       if( newtp->next != NULL ) {
+               newtp->next->prev = newtp;                              //  back chain to us
+       }
+       gptr->tplist = newtp;
+       newtp->paddr = (struct sockaddr *) addr;        //  partner address
+       newtp->fd = status;                         //  save the fd from accept
+       //fprintf( stderr, ">>>>> newsession: accepted session on fd %d\n", status );
+
+       if( gptr->tcp_flags & SI_TF_NODELAY ) {
+               optval = 1;
+       } else {
+               optval = 0;
+       }
+       //fprintf( stderr, ">>>>> newsession: setting no delay = %d\n", optval );
+       SETSOCKOPT( tpptr->fd, SOL_TCP, TCP_NODELAY, (void *)&optval, sizeof( optval) ) ;
+
+
+       if( gptr->tcp_flags & SI_TF_FASTACK ) {
+               optval = 1;
+       } else {
+               optval = 0;
+       }
+       //fprintf( stderr, ">>>>> conn_prep: setting quick ack = %d\n", optval );
+       SETSOCKOPT( tpptr->fd, SOL_TCP, TCP_QUICKACK, (void *)&optval, sizeof( optval) ) ;
+
+       SIaddress( addr, (void **) &buf, AC_TODOT );                            // get addr of remote side
+       if( (cbptr = gptr->cbtab[SI_CB_SECURITY].cbrtn) != NULL ) {     //   invoke the security callback function if there
+               status = (*cbptr)( gptr->cbtab[SI_CB_SECURITY].cbdata, buf );
+               if( status == SI_RET_ERROR ) {                                                                          //  session to be rejected
+                       SIterm( gptr, newtp );                                                                                  //  terminate new tp block
+                       SItrash( TP_BLK, newtp );
+                       free( addr );
+                       free( buf );
+                       return SI_ERROR;
+               } else {
+                       SIcbstat( gptr, status, SI_CB_SECURITY );               //  allow for unreg or shutdown signal
+               }
+       }
+
+       newtp->flags |= TPF_SESSION;     //  indicate a session here
+
+       //fprintf( stderr, ">>>> pending connection callback for: %s\n", buf );
+       if( (cbptr = gptr->cbtab[SI_CB_CONN].cbrtn) != NULL ) {         // drive connection callback
+               status=(*cbptr)( gptr->cbtab[SI_CB_CONN].cbdata, newtp->fd, buf );
+               SIcbstat(  gptr, status, SI_CB_CONN );               //  handle status
+       }
+
+       gptr->flags |= GIF_SESS_CHANGE;                 // sessions changed must rebuild the poll list
+       SImap_fd( gptr, newtp->fd, newtp );             // add fd to the map
+
+       return SI_OK;
+}
+
diff --git a/src/rmr/si/src/si95/sipoll.c b/src/rmr/si/src/si95/sipoll.c
new file mode 100644 (file)
index 0000000..df71eae
--- /dev/null
@@ -0,0 +1,201 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+**************************************************************************
+*  Mnemonic: SIpoll
+*  Abstract: This routine will poll the sockets that are open for
+*            an event and return after the delay period has expired, or
+*            an event has been processed.
+*  Parms:    gptr   - Pointer to the global information block
+*            msdelay- 100ths of seconds to delay
+*  Returns:  SI_OK if the caller can continue, SI_ERROR if all sessions have been
+*            stopped, or the interface cannot proceed. When SI_ERROR is
+*            returned the caller should cleanup and exit immediatly (we
+*            have probably received a sigter or sigquit.
+*  Date:     10 April 1995
+*  Author:   E. Scott Daniels
+*
+**************************************************************************
+*/
+#include  "sisetup.h"     //  get the setup stuff
+#include "sitransport.h"
+#include <wait.h>
+
+
+extern int SIpoll( struct ginfo_blk *gptr, int msdelay )
+{
+ //extern int deaths;       //  number of children that died and are zombies
+ //extern int sigflags;     //  flags set by the signal handler routine
+
+ int fd;                       //  file descriptor for use in this routine
+ int ((*cbptr)());             //  pointer to callback routine to call
+ int status = SI_OK;              //  return status
+ int addrlen;                  //  length of address from recvfrom call
+ char *buf;                   //  work buffer pointer
+ char ibuf[1025];
+ int i;                        //  loop index
+ struct tp_blk *tpptr;         //  pointer at tp stuff
+ struct tp_blk *nextone;        //  pointer at next block to process
+ int pstat;                    //  poll status
+ int kstat;                    //  keyboard status
+ struct timeval  delay;        //  delay to use on select call
+ struct sockaddr *uaddr;       //  pointer to udp address
+
+ gptr->sierr = SI_ERR_SHUTD;
+
+ if( gptr->flags & GIF_SHUTDOWN )     //  cannot do if we should shutdown
+  return( SI_ERROR );                    //  so just get out
+
+
+ gptr->sierr = SI_ERR_HANDLE;
+
+ if( gptr->magicnum != MAGICNUM )     //  if not a valid ginfo block
+  return( SI_ERROR );
+
+   delay.tv_sec = msdelay/100;                //  user submits 100ths, cvt to seconds and milliseconds
+   delay.tv_usec = (msdelay%100) * 10;
+
+
+   SIbldpoll( gptr );                 //  build the fdlist for poll
+   pstat = 0;                         //  ensure good code
+
+   if( gptr->fdcount > 0 )
+    pstat = select( gptr->fdcount, &gptr->readfds, &gptr->writefds,
+                               &gptr->execpfds, &delay );
+
+   if( (pstat < 0 && errno != EINTR)  )
+    {                             //  poll fail or termination signal rcvd
+     gptr->fdcount = 0;           //  prevent trying to look at a session
+     gptr->flags |= GIF_SHUTDOWN; //  cause cleanup and exit at end
+     //deaths = 0;                  //  dont need to issue waits on dead child
+     //sigflags = 0;                //  who cares about signals now too
+    }
+
+/*
+   while( deaths > 0 )  //  there have been death(s) - keep the dead
+    {                   //  from being zombies - send them to heaven
+     wait( NULL );                       //  issue wait on child
+     deaths--;
+    }                   //  end while dead children to send to heaven
+*/
+
+/*
+  if( sigflags &&        //  if signal received and processing them
+     (cbptr = gptr->cbtab[SI_CB_SIGNAL].cbrtn) != NULL )
+   {
+    while( sigflags != 0 )
+     {
+      i = sigflags;                  //  hold for call
+      sigflags = 0;                  //  incase we are interrupted while away
+      status = (*cbptr)( gptr->cbtab[SI_CB_SIGNAL].cbdata, i );
+      SIcbstat( gptr, status, SI_CB_SIGNAL );    //  handle status
+     }                                           //  end while
+   }
+*/
+
+   if( pstat > 0  &&  (! (gptr->flags & GIF_SHUTDOWN)) )
+    {
+     if( FD_ISSET( 0, &gptr->readfds ) )       //  check for keybd input
+      {
+       fgets( ibuf, 1024, stdin );   //  get the stuff from keyboard
+       if( (cbptr = gptr->cbtab[SI_CB_KDATA].cbrtn) != NULL )
+        {
+         status = (*cbptr)( gptr->cbtab[SI_CB_KDATA].cbdata, ibuf );
+         SIcbstat(  gptr, status, SI_CB_KDATA );    //  handle status
+        }                                 //  end if call back was defined
+      }
+
+     // for( tpptr = gptr->tplist; tpptr != NULL; tpptr = tpptr->next )
+     for( tpptr = gptr->tplist; tpptr != NULL; tpptr = nextone )
+      {
+       nextone = tpptr->next;                                  //  prevent coredump if we delete the session
+
+       if( tpptr->squeue != NULL && (FD_ISSET( tpptr->fd, &gptr->writefds )) )
+        SIsend( gptr, tpptr );              //  send if clear to send
+
+       if( FD_ISSET( tpptr->fd, &gptr->execpfds ) )
+        {
+         ; //  sunos seems to set except for unknown reasons; ignore
+        }
+       else
+       if( FD_ISSET( tpptr->fd, &gptr->readfds ) )  //  read event pending?
+        {
+         fd = tpptr->fd;                     //  quick ref to the fd
+
+         if( tpptr->flags & TPF_LISTENFD )     //  listen port setup by init?
+          {                                    //  yes-assume new session req
+           status = SInewsession( gptr, tpptr );    //  make new session
+          }
+         else                              //  data received on a regular port
+          if( tpptr->type == SOCK_DGRAM )          //  udp socket?
+           {
+            uaddr = (struct sockaddr *) malloc( sizeof( struct sockaddr ) );
+            status = RECVFROM( fd, gptr->rbuf, MAX_RBUF, 0, uaddr, &addrlen );
+            if( status >= 0 && ! (tpptr->flags & TPF_DRAIN) )
+             {                                                          //  if good status call cb routine
+              if( (cbptr = gptr->cbtab[SI_CB_RDATA].cbrtn) != NULL )
+               {
+                SIaddress( uaddr, (void **) &buf, AC_TODOT );
+                status = (*cbptr)( gptr->cbtab[SI_CB_RDATA].cbdata, gptr->rbuf, status, buf );
+                SIcbstat( gptr, status, SI_CB_RDATA );    //  handle status
+               free( buf );
+               }                              //  end if call back was defined
+             }                                //  end if status was ok
+            free( uaddr );
+           }                                  //  end if udp
+          else
+           {                                //  else receive on tcp session
+            status = RECV( fd, gptr->rbuf, MAX_RBUF, 0 );    //  read data
+
+            if( status > SI_OK  &&  ! (tpptr->flags & TPF_DRAIN) )
+             {
+              if( (cbptr = gptr->cbtab[SI_CB_CDATA].cbrtn) != NULL )
+               {
+                status = (*cbptr)( gptr->cbtab[SI_CB_CDATA].cbdata, fd, gptr->rbuf, status );
+                SIcbstat( gptr, status, SI_CB_CDATA );   //  handle cb status
+               }                            //  end if call back was defined
+             }                                     //  end if status was ok
+            else   //  sunos seems to send 0 bytes as indication of disc
+             {
+              if( (cbptr = gptr->cbtab[SI_CB_DISC].cbrtn) != NULL )
+               {
+                status = (*cbptr)( gptr->cbtab[SI_CB_DISC].cbdata, tpptr->fd );
+                SIcbstat( gptr, status, SI_CB_DISC );    //  handle status
+               }
+              SIterm( gptr, tpptr );
+            }
+           }                                                //  end tcp read
+        }                    //  end if event on this fd
+      }                      //  end for each fd in the list
+    }                        //  end if not in shutdown
+
+
+ if( gptr->flags & GIF_SHUTDOWN )      //  we need to stop for some reason
+  {
+   gptr->sierr = SI_ERR_SHUTD;        //  indicate error exit status
+   status = SI_ERROR;                //  status should indicate to user to die
+   SIshutdown( gptr );            //  clean things up
+  }
+ else
+  status = SI_OK;                    //  user can continue to process
+
+ return( status );                //  send status back to caller
+}                                 //  SIwait
diff --git a/src/rmr/si/src/si95/siproto.h b/src/rmr/si/src/si95/siproto.h
new file mode 100644 (file)
index 0000000..bb94d2a
--- /dev/null
@@ -0,0 +1,60 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+-----------------------------------------------------------------------------------
+       Mnemonic: siproto.h
+       Abstract: Prototypes for SI95
+
+       Date:     24 October 2019
+       Author:   E. Scott Daniels
+
+-----------------------------------------------------------------------------------
+*/
+
+
+extern void *SInew( int type );
+extern char *sigetname( int sid );
+extern int SIaddress( void *src, void **dest, int type );
+extern void SIbldpoll( struct ginfo_blk* gptr  );
+extern struct tp_blk *SIconn_prep( struct ginfo_blk *gptr, int type, char *abuf, int family );
+extern void SIcbreg( struct ginfo_blk *gptr, int type, int ((*fptr)()), void * dptr );
+extern void SIcbstat( struct ginfo_blk *gptr, int status, int type );
+extern int SIclose( struct ginfo_blk *gptr, int fd );
+extern int SIconnect( struct ginfo_blk *gptr, char *abuf );
+extern struct tp_blk *SIestablish( int type, char *abuf, int family );
+extern int SIgenaddr( char *target, int proto, int family, int socktype, struct sockaddr **rap );
+extern int SIgetaddr( struct ginfo_blk *gptr, char *buf );
+extern struct tp_blk *SIlisten_prep( struct ginfo_blk *gptr, int type, char* abuf, int family );
+extern int SIlistener( struct ginfo_blk *gptr, int type, char *abuf );
+extern void SImap_fd( struct ginfo_blk *gptr, int fd, struct tp_blk* tpptr );
+extern int SInewsession( struct ginfo_blk *gptr, struct tp_blk *tpptr );
+extern int SIpoll( struct ginfo_blk *gptr, int msdelay );
+extern int SIrcv( struct ginfo_blk *gptr, int sid, char *buf, int buflen, char *abuf, int delay );
+extern void SIsend( struct ginfo_blk *gptr, struct tp_blk *tpptr );
+extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen );
+extern void SIset_tflags( struct ginfo_blk* gp, int flags );
+extern int SIshow_version( );
+extern void SIshutdown( struct ginfo_blk *gptr );
+extern void SItp_stats( void *vgp );
+extern void SIterm( struct ginfo_blk* gptr, struct tp_blk *tpptr );
+extern void SItrash( int type, void *bp );
+extern int SIwait( struct ginfo_blk *gptr );
+extern struct ginfo_blk* SIinitialise( int opts );
diff --git a/src/rmr/si/src/si95/sircv.c b/src/rmr/si/src/si95/sircv.c
new file mode 100644 (file)
index 0000000..2202629
--- /dev/null
@@ -0,0 +1,139 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/* X
+*****************************************************************************
+*
+*  Mnemonic: SIrcv
+*  Abstract: This routine allows the user program to receive data on a
+*            session without using the callback structure of the library.
+*            It is the caller's responsibility to provide a buffer large
+*            enough to handle the received data.
+*  Parms:    gptr - The SIHANDLE that the user received on init call
+*            sid  - The session id that the user wants to check
+*            buf  - Pointer to buffer to receive data in
+*            abuf - Pointer to buffer to return address of UDP sender in (!null)
+*            buflen-Length of the receive buffer
+*            delay- Value to pass to poll (time out) -1 == block until data
+*  Returns:  SI_ERROR - (SIerrno will contain reason) if failure, else the
+*            number of bytes read. If the number read is 0 SIerrno will indicate
+*            why: time out exceeded, signal received.
+*  Date:     26 March 1995
+*  Author:   E. Scott Daniels
+*  Mods:     26 Mar 20001 - Changed to support UDP reads
+*
+******************************************************************************
+*/
+#include "sisetup.h"    //  get start up stuff 
+#include "sitransport.h"
+
+extern int SIrcv( struct ginfo_blk *gptr, int sid, char *buf, int buflen, char *abuf, int delay ) {
+ //extern int sigflags;           //  signal flags 
+ int status = SI_ERROR;         //  assume the worst to return to caller 
+ struct tp_blk *tpptr;          //  pointer to transport provider info 
+ int flags = 0;                 //  receive flags 
+ int remainder;                 //  # of bytes remaining after rcv if more 
+ fd_set readfds;                //  special set of read fds for this call 
+ fd_set execpfds;               //  special set of read fds for this call 
+ struct timeval *tptr = NULL;   //  time info for select call 
+ struct timeval time;
+ struct sockaddr *uaddr;       //  pointer to udp address 
+       char    *acbuf;         //  pointer to converted address 
+ int addrlen;
+
+ gptr->sierr = SI_ERR_HANDLE;              //  set errno before we fail 
+ if( gptr->magicnum != MAGICNUM )     //  if not a valid ginfo block 
+  return SI_ERROR;
+
+ gptr->sierr = SI_ERR_SESSID;             //  set errno before we fail 
+ for( tpptr = gptr->tplist; tpptr != NULL && tpptr->fd != sid;
+      tpptr = tpptr->next );      //  find transport block 
+ if( tpptr == NULL )
+  return SI_ERROR;                      //  signal bad block 
+
+ uaddr = (struct sockaddr *) malloc( sizeof( struct sockaddr ) );
+ addrlen = sizeof( *uaddr );
+
+ gptr->sierr = SI_ERR_SHUTD;               //  set errno before we fail 
+ if( ! (gptr->flags & GIF_SHUTDOWN) )
+  {                        //  if not in shutdown and no signal flags  
+   FD_ZERO( &readfds );               //  clear select info 
+   FD_SET( tpptr->fd, &readfds );     //  set to check read status 
+
+   FD_ZERO( &execpfds );               //  clear select info 
+   FD_SET( tpptr->fd, &execpfds );     //  set to check read status 
+
+   if( delay >= 0 )                //  user asked for a fininte time limit 
+    {
+     tptr = &time;                 //  point at the local struct 
+     tptr->tv_sec = 0;             //  setup time for select call 
+     tptr->tv_usec = delay;
+    }
+
+   gptr->sierr = SI_ERR_TP;
+   if( (select( tpptr->fd + 1, &readfds, NULL, &execpfds, tptr ) < 0 ) )
+    gptr->flags |= GIF_SHUTDOWN;     //  we must shut on error or signal 
+   else
+    {                                //  poll was successful - see if data ? 
+     gptr->sierr = SI_ERR_TIMEOUT;
+     if( FD_ISSET( tpptr->fd, &execpfds ) )   //  session error? 
+      {
+       SIterm( gptr, tpptr );                 //  clean up our end of things 
+       gptr->sierr = SI_ERR_SESSID;               //  set errno before we fail 
+      }
+     else
+      {
+       if( (FD_ISSET( tpptr->fd, &readfds )) )
+        {                                       //  process data if no signal 
+               gptr->sierr = SI_ERR_TP;
+               if( tpptr->type == SOCK_DGRAM )        //  raw data received 
+               {
+                       status = RECVFROM( sid, buf, buflen, 0, uaddr, &addrlen );
+                       if( abuf )
+                       {
+                               SIaddress( uaddr, (void **) &acbuf, AC_TODOT ); //  address returns pointer to buf now rather than filling 
+                               strcpy( abuf, acbuf );                  //  must be back compat with old versions 
+                               free( acbuf );
+                       }
+                       if( status < 0 )                        //  session terminated? 
+                               SIterm( gptr, tpptr );                   //  so close our end 
+               }
+               else                                      //  cooked data received 
+               {
+                       status = RECV( sid, buf, buflen, 0 );   //  read data into user buf 
+                       if( status < 0 )                        //  session terminated? 
+                               SIterm( gptr, tpptr );                 //  so close our end 
+               }
+        }                                         //  end event was received 
+       else                                       //  no event was received  
+        status = 0;                               //  status is just ok 
+      }                       //  end else - not in shutdown mode after poll 
+    }                     //  end else pole was successful 
+  }                                 //  end if not already signal shutdown 
+
+ if( gptr->flags & GIF_SHUTDOWN  &&  gptr->tplist != NULL )
+  {             //  shutdown received but sessions not cleaned up 
+   SIshutdown( gptr );
+   status = SI_ERROR;                //  indicate failure on return 
+  }                                  //  end if shut but not clean 
+
+ free( uaddr );
+ return status;          //  send back the status 
+}                           //  SIrcv 
diff --git a/src/rmr/si/src/si95/sisend.c b/src/rmr/si/src/si95/sisend.c
new file mode 100644 (file)
index 0000000..5a8a94e
--- /dev/null
@@ -0,0 +1,87 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+******************************************************************************
+*
+*  Mnemonic: SIsend
+*  Abstract: This routine is called to send a buffer of data to a partner
+*            if the buffer has been queued waiting for the session to
+*            unblock. The top qio block is removed from the tp block's
+*            queue and is sent. It is assumed that for UDP data the
+*            unit data structure was created and contains the buffer and
+*            address and is pointed to by the qio block. The block and
+*            associated buffers are then freed.
+*  Parms:
+*            tpptr- Pointer to the tp block
+*
+*  Returns:    Nothing.
+*  Date:       27 March 1995
+*  Author:     E. Scott Daniels
+*  Mod:                22 Feb 2002 - To support sendqueue tail 
+*
+******************************************************************************
+*/
+#include "sisetup.h"      //  get include files etc 
+#include "sitransport.h"
+
+extern void SIsend( struct ginfo_blk *gptr, struct tp_blk *tpptr ) {
+       struct t_unitdata *udata;      //  pointer at UDP unit data 
+       struct ioq_blk *qptr;          //  pointer at qio block for free 
+       int status;
+//static int announced = 0;    // TESTING
+
+       if( tpptr->squeue == NULL )    //  who knows why we were called 
+               return;                        //  nothing queued - just leave 
+
+/*
+       if( tpptr->type == SOCK_DGRAM ) {                                //  udp send?  
+               sendto( tpptr->fd, tpptr->squeue->data, tpptr->squeue->dlen, 0, tpptr->squeue->addr, sizeof( struct sockaddr ) );
+               if( tpptr->squeue->addr != NULL )
+                       free( tpptr->squeue->data );
+               tpptr->squeue->addr = NULL;
+       } else {
+*/
+               status= SEND( tpptr->fd, tpptr->squeue->data, tpptr->squeue->dlen, 0 );
+/*
+       }
+*/
+
+
+/*
+//TESTING
+if( !announced && status < tpptr->squeue->dlen ) {
+announced = 1;
+fprintf( stderr, ">>>>>>> !!!!!! SIsend: short send: %d != %d\n", status, tpptr->squeue->dlen );
+}
+*/
+       free( tpptr->squeue->data );           //  trash buffer or the udp block 
+       qptr = tpptr->squeue;                  //  hold pointer for free 
+       tpptr->squeue = tpptr->squeue->next;   //  next in queue becommes head 
+       if( !tpptr->squeue )
+               tpptr->sqtail = NULL;           //  no tail left either 
+
+       free( qptr );
+
+       if( (tpptr->flags & TPF_DRAIN) && tpptr->squeue == NULL )  //  done w/ drain? 
+       {
+               SIterm( gptr, tpptr );     //  trash the tp block 
+       }
+}                      //  SIsend 
diff --git a/src/rmr/si/src/si95/sisendt.c b/src/rmr/si/src/si95/sisendt.c
new file mode 100644 (file)
index 0000000..20cf7b3
--- /dev/null
@@ -0,0 +1,195 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+****************************************************************************
+*
+*  Mnemonic: SIsendt
+*  Abstract: This module contains various send functions:
+*                              SIsendt -- send tcp with queuing if would block
+*                              SIsendt_nq - send tcp without queuing if blocking
+
+*  Date:     27 March 1995
+*  Author:   E. Scott Daniels
+*  Mod:                22 Feb 2002 - To better process queued data 
+*
+*****************************************************************************
+*/
+
+#include "sisetup.h"     //  get setup stuff 
+#include "sitransport.h"
+
+/*
+       Send a message on what is assumed to be a tcp connection. If the session
+       would block, then SI_ERR_BLOCKED is returned. Else, SI_OK or SI_ERROR
+       is returned to indicate state. Errno should be set to reflect error state:
+               EBADFD - error from system; fd was closed
+               EBUSY   - system would block the send call
+               EINVAL  - fd was not valid or did not reference an open session
+*/
+//extern int SIsendt_nq( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
+extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
+       int status = SI_ERROR;      //  assume we fail
+       fd_set writefds;            //  local write fdset to check blockage 
+       fd_set execpfds;            //  exception fdset to check errors 
+       struct tp_blk *tpptr;       //  pointer at the tp_blk for the session 
+       struct ioq_blk *qptr;       //  pointer at i/o queue block 
+       struct timeval time;        //  delay time parameter for select call 
+       int     sidx = 0;                               // send index
+
+       errno = EINVAL;
+       gptr->sierr = SI_ERR_SESSID;
+
+       if( fd < MAX_FDS ) {                                    // straight from map if possible
+               tpptr = gptr->tp_map[fd];
+       } else {
+               for( tpptr = gptr->tplist; tpptr != NULL && tpptr->fd != fd; tpptr = tpptr->next ); //  find the block if out of map's range
+       }
+
+       if( tpptr != NULL ) {
+               tpptr->sent++;                          // investigate: this may over count
+
+               FD_ZERO( &writefds );       //  clear for select call 
+               FD_SET( fd, &writefds );    //  set to see if this one was writable 
+               FD_ZERO( &execpfds );       //  clear and set execptions fdset 
+               FD_SET( fd, &execpfds );
+
+               time.tv_sec = 0;                        //  set both to 0 if we just want a poll, else we block at max this amount
+               time.tv_usec = 1;                       // small pause on check to help drain things
+
+               if( select( fd + 1, NULL, &writefds, &execpfds, &time ) > 0 ) {         //  would block if <= 0
+                       gptr->sierr = SI_ERR_TP;
+                       if( FD_ISSET( fd, &execpfds ) ) {       //  error? 
+                               errno = EBADFD;
+                               SIterm( gptr, tpptr );                          //  clean up our portion of the session 
+                               return SI_ERROR;                                        // and bail from this sinking ship
+                       } else {
+                               errno = 0;
+                               while( ulen > 0 ) {                             // once we start, we must ensure that it all goes out
+                                       status =  SEND( tpptr->fd, ubuf+sidx, (unsigned int) ulen, 0 );
+                                       if( status >= 0 ) {
+                                               sidx += status;
+                                               ulen -= status;
+                                               status = SI_OK;
+                                       } else {
+                                               if( errno != EINTR || errno != EAGAIN ) {
+                                                       status = SI_ERROR;
+                                                       break;
+                                               }
+                                       }
+                               }
+                       }
+               } else {
+                       errno = EBUSY;
+                       status = SI_ERR_BLOCKED;
+               }
+       }
+
+       return status;
+}
+
+/*
+       This routine will send a datagram to the TCP session partner
+       that is connected via the FD number that is passed in.
+       If the send would cause the process to block, the send is
+       queued on the tp_blk for the session and is sent later as
+       a function of the SIwait process.  If the buffer must be
+       queued, a copy of the buffer is created such that the
+       user program may free, or reuse, the buffer upon return.
+
+       Parms:i         gptr - The pointer to the global info structure (context)
+                   fd   - File descriptor (session number)
+                   ubuf - User buffer to send.
+                   ulen - Lenght of the user buffer.
+
+       Returns:  SI_OK if sent, SI_QUEUED if queued for later, SI_ERROR if error.
+*/
+#ifdef KEEP
+extern int new_SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
+       int status = SI_OK;         //  status of processing 
+       fd_set writefds;            //  local write fdset to check blockage 
+       fd_set execpfds;            //  exception fdset to check errors 
+       struct tp_blk *tpptr;       //  pointer at the tp_blk for the session 
+       struct ioq_blk *qptr;       //  pointer at i/o queue block 
+       struct timeval time;        //  delay time parameter for select call 
+
+       gptr->sierr = SI_ERR_HANDLE;
+
+       //if( gptr->magicnum == MAGICNUM ) {     //  ensure cookie is good  -- we need to be too performant for this
+       //{                                   //  mmmm oatmeal, my favorite 
+               gptr->sierr = SI_ERR_SESSID;
+
+               if( fd < MAX_FDS ) {                                    // straight from map if possible
+                       tpptr = gptr->tp_map[fd];
+               } else {
+                       for( tpptr = gptr->tplist; tpptr != NULL && tpptr->fd != fd; tpptr = tpptr->next ); //  find the block if out of map's range
+               }
+
+               if( tpptr != NULL ) {
+                       tpptr->sent++;
+
+                       FD_ZERO( &writefds );       //  clear for select call 
+                       FD_SET( fd, &writefds );    //  set to see if this one was writable 
+                       FD_ZERO( &execpfds );       //  clear and set execptions fdset 
+                       FD_SET( fd, &execpfds );
+
+                       time.tv_sec = 0;                        //  set both to 0 if we just want a poll, else we block at max this amount
+                       time.tv_usec = 1;                       // small pause on check to help drain things
+
+                       if( select( fd + 1, NULL, &writefds, &execpfds, &time ) > 0 ) {         //  see if it would block
+                               gptr->sierr = SI_ERR_TP;
+                               if( FD_ISSET( fd, &execpfds ) ) {   //  error? 
+                                       SIterm( gptr, tpptr );                          //  clean up our portion of the session 
+                                       return SI_ERROR;                                        // and bail from this sinking ship
+                               } else {
+                                       if( tpptr->squeue ) {
+                                               SIsend( gptr, tpptr );                  //  something queued; send off queue and queue this
+                                       } else {
+                                               return SEND( tpptr->fd, ubuf, (unsigned int) ulen, 0 );   //  done after send 
+                                       }
+                               }
+                       }
+
+                       gptr->sierr = SI_ERR_NOMEM;
+
+                       tpptr->qcount++;
+                       if( (qptr = SInew( IOQ_BLK )) != NULL ) {               //  alloc a queue block 
+                               if( tpptr->sqtail == NULL ) {                           //  if nothing on the queue 
+                                       tpptr->squeue = qptr;         //  simple add to the tp blk q 
+                                       tpptr->sqtail = qptr;
+                               } else  {                                       //  else - add at end of the q 
+                                       tpptr->sqtail->next = qptr;             
+                                       tpptr->sqtail = qptr;   
+                                       qptr->next = NULL;              //  new block is the last one now 
+                               }                                      //  end add block at end of queue 
+
+                               qptr->dlen = ulen;           //  copy info to queue block 
+                               qptr->data = (char *) malloc( ulen );  //  get buffer 
+                               memcpy( qptr->data, (const char*) ubuf, ulen );
+       
+                               gptr->sierr = SI_QUEUED;                //  indicate queued to caller 
+                               status = SI_QUEUED;                                             // for return
+                       }
+               }                                                       //  end if tpptr was not found 
+       //}                                                             //  ginfo pointer was corrupted 
+
+       return status;
+}
+#endif
diff --git a/src/rmr/si/src/si95/sisetup.h b/src/rmr/si/src/si95/sisetup.h
new file mode 100644 (file)
index 0000000..5d72b48
--- /dev/null
@@ -0,0 +1,51 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+*****************************************************************************
+*  Mnemonic: nisetup.h
+*  Abstract: This file contains the necessary include statements for each
+*            individually compiled module.
+*  Date:     17 January 1995
+*  Author:   E. Scott Daniels
+*****************************************************************************
+*/
+#include <stdio.h>              //  standard io 
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>              //  error number constants 
+#include <fcntl.h>              //  file control 
+#include <netinet/in.h>         //  inter networking stuff 
+#include <signal.h>             //  info for signal stuff 
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>          //  various system files - types 
+#include <sys/socket.h>         //  socket defs 
+
+//  pure bsd supports SIGCHLD not SIGCLD as in bastard flavours 
+#ifndef SIGCHLD
+#define SIGCHLD SIGCLD
+#endif
+
+#include "socket_if.h"                 // public definitions; dummy types/structs
+#include "siconst.h"                   //  internal constants  and prototypes 
+#include "sistruct.h"                  //  real structure defs
+#include "siproto.h"
diff --git a/src/rmr/si/src/si95/sishutdown.c b/src/rmr/si/src/si95/sishutdown.c
new file mode 100644 (file)
index 0000000..95b6f25
--- /dev/null
@@ -0,0 +1,49 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+******************************************************************************
+*
+*  Mnemonic: SIshutdown
+*  Abstract: This routine will ensure that all tp blocks have been closed
+*            with the transport provider and removed from the list. The
+*            shutdown flag is set in addition.
+*  Parms:    gptr - pointer to the ginfo structure (SIHANDLE)
+*  Retrns:   Nothing.
+*  Date:     23 March 1995
+*  Author:   E. Scott Daniels
+*
+*****************************************************************************
+*/
+#include "sisetup.h"                   //  get includes and defines 
+
+extern void SIshutdown( struct ginfo_blk *gptr ) {
+       gptr->sierr = SI_ERR_HANDLE;
+       if( gptr != NULL && gptr->magicnum == MAGICNUM )
+       {
+               gptr->flags |=  GIF_SHUTDOWN;    //  signal shutdown 
+               while( gptr->tplist != NULL )
+               {
+                       gptr->tplist->flags |= TPF_UNBIND;    //  force unbind on session 
+                       SIterm( gptr, gptr->tplist );         //  and drop the session 
+               }                                      //  end while 
+               gptr->sierr = 0;
+       }
+}            
diff --git a/src/rmr/si/src/si95/sistruct.h b/src/rmr/si/src/si95/sistruct.h
new file mode 100644 (file)
index 0000000..c394afb
--- /dev/null
@@ -0,0 +1,85 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+***************************************************************************
+*
+* Mnemonic: sistruct.h
+* Abstract: This file contains the structure definitions ncessary to support
+*           the SI (Socket interface) routines.
+* Date:     26 March 1995
+* Author:   E. Scott Daniels
+*
+******************************************************************************
+*/
+
+struct ioq_blk              //  block to queue on session when i/o waiting 
+{
+       struct ioq_blk *next;     //  next block in the queue 
+       char *data;               //  pointer to the data buffer 
+       unsigned int dlen;        //  data length 
+       void *addr;               //  address to send to (udp only) 
+       int alen;               //  size of address struct (udp) 
+ };
+
+struct callback_blk         //  defines a callback routine 
+{
+       void *cbdata;            //  pointer to be passed to the call back routine 
+       int ((*cbrtn)( ));       //  pointer to the callback routine 
+};
+
+struct tp_blk               //  transmission provider info block 
+{
+       struct tp_blk *next;            //  chain pointer 
+       struct tp_blk *prev;            //  back pointer 
+       int fd;                         //  open file descriptor 
+       int flags;                      //  flags about the session / fd 
+       int type;                       //  connection type SOCK_DGRAM/SOCK_STREAM 
+       int family;                                     //  address family  AF_* constants from system headers
+       struct sockaddr *addr;          //  connection local address 
+       struct sockaddr *paddr;         //  session partner's address 
+       int             palen;                          //      length of the struct referenced by paddr (connect needs)
+       struct ioq_blk *squeue;         //  queue to send to partner when it wont block 
+       struct ioq_blk *sqtail;         //  last in queue to eliminate the need to search 
+
+                                                               // a few counters for stats
+       long long qcount;                       // number of messages that waited on the queue
+       long long sent;                         // send/receive counts
+       long long rcvd;
+};
+
+struct ginfo_blk {                             //  general info block  (context)
+       unsigned int magicnum;          //  magic number that ids a valid block 
+       struct tp_blk *tplist;          //  pointer at tp block list 
+       fd_set readfds;                         //  select/poll file des lists
+       fd_set writefds;
+       fd_set execpfds;
+       fd_set readfds_qs;                      //  quick set read/write/except fd sets
+       fd_set writefds_qs;
+       fd_set execpfds_qs;
+       char *rbuf;                                     //  read buffer 
+       struct callback_blk *cbtab; //  pointer at the callback table 
+       int fdcount;                            //  largest fd to select on in siwait 
+       int flags;                                      //  status flags 
+       int     tcp_flags;                              // connection/session flags (e.g. no delay)
+       int rbuflen;                            //  read buffer length 
+       int     sierr;                                  // our internal error number (SI_ERR_* constants)
+       struct tp_blk** tp_map;         // direct fd -> tp block map
+};
diff --git a/src/rmr/si/src/si95/siterm.c b/src/rmr/si/src/si95/siterm.c
new file mode 100644 (file)
index 0000000..0b74ba0
--- /dev/null
@@ -0,0 +1,64 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+**************************************************************************
+*  Mnemonic: SIterm
+*  Abstract: This routine will terminate a session based on the tp_blk
+*            that is passed into the routine. The transport session block
+*            is released and removed from the ginfo list. The session is
+*            terminated by issuing a t_unbind call (if the unbind flag is
+*            on in the tpptr block), and then issuing a t_close.
+*  Parms:    gptr - Pointer to the global information block
+*            tpptr - Pointer to tp block that defines the open fd.
+*  Returns:  Nothing.
+*  Date:     18 January 1995
+*  Author:   E. Scott Daniels
+*
+**************************************************************************
+*/
+#include "sisetup.h"     //  get the setup stuff 
+#include "sitransport.h"
+
+extern void SIterm( struct ginfo_blk* gptr, struct tp_blk *tpptr ) {
+
+       if( tpptr != NULL ) {
+               if( tpptr->fd >= 0 ) {
+                       CLOSE( tpptr->fd );    
+                       if( tpptr->fd < MAX_FDS ) {
+                               gptr->tp_map[tpptr->fd] = NULL;         // drop reference
+                       }
+               }
+
+               if( tpptr->prev != NULL ) {            //  remove from the list 
+                       tpptr->prev->next = tpptr->next;    //  point previous at the next 
+               } else {
+                       gptr->tplist = tpptr->next;        //  this was head, make next new head 
+               }
+
+               if( tpptr->next != NULL ) {
+                       tpptr->next->prev = tpptr->prev;  //  point next one back behind this one 
+               }
+
+               free( tpptr->addr );             //  release the address bufers 
+               free( tpptr->paddr );
+               free( tpptr );                   //  and release the block 
+       }
+}
diff --git a/src/rmr/si/src/si95/sitransport.h b/src/rmr/si/src/si95/sitransport.h
new file mode 100644 (file)
index 0000000..b51b0d6
--- /dev/null
@@ -0,0 +1,88 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+*****************************************************************************
+
+       Mnemonic:       sitransport.h
+       Abstract:       This file contains definitions needed to set up specific macros
+                               that allow for an underlying transport mechanism such as f-stack
+                               to be substituted for the normal system calls.  The underlying
+                               transport must support the same socket, bind, listen, etc., 
+                               calls and call parameters as the system calls.
+
+       Date:           5 November 2019 
+       Author:         E. Scott Daniels
+
+
+****************************************************************************
+*/
+
+#ifndef _sitransport_h
+
+#ifdef F_STACK
+
+#include "ff_config.h"
+#include "ff_api.h"
+
+
+// TCP/UDP stack provied by f-stack
+#define BIND           ff_bind
+#define LISTEN         ff_listen
+#define SOCKET         ff_socket
+#define CONNECT                ff_connect
+#define ACCEPT         ff_accept
+#define CLOSE          ff_close
+#define SHUTDOWN       ff_shutdown
+#define        GETSOCKOPT      ff_getscokopt 
+#define SETSOCKOPT     ff_setsockopt
+#define READ           ff_read
+#define WRITE          ff_write
+#define SEND           ff_send
+#define SENDTO         ff_sendto
+#define RECV           ff_recv
+#define RECVMSG                ff_recvmsg
+#define RECVFROM       ff_recvfrom
+
+#else
+
+// support normal system TCP/UDP stack
+#define BIND           bind
+#define LISTEN         listen
+#define SOCKET         socket
+#define CONNECT                connect
+#define ACCEPT         accept
+#define CLOSE          close
+#define SHUTDOWN       shutdown
+#define        GETSOCKOPT      getscokopt 
+#define SETSOCKOPT     setsockopt
+#define READ           read
+#define WRITE          write
+#define SEND           send
+#define SENDTO         sendto
+#define RECV           recv
+#define RECVFROM       recvfrom
+#define RECVMSG                recvmsg
+
+#endif
+
+
+
+#endif
diff --git a/src/rmr/si/src/si95/sitrash.c b/src/rmr/si/src/si95/sitrash.c
new file mode 100644 (file)
index 0000000..efc2cd3
--- /dev/null
@@ -0,0 +1,72 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+***************************************************************************
+*  
+*  Mnemonic: sitrash
+*  Abstract: Delete all things referenced by a struct and then free the memory.
+*    
+*  Returns:  Nothing
+*  Date:     08 March 2007
+*  Author:   E. Scott Daniels                            
+*
+******************************************************************************   
+*/
+
+#include        "sisetup.h"
+#include               "sitransport.h"
+                 
+extern void SItrash( int type, void *bp )
+{
+        struct tp_blk *tp = NULL;
+        struct ioq_blk *iptr;
+        struct ioq_blk *inext;
+
+        switch( type )
+        {
+                case IOQ_BLK:
+                        iptr = (struct ioq_blk *) bp;
+                        free( iptr->data );
+                        free( iptr->addr );
+                        free( iptr );
+                        break;
+            
+                case TP_BLK:                                            //  we assume its off the list 
+                        tp = (struct tp_blk *) bp;
+                        for( iptr = tp->squeue; iptr; iptr = inext )            //  trash any pending send buffers 
+                        {
+                                inext = iptr->next;
+                                free( iptr->data );          //  we could recurse, but that seems silly 
+                                free( iptr->addr );
+                                free( iptr );
+                        }
+
+                                               if( tp->fd >= 0 ) {
+                                                       CLOSE( tp->fd );
+                                               }
+     
+                        free( tp->addr );             //  release the address bufers 
+                        free( tp->paddr );        
+                        free( tp );                   //  and release the block 
+                        break;
+        }
+}
+
diff --git a/src/rmr/si/src/si95/siwait.c b/src/rmr/si/src/si95/siwait.c
new file mode 100644 (file)
index 0000000..b5df75f
--- /dev/null
@@ -0,0 +1,149 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+**************************************************************************
+*  Mnemonic: SIwait
+*  Abstract: This  routine will wait for an event to occur on the
+*            connections in tplist. When an event is received on a fd
+*            the status of the fd is checked and the event handled, driving
+*            a callback routine if necessary. The system call poll is usd
+*            to wait, and will be interrupted if a signal is caught,
+*            therefore the routine will handle any work that is required
+*            when a signal is received. The routine continues to loop
+*            until the shutdown flag is set, or until there are no open
+*            file descriptors on which to wait.
+*  Parms:    gptr - Pointer to the global information block
+*  Returns:  SI_OK if the caller can continue, SI_ERROR if all sessions have been
+*            stopped, or the interface cannot proceed. When SI_ERROR is
+*            returned the caller should cleanup and exit immediatly (we
+*            have probably received a sigter or sigquit.
+*  Date:     28 March 1995
+*  Author:   E. Scott Daniels
+*
+*  Modified: 11 Apr 1995 - To pass execption to select when no keyboard
+*            19 Apr 1995 - To call do key to do better keyboard editing
+*            18 Aug 1995 - To init kstat to 0 to prevent key hold if
+*                          network data pending prior to entry.
+*                      31 Jul 2016 - Major formatting clean up in the main while loop.
+**************************************************************************
+*/
+#include  "sisetup.h"     //  get the setup stuff 
+#include "sitransport.h"
+#include       <sys/wait.h>
+
+
+extern int SIwait( struct ginfo_blk *gptr ) {
+       int fd;                                                 //  file descriptor for use in this routine 
+       int ((*cbptr)());                               //  pointer to callback routine to call 
+       int status = SI_OK;                             //  return status 
+       int addrlen;                                    //  length of address from recvfrom call 
+       int i;                                                  //  loop index 
+       struct tp_blk *tpptr;                   //  pointer at tp stuff 
+       struct tp_blk *nextone;                 //  point at next block to process in loop 
+       int pstat;                                              //  poll status 
+       struct timeval  timeout;                //  delay to use on select call
+       char *buf;
+       char *ibuf;
+
+       ibuf = (char *) malloc( 2048 );
+
+       gptr->sierr = SI_ERR_SHUTD;
+
+       if( gptr->flags & GIF_SHUTDOWN ) {                              //  cannot do if we should shutdown 
+               fprintf( stderr, ">>> wait: shutdown on entry????\n" );
+               return SI_ERROR;                                                        //  so just get out 
+       }
+
+       gptr->sierr = SI_ERR_HANDLE;
+
+       if( gptr->magicnum != MAGICNUM ) {                              //  if not a valid ginfo block 
+               fprintf( stderr, ">>> wait: bad magic on entry????\n" );
+               return SI_ERROR;
+       }
+
+       timeout.tv_sec = 0;
+       timeout.tv_usec = 500000;                               // pop every 500ms to ensure we pick up new outbound connections in list
+
+       do {                                                                    // main wait/process loop 
+
+               SIbldpoll( gptr );                                      // build the fdlist for poll 
+               pstat = select( gptr->fdcount, &gptr->readfds, &gptr->writefds, &gptr->execpfds, &timeout );
+
+               if( (pstat < 0 && errno != EINTR)  ) {
+                       gptr->fdcount = 0;                              //  prevent trying to look at a session 
+                       gptr->flags |= GIF_SHUTDOWN;    //  cause cleanup and exit at end 
+               }
+
+               if( pstat > 0  &&  (! (gptr->flags & GIF_SHUTDOWN)) ) {
+                       for( tpptr = gptr->tplist; tpptr != NULL; tpptr = nextone ) {
+                               nextone = tpptr->next;                          //  prevent issues if we delete the block during loop 
+
+                               if( tpptr->fd >= 0 ) {
+                                       if( tpptr->squeue != NULL && (FD_ISSET( tpptr->fd, &gptr->writefds )) ) {
+                                               SIsend( gptr, tpptr );                  //  send if clear to send 
+                                       }
+       
+                                       if( FD_ISSET( tpptr->fd, &gptr->execpfds ) ) {
+                                                       ;                               // sunos seems to set the except flag for unknown reasons; ignore it
+                                       } else {
+                                               if( FD_ISSET( tpptr->fd, &gptr->readfds ) ) {                   // ready to read
+                                                       fd = tpptr->fd;
+                                                       tpptr->rcvd++;
+       
+                                                       if( tpptr->flags & TPF_LISTENFD ) {                                     // new session request
+                                                               errno=0;
+                                                               status = SInewsession( gptr, tpptr );                   // accept connection
+                                                       } else  {                                                                                       //  data received on a regular port (we support just tcp now
+                                                               status = RECV( fd, gptr->rbuf, MAX_RBUF, 0 );   //  read data 
+                                                               if( status > 0  &&  ! (tpptr->flags & TPF_DRAIN) ) {
+                                                                       if( (cbptr = gptr->cbtab[SI_CB_CDATA].cbrtn) != NULL ) {
+                                                                               status = (*cbptr)( gptr->cbtab[SI_CB_CDATA].cbdata, fd, gptr->rbuf, status );
+                                                                               SIcbstat( gptr, status, SI_CB_CDATA );  //  handle cb status 
+                                                                       }
+                                                               } else {                                                                                // no bites, but read flagged indicates disconnect
+                                                                       if( (cbptr = gptr->cbtab[SI_CB_DISC].cbrtn) != NULL ) {
+                                                                               status = (*cbptr)( gptr->cbtab[SI_CB_DISC].cbdata, tpptr->fd );
+                                                                               SIcbstat( gptr, status, SI_CB_DISC );   //  handle status 
+                                                                       }
+                                                                       SIterm( gptr, tpptr );
+                                                               }
+                                                       }
+                                               }
+                                       }
+                               }                                                               //  if still good fd 
+                       }
+               }
+       } while( gptr->tplist != NULL && !(gptr->flags & GIF_SHUTDOWN) );
+
+       free( ibuf );
+       if( gptr->tplist == NULL )                                      //  indicate all fds closed 
+               gptr->sierr = SI_ERR_NOFDS;
+
+       if( gptr->flags & GIF_SHUTDOWN ) {                      //  we need to stop for some reason 
+               gptr->sierr = SI_ERR_SHUTD;                             //  indicate error exit status 
+               status = SI_ERROR;                                              //  status should indicate to user to die 
+               SIshutdown( gptr );                                             //  clean things up 
+       } else {
+               status = SI_OK;                                                 //  user can continue to process 
+       }
+
+       return status;
+}
diff --git a/src/rmr/si/src/si95/socket_if.h b/src/rmr/si/src/si95/socket_if.h
new file mode 100644 (file)
index 0000000..5546800
--- /dev/null
@@ -0,0 +1,125 @@
+// vim: noet sw=4 ts=4:
+/*
+==================================================================================
+    Copyright (c) 2020 Nokia
+    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+******************************************************************************
+*
+*  Mnemonic:   socket_if.h
+*  Abstract:   Main set of definitions needed by the SI native functions and
+*                              any user code.
+*  Date:           26 March 1995       (original) 
+*                              3 January 2020  (revised)
+*  Author:             E. Scott Daniels
+*
+*****************************************************************************
+*/
+
+#ifndef _SOCKET_IF_H
+#define _SOCKET_IF_H
+
+#ifndef PARINOID_CHECKS
+#      define PARINOID_CHECKS 1
+#endif
+
+#define TCP_DEVICE     0       //  device type of socket 
+#define UDP_DEVICE     1       
+
+//  these are for SIclose, must be negative so as to be distinguished from real fd values 
+#define TCP_LISTEN_PORT        (-1)    //  close first listen port found 
+#define UDP_PORT       (-2)    //  close first udp port found 
+
+#define SI_BAD_HANDLE  ((void *) 0)
+
+#define SI_OPT_NONE            0               // initialisation options
+#define SI_OPT_FORK    0x01      //  fork new sessions
+#define SI_OPT_FG      0x02      //  keep process in the "foreground" 
+#define SI_OPT_TTY     0x04      //  processes keyboard interrupts if fg 
+#define SI_OPT_ALRM    0x08      //  cause setsig to be called with alarm flg 
+
+                                 //  offsets of callbacks in table 
+                                 //  used to indentify cb in SIcbreg 
+#define SI_CB_SIGNAL   0         //  usr signal/alarm received 
+#define SI_CB_RDATA    1         //  handles data arrival on raw interface 
+#define SI_CB_CDATA    2         //  handles data arrival on cooked interface 
+#define SI_CB_KDATA    3         //  handles data arrival from keyboard 
+#define SI_CB_SECURITY 4         //  authorizes acceptance of a conect req 
+#define SI_CB_CONN     5         //  called when a session is accepted 
+#define SI_CB_DISC     6         //  called when a session is lost 
+#define SI_CB_POLL     7
+
+                                 //  return values callbacks are expected to produce
+#define SI_RET_OK      0         //  processing ok -- continue
+#define SI_RET_ERROR   (-1)      //  processing not ok -- reject/disallow
+#define SI_RET_UNREG   1         //  unregester (never call again) the cb 
+#define SI_RET_QUIT    2         //  set the shutdown flag  and terminate the SI environment
+
+                                 //  values returned to user by SI routines 
+#define SI_ERROR       (-1)      //  unable to process 
+#define SI_OK          0         //  processing completed successfully 
+#define SI_QUEUED      1         //  send messages was queued 
+
+                                  //  flags passed to signal callback 
+#define SI_SF_QUIT     0x01      //  program should terminate 
+#define SI_SF_USR1     0x02      //  user 1 signal received 
+#define SI_SF_USR2     0x04      //  user 2 signal received 
+#define SI_SF_ALRM     0x08      //  alarm clock signal received 
+
+                                 //  signal bitmasks for the setsig routine 
+#define SI_SIG_QUIT    0x01      //  please catch quit signal 
+#define SI_SIG_HUP     0x02      //  catch hangup signal 
+#define SI_SIG_TERM    0x04      //  catch the term signal 
+#define SI_SIG_USR1    0x08      //  catch user signals 
+#define SI_SIG_USR2    0x10
+#define SI_SIG_ALRM    0x20      //  catch alarm signals 
+#define SI_DEF_SIGS    0x1F      //  default signals to catch 
+
+                                 //  SIerrno values set in public rtns 
+#define SI_ERR_NONE     0        //  no error as far as we can tell 
+#define SI_ERR_QUEUED   1      //  must be same as queued 
+#define SI_ERR_TPORT    2        //  could not bind to requested tcp port 
+#define SI_ERR_UPORT    3        //  could not bind to requested udp port 
+#define SI_ERR_FORK     4        //  could not fork to become daemon 
+#define SI_ERR_HANDLE   5        //  global information pointer went off 
+#define SI_ERR_SESSID   6        //  invalid session id 
+#define SI_ERR_TP       7        //  error occured in transport provider 
+#define SI_ERR_SHUTD    8        //  cannot process because in shutdown mode 
+#define SI_ERR_NOFDS    9        //  no file descriptors are open 
+#define SI_ERR_SIGUSR1  10       //  signal received data not read 
+#define SI_ERR_SIGUSR2  11       //  signal received data not read 
+#define SI_ERR_DISC     12       //  session disconnected 
+#define SI_ERR_TIMEOUT  13       //  poll attempt timed out - no data 
+#define SI_ERR_ORDREL   14       //  orderly release received 
+#define SI_ERR_SIGALRM  15       //  alarm signal received 
+#define SI_ERR_NOMEM    16       //  could not allocate needed memory 
+#define SI_ERR_ADDR            17       //  address conversion failed 
+#define SI_ERR_BLOCKED 18              // operation would block
+
+#define SI_TF_NONE             0               // tcp flags in the global info applied to each session
+#define SI_TF_NODELAY  0x01    // set nagle's off for each connection
+#define SI_TF_FASTACK  0x02    // set fast ack on for each connection
+
+#ifndef _SI_ERRNO
+extern int SIerrno;               //  error number set by public routines 
+#define _SI_ERRNO
+#endif
+
+typedef struct ginfo_blk si_ctx_t;             // generic context reference for users
+
+#endif
diff --git a/src/rmr/si/src/sr_si_static.c b/src/rmr/si/src/sr_si_static.c
new file mode 100644 (file)
index 0000000..5c751d7
--- /dev/null
@@ -0,0 +1,740 @@
+// vim: ts=4 sw=4 noet :
+/*
+==================================================================================
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       sr_si_static.c
+       Abstract:       These are static send/receive primatives which (sadly)
+                               differ based on the underlying protocol (nng vs nanomsg).
+                               Split from rmr_nng.c  for easier wormhole support.
+
+       Author:         E. Scott Daniels
+       Date:           13 February 2019
+*/
+
+#ifndef _sr_si_static_c
+#define _sr_si_static_c
+
+#include <nng/nng.h>
+#include <nng/protocol/pubsub0/pub.h>
+#include <nng/protocol/pubsub0/sub.h>
+#include <nng/protocol/pipeline0/push.h>
+#include <nng/protocol/pipeline0/pull.h>
+
+
+static void dump_40( char *p, char* label ) {
+       int i;
+
+       if( label )
+               fprintf( stderr, ">>>>> %s p=%p\n", label, p );
+
+       for( i = 0; i < 40; i++ ) {
+               fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
+       }
+       fprintf( stderr, "\n" );
+}
+
+/*
+       Translates the nng state passed in to one of ours that is suitable to put
+       into the message, and sets errno to something that might be useful.
+       If we don't have a specific RMr state, then we return the default (e.g.
+       receive failed).
+
+       The addition of the connection shut error code to the switch requires
+       that the NNG version at commit e618abf8f3db2a94269a (or after) be
+       used for compiling RMR. 
+*/
+static inline int xlate_si_state( int state, int def_state ) {
+
+       switch( state ) {
+               case SI_ERR_NONE:
+                       errno = 0;
+                       state = RMR_OK;
+                       break;
+
+               default:
+                       state = def_state;
+                       errno = EAGAIN;
+                       break;
+
+               case SI_ERR_QUEUED:
+               case SI_ERR_TPORT:
+               case SI_ERR_UPORT:
+               case SI_ERR_FORK:
+               case SI_ERR_HANDLE:
+               case SI_ERR_SESSID:
+               case SI_ERR_TP:
+               case SI_ERR_SHUTD:
+               case SI_ERR_NOFDS:
+               case SI_ERR_SIGUSR1:
+               case SI_ERR_SIGUSR2:
+               case SI_ERR_DISC:
+               case SI_ERR_TIMEOUT:
+               case SI_ERR_ORDREL:
+               case SI_ERR_SIGALRM:
+               case SI_ERR_NOMEM:
+               case SI_ERR_ADDR:
+                       errno  = ENOTSUP;
+                       state = def_state;
+                       break;
+       }
+
+       return state;
+}
+
+/*
+       Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
+       a new message struct as well. Size is the size of the zc buffer to allocate (not
+       including our header). If size is 0, then the buffer allocated is the size previously
+       allocated (if msg is !nil) or the default size given at initialisation).
+
+       The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
+       the context value is used.
+
+       NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
+               are zero copy, however ONLY the message is zero copy. We now allocate and use
+               nng messages.
+*/
+static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
+       size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
+       uta_mhdr_t*     hdr;                    // convenience pointer
+       int                     tr_len;                 // trace data len (default or override)
+       int*            alen;                   // convenience pointer to set allocated len
+       //int                   tpb_len;                // transport buffer total len
+static int logged = 0;
+
+       tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
+
+       mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
+       mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
+       mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
+
+       if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
+               msg = (rmr_mbuf_t *) malloc( sizeof *msg );
+               if( msg == NULL ) {
+                       fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
+                       return NULL;                                                            // we used to exit -- that seems wrong
+               }
+               memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
+       } else {                                                                // user message or message from the ring
+               if( mlen > msg->alloc_len ) {           // current allocation is too small
+                       msg->alloc_len = 0;                             // force tp_buffer realloc below
+                       if( msg->tp_buf ) {
+                               free( msg->tp_buf );
+                       }
+               } else {
+                       mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
+               }
+       }
+
+
+       if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
+               fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
+               abort( );                                                                                       // toss out a core file for this
+       }
+
+/*
+       memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
+       memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );                // NOT for production -- debugging eyecatcher
+*/
+       alen = (int *) msg->tp_buf;
+       *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
+
+       msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
+       memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
+       if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
+               hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
+               hdr->sub_id = htonl( UNSET_SUBID );
+               SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
+               SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
+               SET_HDR_D1_LEN( hdr, ctx->d1_len );
+               //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
+       }
+       msg->len = 0;                                                                                   // length of data in the payload
+       msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
+       msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
+       msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
+       msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
+       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+       msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
+       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
+       strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+
+       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
+
+       return msg;
+}
+
+/*
+       Allocates only the mbuf and does NOT allocate an underlying transport buffer since
+       transport receive should allocate that on its own.
+*/
+static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
+       size_t  mlen;
+       uta_mhdr_t* hdr;                        // convenience pointer
+       rmr_mbuf_t* msg;
+
+       if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
+               if( msg->tp_buf ) {
+                       free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
+               }
+       } else {
+               if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
+                       fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" );
+                       return NULL;                                                    // this used to exit, but that seems wrong
+               }
+       }
+
+       memset( msg, 0, sizeof( *msg ) );
+
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
+       msg->tp_buf = NULL;
+       msg->header = NULL;
+       msg->len = -1;                                                                                  // no payload; invalid len
+       msg->alloc_len = 0;
+       msg->payload = NULL;
+       msg->xaction = NULL;
+       msg->state = state;
+       msg->flags = 0;
+       msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
+
+       return msg;
+}
+
+/*
+       This accepts a message with the assumption that only the tp_buf pointer is valid. It
+       sets all of the various header/payload/xaction pointers in the mbuf to the proper
+       spot in the transport layer buffer.  The len in the header is assumed to be the
+       allocated len (a receive buffer that nng created);
+
+       The alen parm is the assumed allocated length; assumed because it's a value likely
+       to have come from si receive and the actual alloc len might be larger, but we
+       can only assume this is the total usable space. Because we are managing a transport
+       header in the first n bytes of the real msg, we must adjust this length down by the
+       size of the tp header (for testing 50 bytes, but this should change to a struct if
+       we adopt this interface).
+
+       This function returns the message with an error state set if it detects that the
+       received message might have been truncated.  Check is done here as the calculation
+       is somewhat based on header version.
+*/
+static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
+       uta_mhdr_t* hdr = NULL;                 // current header
+       uta_v1mhdr_t* v1hdr;                    // version 1 header
+       int ver;
+       int     hlen;                                           // header len to use for a truncation check
+
+       msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;                              // FIX ME:  hard 50 needs to be some kind of tp header struct
+
+       // do NOT reduce alen any more.  alen must be TP_HEADER + RMR_HEADER + user space
+       // get payload size will do the right thing and subtract TP_HEADER and RMR_HEADER lengths
+       //alen -= 50;                                           // actual length of "rmr space" 
+
+       v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
+
+       if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
+               ver = 1;
+               v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
+       } else {
+               ver = ntohl( v1hdr->rmr_ver );
+       }
+
+       switch( ver ) {
+               case 1:
+                       msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
+                       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
+                       msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
+
+                       msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
+                       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+                       msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
+                       msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
+                       msg->state = RMR_OK;
+                       hlen = sizeof( uta_v1mhdr_t );
+                       break;
+
+               default:                                                                                                        // current version always lands here
+                       hdr = (uta_mhdr_t *) msg->header;
+                       msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
+                       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
+
+                       msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
+                       msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
+                       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+                       msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
+                       msg->sub_id = ntohl( hdr->sub_id );
+                       hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
+                       break;
+       }
+
+       if( msg->len > (msg->alloc_len - hlen ) ) {
+               msg->state = RMR_ERR_TRUNC;
+               msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
+       } else {
+               msg->state = RMR_OK;
+       }
+}
+
+/*
+       This will clone a message into a new zero copy buffer and return the cloned message.
+*/
+static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
+       rmr_mbuf_t* nm;                 // new message buffer
+       size_t  mlen;
+       int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
+
+       nm = (rmr_mbuf_t *) malloc( sizeof *nm );
+       if( nm == NULL ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               exit( 1 );
+       }
+       memset( nm, 0, sizeof( *nm ) );
+
+       mlen = old_msg->alloc_len;                                                                              // length allocated before
+       if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
+               fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
+               abort();
+       }
+
+       nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
+                       nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
+                       break;
+       }
+
+       // --- these are all version agnostic -----------------------------------
+       nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
+       nm->len = old_msg->len;                                                                 // length of data in the payload
+       nm->alloc_len = mlen;                                                                   // length of allocated payload
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
+       nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
+       memcpy( nm->payload, old_msg->payload, old_msg->len );
+
+       return nm;
+}
+
+/*
+       This will clone a message with a change to the trace area in the header such that
+       it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
+       The orignal message will be left unchanged, and a pointer to the new message is returned.
+       It is not possible to realloc buffers and change the data sizes.
+*/
+static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
+       rmr_mbuf_t* nm;                 // new message buffer
+       size_t  mlen;
+       int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
+       int     tr_old_len;                     // tr size in new buffer
+       int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
+       int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
+
+       nm = (rmr_mbuf_t *) malloc( sizeof *nm );
+       if( nm == NULL ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               exit( 1 );
+       }
+       memset( nm, 0, sizeof( *nm ) );
+
+       hdr = old_msg->header;
+       tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
+
+       mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
+       if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
+
+       tpb_len = mlen + TP_HDR_LEN;
+       if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
+               exit( 1 );
+       }
+       memset( nm->tp_buf, 0, tpb_len );
+       memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
+       alen = (int *) nm->tp_buf;
+       *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
+
+       nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
+
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message version always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
+
+                       if( RMR_D1_LEN( hdr )  ) {
+                               memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
+                       }
+                       if( RMR_D2_LEN( hdr )  ) {
+                               memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
+                       }
+
+                       nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
+                       break;
+       }
+
+       // --- these are all version agnostic -----------------------------------
+       nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
+       nm->len = old_msg->len;                                                                 // length of data in the payload
+       nm->alloc_len = mlen;                                                                   // length of allocated payload
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
+       nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
+       memcpy( nm->payload, old_msg->payload, old_msg->len );
+
+       return nm;
+}
+
+/*
+       For SI95 based transport all receives are driven through the threaded
+       ring and thus this function should NOT be called. If it is we will panic
+       and abort straight away.
+*/
+static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
+
+fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
+exit( 1 );
+
+       return NULL;
+}
+
+/*
+       Receives a 'raw' message from a non-RMr sender (no header expected). The returned
+       message buffer cannot be used to send, and the length information may or may
+       not be correct (it is set to the length received which might be more than the
+       bytes actually in the payload).
+
+       Mostly this supports the route table collector, but could be extended with an
+       API external function.
+*/
+static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
+       return NULL;
+/*
+FIXME: not implemented yet
+       int state;
+       rmr_mbuf_t*     msg = NULL;             // msg received
+       size_t  rsize;                          // nng needs to write back the size received... grrr
+
+       if( old_msg ) {
+               msg = old_msg;
+       } else {
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
+       }
+
+       //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                        // blocks hard until received
+       if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
+               return msg;
+       }
+       rsize = nng_msg_len( msg->tp_buf );
+
+       // do NOT use ref_tpbuf() here! Must fill these in manually.
+       msg->header = nng_msg_body( msg->tp_buf );
+       msg->len = rsize;                                                       // len is the number of bytes received
+       msg->alloc_len = rsize;
+       msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
+       msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
+       msg->state = RMR_OK;
+       msg->flags = MFL_RAW;
+       msg->payload = msg->header;                                     // payload is the whole thing; no header
+       msg->xaction = NULL;
+
+       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
+
+       return msg;
+*/
+}
+
+/*
+       This does the hard work of actually sending the message to the given socket. On success,
+       a new message struct is returned. On error, the original msg is returned with the state
+       set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
+       buffer will not be allocated and returned (mostly for call() interal processing since
+       the return message from call() is a received buffer, not a new one).
+
+       Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
+       validation has been done prior.
+
+       When msg->state is not ok, this function must set tp_state in the message as some API 
+       fucntions return the message directly and do not propigate errno into the message.
+*/
+static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
+       int state;
+       uta_mhdr_t*     hdr;
+       int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
+       int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
+       int tot_len;                                                    // total send length (hdr + user data + tp header)
+
+       // future: ensure that application did not overrun the XID buffer; last byte must be 0
+
+       hdr = (uta_mhdr_t *) msg->header;
+       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
+       hdr->sub_id = htonl( msg->sub_id );
+       hdr->plen = htonl( msg->len );
+       tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
+
+       if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+       }
+
+       if( retries == 0 ) {
+               spin_retries = 100;
+               retries++;
+       }
+
+       errno = 0;
+       msg->state = RMR_OK;
+       do {
+               tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
+               *((int*) msg->tp_buf) = tot_len;
+
+               if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
+               if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
+
+               if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
+                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg:  error!! sent state=%d\n", state );
+                       msg->state = state;
+                       if( retries > 0 && state == SI_ERR_BLOCKED ) {
+                               if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
+                                       retries--;
+                                       if( retries > 0 ) {                                     // only if we'll loop through again
+                                               usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
+                                       }
+                                       spin_retries = 1000;
+                               }
+                       } else {
+                               state = 0;                      // don't loop
+                       }
+               } else {
+                       if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state );
+                       state = 0;
+                       msg->state = RMR_OK;
+                       hdr = NULL;
+               }
+       } while( state && retries > 0 );
+
+       if( msg->state == RMR_OK ) {                                                                    // successful send
+               if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
+                       return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
+               } else {
+                       rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
+                       return NULL;
+               }
+       } else {                                                                                        // send failed -- return original message
+               if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
+                       errno = EAGAIN;
+                       msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
+               } else {
+                       msg->state = RMR_ERR_SENDFAILED;
+               }
+
+               if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
+       }
+
+       return msg;
+}
+
+/*
+       send message with maximum timeout.
+       Accept a message and send it to an endpoint based on message type.
+       If NNG reports that the send attempt timed out, or should be retried,
+       RMr will retry for approximately max_to microseconds; rounded to the next
+       higher value of 10.
+
+       Allocates a new message buffer for the next send. If a message type has
+       more than one group of endpoints defined, then the message will be sent
+       in round robin fashion to one endpoint in each group.
+
+       An endpoint will be looked up in the route table using the message type and
+       the subscription id. If the subscription id is "UNSET_SUBID", then only the
+       message type is used.  If the initial lookup, with a subid, fails, then a
+       second lookup using just the mtype is tried.
+
+       When msg->state is not OK, this function must set tp_state in the message as 
+       some API fucntions return the message directly and do not propigate errno into 
+       the message.
+
+       CAUTION: this is a non-blocking send.  If the message cannot be sent, then
+               it will return with an error and errno set to eagain. If the send is
+               a limited fanout, then the returned status is the status of the last
+               send attempt.
+
+*/
+static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
+       endpoint_t*     ep;                                     // end point that we're attempting to send to
+       rtable_ent_t*   rte;                    // the route table entry which matches the message key
+       int     nn_sock;                                        // endpoint socket (fd in si case) for send
+       uta_ctx_t*      ctx;
+       int                     group;                          // selected group to get socket for
+       int                     send_again;                     // true if the message must be sent again
+       rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
+       int                     sock_ok;                        // got a valid socket from round robin select
+       char*           d1;
+       int                     ok_sends = 0;           // track number of ok sends
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
+               errno = EINVAL;                                                                                         // if msg is null, this is their clue
+               if( msg != NULL ) {
+                       msg->state = RMR_ERR_BADARG;
+                       errno = EINVAL;                                                                                 // must ensure it's not eagain
+                       msg->tp_state = errno;
+               }
+               return msg;
+       }
+
+       errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
+       if( msg->header == NULL ) {
+               fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
+               msg->state = RMR_ERR_NOHDR;
+               errno = EBADMSG;                                                                                        // must ensure it's not eagain
+               msg->tp_state = errno;
+               return msg;
+       }
+
+       if( max_to < 0 ) {
+               max_to = ctx->send_retries;             // convert to retries
+       }
+
+       if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
+               if( ctx->flags & CTXFL_WARN ) {
+                       fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
+               }
+               msg->state = RMR_ERR_NOENDPT;
+               errno = ENXIO;                                                                          // must ensure it's not eagain
+               msg->tp_state = errno;
+               return msg;                                                                                     // caller can resend (maybe) or free
+       }
+
+       send_again = 1;                                                                                 // force loop entry
+       group = 0;                                                                                              // always start with group 0
+       while( send_again ) {
+               sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx );         // select endpt from rr group and set again if more groups
+
+               if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
+                               msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
+
+               group++;
+
+               if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
+                       if( send_again ) {
+                               clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
+                               if( clone_m == NULL ) {
+                                       msg->state = RMR_ERR_SENDFAILED;
+                                       errno = ENOMEM;
+                                       msg->tp_state = errno;
+                                       if( ctx->flags & CTXFL_WARN ) {
+                                               fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
+                                       }
+                                       return msg;
+                               }
+
+                               if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+                               msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
+                               msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
+       
+                               if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
+                                       rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
+                                       msg = clone_m;
+                               } else {
+                                       ok_sends++;
+                                       msg = clone_m;                                                                          // clone will be the next to send
+                               }
+                       } else {
+                               msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
+                               if( DEBUG ) {
+                                       if( msg == NULL ) {
+                                               fprintf( stderr, "[DBUG] mtosend_msg:  send returned nil message!\n" );         
+                                       }
+                               }
+                       }
+
+                       if( ep != NULL && msg != NULL ) {
+                               switch( msg->state ) {
+                                       case RMR_OK:
+                                               ep->scounts[EPSC_GOOD]++;
+                                               break;
+                               
+                                       case RMR_ERR_RETRY:
+                                               ep->scounts[EPSC_TRANS]++;
+                                               break;
+
+                                       default:
+                                               ep->scounts[EPSC_FAIL]++;
+                                               uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
+                                               break;
+                               }
+                       }
+               } else {
+/*
+                       if( ctx->flags & CTXFL_WARN ) {
+                               fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
+                       }
+*/
+                       msg->state = RMR_ERR_NOENDPT;
+                       errno = ENXIO;
+               }
+       }
+
+       if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
+               msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
+               if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
+                       msg->state = RMR_OK;
+               }
+       
+               if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
+       
+               msg->tp_state = errno;
+       }
+
+       return msg;                                                                     // last message caries the status of last/only send attempt
+}
+
+
+/*
+       A generic wrapper to the real send to keep wormhole stuff agnostic.
+       We assume the wormhole function vetted the buffer so we don't have to.
+*/
+static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
+       return send_msg( ctx, msg, ep->nn_sock, -1 );
+}
+
+#endif
index 019f3e2..670ca1f 100644 (file)
 #==================================================================================
 #
 
-# NOTE:  this makefile assumes that RMr has been built using the directory .build
+# NOTE:  this makefile assumes that RMR has been built using the directory .build
 #              at the top most repo directory (e.g. ../../.build). It can be changed 
 #              if you need to by adding "build_path=<path>" to the make command line.
-#              To use this makefile to build on a system where RMr is already installed
+#              To use this makefile to build on a system where RMR is already installed
 #              try:    make build_path=/usr/local/lib
 #
 #              By default we prefer the Korn shell (it's just better). If you really need
@@ -46,9 +46,14 @@ LIBRARY_PATH = $(LD_LIBRARY_PATH)
 # from the perspective of two, or more, communicating processes. 
 
 
-.PHONY: all
+.PHONY: all all_si
 all: sender receiver caller mt_receiver v_sender ex_rts_receiver
 
+all_si: sender_si receiver_si
+
+
+# ------ nng based builds ------------------------------------------------------------
+
 receiver: receiver.c
        gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
@@ -74,6 +79,20 @@ lcaller: lcaller.c
        gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@  -lrmr_nng -lnng -lpthread -lm
 
 lsender: lsender.c
+       gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@  -lrmr_nng -lnng -lpthread -lm
+
+
+
+# ----- si test builds -------------------------------------------------------------
+
+sender_si: sender.c
+       gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_si -lpthread -lm
+
+receiver_si: receiver.c
+       gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_si -lpthread -lm
+
+
+# --------- housekeeping -----------------------------------------------------------
 
 # clean removes intermediates; nuke removes everything that can be built
 .PHONY: clean nuke
index 7b7336a..c72e072 100644 (file)
@@ -64,6 +64,7 @@
 #include <rmr/rmr.h>
 
 #define TRACE_SIZE 40          // bytes in header to provide for trace junk
+#define WBUF_SIZE 1024
 
 /*
        Thread data
@@ -108,13 +109,15 @@ static void* mk_calls( void* data ) {
        int             drops = 0;
        int             fail_count = 0;                                 // # of failure sends after first successful send
        int             successful = 0;                                 // set to true after we have a successful send
-       char    wbuf[1024];
+       char*   wbuf = NULL;
        char    xbuf[1024];                                             // build transaction string here
        char    trace[1024];
        int             xaction_id = 1;
        char*   tok;
        int             state = 0;
 
+       wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
+
        if( (control  = (tdata_t *) data) == NULL ) {
                fprintf( stderr, "thread data was nil; bailing out\n" );
        }
index 51429b5..994c591 100644 (file)
@@ -68,6 +68,9 @@
 
 #include <rmr/rmr.h>
 
+#define WBUF_SIZE      1024
+#define TRACE_SIZE     1024
+
 static int sum( char* str ) {
        int sum = 0;
        int     i = 0;
@@ -112,9 +115,9 @@ int main( int argc, char** argv ) {
        int             mtype = 0;
        int             stats_freq = 100;
        int             successful = 0;                                 // set to true after we have a successful send
-       char    wbuf[1024];
+       char*   wbuf = NULL;                                    // working buffer
        char    me[128];                                                // who I am to vet rts was actually from me
-       char    trace[1024];
+       char*   trace = NULL;                                   // area to build trace data in
        long    timeout = 0;
        int             delay = 100000;                                 // usec between send attempts
        int             nmsgs = 10;                                             // number of messages to send
@@ -122,6 +125,9 @@ int main( int argc, char** argv ) {
        int             start_mt = 0;
        int             pass = 1;
 
+       wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
+       trace = (char *) malloc( sizeof( char ) * TRACE_SIZE );
+
        if( argc > 1 ) {
                nmsgs = atoi( argv[1] );
        }
@@ -187,7 +193,7 @@ int main( int argc, char** argv ) {
 
        timeout = time( NULL ) + 20;
 
-       gethostname( wbuf, sizeof( wbuf ) );
+       gethostname( wbuf, WBUF_SIZE );
        snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
 
        while( count < nmsgs ) {                                                                // we send n messages after the first message is successful