enhance(API): Add multi-threaded call 97/197/3
authorE. Scott Daniels <daniels@research.att.com>
Mon, 20 May 2019 20:00:52 +0000 (20:00 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Tue, 28 May 2019 13:06:25 +0000 (13:06 +0000)
Change-Id: I2d7c9abd2aabe4c2f05ba0935acaeb4f3fd8bb94
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Tweaks to call based on testing

Change-Id: I2cda8652ba045bf411bda77e64d2a92de8d2c0f2
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Add nng call static module

Change-Id: I5e964078ae346b25cc283ea32239271ea69fa55e
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Add locking round connect

Change-Id: Icf7a9c691385f199107b746c34c32f8f83b1ef04
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Tweaks based on testing; move mtrcv to static module

Change-Id: I75c38a9eeb34991da92fdb6f655b11d7f600d8a8
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Unit test changes

Change-Id: I7c1d3dcbe8802ce459a63b762c3ad3b8abeb7a61
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Beef up unit tests to passing after discount, all >70% before discount

Change-Id: I0e34052c142cfea77053512aac68008c3af49694
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Update application oriented tests to include mt-call

Change-Id: I0939abf96008ed7fde9640a070d09e683c0d5dea
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Fix possible nil pointer problem

Change-Id: I55e911761d54b9fc7500c121de018b485913763e
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Tweaks resulting from testing

Change-Id: Iaa6fb4d2719a39dbe209e17cc793c2341a477043
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Add info message to make it obvious when mt-receive is enabled

Change-Id: I6dd3cd5ad01d5cf2a09dda87ce6fdd0bc5a4670c
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Add man pages for new mt functions

Change-Id: Ia52e5d71502bcebe6af65024602dc32ca8877b52
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Add man pages to CMake

Change-Id: I60ff3a753d9249c8797d9141dd3ce2b5b55752b8
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Update CM version

Change-Id: I9e0f031f4e8a0ccba8ae7c788a6d9bc6dcb0f15b
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
42 files changed:
CMakeLists.txt
doc/CMakeLists.txt
doc/src/man/rmr_init.3.xfm
doc/src/man/rmr_mt_call.3.xfm [new file with mode: 0644]
doc/src/man/rmr_mt_rcv.3.xfm [new file with mode: 0644]
doc/src/man/rmr_rcv_msg.3.xfm
src/rmr/common/include/RIC_message_types.h
src/rmr/common/include/rmr.h
src/rmr/common/include/rmr_agnostic.h
src/rmr/common/src/mbuf_api.c
src/rmr/common/src/mt_call_static.c [new file with mode: 0644]
src/rmr/common/src/rt_generic_static.c
src/rmr/nanomsg/include/rmr_private.h
src/rmr/nanomsg/src/rmr.c
src/rmr/nng/include/rmr_nng_private.h
src/rmr/nng/src/mt_call_nng_static.c [new file with mode: 0644]
src/rmr/nng/src/rmr_nng.c
src/rmr/nng/src/rtable_nng_static.c
src/rmr/nng/src/sr_nng_static.c
test/Makefile
test/app_test/Makefile
test/app_test/caller.c [new file with mode: 0644]
test/app_test/receiver.c
test/app_test/run_call_test.ksh [new file with mode: 0644]
test/hdr_static_test.c
test/mbuf_api_static_test.c
test/mbuf_api_test.c
test/ring_static_test.c
test/ring_test.c
test/rmr_nano_test.c
test/rmr_nng_api_static_test.c
test/rmr_nng_test.c
test/rt_nano_static_test.c
test/rt_static_test.c
test/sr_nano_static_test.c
test/sr_nng_static_test.c
test/symtab_static_test.c
test/symtab_test.c
test/test_nng_em.c
test/tools_test.c
test/unit_test.ksh
test/wormhole_static_test.c

index cf887cf..4199147 100644 (file)
@@ -23,7 +23,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "25" )
+set( patch_level "26" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
index b4f4d8a..b092fdc 100644 (file)
@@ -80,6 +80,8 @@ if( BUILD_DOC )
                rmr_tralloc_msg.3
                rmr_get_trlen.3
                rmr_get_src.3
+               rmr_mt_call.3
+               rmr_mt_rcv.3
        )
 
        # empty list of roff/troff input files we generated
index 408c954..83a0f18 100644 (file)
@@ -66,6 +66,46 @@ The value of &ital(max_msg_size) will be used when allocating zero copy send buf
 which must be allocated, possibly, prior to the application knowing the actual size of
 a specific message. 
 
+&space
+&ital(Flags) allows for selection of some RMr options at the time of initialisation. 
+These are set by ORing &cw(RMRFL_) constants from the RMr header file. Currently the 
+following flags are supported:
+
+&half_space
+&beg_dlist(1i : &bold_font )
+&ditem(RMRFL_NONE) 
+       No flags are set.
+
+&half_space
+&ditem(RMRFL_NOTHREAD)
+       The route table collector thread is not to be started. This should only be used
+       by the route table generator application if it is based on RMr.
+
+&half_space
+&ditem(RMRFL_MTCALL)
+       Enable multi-threaded call support. 
+&end_dlist
+
+&h3(Multi-threaded Calling)
+The support for an application to issue a &ital(blocking call) by the &cw(rmr_call()) function
+was limited such that only user applications which were operating in a single thread
+could safely use the function.
+Further, timeouts were message count based and not time unit based. 
+Multi-threaded call support adds the ability for a user application with multiple threads
+to invoke a blocking call function with the guarentee that the correct response message 
+is delivered to the thread.  
+The additional support is implemented with the &ital( rmr_mt_call() ) and &ital( rmr_mt_rcv() )
+function calls. 
+&space
+
+Multi-threaded call support requires the user application to specifically enable it
+when RMr is initialised. 
+This is necessary because a second, dedicated, receiver thread  must be started, and 
+requires all messages to be examined and queued by this thread.
+The additional overhead is minimal, queuing information is all in the RMr message
+header, but as an additional process is necessary the user application must "opt in" 
+to this approach.
+
 &space
 &h2(ENVIRONMENT)
 As a part of the initialisation process &cw(rmr_init) will look into the available 
@@ -117,6 +157,8 @@ rmr_alloc_msg(3),
 rmr_call(3),
 rmr_free_msg(3),
 rmr_get_rcvfd(3),
+rmr_mt_call(3),
+rmr_mt_rcv(3),
 rmr_payload_size(3),
 rmr_send_msg(3),
 rmr_rcv_msg(3),
diff --git a/doc/src/man/rmr_mt_call.3.xfm b/doc/src/man/rmr_mt_call.3.xfm
new file mode 100644 (file)
index 0000000..6cbfc24
--- /dev/null
@@ -0,0 +1,221 @@
+.if false
+==================================================================================
+       Copyright (c) 2019 Nokia 
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+.fi
+
+
+.if false
+       Mnemonic        rmr_mt_call_man.xfm
+       Abstract        The manual page for the rmr multi-threaded call function.
+       Author          E. Scott Daniels
+       Date            24 May 2019
+.fi
+
+.** if formatting with tfm, the roff.im will cause roff output to be generated
+.** if formatting with pfm, then pretty postscript will be generated
+.gv e LIB lib
+.if pfm
+       .im &{lib}/generic_ps.im
+.ei
+       .gv e OUTPUT_RST use_rst
+       .if .ev &use_rst 1 = 
+               .im &{lib}/rst.im
+       .ei
+               .im &{lib}/roff.im
+       .fi
+.fi
+
+&line_len(6i)
+
+&h1(RMR Library Functions)
+&h2(NAME)
+       rmr_mt_call
+
+&h2(SYNOPSIS )
+&indent
+&ex_start
+#include <rmr/rmr.h>
+
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* msg, int id, int timeout );
+&ex_end
+&uindent
+
+&h2(DESCRIPTION)
+The &cw(rmr_mt_call) function sends the user application message to a remote
+endpoint, and waits for a corresponding response message before returning
+control to the user application.
+The user application supplies a completed message buffer, as it would for
+a &cw(rmr_send_msg) call, but unlike with a send, the buffer returned will have
+the response from the application that received the message.
+The thread invoking the &ital( rmr_mt_call())  will block until a message arrives
+or until &ital(timeout) milliseconds has passed; which ever comes first.  
+Using a timeout value of zero (0) will cause the thread to block without a timeout.
+
+&space
+The &ital( id ) supplied as the third parameter is an integer in the range of 2 through
+255 inclusive. 
+This is a caller defined "thread number" and is used to match the response message
+with the correct user application thread.
+If the ID value is not in the proper range, the attempt to make the call will fail.
+
+&space
+Messages which are received while waiting for the response are queued on a &ital(normal)
+receive queue and will be delivered to the user application with the next invocation 
+of &itaL( rmr_mt_rcv() ) or &ital( rmr_rvv_msg().) 
+by RMR, and are returned to the user application when &cw(rmr_rcv_msg) is 
+invoked.
+These messages are returned in th order received, one per call to &cw(rmr_rcv_msg.)
+
+&space
+NOTE: Currently the multi-threaded functions are supported only when the NNG 
+transport mechanism is being used. It will not be possible to link a programme
+using the nanomsg version of the library when references to this function are
+present.
+
+&h3(The Transaction ID)
+The user application is responsible for setting the value of the transaction ID field
+before invoking &ital(rmr_mt_call.)  
+The transaction ID is a &cw(RMR_MAX_XID) byte field that is used to match the 
+response message when it arrives. 
+RMr will compare &bold(all) of the bytes in the field, so the caller must ensure
+that they are set correctly to avoid missing the response message. 
+(The application which returns the response message is also expected to ensure that
+the return buffer has the matching transaction ID. This can be done transparently if
+the application uses the &ital( rmr_rts_msg() ) function and does not adjust the 
+transaction ID.
+
+
+&h2(RETURN VALUE)
+The &cw(rmr_mt_call) function returns a pointer to a message buffer with the state set to reflect
+the overall state of call processing.
+If the state is &cw(RMR_OK) then the buffer contains the response message; otherwise
+the state indicates the error encountered while attempting to send the message.
+
+&space
+If no response message is received when the timeout period has expired, a nil pointer
+will be returned (NULL).  
+
+&h2(ERRORS)
+These values are reflected in the state field of the returned message. 
+
+&half_space
+&beg_dlist(.75i : ^&bold_font )
+&di(RMR_OK) The call was successful and the message buffer references the response message.
+
+&half_space
+&di(RMR_ERR_BADARG) An argument passed to the function was invalid.
+
+&half_space
+&di(RMR_ERR_CALLFAILED) The call failed and the value of &ital(errno,) as described below, 
+       should be checked for the specific reason.
+
+&half_space
+&di(RMR_ERR_NOENDPT) An endpoint associated with the message type could not be found in the
+       route table.
+
+&half_space
+&di(RMR_ERR_RETRY) The underlying transport mechanism was unable to accept the message
+       for sending. The user application can retry the call operation if appropriate to
+       do so.
+
+&end_dlist
+
+&space
+The global "variable" &ital(errno) will be set to one of the following values if the 
+overall call processing was not successful. 
+&half_space
+
+&beg_dlist(.75i : ^&bold_font )
+&di(ETIMEDOUT) Too many messages were queued before receiving the expected response
+
+&half_space
+&di(ENOBUFS)   The queued message ring is full, messages were dropped
+
+&half_space
+&di(EINVAL)     A parameter was not valid
+
+&half_space
+&di(EAGAIN)    The underlying message system wsa interrupted or the device was busy;
+       the message was &bold(not) sent, and user application should call 
+       this function with the message again.
+&end_dlist
+
+&h2(EXAMPLE)
+The following code bit shows one way of using the &cw(rmr_mt_call) function, and illustrates
+how the transaction ID must be set.
+
+&space
+&ex_start
+    int retries_left = 5;               // max retries on dev not available
+    static rmr_mbuf_t*  mbuf = NULL;    // response msg
+    msg_t*  pm;                         // private message (payload)
+
+       // get a send buffer and reference the payload 
+    mbuf = rmr_alloc_msg( mr, RMR_MAX_RCV_BYTES );
+    pm = (msg_t*) mbuf->payload;
+
+       // generate an xaction ID and fill in payload with data and msg type
+    rmr_bytes2xact( mbuf, xid, RMR_MAX_XID );
+    snprintf( pm->req, sizeof( pm->req ), "{ \"req\": \"num users\"}" );
+    mbuf->mtype = MT_USR_RESP;
+    
+    msg = rmr_mt_call( mr, msg, my_id, 100 );          // wait up to 100ms
+    if( ! msg ) {               // probably a timeout and no msg received
+        return NULL;            // let errno trickle up
+    } 
+
+    if( mbuf->state != RMR_OK ) {
+        while( retries_left-- > 0 &&             // loop as long as eagain
+               mbuf->state == RMR_ERR_RETRY && 
+               (msg = rmr_mt_call( mr, msg )) != NULL && 
+               mbuf->state != RMR_OK ) {
+
+            usleep( retry_delay );
+        }
+    
+        if( mbuf == NULL || mbuf->state != RMR_OK ) {
+            rmr_free_msg( mbuf );        // safe if nil
+            return NULL;
+        }
+    }
+
+    // do something with mbuf
+&ex_end
+
+
+&h2(SEE ALSO )
+.ju off
+rmr_alloc_msg(3),
+rmr_free_msg(3),
+rmr_init(3),
+rmr_mt_rcv(3),
+rmr_payload_size(3),
+rmr_send_msg(3),
+rmr_rcv_msg(3),
+rmr_rcv_specific(3),
+rmr_rts_msg(3),
+rmr_ready(3),
+rmr_fib(3),
+rmr_has_str(3),
+rmr_tokenise(3),
+rmr_mk_ring(3),
+rmr_ring_free(3)
+.ju on
+
+
+.qu
+
diff --git a/doc/src/man/rmr_mt_rcv.3.xfm b/doc/src/man/rmr_mt_rcv.3.xfm
new file mode 100644 (file)
index 0000000..7c83f18
--- /dev/null
@@ -0,0 +1,210 @@
+.if false
+==================================================================================
+       Copyright (c) 2019 Nokia 
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+.fi
+.if false
+       Mnemonic        rmr_mt_rcv_man.xfm
+       Abstract        The manual page for the rmr_mt_rcv function.
+       Author          E. Scott Daniels
+       Date            24 May 2019
+.fi
+
+.** if formatting with tfm, the roff.im will cause roff output to be generated
+.** if formatting with pfm, then pretty postscript will be generated
+.gv e LIB lib
+.if pfm
+       .im &{lib}/generic_ps.im
+.ei
+       .gv e OUTPUT_RST use_rst
+       .if .ev &use_rst 1 = 
+               .im &{lib}/rst.im
+       .ei
+               .im &{lib}/roff.im
+       .fi
+.fi
+
+&line_len(6i)
+
+&h1(RMR Library Functions)
+&h2(NAME)
+       rmr_mt_rcv
+
+&h2(SYNOPSIS )
+&indent
+&ex_start
+#include <rmr/rmr.h>
+
+rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* old_msg, int timeout );
+&ex_end
+&uindent
+
+&h2(DESCRIPTION)
+The &cw(rmr_mt_rcv) function blocks until a message is received, or the timeout
+period (milliseconds) has passed. 
+The result is an RMr message buffer which references a received message. 
+In the case of a timeout the state will be reflected in an "empty buffer" (if old_msg
+was not nil, or simply with the return of a nil pointer.
+If a timeout value of zero (0) is given, then the function will block until
+the next message received.
+
+&space
+The &ital(vctx) pointer is the pointer returned by the &cw(rmr_init) function.
+&ital(Old_msg) is a pointer to a previously used message buffer or NULL. 
+The ability to reuse message buffers helps to avoid alloc/free cycles in the
+user application. 
+When no buffer is available to supply, the receive function will allocate one.
+
+&space
+The &ital(old_msg) parameter allows the user to pass a previously generated RMr 
+message back to RMr for reuse. 
+Optionally, the user application may pass a nil pointer if no reusable message
+is available. 
+When a timeout occurs, and old_msg was not nil, the state will be returned 
+by returning a pointer to the old message with the state set. 
+
+&space
+It is possible to use the &ital(rmr_rcv_msg()) function instead of this function. 
+Doing so might be advantagous if the user programme does not always start the
+multi-threaded mode and the use of &ital(rmr_rcv_msg()) would make the flow of
+the code more simple. 
+The advantags of using this function are the ability to set a timeout without
+using epoll, and a small performance gain (if multi-threaded mode is enabled, and the
+&ital(rmr_rcv_msg()) function is used, it simply invokes this function without
+a timeout value, thus there is the small cost of a second call that results).
+Similarly, the &ital(rmr_torcv_msg()) call can be used when in multi-threaded
+mode with the same "pass through" overhead to using this function directly.
+
+&space
+NOTE: Currently the multi-threaded functions are supported only when the NNG 
+transport mechanism is being used. It will not be possible to link a programme
+using the nanomsg version of the library when references to this function are
+present.
+
+&h2(RETURN VALUE)
+When a message is received before the timeout period expires, a pointer to the 
+RMr message buffer which describes the message is returned.  
+This will, with a high probability, be a different message buffer than &ital(old_msg;)
+the user application should not continue to use &ital(old_msg) after it is passed
+to this function. 
+
+&space
+In the event of a timeout the return value will be the old msg with the state set, 
+or a nil pointer if no old message was provided.
+
+&h2(ERRORS)
+The &ital(state) field in the message buffer will be set to one of the following
+values: 
+&space
+
+&beg_dlist(.75i : ^&bold_font )
+&di(RMR_OK) The message was received without error.
+
+&half_space
+&di(RMR_ERR_BADARG) A parameter passed to the function was not valid (e.g. a nil pointer).
+
+indicate either &cw(RMR_OK) or
+&cw(RMR_ERR_EMPTY) if an empty message was received. 
+
+&half_space
+&di(RMR_ERR_EMPTY) The message received had no associated data. The length of the 
+       message will be 0.
+
+&half_space
+&di(RMR_ERR_NOTSUPP) The multi-threaded option was not enabled when RMr was 
+initialised.  See the man page for &ital(rmr_init() ) for details.
+
+&half_space
+&di(RMR_ERR_RCVFAILED) A hard error occurred preventing the receive from completing.
+&end_dlist
+
+
+When a nil pointer is returned, or any other state value was set in the message
+buffer,  &cw(errno) will be set to one of the following:
+&space
+
+&beg_dlist(.75i : ^&bold_font )
+&di(INVAL) Parameter(s) passed to the function were not valid.
+
+&half_space
+&di(EBADF) The underlying message transport is unable to process the request.
+
+&half_space
+&di(ENOTSUP) The underlying message transport is unable to process the request.
+
+&half_space
+&di(EFSM) The underlying message transport is unable to process the request.
+
+&half_space
+&di(EAGAIN) The underlying message transport is unable to process the request.
+
+&half_space
+&di(EINTR) The underlying message transport is unable to process the request.
+
+&half_space
+&di(ETIMEDOUT) The underlying message transport is unable to process the request.
+
+&half_space
+&di(ETERM) The underlying message transport is unable to process the request.
+&end_dlist
+
+&h2(EXAMPLE)
+&space
+&ex_start
+    rmr_mbuf_t*  mbuf = NULL;   // received msg
+
+    msg = rmr_mt_recv( mr, mbuf, 100 );     // wait up to 100ms
+    if( msg != NULL ) {
+        switch( msg->state ) {
+            case RMR_OK:
+                printf( "got a good message\n" );
+                break;
+
+            case RMR_ERR_EMPTY:
+                printf( "received timed out\n" );
+                break;
+
+            default:
+                printf( "receive error: %d\n", mbuf->state );
+                break;
+        }
+    } else {
+        printf( "receive timeout (nil)\n" );
+    }
+&ex_end
+
+&h2(SEE ALSO )
+.ju off
+rmr_alloc_msg(3),
+rmr_call(3),
+rmr_free_msg(3),
+rmr_get_rcvfd(3),
+rmr_init(3),
+rmr_mk_ring(3),
+rmr_mt_call(3),
+rmr_payload_size(3),
+rmr_send_msg(3),
+rmr_torcv_msg(3),
+rmr_rcv_specific(3),
+rmr_rts_msg(3),
+rmr_ready(3),
+rmr_ring_free(3),
+rmr_torcv_msg(3)
+.ju on
+
+
+.qu
+
index 58809fd..b432f9d 100644 (file)
@@ -119,7 +119,8 @@ rmr_torcv_msg(3),
 rmr_rcv_specific(3),
 rmr_rts_msg(3),
 rmr_ready(3),
-rmr_ring_free(3)
+rmr_ring_free(3),
+rmr_torcv_msg(3)
 .ju on
 
 
index 92e2602..403a770 100644 (file)
@@ -33,6 +33,8 @@
 #define RIC_UNDEFINED                          -1
 
 // ---------------------------------------------------------
+#define RIC_SCTP_CONNECTION_FAILURE 1080
+
 #define RIC_SUB_REQ                                    12010   
 #define RIC_SUB_RESP                           12011   
 #define RIC_SUB_FAILURE                                12012   
@@ -57,6 +59,7 @@
 #define RIC_X2_RESET                           10070   
 #define        RIC_X2_RESET_RESP                       10071
 
+
 #define RIC_ENDC_X2_SETUP_REQ          10360   
 #define RIC_ENDC_X2_SETUP_RESP         10361   
 #define RIC_ENDC_X2_SETUP_FAILURE      10362   
index e6c1660..bed5f2a 100644 (file)
@@ -43,7 +43,9 @@ extern "C" {
 
                                                                        // various flags for function calls
 #define RMRFL_NONE                     0x00    // no flags
-#define RMRFL_AUTO_ALLOC       0x01    // send auto allocates a zerocopy buffer
+#define RMRFL_NOTHREAD         0x01    // do not start an additional route collector thread
+#define RMRFL_MTCALL           0x02    // set up multi-threaded call support (rmr_init)
+#define RMRFL_AUTO_ALLOC       0x03    // send auto allocates a zerocopy buffer
 
 #define RMR_DEF_SIZE           0               // pass as size to have msg allocation use the default msg size
 
@@ -66,6 +68,7 @@ extern "C" {
 #define RMR_ERR_UNSET          13              // the message hasn't been populated with a transport buffer
 #define        RMR_ERR_TRUNC           14              // received message likely truncated
 #define RMR_ERR_INITFAILED     15              // initialisation of something (probably message) failed
+#define RMR_ERR_NOTSUPP                16              // the request is not supported, or RMr was not initialised for the request
 
 #define RMR_WH_CONNECTED(a) (a>=0)     // for now whid is integer; it could be pointer at some future date
 
@@ -117,6 +120,9 @@ extern rmr_whid_t rmr_wh_open( void* vctx, char const* target );
 extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg );
 extern void rmr_wh_close( void* vctx, int whid );
 
+// ----- mt call support --------------------------------------------------------------------------------
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait );
+extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait );
 
 // ----- msg buffer operations (no context needed) ------------------------------------------------------
 extern int rmr_bytes2meid( rmr_mbuf_t* mbuf, unsigned char const* src, int len );
index 418ebbc..0d4458a 100644 (file)
@@ -60,6 +60,8 @@ typedef struct uta_ctx  uta_ctx_t;
                                                                        // internal flags, must be > than UFLAG_MASK
 //#define IFL_....
 
+#define CFL_MTC_ENABLED        0x01            // multi-threaded call is enabled
+
                                                                        // msg buffer flags
 #define MFL_ZEROCOPY   0x01            // the message is an allocated zero copy message and can be sent.
 #define MFL_NOALLOC            0x02            // send should NOT allocate a new buffer before returning
@@ -68,6 +70,7 @@ typedef struct uta_ctx  uta_ctx_t;
 
 #define MAX_EP_GROUP   32                      // max number of endpoints in a group
 #define MAX_RTG_MSG_SZ 2048            // max expected message size from route generator
+#define MAX_CALL_ID            255                     // largest call ID that is supported
 
 //#define DEF_RTG_MSGID        ""                              // default to pick up all messages from rtg
 #define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on
@@ -83,6 +86,7 @@ typedef struct uta_ctx  uta_ctx_t;
 #define RMR_D1_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len2))
 #define RMR_D2_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len3))
 
+// CAUTION:  if using an offset with a header pointer, the pointer MUST be cast to void* before adding the offset!
 #define TRACE_OFFSET(h)                ((ntohl(((uta_mhdr_t *)h)->len0)))
 #define DATA1_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
 #define DATA2_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
@@ -98,12 +102,17 @@ typedef struct uta_ctx  uta_ctx_t;
 #define SET_HDR_D1_LEN(h,l)    (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
 #define SET_HDR_D2_LEN(h,l)    (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
 
+                                                       // index of things in the d1 data space
+#define D1_CALLID_IDX  0       // the call-id to match on return
+
+#define        NO_CALL_ID              0       // no call id associated with the message (normal queue)
 
 #define V1_PAYLOAD_OFFSET(h)   (sizeof(uta_v1mhdr_t))
 
                                                                                // v2 header flags
 #define HFL_HAS_TRACE  0x01                    // Trace data is populated
 #define HFL_SUBID              0x02                    // subscription ID is populated
+#define HFL_CALL_MSG   0x04                    // msg sent via blocking call
 
 /*
        Message header; interpreted by the other side, but never seen by
@@ -212,6 +221,18 @@ typedef struct ring {
 } ring_t;
 
 
+// --------- multi-threaded call things -----------------------------------------
+/*
+       A chute provides a return path for a received message that a thread has blocked
+       on.  The receive thread will set the mbuf pointer and tickler the barrier to
+       signal to the call thread that data is ready.
+*/
+typedef struct chute {
+       rmr_mbuf_t*     mbuf;                                           // pointer to message buffer received
+       sem_t   barrier;                                                // semaphore that the thread is waiting on
+       unsigned char   expect[RMR_MAX_XID];    // the expected transaction ID
+} chute_t;
+
 
 // -------------- common static prototypes --------------------------------------
 
index e801ac9..65979de 100644 (file)
@@ -34,6 +34,7 @@
 #include <string.h>
 #include <unistd.h>
 #include <stdio.h>
+#include <semaphore.h>
 
 #include "rmr.h"                               // things the users see
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
@@ -246,6 +247,11 @@ extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size )
        }
 
        hdr = (uta_mhdr_t *) msg->header;
+       if( !hdr ) {
+               errno = EINVAL;
+               return 0;
+       }
+
        len = RMR_TR_LEN( hdr );
 
        if( len != size ) {                                                     // different sized trace data, must realloc the buffer
diff --git a/src/rmr/common/src/mt_call_static.c b/src/rmr/common/src/mt_call_static.c
new file mode 100644 (file)
index 0000000..ee3cda2
--- /dev/null
@@ -0,0 +1,80 @@
+// :vi sw=4 ts=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       mt_call_static.c
+       Abstract:       Static multi-threaded call support.
+
+       Author:         E. Scott Daniels
+       Date:           17  May 2019
+*/
+
+#ifndef mtcall_static_c
+#define mtcall_static_c
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <errno.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <time.h>
+
+#include <semaphore.h>
+
+
+/*
+       Initialises the chutes that are then hung off of the context.
+
+       Returns > 0 on success; 0 on failure with errno set.
+*/
+static int init_mtcall( uta_ctx_t*     ctx ) {
+       int                     rc = 1;                 // return code 1== good.
+       int                     i;
+       chute_t*        chutes;
+
+       if( ctx == NULL ) {
+               errno = EINVAL;
+               return 0;
+       }
+
+       chutes = ctx->chutes = (chute_t *) malloc( sizeof( chute_t ) * (MAX_CALL_ID+1) );
+       if( chutes == NULL ) {
+               return 0;
+       }
+
+       for( i = 0; i < MAX_CALL_ID; i++ ) {                            // initialise all of the semaphores
+               chutes[i].mbuf = NULL;
+               if( sem_init( &chutes[i].barrier, 0, 0 ) < 0 ) {
+                       fprintf( stderr, "[ERR] rmr: unable to initialise mt call chute [%d]: %s\n", i, strerror( errno ) );
+                       rc = -1;
+               }
+       }       
+
+
+       return rc;
+}
+
+#endif
index 64cf47f..7add33c 100644 (file)
@@ -413,6 +413,10 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                        case 'u':                                                                                               // update current table, not a total replacement
                                tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
+                                       if( ctx->new_rtable == NULL ) {                                 // update table not in progress
+                                               break;
+                                       }
+
                                        if( ntoks >2 ) {
                                                if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
                                                        fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
@@ -809,9 +813,10 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
                        return NULL;
                }
 
-               ep->open = 0;                                   // not connected
+               ep->open = 0;                                                           // not connected
                ep->addr = uta_h2ip( ep_name );
                ep->name = strdup( ep_name );
+               pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
 
                rmr_sym_put( rt->hash, ep_name, 1, ep );
        }
index 7d78382..8e60d63 100644 (file)
@@ -42,6 +42,7 @@ struct endpoint {
        char*   addr;                   // address used for connection
        int             nn_sock;                // the nano-msg socket to write to for this entry
        int             open;                   // true if we've established the connection
+       pthread_mutex_t gate;           // must be able to serialise some transport level functions on the ep
 };
 
 /*
index b57df3a..93e4f7a 100644 (file)
@@ -44,6 +44,7 @@
 #include <stdint.h>
 #include <time.h>
 #include <arpa/inet.h>
+#include <semaphore.h>
 
 #include <nanomsg/nn.h>
 #include <nanomsg/tcp.h>
index 6bca91f..2349ce6 100644 (file)
@@ -43,6 +43,7 @@ struct endpoint {
        nng_socket      nn_sock;        // the nano-msg socket to write to for this entry
        nng_dialer      dialer;         // the connection specific information (retry timout etc)
        int             open;                   // set to true if we've connected as socket cannot be checked directly)
+       pthread_mutex_t gate;   // we must serialise when we open/link to the endpoint
 };
 
 /*
@@ -71,7 +72,7 @@ struct uta_ctx {
        int nrtele;                                     // number of elements in the routing table
        int send_retries;                       // number of retries send_msg() should attempt if eagain/timeout indicated by nng
        int     trace_data_len;                 // number of bytes to allocate in header for trace data
-       int d1_len;                                     // extra header data 1 length   (future)
+       int d1_len;                                     // extra header data 1 length
        int d2_len;                                     // extra header data 2 length   (future)
        nng_socket      nn_sock;                // our general listen socket
        route_table_t* rtable;          // the active route table
@@ -79,6 +80,7 @@ struct uta_ctx {
        route_table_t* new_rtable;      // route table under construction
        if_addrs_t*     ip_list;                // list manager of the IP addresses that are on our known interfaces
        void*   mring;                          // ring where msgs are queued while waiting for a call response msg
+       chute_t*        chutes;
 
        char*   rtg_addr;                       // addr/port of the route table generation publisher
        int             rtg_port;                       // the port that the rtg listens on
@@ -87,6 +89,7 @@ struct uta_ctx {
        epoll_stuff_t*  eps;            // epoll information needed for the rcv with timeout call
 
        pthread_t       rtc_th;                 // thread info for the rtc listener
+       pthread_t       mtc_th;                 // thread info for the multi-thread call receive process
 };
 
 
@@ -101,7 +104,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags );
 static void free_ctx( uta_ctx_t* ctx );
 
 // --- rt table things ---------------------------
-static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer );
+static int uta_link2( endpoint_t* ep );
 static int rt_link2_ep( endpoint_t* ep );
 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock );
 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock );
diff --git a/src/rmr/nng/src/mt_call_nng_static.c b/src/rmr/nng/src/mt_call_nng_static.c
new file mode 100644 (file)
index 0000000..2ef21f6
--- /dev/null
@@ -0,0 +1,114 @@
+// : vi ts=4 sw=4 noet :
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       mt_call_nng_static.c
+       Abstract:       Static funcitons related to the multi-threaded call feature
+                               which are NNG specific.
+
+       Author:         E. Scott Daniels
+       Date:           20 May 2019
+*/
+
+#ifndef _mtcall_nng_static_c
+#define _mtcall_nng_static_c
+#include <semaphore.h>
+
+static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
+       chute_t*        chute;
+       int                     state;
+
+       if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
+               rmr_free_msg( mbuf );                                                           // drop if ring is full
+       }
+
+       chute = &ctx->chutes[0];
+       chute->mbuf = mbuf;
+       state = sem_post( &chute->barrier );                                                            // tickle the ring monitor
+}
+
+/*
+       This is expected to execute in a separate thread. It is responsible for
+       _all_ receives and queues them on the appropriate ring, or chute.
+
+       The "state" of the message is checked which determines where the message
+       is delivered.
+
+               Flags indicate that the message is a call generated message, then
+               the message is queued on the normal receive ring.
+
+               Chute ID is == 0, then the message is queued on the normal receive ring.
+
+               The transaction ID in the message matches the expected ID in the chute,
+               then the message is given to the chute and the chute's semaphore is tickled.
+
+               If none are true, the message is dropped.
+*/
+static void* mt_receive( void* vctx ) {
+       uta_ctx_t*              ctx;
+       uta_mhdr_t*             hdr;            // header of the message received
+       rmr_mbuf_t*             mbuf;           // msg received
+       unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
+       chute_t*                chute;
+       unsigned int    call_id;        // the id assigned to the call generated message
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               if( DEBUG ) fprintf( stderr, "rmr mt_receive: bad parms, thread not started\n" );
+               return NULL;
+       }
+
+       fprintf( stderr, "[INFO] rmr mt_receiver is spinning\n" );
+
+       while( ! ctx->shutdown ) {
+               mbuf = rcv_msg( ctx, NULL );
+
+               if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL ) {
+                       if( hdr->flags & HFL_CALL_MSG ) {                                       // call generated message; ignore call-id etc and queue
+                               queue_normal( ctx, mbuf );
+                       } else {
+                               if( RMR_D1_LEN( hdr ) <= 0 ) {                                  // no call-id data; just queue
+                                       queue_normal( ctx, mbuf );
+                               } else {
+                                       d1 = DATA1_ADDR( hdr );
+                                       if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                                       // call_id not set, just queue
+                                               queue_normal( ctx, mbuf );
+                                       } else {
+                                               chute = &ctx->chutes[call_id];
+                                               if( memcmp( mbuf->xaction, chute->expect, RMR_MAX_XID ) == 0 ) {                // match
+                                                       chute->mbuf = mbuf;
+                                                       sem_post( &chute->barrier );
+                                               } else {
+                                                       rmr_free_msg( mbuf );
+                                                       mbuf = NULL;
+                                               }
+                                       }
+                               }
+                       }
+               } else {
+                       if( ! mbuf ) {                          // very very unlikely, but prevent leaks
+                               rmr_free_msg( mbuf );
+                       }
+               }
+       }
+
+       return NULL;
+}
+
+#endif
index e77c9f3..582cbb0 100644 (file)
@@ -50,6 +50,8 @@
 #include <unistd.h>
 #include <time.h>
 #include <arpa/inet.h>
+#include <semaphore.h>
+#include <pthread.h>
 
 #include <nng/nng.h>
 #include <nng/protocol/pubsub0/pub.h>
@@ -70,6 +72,8 @@
 #include "tools_static.c"
 #include "sr_nng_static.c"                     // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
+#include "mt_call_static.c"
+#include "mt_call_nng_static.c"
 
 
 //------------------------------------------------------------------------------
@@ -181,103 +185,20 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
 }
 
 /*
-       send message with maximum timeout.
-       Accept a message and send it to an endpoint based on message type.
-       If NNG reports that the send attempt timed out, or should be retried,
-       RMr will retry for approximately max_to microseconds; rounded to the next
-       higher value of 10.
-
-       Allocates a new message buffer for the next send. If a message type has
-       more than one group of endpoints defined, then the message will be sent
-       in round robin fashion to one endpoint in each group.
-
-       An endpoint will be looked up in the route table using the message type and
-       the subscription id. If the subscription id is "UNSET_SUBID", then only the
-       message type is used.  If the initial lookup, with a subid, fails, then a
-       second lookup using just the mtype is tried.
-
-       CAUTION: this is a non-blocking send.  If the message cannot be sent, then
-               it will return with an error and errno set to eagain. If the send is
-               a limited fanout, then the returned status is the status of the last
-               send attempt.
-
+       This is a wrapper to the real timeout send. We must wrap it now to ensure that
+       the call flag and call-id are reset
 */
 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
-       nng_socket nn_sock;                     // endpoint socket for send
-       uta_ctx_t*      ctx;
-       int     group;                                  // selected group to get socket for
-       int send_again;                         // true if the message must be sent again
-       rmr_mbuf_t*     clone_m;                // cloned message for an nth send
-       int sock_ok;                            // got a valid socket from round robin select
-       uint64_t key;                           // mtype or sub-id/mtype sym table key
-       int     altk_ok = 0;                    // set true if we can lookup on alternate key if mt/sid lookup fails
-
-       if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
-               errno = EINVAL;                                                                                         // if msg is null, this is their clue
-               if( msg != NULL ) {
-                       msg->state = RMR_ERR_BADARG;
-                       errno = EINVAL;                                                                                 // must ensure it's not eagain
-               }
-               return msg;
-       }
+       char* d1;                                                                                                                       // point at the call-id in the header
 
-       errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
-       if( msg->header == NULL ) {
-               fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
-               msg->state = RMR_ERR_NOHDR;
-               errno = EBADMSG;                                                                                        // must ensure it's not eagain
-               return msg;
-       }
+       if( msg != NULL ) {
+               ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
-       if( max_to < 0 ) {
-               max_to = ctx->send_retries;             // convert to retries
-       }
+               d1 = DATA1_ADDR( msg->header );
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+       }       
 
-       send_again = 1;                                                                                 // force loop entry
-       group = 0;                                                                                              // always start with group 0
-
-       key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
-       if( msg->sub_id != UNSET_SUBID ) {
-               altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
-       }
-       while( send_again ) {
-               sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
-                               msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
-
-               if( ! sock_ok ) {
-                       if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
-                               altk_ok = 0;
-                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
-                               send_again = 1;                                                                         // ensure we don't exit the while
-                               continue;
-                       }
-
-                       msg->state = RMR_ERR_NOENDPT;
-                       errno = ENXIO;                                                                                  // must ensure it's not eagain
-                       return msg;                                                                                             // caller can resend (maybe) or free
-               }
-
-               group++;
-
-               if( send_again ) {
-                       clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
-                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
-                       msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
-                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
-                       /*
-                       if( msg ) {
-                               // error do we need to count successes/errors, how to report some success, esp if last fails?
-                       }
-                       */
-
-                       msg = clone_m;                                                                                  // clone will be the next to send
-               } else {
-                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
-               }
-       }
-
-       return msg;                                                                     // last message caries the status of last/only send attempt
+       return mtosend_msg( vctx, msg, max_to );
 }
 
 /*
@@ -286,7 +207,16 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        See rmr_stimeout() for info on setting the default timeout.
 */
 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
-       return rmr_mtosend_msg( vctx, msg,  -1 );                       // retries <  uses default from ctx
+       char* d1;                                                                                                               // point at the call-id in the header
+
+       if( msg != NULL ) {
+               ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
+
+               d1 = DATA1_ADDR( msg->header );
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+       }       
+
+       return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
 }
 
 /*
@@ -314,12 +244,11 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
                The caller must check for this and handle.
 */
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
-       nng_socket nn_sock;                     // endpoint socket for send
+       nng_socket      nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
-       int state;
-       uta_mhdr_t*     hdr;
-       char*   hold_src;                       // we need the original source if send fails
-       int             sock_ok;                        // true if we found a valid endpoint socket
+       int                     state;
+       char*           hold_src;                       // we need the original source if send fails
+       int                     sock_ok;                        // true if we found a valid endpoint socket
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -336,19 +265,20 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
+       ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
        sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // socket of specific endpoint
        if( ! sock_ok ) {
                msg->state = RMR_ERR_NOENDPT;
                return msg;                                                     // preallocated msg can be reused since not given back to nn
        }
 
-       msg->state = RMR_OK;                                    // ensure it is clear before send
-       hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                        // must overlay the source to be ours
+       msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
+       hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
+       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );        // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );                    // always return original source so rts can be called again
-               msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );    // always return original source so rts can be called again
+               msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
        free( hold_src );
@@ -356,6 +286,16 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 }
 
 /*
+       If multi-threading call is turned on, this invokes that mechanism with the special call
+       id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
+       behavour (described below) is carried out.  This is safe to use when mt is enabled, but
+       the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
+       a flexible timeout.
+
+       On timeout this function will return a nil pointer. If the original message could not
+       be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
+
+       Original behavour:
        Call sends the message based on message routing using the message type, and waits for a
        response message to arrive with the same transaction id that was in the outgoing message.
        If, while wiating for the expected response,  messages are received which do not have the
@@ -373,8 +313,6 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
                                        user should call this function with the message again.
 
-
-       QUESTION:  should user specify the number of messages to allow to queue?
 */
 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
        uta_ctx_t*              ctx;
@@ -387,6 +325,10 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
+               return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
+       }
+
        memcpy( expected_id, msg->xaction, RMR_MAX_XID );
        expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
        if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
@@ -426,6 +368,10 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
        }
        errno = 0;
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
+               return rmr_mt_rcv( ctx, old_msg, -1 );
+       }
+
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
@@ -458,6 +404,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
                return old_msg;
        }
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
+               return rmr_mt_rcv( ctx, old_msg, ms_to );
+       }
+
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
@@ -650,7 +600,14 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
-       ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
+       ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
+
+       if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
+               ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
+               init_mtcall( ctx );                                                     // set up call chutes
+       } else {
+               ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
+       }
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
@@ -713,6 +670,14 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                }
        }
 
+       if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
+               ctx->flags |= CFL_MTC_ENABLED;
+               if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
+                       fprintf( stderr, "[WARN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+               }
+               
+       }
+
        free( proto_port );
        return (void *) ctx;
 }
@@ -818,4 +783,210 @@ extern void rmr_close( void* vctx ) {
 }
 
 
+// ----- multi-threaded call/receive support -------------------------------------------------
 
+/*
+       Blocks on the receive ring chute semaphore and then reads from the ring
+       when it is tickled.  If max_wait is -1 then the function blocks until
+       a message is ready on the ring. Else max_wait is assumed to be the number
+       of millaseconds to wait before returning a timeout message.
+*/
+extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
+       uta_ctx_t*      ctx;
+       uta_mhdr_t*     hdr;                    // header in the transport buffer
+       chute_t*        chute;
+       struct timespec ts;                     // time info if we have a timeout
+       long    new_ms;                         // adjusted mu-sec
+       long    seconds = 0;            // max wait seconds
+       long    nano_sec;                       // max wait xlated to nano seconds
+       int             state;
+       rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
+       
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               if( mbuf ) {
+                       mbuf->state = RMR_ERR_BADARG;
+               }
+               return mbuf;
+       }
+
+       if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+               errno = EINVAL;
+               if( mbuf != NULL ) {
+                       mbuf->state = RMR_ERR_NOTSUPP;
+               }
+               return mbuf;
+       }
+
+       ombuf = mbuf;
+       if( ombuf ) {
+               ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
+               ombuf->len = 0;
+       }
+
+       chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
+       
+       if( max_wait > 0 ) {
+               clock_gettime( CLOCK_REALTIME, &ts );   
+
+               if( max_wait > 999 ) {
+                       seconds = (max_wait - 999)/1000;
+                       max_wait -= seconds * 1000;
+                       ts.tv_sec += seconds;
+               }
+               if( max_wait > 0 ) {
+                       nano_sec = max_wait * 1000000;
+                       ts.tv_nsec += nano_sec;
+                       if( ts.tv_nsec > 999999999 ) {
+                               ts.tv_nsec -= 999999999;
+                               ts.tv_sec++;
+                       }
+               }
+
+               seconds = 1;                                                                                                    // use as flag later to invoked timed wait
+       }
+
+       errno = 0;
+       while( chute->mbuf == NULL && ! errno ) {
+               if( seconds ) {
+                       state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
+               } else {
+                       state = sem_wait( &chute->barrier );
+               }
+
+               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
+                       errno = 0;
+               }
+       }
+
+       if( state < 0 ) {
+               mbuf = ombuf;                           // return caller's buffer if they passed one in
+       } else {
+               if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+               if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       if( mbuf ) {
+                               mbuf->state = RMR_OK;
+
+                               if( ombuf ) {
+                                       rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
+                               }
+                       } else {
+                               mbuf = ombuf;                           // no buffer, return user's if there
+                       }
+               }
+       }
+
+       return mbuf;
+}
+
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+       rmr_mbuf_t* ombuf;                      // original mbuf passed in
+       uta_ctx_t*      ctx;
+       uta_mhdr_t*     hdr;                    // header in the transport buffer
+       chute_t*        chute;
+       unsigned char*  d1;                     // d1 data in header
+       struct timespec ts;                     // time info if we have a timeout
+       long    new_ms;                         // adjusted mu-sec
+       long    seconds = 0;            // max wait seconds
+       long    nano_sec;                       // max wait xlated to nano seconds
+       int             state;
+       
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
+               errno = EINVAL;
+               if( mbuf ) {
+                       mbuf->state = RMR_ERR_BADARG;
+               }
+               return mbuf;
+       }
+
+       if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+               mbuf->state = RMR_ERR_NOTSUPP;
+               return mbuf;
+       }
+
+       if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
+               mbuf->state = RMR_ERR_BADARG;
+               return mbuf;
+       }
+
+       ombuf = mbuf;                                                                                                   // save to return timeout status with
+
+       chute = &ctx->chutes[call_id];
+       if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
+               rmr_free_msg( chute->mbuf );
+               chute->mbuf = NULL;
+       }
+       
+       hdr = (uta_mhdr_t *) mbuf->header;
+       hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
+       memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
+       d1 = DATA1_ADDR( hdr );
+       d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
+       mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
+
+       if( max_wait > 0 ) {
+               clock_gettime( CLOCK_REALTIME, &ts );   
+
+               if( max_wait > 999 ) {
+                       seconds = (max_wait - 999)/1000;
+                       max_wait -= seconds * 1000;
+                       ts.tv_sec += seconds;
+               }
+               if( max_wait > 0 ) {
+                       nano_sec = max_wait * 1000000;
+                       ts.tv_nsec += nano_sec;
+                       if( ts.tv_nsec > 999999999 ) {
+                               ts.tv_nsec -= 999999999;
+                               ts.tv_sec++;
+                       }
+               }
+
+               seconds = 1;                                                                            // use as flag later to invoked timed wait
+       }
+
+       mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
+       if( mbuf ) {
+               if( mbuf->state != RMR_OK ) {
+                       return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
+               }
+       }
+
+       errno = 0;
+       while( chute->mbuf == NULL && ! errno ) {
+               if( seconds ) {
+                       state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
+               } else {
+                       state = sem_wait( &chute->barrier );
+               }
+
+               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
+                       errno = 0;
+               }
+       }
+
+       if( state < 0 ) {
+               return NULL;                                    // leave errno as set by sem wait call
+       }
+
+       mbuf = chute->mbuf;
+       mbuf->state = RMR_OK;
+       chute->mbuf = NULL;
+
+       return mbuf;
+}
index 6122f69..d903de3 100644 (file)
        Target assumed to be address:port.  The new socket is returned via the
        user supplied pointer so that a success/fail code is returned directly.
        Return value is 0 (false) on failure, 1 (true)  on success.
+
+       In order to support multi-threaded user applications we must hold a lock before 
+       we attempt to create a dialer and connect. NNG is thread safe, but we can 
+       get things into a bad state if we allow a collision here.  The lock grab
+       only happens on the intial session setup.
 */
-static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
+//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
+static int uta_link2( endpoint_t* ep ) {
+       char*           target; 
+       nng_socket*     nn_sock; 
+       nng_dialer*     dialer;
        char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
        char*           addr;
        int                     state = FALSE;
 
+       if( ep == NULL ) {
+               return FALSE;
+       }
+
+       target = ep->addr;
+       nn_sock = &ep->nn_sock;
+       dialer = &ep->dialer;
+
        if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
                fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
                return FALSE;
@@ -69,13 +86,22 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
                return FALSE;
        }
 
+       pthread_mutex_lock( &ep->gate );                        // grab the lock
+       if( ep->open ) {
+               pthread_mutex_unlock( &ep->gate );
+               return TRUE;
+       }
+       
+
        if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
+               pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
                return FALSE;
        }
 
        snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
        if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
+               pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
                nng_close( *nn_sock );
                return FALSE;
@@ -85,6 +111,7 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
        nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
 
        if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) {                                            // can fail immediatly (unlike nanomsg)
+               pthread_mutex_unlock( &ep->gate );
                fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
                nng_close( *nn_sock );
                return FALSE;
@@ -92,6 +119,8 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
 
        if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
 
+       ep->open = TRUE;                                                // must set before release
+       pthread_mutex_unlock( &ep->gate );
        return TRUE;
 }
 
@@ -108,7 +137,7 @@ static int rt_link2_ep( endpoint_t* ep ) {
                return TRUE;
        }
 
-       ep->open =  uta_link2( ep->addr, &ep->nn_sock, &ep->dialer );
+       uta_link2( ep );
        return ep->open;
 }
 
@@ -198,7 +227,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                        ep->addr = uta_h2ip( ep->name );
                }
-               if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
+               if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
                        state = TRUE;
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
@@ -302,7 +331,7 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
                                ep->addr = uta_h2ip( ep->name );
                        }
 
-                       if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
+                       if( uta_link2( ep ) ) {                                                                                 // find entry in table and create link
                                ep->open = TRUE;
                                *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
                        } else {
index 7c17589..da4dfbd 100644 (file)
@@ -142,8 +142,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                hdr->sub_id = htonl( UNSET_SUBID );
                SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
                SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
-               //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
-               //SET_HDR_D2_LEN( hdr, ctx->d2_len );
+               SET_HDR_D1_LEN( hdr, ctx->d1_len );
+               //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
        }
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
@@ -322,7 +322,6 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
        uta_mhdr_t* hdr;
        uta_v1mhdr_t* v1hdr;
        int     tr_old_len;                     // tr size in new buffer
-       int     coffset;                        // an offset to something in the header for copy
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
@@ -349,22 +348,19 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
                        nm->payload = (void *) v1hdr + sizeof( *v1hdr );
                        break;
 
-               default:                                                                                        // current message always caught  here
+               default:                                                                                        // current message version always caught  here
                        hdr = nm->header;
-                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data might have changed
-                       if( RMR_D1_LEN( hdr )  ) {
-                               coffset = DATA1_OFFSET( hdr );                                                                                          // offset to d1
-                               memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
+                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
 
+                       if( RMR_D1_LEN( hdr )  ) {
+                               memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
                        }
                        if( RMR_D2_LEN( hdr )  ) {
-                               coffset = DATA2_OFFSET( hdr );                                                                                          // offset to d2
-                               memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) );          // copy data2 and data2 if necessary
+                               memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
                        }
 
-                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // MUST set before pointing payload
                        nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
-                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // do NOT copy old trace data, just set the new header
                        break;
        }
 
@@ -427,6 +423,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        msg->len = 0;
        msg->payload = NULL;
        msg->xaction = NULL;
+       msg->tp_buf = NULL;
 
        msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
        if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
@@ -590,6 +587,108 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        return msg;
 }
 
+/*
+       send message with maximum timeout.
+       Accept a message and send it to an endpoint based on message type.
+       If NNG reports that the send attempt timed out, or should be retried,
+       RMr will retry for approximately max_to microseconds; rounded to the next
+       higher value of 10.
+
+       Allocates a new message buffer for the next send. If a message type has
+       more than one group of endpoints defined, then the message will be sent
+       in round robin fashion to one endpoint in each group.
+
+       An endpoint will be looked up in the route table using the message type and
+       the subscription id. If the subscription id is "UNSET_SUBID", then only the
+       message type is used.  If the initial lookup, with a subid, fails, then a
+       second lookup using just the mtype is tried.
+
+       CAUTION: this is a non-blocking send.  If the message cannot be sent, then
+               it will return with an error and errno set to eagain. If the send is
+               a limited fanout, then the returned status is the status of the last
+               send attempt.
+
+*/
+static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
+       nng_socket      nn_sock;                        // endpoint socket for send
+       uta_ctx_t*      ctx;
+       int                     group;                          // selected group to get socket for
+       int                     send_again;                     // true if the message must be sent again
+       rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
+       int                     sock_ok;                        // got a valid socket from round robin select
+       uint64_t         key;                           // mtype or sub-id/mtype sym table key
+       int                     altk_ok = 0;            // set true if we can lookup on alternate key if mt/sid lookup fails
+       char*           d1;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
+               errno = EINVAL;                                                                                         // if msg is null, this is their clue
+               if( msg != NULL ) {
+                       msg->state = RMR_ERR_BADARG;
+                       errno = EINVAL;                                                                                 // must ensure it's not eagain
+               }
+               return msg;
+       }
+
+       errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
+       if( msg->header == NULL ) {
+               fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
+               msg->state = RMR_ERR_NOHDR;
+               errno = EBADMSG;                                                                                        // must ensure it's not eagain
+               return msg;
+       }
+
+       if( max_to < 0 ) {
+               max_to = ctx->send_retries;             // convert to retries
+       }
+
+       send_again = 1;                                                                                 // force loop entry
+       group = 0;                                                                                              // always start with group 0
+
+       key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
+       if( msg->sub_id != UNSET_SUBID ) {
+               altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+       }
+       while( send_again ) {
+               sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
+               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+                               msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
+
+               if( ! sock_ok ) {
+                       if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
+                               altk_ok = 0;
+                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
+                               send_again = 1;                                                                         // ensure we don't exit the while
+                               continue;
+                       }
+
+                       msg->state = RMR_ERR_NOENDPT;
+                       errno = ENXIO;                                                                                  // must ensure it's not eagain
+                       return msg;                                                                                             // caller can resend (maybe) or free
+               }
+
+               group++;
+
+               if( send_again ) {
+                       clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
+                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+                       msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
+                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
+                       /*
+                       if( msg ) {
+                               // error do we need to count successes/errors, how to report some success, esp if last fails?
+                       }
+                       */
+
+                       msg = clone_m;                                                                                  // clone will be the next to send
+               } else {
+                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
+               }
+       }
+
+       return msg;                                                                     // last message caries the status of last/only send attempt
+}
+
+
 /*
        A generic wrapper to the real send to keep wormhole stuff agnostic.
        We assume the wormhole function vetted the buffer so we don't have to.
index d6c2c64..db4655a 100644 (file)
@@ -24,6 +24,7 @@ coverage_opts = -ftest-coverage -fprofile-arcs
 
 #libs = ../build/librmr_nng.a -L ../build/lib -lnng -lpthread -lm
 libs =  -L ../build/lib -lnng -lpthread -lm
+ipaths = -I ../src/rmr/common/src/ -I ../src/rmr/common/include -I ../src/rmr/nng/include/ -I ../src/rmr/nng/src/ -I ../src/rmr/nanomsg/include/ -I ../src/rmr/nanomsg/src/ 
 
 #sa_tests = sa_tools_test.o
 
@@ -31,7 +32,7 @@ libs =  -L ../build/lib -lnng -lpthread -lm
        $(CC) -g $< -c
 
 %:: %.c
-       $(CC) -I ../src/common/src/ -I ../src/common/include -I ../src/nng/include -I ../src/nanomsg/include  $(coverage_opts) -fPIC -g $< -o $@  $(libs)
+       $(CC)  $(ipaths) $(coverage_opts) -fPIC -g $< -o $@  $(libs)
 
 # catch all
 all:
index f05f49e..07748ee 100644 (file)
 # NOTE:  this makefile assumes that RMr has been built using the directory .build
 #              at the top most repo directory (e.g. ../../.build). It can be changed 
 #              if you need to by adding "build_path=<path>" to the make command line.
+#              To use this makefile to build on a system where RMr is already installed
+#              try:    make build_path=/usr/local/lib
+#
+#              By default we prefer the Korn shell (it's just better). If you really need
+#              to build with a dfferent shell add "SHELL=path" to the command line:
+#                              make SHELL=/bin/bash
+#
 
 .EXPORT_ALL_VARIABLES:
 .ONESHELL:
 #.SHELLFLAGS = -e      # hosed on some flavours so keep it off
-SHELL = /bin/ksh
+SHELL ?= /bin/ksh
 
 build_path ?= ../../.build
-header_path := $(shell find ../../.build -name 'rmr.h' |head -1 | sed 's!/rmr/.*!!' )
+header_path := $(shell find $(build_path) -name 'rmr.h' |head -1 | sed 's!/rmr/.*!!' )
 
 C_INCLUDE_PATH := $(header_path)
 LD_LIBRARY_PATH=$(build_path):$(build_path)/lib
 LIBRARY_PATH = $(LD_LIBRARY_PATH)
 
 # These programmes are designed to test some basic application level functions
-# from the perspective of two communicating processes. 
+# from the perspective of two, or more, communicating processes. 
 
 
 .PHONY: all
-all: sender receiver sender_nano receiver_nano
+all: sender receiver sender_nano receiver_nano caller mt_receiver
 
 receiver_nano: receiver.c
        gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm
@@ -45,13 +52,23 @@ receiver_nano: receiver.c
 receiver: receiver.c
        gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
+mt_receiver: receiver.c
+       gcc -I $${C_INCLUDE_PATH:-.} -DMTC $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
 sender_nano: sender.c
        gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm
 
 sender: sender.c
        gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
+caller: caller.c
+       gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
 
-.PHONY: clean
+# clean removes intermediates; nuke removes everything that can be built
+.PHONY: clean nuke
 clean:
-       rm -f sender sender_nano receiver receiver_nano *.o
+       rm -f *.o
+
+nuke: clean
+       rm -f sender sender_nano receiver receiver_nano caller mt_receiver
diff --git a/test/app_test/caller.c b/test/app_test/caller.c
new file mode 100644 (file)
index 0000000..7b7336a
--- /dev/null
@@ -0,0 +1,326 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       caller.c
+       Abstract:       This is a simple sender which will send a series of messages using
+                               rmr_call().  N threads are started each sending the desired number
+                               of messages and expecting an 'ack' for each. Each ack is examined
+                               to verify that the thread id placed into the message matches (meaning
+                               that the ack was delivered by RMr to the correct thread's chute.
+
+                               In addition, the main thread listens for messages in order to verify
+                               that a main or receiving thread can receive messages concurrently
+                               while call acks are pending and being processed.
+
+                               Message format is:
+                                       ck1 ck2|<msg-txt> @ tid<nil>
+
+                               Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
+                               Ck2 is the simple check sum of the trace data which is a nil terminated
+                               series of bytes.
+                               tid is the thread id assigned by the main thread.
+
+                               Parms:  argv[1] == number of msgs to send (10)
+                                               argv[2] == delay                (mu-seconds, 1000000 default)
+                                               argv[3] == number of threads (3)
+                                               argv[4] == listen port
+
+                               Sender will send for at most 20 seconds, so if nmsgs and delay extend
+                               beyond that period the total number of messages sent will be less
+                               than n.
+
+       Date:           18 April 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <pthread.h>
+
+
+#include <rmr/rmr.h>
+
+#define TRACE_SIZE 40          // bytes in header to provide for trace junk
+
+/*
+       Thread data
+*/
+typedef struct tdata {
+       int     id;                                     // the id we'll pass to RMr mt-call function NOT the thread id
+       int n2send;                             // number of messages to send
+       int delay;                              // ms delay between messages
+       void* mrc;                              // RMr context
+       int     state;
+} tdata_t;
+
+
+
+// --------------------------------------------------------------------------------
+
+
+static int sum( char* str ) {
+       int sum = 0;
+       int     i = 0;
+
+       while( *str ) {
+               sum += *(str++) + i++;
+       }
+
+       return sum % 255;
+}
+
+
+
+/*
+       Executed as a thread, this puppy will generate calls to ensure that we get the
+       response back to the right thread, that we can handle threads, etc.
+*/
+static void* mk_calls( void* data ) {
+       tdata_t*        control;
+       rmr_mbuf_t*             sbuf;                                   // send buffer
+       int             count = 0;
+       int             rt_count = 0;                                   // number of messages requiring a spin retry
+       int             ok_msg = 0;                                             // received messages that were sent by us
+       int             bad_msg = 0;                                    // received messages that were sent by a different thread
+       int             drops = 0;
+       int             fail_count = 0;                                 // # of failure sends after first successful send
+       int             successful = 0;                                 // set to true after we have a successful send
+       char    wbuf[1024];
+       char    xbuf[1024];                                             // build transaction string here
+       char    trace[1024];
+       int             xaction_id = 1;
+       char*   tok;
+       int             state = 0;
+
+       if( (control  = (tdata_t *) data) == NULL ) {
+               fprintf( stderr, "thread data was nil; bailing out\n" );
+       }
+       //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
+
+       sbuf = rmr_alloc_msg( control->mrc, 512 );      // alloc first send buffer; subsequent buffers allcoated on send
+
+       memset( trace, 0, sizeof( trace ) );    
+       while( count < control->n2send ) {                                                              // we send n messages after the first message is successful
+               snprintf( trace, 100, "%lld", (long long) time( NULL ) );
+               rmr_set_trace( sbuf, trace, TRACE_SIZE );                                       // fully populate so we dont cause a buffer realloc
+
+               snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
+               snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
+               snprintf( xbuf, 200, "%31d", xaction_id );
+               rmr_bytes2xact( sbuf, xbuf, 32 );
+
+               sbuf->mtype = 5;                                                                // mtype is always 5 as the test receiver acks just mtype 5 messages
+               sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
+               sbuf->state = 0;
+               sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 );    // send it (send returns an empty payload on success, or the original payload on fail/retry)
+
+               if( sbuf && sbuf->state == RMR_ERR_RETRY ) {                                    // number of times we had to spin to send
+                       rt_count++;
+               }
+               while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) {                         // send blocked; keep trying
+                       sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 );             // call and wait up to 100ms for a response
+               }
+
+               if( sbuf != NULL ) {
+                       switch( sbuf->state ) {
+                               case RMR_OK:                                                    // we should have a buffer back from the sender here
+                                       successful = 1;
+                                       if( (tok = strchr( sbuf->payload, '@' )) != NULL ) {
+                                               if( atoi( tok+1 ) == control->id ) {
+                                                       //fprintf( stderr, "<THRD> tid=%-2d ok  ack\n", control->id );
+                                                       ok_msg++;
+                                               } else {
+                                                       bad_msg++;
+                                                       //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload );
+                                               }
+                                       }
+                                       //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload );
+                                       // future -- verify that we see our ID at the end of the message
+                                       count++;
+                                       break;
+
+                               default:
+                                       fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
+                                       sbuf = rmr_alloc_msg( control->mrc, 512 );                      // allocate a sendable buffer
+                                       if( successful ) {
+                                               fail_count++;                                                   // count failures after first successful message
+                                       } else {
+                                               // some error (not connected likely), don't count this
+                                               sleep( 1 );
+                                       }
+                                       break;
+                       }
+               } else {
+                       //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
+                       sbuf = rmr_alloc_msg( control->mrc, 512 );                              // loop expects an subf
+                       drops++;
+                       count++;
+               }
+
+               if( control->delay > 0 ) {
+                       usleep( control->delay );
+               }
+       }
+
+       state = 1;
+       if( ok_msg < (control->n2send-1) || bad_msg > 0 ) {             // allow one drop to pass
+               state = 0;
+       }
+       if( count < control->n2send ) {
+               state = 0;
+       }
+
+       control->state = -state;                                // signal inactive to main thread; -1 == pass, 0 == fail
+       fprintf( stderr, "<THRD> [%s]  tid=%-2d sent=%d  ok-acks=%d bad-acks=%d  drops=%d failures=%d retries=%d\n", 
+               state ? "PASS" : "FAIL",  control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
+
+
+       return NULL;
+}
+
+int main( int argc, char** argv ) {
+       void* mrc;                                                      // msg router context
+       rmr_mbuf_t*     rbuf = NULL;                            // received on 'normal' flow
+       struct  epoll_event events[1];                  // list of events to give to epoll
+       struct  epoll_event epe;                                // event definition for event to listen to
+       int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
+       int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
+       int             nready;                                                 // number of events ready for receive
+       char*   listen_port = "43086";
+       long    timeout = 0;
+       int             delay = 100000;                                 // usec between send attempts
+       int             nmsgs = 10;                                             // number of messages to send
+       int             nthreads = 3;
+       tdata_t*        cvs;                                            // vector of control blocks
+       int                     i;
+       pthread_t*      pt_info;                                        // thread stuff
+       int     failures = 0;
+       int             pings = 0;                                              // number of messages received on normal channel
+
+       if( argc > 1 ) {
+               nmsgs = atoi( argv[1] );
+       }
+       if( argc > 2 ) {
+               delay = atoi( argv[2] );
+       }
+       if( argc > 3 ) {
+               nthreads = atoi( argv[3] );
+       }
+       if( argc > 4 ) {
+               listen_port = argv[4];
+       }
+
+       fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+       if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) {             // initialise with multi-threaded call enabled
+               fprintf( stderr, "<CALL> unable to initialise RMr\n" );
+               exit( 1 );
+       }
+
+       rmr_init_trace( mrc, TRACE_SIZE );
+
+       if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                                                    // epoll only available from NNG -- skip receive later if not NNG
+               if( rcv_fd < 0 ) {
+                       fprintf( stderr, "<CALL> unable to set up polling fd\n" );
+                       exit( 1 );
+               }
+               if( (ep_fd = epoll_create1( 0 )) < 0 ) {
+                       fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
+                       exit( 1 );
+               }
+               epe.events = EPOLLIN;
+               epe.data.fd = rcv_fd;
+
+               if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+                       fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+                       exit( 1 );
+               }
+       } else {
+               rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
+       }
+
+
+       cvs = malloc( sizeof( tdata_t ) * nthreads );
+       pt_info = malloc( sizeof( pthread_t ) * nthreads );
+       if( cvs == NULL ) {
+               fprintf( stderr, "<CALL> unable to allocate control vector\n" );
+               exit( 1 );      
+       }
+
+
+       timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
+       while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
+               fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
+               sleep( 1 );
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "<CALL> giving up\n" );
+                       exit( 1 );
+               }
+       }
+       fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
+
+       for( i = 0; i < nthreads; i++ ) {
+               cvs[i].mrc = mrc;
+               cvs[i].id = i + 2;                              // we pass this as the call-id to rmr, so must be >1
+               cvs[i].delay = delay;
+               cvs[i].n2send = nmsgs;
+               cvs[i].state = 1;
+
+               pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] );         // kick a thread
+       }
+
+       timeout = time( NULL ) + 20;
+       i = 0;
+       while( nthreads > 0 ) {
+               if( cvs[i].state < 1 ) {                        // states 0 or below indicate done. 0 == failure, -n == success
+                       nthreads--;
+                       if( cvs[i].state == 0 ) {
+                               failures++;
+                       }
+                       i++;
+               } else {
+               //      sleep( 1 );
+                       rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
+                       if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
+                               pings++;
+                               rmr_free_msg( rbuf );
+                               rbuf = NULL;
+                       }
+               }
+               if( time( NULL ) > timeout ) {
+                       failures += nthreads;
+                       fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
+                       break;
+               }
+       }
+
+       fprintf( stderr, "<CALL> [%s] failing threads=%d  pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL",  failures, pings );
+       rmr_close( mrc );
+
+       return failures > 0;
+}
+
index ed2450b..e016f9e 100644 (file)
                                        RMR_SEED_RT -- path to the static routing table
                                        RMR_RTG_SVC -- port to listen for RTG connections
 
+                               Compile time options
+                               if -DMTC is defined on the compile command, then RMr is initialised
+                               with the multi-threaded receive thread rather than using the same
+                               process receive function. All other functions in the receiver are
+                               the same.
+
        Date:           18 April 2019
        Author:         E. Scott Daniels
 */
@@ -125,7 +131,12 @@ int main( int argc, char** argv ) {
 
        fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
 
+#ifdef MTC
+       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
+
+#else
        mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
+#endif
        if( mrc == NULL ) {
                fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
                exit( 1 );
diff --git a/test/app_test/run_call_test.ksh b/test/app_test/run_call_test.ksh
new file mode 100644 (file)
index 0000000..08562ed
--- /dev/null
@@ -0,0 +1,230 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+#      Mnemonic:       run_call_test.ksh
+#      Abstract:       This is a simple script to set up and run the basic send/receive
+#                              processes for some library validation on top of nano/nng.
+#                              It should be possible to clone the repo, switch to this directory
+#                              and execute  'ksh run -B'  which will build RMr, make the sender and
+#                              recevier then  run the basic test.
+#
+#                              The call test drives three RMr based processes:
+#                                      caller which invokes rmr_mt_call() to send n messages
+#
+#                                      receiver which executes rmr_rts_msg() on each received
+#                                              message with type == 5.
+#      
+#                                      sender which sends n "pings" to the caller process
+#
+#                              The script assumes that all three proessess are running on the same
+#                              host/container such that setting up the route table with localhost:port
+#                              will work.
+#
+#                              The number of message, and the delay between each message may be given
+#                              on the command line. The number of threads may also be given. The
+#                              number of messages defines the number _each_ caller will sent (5000 
+#                              messages and three threads == 15,000 total messages.
+#
+#                              Example command line:
+#                                      ksh ./run_call_test.ksh         # default 10 messages at 1 msg/sec 3 threads
+#                                      ksh ./run_call_test.ksh -n 5000  -t 6 -d 400    # 5k messages, 6 threads, delay 400 mus
+#
+#      Date:           20 May 2019
+#      Author:         E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receiver are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+       if (( nano_sender ))
+       then
+               #       ./sender_nano $nmsg $delay
+               echo "nano is not supported"
+               exit 1
+       else
+               ./caller $nmsg $delay $nthreads
+       fi
+       echo $? >/tmp/PID$$.src         # must communicate state back via file b/c asynch
+}
+
+# start receiver listening for nmsgs from each thread
+function run_rcvr {
+       if (( nano_receiver ))
+       then
+               #./receiver_nano $(( nmsg * nthreads ))
+               echo "nano is not supported"
+               exit 1
+       else
+               ./mt_receiver $(( nmsg * nthreads ))            # we'll test with the RMr multi threaded receive function
+       fi
+       echo $? >/tmp/PID$$.rrc
+}
+
+# This will send 100 messages to the caller process. This is a test to verify that
+# threaded calling is not affected by normal messages and that normal messages can
+# be received concurrently.
+#
+function run_pinger {
+    RMR_RTG_SVC=9999 ./sender 100 100 4 3333 >/dev/NULL 2>&1  # send pings
+}
+
+
+# Generate a route table that is tailored to our needs of sender sending messages to
+# the caller, and caller sending mtype == 5 to the receiver.
+#
+function mk_rt {
+
+cat <<endKat >caller.rt
+# this is a specialised rt for caller testing. mtype 5 go to the
+# receiver, and all others go to the caller.
+
+newrt | start
+mse | 0 |  0 | localhost:43086
+mse | 1 | 10 | localhost:43086
+mse | 2 | 20 | localhost:43086
+rte | 3 | localhost:43086
+mse | 3 | 100 | localhost:43086        # special test to ensure that this does not affect previous entry
+rte | 4 | localhost:43086
+rte | 5 | localhost:4560
+rte | 6 | localhost:43086
+rte | 7 | localhost:43086
+rte | 8 | localhost:43086
+rte | 9 | localhost:43086
+rte | 10 | localhost:43086
+rte | 11 | localhost:43086
+rte | 12 | localhost:43086
+rte | 13 | localhost:43086
+
+newrt | end | 16
+endKat
+}
+
+# ---------------------------------------------------------
+
+nmsg=10                                                # total number of messages to be exchanged (-n value changes)
+delay=1000000                          # microsec sleep between msg 1,000,000 == 1s
+nano_sender=0                          # start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+verbose=0
+nthreads=3
+dev_base=1                                     # -D turns off to allow this to run on installed libs
+
+nano_sender=0                          # mt-call is not supported in nano
+nano_receiver=0
+
+
+while [[ $1 == -* ]]
+do
+       case $1 in
+               -B)     rebuild=1;;
+               -d)     delay=$2; shift;;
+               -D)     dev_base=0;;
+               -n)     nmsg=$2; shift;;
+               -t)     nthreads=$2; shift;;
+               -v)     verbose=1;;
+
+               *)      echo "unrecognised option: $1"
+                       echo "usage: $0 [-B] [-d micro-sec-delay] [-n num-msgs] [-t num-threads]"
+                       echo "  -B forces a rebuild which will use .build"
+                       exit 1
+                       ;;
+       esac
+
+       shift
+done
+
+if (( verbose ))
+then
+       echo "2" >.verbose
+       export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( rebuild ))
+then
+       build_path=../../.build
+       set -e
+       ksh ./rebuild.ksh
+       set +e
+else
+       build_path=${BUILD_PATH:-"../../.build"}        # we prefer .build at the root level, but allow user option
+
+       if [[ ! -d $build_path ]]
+       then
+               echo "cannot find build in: $build_path"
+               echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+               exit 1
+       fi
+fi
+
+if (( dev_base ))                                                      # assume we are testing against what we've built, not what is installed
+then
+       if [[ -d $build_path/lib64 ]]
+       then
+               export LD_LIBRARY_PATH=$build_path:$build_path/lib64
+       else
+               export LD_LIBRARY_PATH=$build_path:$build_path/lib
+       fi
+else                                                                           # -D option gets us here to test an installed library
+       export LD_LIBRARY_PATH=/usr/local/lib
+       export LIBRARY_PATH=$LD_LIBRARY_PATH
+fi
+
+export RMR_SEED_RT=${RMR_SEED_RT:-./caller.rt}         # allow easy testing with different rt
+
+if [[ ! -f $RMR_SEED_RT ]]                                                     # special caller rt for three process setup
+then
+       mk_rt
+fi
+
+if [[ ! -f ./sender ]]
+then
+       if ! make >/dev/null 2>&1
+       then
+               echo "[FAIL] cannot find sender binary, and cannot make it.... humm?"
+               exit 1
+       fi
+fi
+
+run_rcvr &
+sleep 2                                # if caller starts faster than rcvr we can drop, so pause a bit
+run_sender &
+run_pinger &
+
+wait
+head -1 /tmp/PID$$.rrc | read rrc              # get pass/fail state from each
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+       echo "[FAIL] sender rc=$src  receiver rc=$rrc"
+else
+       echo "[PASS] sender rc=$src  receiver rc=$rrc"
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+
+exit $(( !! (src + rrc) ))
+
index 82a01a8..f716378 100644 (file)
@@ -34,6 +34,8 @@
 #include <string.h>
 #include <stdint.h>
 #include <netdb.h>
+#include <pthread.h>
+#include <semaphore.h>
 
 #include <nng/nng.h>
 #include <nng/protocol/pubsub0/pub.h>
 #include <nng/protocol/pipeline0/push.h>
 #include <nng/protocol/pipeline0/pull.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
-#include "../src/nng/include/rmr_nng_private.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
+#include "rmr_nng_private.h"
 
 #define EMULATE_NNG
 #include "test_nng_em.c"
-#include "../src/nng/src/sr_nng_static.c"
+#include "sr_nng_static.c"
 
 #include "test_support.c"
 
index b2f1f7d..99452dc 100644 (file)
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
 
 
 int mbuf_api_test( ) {
index 573415e..fdc221a 100644 (file)
 #include <errno.h>
 #include <pthread.h>
 #include <ctype.h>
+#include <pthread.h>
+#include <semaphore.h>
 
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
 
-#include "../src/common/src/mbuf_api.c"                        // module under test
+#include "mbuf_api.c"
 
 #include "test_support.c"                                              // our private library of test tools
 #include "mbuf_api_static_test.c"                              // test functions
index e0c9b6e..bd2b428 100644 (file)
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
-//#include "../src/common/src/ring_static.c"
+#include "rmr.h"
+#include "rmr_agnostic.h"
 
 
 /*
index 5e1a81c..51b8260 100644 (file)
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
-#include "../src/common/src/ring_static.c"
+#include "rmr.h"
+#include "rmr_agnostic.h"
+#include "ring_static.c"
 
 #include "test_support.c"                                      // things like fail_if()
 #include "ring_static_test.c"                          // the actual tests
index 50c4266..c161f91 100644 (file)
 #include <stdint.h>
 #include <ctype.h>
 #include <sys/epoll.h>
+#include <pthread.h>
+#include <semaphore.h>
 
 #define DEBUG 1
 
 #include <nanomsg/nn.h>
-//#include <nng/protocol/pubsub0/pub.h>
-//#include <nng/protocol/pubsub0/sub.h>
-//#include <nng/protocol/pipeline0/push.h>
-//#include <nng/protocol/pipeline0/pull.h>
 
 #undef EMULATE_NNG
 #include "test_nng_em.c"                                                       // nng/nn emulation (before including things under test)
 
 
-#include "../src/common/include/rmr.h"                                 // things the users see
-#include "../src/common/include/rmr_symtab.h"
-#include "../src/common/include/rmr_agnostic.h"                        // transport agnostic header
-#include "../src/nanomsg/include/rmr_private.h"                        // transport specific
+#include "rmr.h"                                       // things the users see
+#include "rmr_symtab.h"
+#include "rmr_agnostic.h"                      // transport agnostic header
+#include "rmr_private.h"                       // transport specific
 
-#include "../src/common/src/symtab.c"
-#include "../src/nanomsg/src/rmr.c"
-#include "../src/common/src/mbuf_api.c"
-#include "../src/nanomsg/src/rtable_static.c"
+#include "symtab.c"
+#include "rmr.c"
+#include "mbuf_api.c"
+#include "rtable_static.c"
 
 static void gen_rt( uta_ctx_t* ctx );          // defined in sr_static_test, but used by a few others (eliminate order requirement below)
 
                                                                                        // specific test tools in this directory
 #include "test_support.c"                                      // things like fail_if()
                                                                                        // and finally....
-//#include "tools_static_test.c"                               // local test functions pulled directly because of static nature of things
-//#include "symtab_static_test.c"
-//#include "ring_static_test.c"
-//#include "rt_static_test.c"
-//#include "wormhole_static_test.c"
 #include "mbuf_api_static_test.c"
 
 #include "sr_nano_static_test.c"
index 2db1a20..0cb49a5 100644 (file)
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
 
 /*
        Send a 'burst' of messages to drive some send retry failures to increase RMr coverage
@@ -84,6 +86,22 @@ static void send_n_msgs( void* ctx, int n ) {
        }
 }
 
+/*
+       Refresh or allocate a message with some default values
+*/
+static rmr_mbuf_t* fresh_msg( void* ctx, rmr_mbuf_t* msg ) {
+       if( ! msg )  {
+               msg = rmr_alloc_msg( ctx, 2048 );
+       }
+
+       msg->mtype = 0;
+       msg->sub_id = -1;
+       msg->state = 0;
+       msg->len = 100;
+
+       return msg;
+}
+
 static int rmr_api_test( ) {
        int             errors = 0;
        void*   rmc;                            // route manager context
@@ -113,6 +131,7 @@ static int rmr_api_test( ) {
                errors += fail_if_nil( rmc, "rmr_init returned a nil pointer when driving for default port "  );
        }
 
+
        v = rmr_ready( rmc );           // unknown return; not checking at the moment
 
        msg = rmr_alloc_msg( NULL,  1024 );                                                                     // should return nil pointer
@@ -285,7 +304,7 @@ static int rmr_api_test( ) {
        errors += fail_if( i >= 40, "torcv_msg never returned a timeout "  );
 
 
-       // ---- trace things that are not a part of the mbuf_api functions and thus must be tested here
+       // ---- trace things that are not a part of the mbuf_api functions and thus must be tested here -------
        state = rmr_init_trace( NULL, 37 );                                             // coverage test nil context
        errors += fail_not_equal( state, 0, "attempt to initialise trace with nil context returned non-zero state (a) "  );
        errors += fail_if_equal( errno, 0, "attempt to initialise trace with nil context did not set errno as expected "  );
@@ -306,10 +325,110 @@ static int rmr_api_test( ) {
        em_send_failures = 0;
 
 
+       ((uta_ctx_t *)rmc)->shutdown = 1;
        rmr_close( NULL );                      // drive for coverage
        rmr_close( rmc );                       // no return to check; drive for coverage
 
 
+       // -- allocate a new context for mt-call and drive that stuff -----------------------------------------
+#ifdef EMULATE_NNG
+       msg = fresh_msg( rmc, msg );                                                    // ensure we have one with known contents
+
+       msg->state = 0;
+       msg = rmr_mt_call( rmc, msg, 3, 10 );                                                   // drive when not in mt setup
+       if( msg ) {
+               errors += fail_not_equal( msg->state, RMR_ERR_NOTSUPP, "rmr_mt_call did not set not supported error when not mt initialised" );
+       } else {
+               errors += fail_if_nil( msg, "rmr_mt_call returned nil pointer when not mt initialised" );
+       }
+       msg = fresh_msg( rmc, msg );
+
+       msg = rmr_mt_rcv( rmc, msg, 10 );                                                               // gen not supported error if ctx not set for mt
+       if( msg ) {
+               errors += fail_not_equal( msg->state, RMR_ERR_NOTSUPP, "rmr_mt_rcv did not return not supported state when mt not initialised" );
+       } else {
+               errors += fail_if_nil( msg, "nil pointer from rmr_mt_rcv when mt not initialised\n" );
+       }
+       msg = fresh_msg( rmc, msg );
+
+       msg->state = 0;
+       if( msg ) {
+               msg = rmr_mt_call( rmc, msg, 1000, 10 );                                                        // thread id out of range
+               errors += fail_if_equal( msg->state, 0, "rmr_mt_call did not set an error when given an invalid call-id" );
+       } else {
+               errors += fail_if_nil( msg, "rmr_mt_call returned a nil pointer when given an invalid call-id" );
+       }
+       msg = fresh_msg( rmc, msg );
+
+       state = init_mtcall( NULL );                                    // drive for coverage
+       errors += fail_not_equal( state, 0, "init_mtcall did not return false (a) when given a nil context pointer" );
+
+
+       if( (rmc = rmr_init( NULL, 1024, FL_NOTHREAD | RMRFL_MTCALL )) == NULL ) {                      // drive multi-call setup code without rtc thread
+               errors += fail_if_nil( rmc, "rmr_init returned a nil pointer when driving for mt-call setup "  );
+       }
+
+       gen_rt( rmc );                                                                  // must attach a route table so sends succeed
+
+       fprintf( stderr, "<INFO> enabling mt messages\n" );
+       em_set_rcvdelay( 1 );                                                   // force slow msg rate during mt testing
+       em_set_mtc_msgs( 1 );                                                   // emulated nngrcv will now generate messages with call-id and call flag
+
+       msg->state = 0;
+       msg = rmr_mt_call( NULL, msg, 3, 10 );                  // should timeout
+       if( msg ) {
+               errors += fail_if( msg->state == 0, "rmr_mt_call did not set message state when given message with nil context "  );
+       }
+       msg = fresh_msg( rmc, msg );
+
+       fprintf( stderr, "<INFO> invoking mt_call with timout == 2999\n" );
+       msg = rmr_mt_call( rmc, msg, 2, 2999 );                 // long timeout to drive time building code, should receive
+       if( msg ) {
+               if( msg->state != RMR_OK ) {
+                       fprintf( stderr, "<INFO> rmr_mt_call returned error in mbuf: %d\n", msg->state );
+               } else {
+                       errors += fail_not_nil( msg, "rmr_mt_call did not return a nil pointer on read timeout" );
+               }
+       }
+       msg = fresh_msg( rmc, msg );
+
+       msg = rmr_mt_rcv( NULL, NULL, 10 );
+       errors += fail_not_nil( msg, "rmr_mt_rcv returned a non-nil message when given nil message and nil context" );
+
+       fprintf( stderr, "<INFO> invoking mt_rcv with timout == 2999\n" );
+       msg = fresh_msg( rmc, msg );
+       msg = rmr_mt_rcv( rmc, msg, 2999 );
+       if( !msg ) {
+               errors += fail_if_nil( msg, "rmr_mt_rcv returned a nil message when given valid message and timeout of 29999" );
+       }
+       msg = fresh_msg( rmc, msg );
+
+       msg = rmr_mt_rcv( rmc, msg, -1 );
+       if( !msg ) {
+               errors += fail_if_nil( msg, "rmr_mt_rcv returned a nil message when given valid message unlimited timeout" );
+       }
+       msg = fresh_msg( rmc, msg );
+
+       fprintf( stderr, "<INFO> waiting 20.0 seconds for a known call xaction to arrive (%ld)\n", time( NULL ) );
+       snprintf( msg->xaction, 17, "%015d", 5 );               // we'll reset receive counter before calling mt_call so this will arrive again
+       em_set_rcvcount( 0 );
+       msg = rmr_mt_call( rmc, msg, 2, 15000 );                // we need about 10s to get the message with the 'slow rate'
+       if( msg ) {
+               errors += fail_not_equal( msg->state, RMR_OK, "mt_call with known xaction id bad state (a)" );
+       } else {
+               errors += fail_if_nil( msg, "mt_call with known xaction id returned nil message" );
+       }
+       fprintf( stderr, "<INFO> time check: %ld\n", time( NULL ) );
+               
+
+       em_set_mtc_msgs( 0 );                                                   // turn off 
+       em_set_rcvdelay( 0 );                                                   // full speed receive rate
+       ((uta_ctx_t *)rmc)->shutdown = 1;                               // force the mt-reciver attached to the context to stop
+#endif
+
+
+       // --------------- phew, done ------------------------------------------------------------------------------
+
        if( ! errors ) {
                fprintf( stderr, "<INFO> all RMr API tests pass\n" );
        }
index 5dbeb21..1a8df89 100644 (file)
@@ -51,6 +51,8 @@
 #include <stdint.h>
 #include <ctype.h>
 #include <sys/epoll.h>
+#include <pthread.h>
+#include <semaphore.h>
 
 #define DEBUG 1
 
 #include "test_nng_em.c"                                                       // nng/nn emulation (before including things under test)
 
 
-#include "../src/common/include/rmr.h"                                 // things the users see
-#include "../src/common/include/rmr_symtab.h"
-#include "../src/common/include/rmr_agnostic.h"                        // transport agnostic header
-#include "../src/nng/include/rmr_nng_private.h"                        // transport specific
+#include "rmr.h"                                       // things the users see
+#include "rmr_symtab.h"
+#include "rmr_agnostic.h"                      // transport agnostic header
+#include "rmr_nng_private.h"                   // transport specific
 
-#include "../src/common/src/symtab.c"
-#include "../src/nng/src/rmr_nng.c"
-#include "../src/common/src/mbuf_api.c"
+#include "symtab.c"
+#include "rmr_nng.c"
+#include "mbuf_api.c"
 
 static void gen_rt( uta_ctx_t* ctx );          // defined in sr_nng_static_test, but used by a few others (eliminate order requirement below)
 
index 90cad7e..49cff8a 100644 (file)
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
 
 typedef struct entry_info {
        int group;
index 972b767..13a216b 100644 (file)
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
 
 typedef struct entry_info {
        int group;
@@ -276,16 +278,12 @@ static int rt_test( ) {
 
        uta_fib( "no-suhch-file" );                     // drive some error checking for coverage
 
-/*
-       if( ctx ) {
-               if( ctx->rtg_addr ) {
-                       free( ctx->rtg_addr );
-               }
-               free( ctx );
-       }
-*/
 
-       state = uta_link2( "worm", NULL, NULL );
+       ep = (endpoint_t *) malloc( sizeof( *ep ) );
+       pthread_mutex_init( &ep->gate, NULL );
+       ep->name = strdup( "worm" );
+       ep->addr = NULL;
+       state = uta_link2( ep );
        errors += fail_if_true( state, "link2 did not return false when given nil pointers" );
 
        state = uta_epsock_rr( rt, 122, 0, NULL, NULL );
index 8e939da..14c678f 100644 (file)
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
 
 /*
        Generate a simple route table (for all but direct route table testing).
+       This table contains multiple tables inasmuch as a second update set of
+       records follows the initial set. 
 */
 static void gen_rt( uta_ctx_t* ctx ) {
        int             fd;
index c58b281..809deba 100644 (file)
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
 
 /*
        Generate a simple route table (for all but direct route table testing).
@@ -162,7 +164,7 @@ static int sr_nng_test() {
                mbuf = rmr_rcv_msg( ctx, NULL );
        }
 
-       size = 2048 - sizeof( uta_mhdr_t );             // emulated nng receive allocates 2K payloads
+       size = 2048 - em_hdr_size();            // emulated nng receive allocates 2K buffers -- subtract off header size
        state = rmr_payload_size( mbuf );
        errors += fail_not_equal( state, size, "payload size didn't return expected value" );   // receive should always give 4k buffer
 
index f0649c1..7d5a39c 100644 (file)
@@ -28,7 +28,7 @@
        Author:         E. Scott Daniels
 */
 
-#include "../src/common/include/rmr_symtab.h"
+#include "rmr_symtab.h"
 //  -- parent must include if needed #include "../src/common/src/symtab.c"
 
 #include "test_support.c"
index f39085c..b755e9c 100644 (file)
@@ -29,8 +29,8 @@
 
 #define NO_DUMMY_RMR 1                 // no dummy rmr functions; we don't pull in rmr.h or agnostic.h
 
-#include "../src/common/include/rmr_symtab.h"
-#include "../src/common/src/symtab.c"
+#include "rmr_symtab.h"
+#include "symtab.c"
 
 #include "test_support.c"
 
index f1976b9..c2101f5 100644 (file)
        Author:         E. Scott Daniels
 */
 
+
+#include "rmr.h"                               // we use some of rmr defs in building dummy messages, so we need these
+#include "rmr_agnostic.h"
+
 // ---------------------- emulated nng functions ---------------------------
 
 
 #ifndef _em_nn
 #define _em_nn
 
+#include <pthread.h>
+
 static int em_send_failures = 0;       // test programme can set this to emulate eagain send failures
 static int em_timeout = -1;                    // set by set socket option
+static int em_mtc_msgs = 0;                    // set to generate 'received' messages with mt-call header data
+static int return_value = 0;           // functions should return this value
+static int rcv_count = 0;                      // receive counter for transaction id to allow test to rest
+static int rcv_delay = 0;                      // forced delay before call to rcvmsg starts to work
+
+static int gates_ok = 0;
+static pthread_mutex_t rcv_gate;
+
 
 // ----------- epoll emulation ---------------------------------------------
 
@@ -95,17 +109,12 @@ struct em_msg {
        int32_t len1;                       // length of the tracing data
        int32_t len2;                       // length of data 1 (d1)
        int32_t len3;                       // length of data 2 (d2)
-
+       int32_t sub_id;                                         // subscription id (-1 invalid)
 };
 
-static int return_value = 0;
 
-//--------------------------------------------------------------------------
-#ifdef EMULATE_NNG
-struct nn_msghdr {
-       int boo;
-};
 
+// --  emulation control functions ------------------------------------------------------
 
 /*
        Test app can call this to have all emulated functions return failure instead
@@ -122,34 +131,106 @@ static int em_nng_foo() {
 }
 
 
+/*
+       Turns on/off the generation of multi-threaded call messages
+*/
+static int em_set_mtc_msgs( int state ) {
+       em_mtc_msgs = state;
+}
+
+/*
+       Returns the size of the header we inserted
+*/
+static int em_hdr_size() {
+       if( em_mtc_msgs ) {
+               return (int) sizeof( struct em_msg ) + 4;
+       }
+
+       return (int) sizeof( struct em_msg );
+}
+
+static void em_set_rcvcount( int v ) {
+       rcv_count = v;
+}
+
+static void em_set_rcvdelay( int v ) {
+       rcv_delay = v;
+}
+
+static void em_start() {
+       if( ! gates_ok ) {
+               pthread_mutex_init( &rcv_gate, NULL );
+               gates_ok = 1;
+       }
+}
+
+//--------------------------------------------------------------------------
+#ifdef EMULATE_NNG
+struct nn_msghdr {
+       int boo;
+};
+
+
 /*
        Receive message must allocate a new buffer and return the pointer into *m.
        Every 9 messages or so we'll simulate an old version message
+
+       If em_mtc_msgs is set, then we add a non-zero d1 field with
+       the call-id set to 2, and alternate the call flag
 */
 static int em_nng_recvmsg( nng_socket s, nng_msg ** m, int i ) {
+       static int call_flag = 0;
+
        void* b;
        struct em_msg* msg;
-       static int count = 0;                   // we'll simulate a message going in by dropping an rmr-ish msg with transaction id only
        int trace_size = 0;
+       int d1_size = 0;
+       unsigned char* d1;
+
+       if( rcv_delay > 0 ) {
+               sleep( rcv_delay );
+       }
 
-       //sleep( 1 );
+       if( em_mtc_msgs ) {
+               d1_size = 4;
+       }
 
        b = (void *) malloc( 2048 );
        if( m != NULL ) {
                memset( b, 0, 2048 );
+
                *m = (nng_msg *) b;
                msg = (struct em_msg *) b;
-               if( count % 10  == 9 ) {
-                       //msg->rmr_ver = htonl( MSG_VER );
-                       msg->rmr_ver = ALT_MSG_VER;             // emulate the bug in RMr v1
+               if( ! em_mtc_msgs  &&  (rcv_count % 10) == 9 ) {
+                       msg->rmr_ver = ALT_MSG_VER;                                                     // allow emulation the bug in RMr v1
                } else {
                        msg->rmr_ver = htonl( MSG_VER );
                }
+
                msg->mtype = htonl( 1 );
                msg->plen = htonl( 220 );
                msg->len0 = htonl( sizeof( struct em_msg ) );
                msg->len1 = htonl( trace_size );
-               snprintf( msg->xid, 32, "%015d", count++ );             // simple transaction id so we can test receive specific and ring stuff
+               msg->len2 = htonl( d1_size );
+               msg->len3 = htonl( 0 );
+
+               pthread_mutex_lock( &rcv_gate );        // hold lock to update counter/flag
+               if( em_mtc_msgs ) {
+                       d1 = DATA1_ADDR( msg );
+                       d1[0] = 2;                                                                      // simulated msgs always on chute 2
+                       if( call_flag ) {
+                               rcv_count++;
+                               msg->flags |= HFL_CALL_MSG;
+                       }
+                       if( rcv_delay > 0 ) {
+                               fprintf( stderr, "<EM>    count=%d flag=%d %02x \n", rcv_count, call_flag, msg->flags );
+                       }
+                       call_flag = !call_flag;
+               } else {
+                       rcv_count++;
+               }
+               pthread_mutex_unlock( &rcv_gate );
+               snprintf( msg->xid, 32, "%015d", rcv_count );           // simple transaction id so we can test receive specific and ring stuff
                snprintf( msg->src, 16, "localhost:4562" );             // set src id (unrealistic) so that rts() can be tested
        }
 
index eac19a6..6725e9b 100644 (file)
 #include <errno.h>
 #include <pthread.h>
 #include <ctype.h>
+#include <pthread.h>
+#include <semaphore.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
 #include "test_support.c"              // our private library of test tools
 
 // ===== dummy context for tools testing so we don't have to pull in all of the nano/nng specific stuff =====
@@ -66,7 +68,7 @@ struct uta_ctx {
 };
 
 
-#include "../src/common/src/tools_static.c"
+#include "tools_static.c"
 
 
 int main( ) {
index 9953733..559f7d3 100755 (executable)
@@ -320,7 +320,7 @@ else
        fi
 fi
 
-export C_INCLUDE_PATH="../src/common/include:$C_INCLUDE_PATH"
+export C_INCLUDE_PATH="../src/rmr/common/include:$C_INCLUDE_PATH"
 
 module_cov_target=80
 builder="make -B %s"           # default to plain ole make
index f2d4bf6..06cb36c 100644 (file)
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
 
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
 
 
 /*