From 412d53dfa2f9b5b56a448797d0dfec3b0f11f666 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Mon, 20 May 2019 20:00:52 +0000 Subject: [PATCH] enhance(API): Add multi-threaded call Change-Id: I2d7c9abd2aabe4c2f05ba0935acaeb4f3fd8bb94 Signed-off-by: E. Scott Daniels Tweaks to call based on testing Change-Id: I2cda8652ba045bf411bda77e64d2a92de8d2c0f2 Signed-off-by: E. Scott Daniels Add nng call static module Change-Id: I5e964078ae346b25cc283ea32239271ea69fa55e Signed-off-by: E. Scott Daniels Add locking round connect Change-Id: Icf7a9c691385f199107b746c34c32f8f83b1ef04 Signed-off-by: E. Scott Daniels Tweaks based on testing; move mtrcv to static module Change-Id: I75c38a9eeb34991da92fdb6f655b11d7f600d8a8 Signed-off-by: E. Scott Daniels Unit test changes Change-Id: I7c1d3dcbe8802ce459a63b762c3ad3b8abeb7a61 Signed-off-by: E. Scott Daniels Beef up unit tests to passing after discount, all >70% before discount Change-Id: I0e34052c142cfea77053512aac68008c3af49694 Signed-off-by: E. Scott Daniels Update application oriented tests to include mt-call Change-Id: I0939abf96008ed7fde9640a070d09e683c0d5dea Signed-off-by: E. Scott Daniels Fix possible nil pointer problem Change-Id: I55e911761d54b9fc7500c121de018b485913763e Signed-off-by: E. Scott Daniels Tweaks resulting from testing Change-Id: Iaa6fb4d2719a39dbe209e17cc793c2341a477043 Signed-off-by: E. Scott Daniels Add info message to make it obvious when mt-receive is enabled Change-Id: I6dd3cd5ad01d5cf2a09dda87ce6fdd0bc5a4670c Signed-off-by: E. Scott Daniels Add man pages for new mt functions Change-Id: Ia52e5d71502bcebe6af65024602dc32ca8877b52 Signed-off-by: E. Scott Daniels Add man pages to CMake Change-Id: I60ff3a753d9249c8797d9141dd3ce2b5b55752b8 Signed-off-by: E. Scott Daniels Update CM version Change-Id: I9e0f031f4e8a0ccba8ae7c788a6d9bc6dcb0f15b Signed-off-by: E. Scott Daniels --- CMakeLists.txt | 2 +- doc/CMakeLists.txt | 2 + doc/src/man/rmr_init.3.xfm | 42 ++++ doc/src/man/rmr_mt_call.3.xfm | 221 +++++++++++++++++ doc/src/man/rmr_mt_rcv.3.xfm | 210 ++++++++++++++++ doc/src/man/rmr_rcv_msg.3.xfm | 3 +- src/rmr/common/include/RIC_message_types.h | 3 + src/rmr/common/include/rmr.h | 8 +- src/rmr/common/include/rmr_agnostic.h | 21 ++ src/rmr/common/src/mbuf_api.c | 6 + src/rmr/common/src/mt_call_static.c | 80 ++++++ src/rmr/common/src/rt_generic_static.c | 7 +- src/rmr/nanomsg/include/rmr_private.h | 1 + src/rmr/nanomsg/src/rmr.c | 1 + src/rmr/nng/include/rmr_nng_private.h | 7 +- src/rmr/nng/src/mt_call_nng_static.c | 114 +++++++++ src/rmr/nng/src/rmr_nng.c | 383 +++++++++++++++++++++-------- src/rmr/nng/src/rtable_nng_static.c | 37 ++- src/rmr/nng/src/sr_nng_static.c | 123 ++++++++- test/Makefile | 3 +- test/app_test/Makefile | 29 ++- test/app_test/caller.c | 326 ++++++++++++++++++++++++ test/app_test/receiver.c | 11 + test/app_test/run_call_test.ksh | 230 +++++++++++++++++ test/hdr_static_test.c | 10 +- test/mbuf_api_static_test.c | 6 +- test/mbuf_api_test.c | 8 +- test/ring_static_test.c | 7 +- test/ring_test.c | 8 +- test/rmr_nano_test.c | 27 +- test/rmr_nng_api_static_test.c | 125 +++++++++- test/rmr_nng_test.c | 16 +- test/rt_nano_static_test.c | 6 +- test/rt_static_test.c | 20 +- test/sr_nano_static_test.c | 8 +- test/sr_nng_static_test.c | 8 +- test/symtab_static_test.c | 2 +- test/symtab_test.c | 4 +- test/test_nng_em.c | 107 +++++++- test/tools_test.c | 8 +- test/unit_test.ksh | 2 +- test/wormhole_static_test.c | 6 +- 42 files changed, 2031 insertions(+), 217 deletions(-) create mode 100644 doc/src/man/rmr_mt_call.3.xfm create mode 100644 doc/src/man/rmr_mt_rcv.3.xfm create mode 100644 src/rmr/common/src/mt_call_static.c create mode 100644 src/rmr/nng/src/mt_call_nng_static.c create mode 100644 test/app_test/caller.c create mode 100644 test/app_test/run_call_test.ksh diff --git a/CMakeLists.txt b/CMakeLists.txt index cf887cf..4199147 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,7 +23,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "0" ) -set( patch_level "25" ) +set( patch_level "26" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_lib "lib" ) diff --git a/doc/CMakeLists.txt b/doc/CMakeLists.txt index b4f4d8a..b092fdc 100644 --- a/doc/CMakeLists.txt +++ b/doc/CMakeLists.txt @@ -80,6 +80,8 @@ if( BUILD_DOC ) rmr_tralloc_msg.3 rmr_get_trlen.3 rmr_get_src.3 + rmr_mt_call.3 + rmr_mt_rcv.3 ) # empty list of roff/troff input files we generated diff --git a/doc/src/man/rmr_init.3.xfm b/doc/src/man/rmr_init.3.xfm index 408c954..83a0f18 100644 --- a/doc/src/man/rmr_init.3.xfm +++ b/doc/src/man/rmr_init.3.xfm @@ -66,6 +66,46 @@ The value of &ital(max_msg_size) will be used when allocating zero copy send buf which must be allocated, possibly, prior to the application knowing the actual size of a specific message. +&space +&ital(Flags) allows for selection of some RMr options at the time of initialisation. +These are set by ORing &cw(RMRFL_) constants from the RMr header file. Currently the +following flags are supported: + +&half_space +&beg_dlist(1i : &bold_font ) +&ditem(RMRFL_NONE) + No flags are set. + +&half_space +&ditem(RMRFL_NOTHREAD) + The route table collector thread is not to be started. This should only be used + by the route table generator application if it is based on RMr. + +&half_space +&ditem(RMRFL_MTCALL) + Enable multi-threaded call support. +&end_dlist + +&h3(Multi-threaded Calling) +The support for an application to issue a &ital(blocking call) by the &cw(rmr_call()) function +was limited such that only user applications which were operating in a single thread +could safely use the function. +Further, timeouts were message count based and not time unit based. +Multi-threaded call support adds the ability for a user application with multiple threads +to invoke a blocking call function with the guarentee that the correct response message +is delivered to the thread. +The additional support is implemented with the &ital( rmr_mt_call() ) and &ital( rmr_mt_rcv() ) +function calls. +&space + +Multi-threaded call support requires the user application to specifically enable it +when RMr is initialised. +This is necessary because a second, dedicated, receiver thread must be started, and +requires all messages to be examined and queued by this thread. +The additional overhead is minimal, queuing information is all in the RMr message +header, but as an additional process is necessary the user application must "opt in" +to this approach. + &space &h2(ENVIRONMENT) As a part of the initialisation process &cw(rmr_init) will look into the available @@ -117,6 +157,8 @@ rmr_alloc_msg(3), rmr_call(3), rmr_free_msg(3), rmr_get_rcvfd(3), +rmr_mt_call(3), +rmr_mt_rcv(3), rmr_payload_size(3), rmr_send_msg(3), rmr_rcv_msg(3), diff --git a/doc/src/man/rmr_mt_call.3.xfm b/doc/src/man/rmr_mt_call.3.xfm new file mode 100644 index 0000000..6cbfc24 --- /dev/null +++ b/doc/src/man/rmr_mt_call.3.xfm @@ -0,0 +1,221 @@ +.if false +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +.fi + + +.if false + Mnemonic rmr_mt_call_man.xfm + Abstract The manual page for the rmr multi-threaded call function. + Author E. Scott Daniels + Date 24 May 2019 +.fi + +.** if formatting with tfm, the roff.im will cause roff output to be generated +.** if formatting with pfm, then pretty postscript will be generated +.gv e LIB lib +.if pfm + .im &{lib}/generic_ps.im +.ei + .gv e OUTPUT_RST use_rst + .if .ev &use_rst 1 = + .im &{lib}/rst.im + .ei + .im &{lib}/roff.im + .fi +.fi + +&line_len(6i) + +&h1(RMR Library Functions) +&h2(NAME) + rmr_mt_call + +&h2(SYNOPSIS ) +&indent +&ex_start +#include + +extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* msg, int id, int timeout ); +&ex_end +&uindent + +&h2(DESCRIPTION) +The &cw(rmr_mt_call) function sends the user application message to a remote +endpoint, and waits for a corresponding response message before returning +control to the user application. +The user application supplies a completed message buffer, as it would for +a &cw(rmr_send_msg) call, but unlike with a send, the buffer returned will have +the response from the application that received the message. +The thread invoking the &ital( rmr_mt_call()) will block until a message arrives +or until &ital(timeout) milliseconds has passed; which ever comes first. +Using a timeout value of zero (0) will cause the thread to block without a timeout. + +&space +The &ital( id ) supplied as the third parameter is an integer in the range of 2 through +255 inclusive. +This is a caller defined "thread number" and is used to match the response message +with the correct user application thread. +If the ID value is not in the proper range, the attempt to make the call will fail. + +&space +Messages which are received while waiting for the response are queued on a &ital(normal) +receive queue and will be delivered to the user application with the next invocation +of &itaL( rmr_mt_rcv() ) or &ital( rmr_rvv_msg().) +by RMR, and are returned to the user application when &cw(rmr_rcv_msg) is +invoked. +These messages are returned in th order received, one per call to &cw(rmr_rcv_msg.) + +&space +NOTE: Currently the multi-threaded functions are supported only when the NNG +transport mechanism is being used. It will not be possible to link a programme +using the nanomsg version of the library when references to this function are +present. + +&h3(The Transaction ID) +The user application is responsible for setting the value of the transaction ID field +before invoking &ital(rmr_mt_call.) +The transaction ID is a &cw(RMR_MAX_XID) byte field that is used to match the +response message when it arrives. +RMr will compare &bold(all) of the bytes in the field, so the caller must ensure +that they are set correctly to avoid missing the response message. +(The application which returns the response message is also expected to ensure that +the return buffer has the matching transaction ID. This can be done transparently if +the application uses the &ital( rmr_rts_msg() ) function and does not adjust the +transaction ID. + + +&h2(RETURN VALUE) +The &cw(rmr_mt_call) function returns a pointer to a message buffer with the state set to reflect +the overall state of call processing. +If the state is &cw(RMR_OK) then the buffer contains the response message; otherwise +the state indicates the error encountered while attempting to send the message. + +&space +If no response message is received when the timeout period has expired, a nil pointer +will be returned (NULL). + +&h2(ERRORS) +These values are reflected in the state field of the returned message. + +&half_space +&beg_dlist(.75i : ^&bold_font ) +&di(RMR_OK) The call was successful and the message buffer references the response message. + +&half_space +&di(RMR_ERR_BADARG) An argument passed to the function was invalid. + +&half_space +&di(RMR_ERR_CALLFAILED) The call failed and the value of &ital(errno,) as described below, + should be checked for the specific reason. + +&half_space +&di(RMR_ERR_NOENDPT) An endpoint associated with the message type could not be found in the + route table. + +&half_space +&di(RMR_ERR_RETRY) The underlying transport mechanism was unable to accept the message + for sending. The user application can retry the call operation if appropriate to + do so. + +&end_dlist + +&space +The global "variable" &ital(errno) will be set to one of the following values if the +overall call processing was not successful. +&half_space + +&beg_dlist(.75i : ^&bold_font ) +&di(ETIMEDOUT) Too many messages were queued before receiving the expected response + +&half_space +&di(ENOBUFS) The queued message ring is full, messages were dropped + +&half_space +&di(EINVAL) A parameter was not valid + +&half_space +&di(EAGAIN) The underlying message system wsa interrupted or the device was busy; + the message was &bold(not) sent, and user application should call + this function with the message again. +&end_dlist + +&h2(EXAMPLE) +The following code bit shows one way of using the &cw(rmr_mt_call) function, and illustrates +how the transaction ID must be set. + +&space +&ex_start + int retries_left = 5; // max retries on dev not available + static rmr_mbuf_t* mbuf = NULL; // response msg + msg_t* pm; // private message (payload) + + // get a send buffer and reference the payload + mbuf = rmr_alloc_msg( mr, RMR_MAX_RCV_BYTES ); + pm = (msg_t*) mbuf->payload; + + // generate an xaction ID and fill in payload with data and msg type + rmr_bytes2xact( mbuf, xid, RMR_MAX_XID ); + snprintf( pm->req, sizeof( pm->req ), "{ \"req\": \"num users\"}" ); + mbuf->mtype = MT_USR_RESP; + + msg = rmr_mt_call( mr, msg, my_id, 100 ); // wait up to 100ms + if( ! msg ) { // probably a timeout and no msg received + return NULL; // let errno trickle up + } + + if( mbuf->state != RMR_OK ) { + while( retries_left-- > 0 && // loop as long as eagain + mbuf->state == RMR_ERR_RETRY && + (msg = rmr_mt_call( mr, msg )) != NULL && + mbuf->state != RMR_OK ) { + + usleep( retry_delay ); + } + + if( mbuf == NULL || mbuf->state != RMR_OK ) { + rmr_free_msg( mbuf ); // safe if nil + return NULL; + } + } + + // do something with mbuf +&ex_end + + +&h2(SEE ALSO ) +.ju off +rmr_alloc_msg(3), +rmr_free_msg(3), +rmr_init(3), +rmr_mt_rcv(3), +rmr_payload_size(3), +rmr_send_msg(3), +rmr_rcv_msg(3), +rmr_rcv_specific(3), +rmr_rts_msg(3), +rmr_ready(3), +rmr_fib(3), +rmr_has_str(3), +rmr_tokenise(3), +rmr_mk_ring(3), +rmr_ring_free(3) +.ju on + + +.qu + diff --git a/doc/src/man/rmr_mt_rcv.3.xfm b/doc/src/man/rmr_mt_rcv.3.xfm new file mode 100644 index 0000000..7c83f18 --- /dev/null +++ b/doc/src/man/rmr_mt_rcv.3.xfm @@ -0,0 +1,210 @@ +.if false +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +.fi +.if false + Mnemonic rmr_mt_rcv_man.xfm + Abstract The manual page for the rmr_mt_rcv function. + Author E. Scott Daniels + Date 24 May 2019 +.fi + +.** if formatting with tfm, the roff.im will cause roff output to be generated +.** if formatting with pfm, then pretty postscript will be generated +.gv e LIB lib +.if pfm + .im &{lib}/generic_ps.im +.ei + .gv e OUTPUT_RST use_rst + .if .ev &use_rst 1 = + .im &{lib}/rst.im + .ei + .im &{lib}/roff.im + .fi +.fi + +&line_len(6i) + +&h1(RMR Library Functions) +&h2(NAME) + rmr_mt_rcv + +&h2(SYNOPSIS ) +&indent +&ex_start +#include + +rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* old_msg, int timeout ); +&ex_end +&uindent + +&h2(DESCRIPTION) +The &cw(rmr_mt_rcv) function blocks until a message is received, or the timeout +period (milliseconds) has passed. +The result is an RMr message buffer which references a received message. +In the case of a timeout the state will be reflected in an "empty buffer" (if old_msg +was not nil, or simply with the return of a nil pointer. +If a timeout value of zero (0) is given, then the function will block until +the next message received. + +&space +The &ital(vctx) pointer is the pointer returned by the &cw(rmr_init) function. +&ital(Old_msg) is a pointer to a previously used message buffer or NULL. +The ability to reuse message buffers helps to avoid alloc/free cycles in the +user application. +When no buffer is available to supply, the receive function will allocate one. + +&space +The &ital(old_msg) parameter allows the user to pass a previously generated RMr +message back to RMr for reuse. +Optionally, the user application may pass a nil pointer if no reusable message +is available. +When a timeout occurs, and old_msg was not nil, the state will be returned +by returning a pointer to the old message with the state set. + +&space +It is possible to use the &ital(rmr_rcv_msg()) function instead of this function. +Doing so might be advantagous if the user programme does not always start the +multi-threaded mode and the use of &ital(rmr_rcv_msg()) would make the flow of +the code more simple. +The advantags of using this function are the ability to set a timeout without +using epoll, and a small performance gain (if multi-threaded mode is enabled, and the +&ital(rmr_rcv_msg()) function is used, it simply invokes this function without +a timeout value, thus there is the small cost of a second call that results). +Similarly, the &ital(rmr_torcv_msg()) call can be used when in multi-threaded +mode with the same "pass through" overhead to using this function directly. + +&space +NOTE: Currently the multi-threaded functions are supported only when the NNG +transport mechanism is being used. It will not be possible to link a programme +using the nanomsg version of the library when references to this function are +present. + +&h2(RETURN VALUE) +When a message is received before the timeout period expires, a pointer to the +RMr message buffer which describes the message is returned. +This will, with a high probability, be a different message buffer than &ital(old_msg;) +the user application should not continue to use &ital(old_msg) after it is passed +to this function. + +&space +In the event of a timeout the return value will be the old msg with the state set, +or a nil pointer if no old message was provided. + +&h2(ERRORS) +The &ital(state) field in the message buffer will be set to one of the following +values: +&space + +&beg_dlist(.75i : ^&bold_font ) +&di(RMR_OK) The message was received without error. + +&half_space +&di(RMR_ERR_BADARG) A parameter passed to the function was not valid (e.g. a nil pointer). + +indicate either &cw(RMR_OK) or +&cw(RMR_ERR_EMPTY) if an empty message was received. + +&half_space +&di(RMR_ERR_EMPTY) The message received had no associated data. The length of the + message will be 0. + +&half_space +&di(RMR_ERR_NOTSUPP) The multi-threaded option was not enabled when RMr was +initialised. See the man page for &ital(rmr_init() ) for details. + +&half_space +&di(RMR_ERR_RCVFAILED) A hard error occurred preventing the receive from completing. +&end_dlist + + +When a nil pointer is returned, or any other state value was set in the message +buffer, &cw(errno) will be set to one of the following: +&space + +&beg_dlist(.75i : ^&bold_font ) +&di(INVAL) Parameter(s) passed to the function were not valid. + +&half_space +&di(EBADF) The underlying message transport is unable to process the request. + +&half_space +&di(ENOTSUP) The underlying message transport is unable to process the request. + +&half_space +&di(EFSM) The underlying message transport is unable to process the request. + +&half_space +&di(EAGAIN) The underlying message transport is unable to process the request. + +&half_space +&di(EINTR) The underlying message transport is unable to process the request. + +&half_space +&di(ETIMEDOUT) The underlying message transport is unable to process the request. + +&half_space +&di(ETERM) The underlying message transport is unable to process the request. +&end_dlist + +&h2(EXAMPLE) +&space +&ex_start + rmr_mbuf_t* mbuf = NULL; // received msg + + msg = rmr_mt_recv( mr, mbuf, 100 ); // wait up to 100ms + if( msg != NULL ) { + switch( msg->state ) { + case RMR_OK: + printf( "got a good message\n" ); + break; + + case RMR_ERR_EMPTY: + printf( "received timed out\n" ); + break; + + default: + printf( "receive error: %d\n", mbuf->state ); + break; + } + } else { + printf( "receive timeout (nil)\n" ); + } +&ex_end + +&h2(SEE ALSO ) +.ju off +rmr_alloc_msg(3), +rmr_call(3), +rmr_free_msg(3), +rmr_get_rcvfd(3), +rmr_init(3), +rmr_mk_ring(3), +rmr_mt_call(3), +rmr_payload_size(3), +rmr_send_msg(3), +rmr_torcv_msg(3), +rmr_rcv_specific(3), +rmr_rts_msg(3), +rmr_ready(3), +rmr_ring_free(3), +rmr_torcv_msg(3) +.ju on + + +.qu + diff --git a/doc/src/man/rmr_rcv_msg.3.xfm b/doc/src/man/rmr_rcv_msg.3.xfm index 58809fd..b432f9d 100644 --- a/doc/src/man/rmr_rcv_msg.3.xfm +++ b/doc/src/man/rmr_rcv_msg.3.xfm @@ -119,7 +119,8 @@ rmr_torcv_msg(3), rmr_rcv_specific(3), rmr_rts_msg(3), rmr_ready(3), -rmr_ring_free(3) +rmr_ring_free(3), +rmr_torcv_msg(3) .ju on diff --git a/src/rmr/common/include/RIC_message_types.h b/src/rmr/common/include/RIC_message_types.h index 92e2602..403a770 100644 --- a/src/rmr/common/include/RIC_message_types.h +++ b/src/rmr/common/include/RIC_message_types.h @@ -33,6 +33,8 @@ #define RIC_UNDEFINED -1 // --------------------------------------------------------- +#define RIC_SCTP_CONNECTION_FAILURE 1080 + #define RIC_SUB_REQ 12010 #define RIC_SUB_RESP 12011 #define RIC_SUB_FAILURE 12012 @@ -57,6 +59,7 @@ #define RIC_X2_RESET 10070 #define RIC_X2_RESET_RESP 10071 + #define RIC_ENDC_X2_SETUP_REQ 10360 #define RIC_ENDC_X2_SETUP_RESP 10361 #define RIC_ENDC_X2_SETUP_FAILURE 10362 diff --git a/src/rmr/common/include/rmr.h b/src/rmr/common/include/rmr.h index e6c1660..bed5f2a 100644 --- a/src/rmr/common/include/rmr.h +++ b/src/rmr/common/include/rmr.h @@ -43,7 +43,9 @@ extern "C" { // various flags for function calls #define RMRFL_NONE 0x00 // no flags -#define RMRFL_AUTO_ALLOC 0x01 // send auto allocates a zerocopy buffer +#define RMRFL_NOTHREAD 0x01 // do not start an additional route collector thread +#define RMRFL_MTCALL 0x02 // set up multi-threaded call support (rmr_init) +#define RMRFL_AUTO_ALLOC 0x03 // send auto allocates a zerocopy buffer #define RMR_DEF_SIZE 0 // pass as size to have msg allocation use the default msg size @@ -66,6 +68,7 @@ extern "C" { #define RMR_ERR_UNSET 13 // the message hasn't been populated with a transport buffer #define RMR_ERR_TRUNC 14 // received message likely truncated #define RMR_ERR_INITFAILED 15 // initialisation of something (probably message) failed +#define RMR_ERR_NOTSUPP 16 // the request is not supported, or RMr was not initialised for the request #define RMR_WH_CONNECTED(a) (a>=0) // for now whid is integer; it could be pointer at some future date @@ -117,6 +120,9 @@ extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ); extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg ); extern void rmr_wh_close( void* vctx, int whid ); +// ----- mt call support -------------------------------------------------------------------------------- +extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ); +extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ); // ----- msg buffer operations (no context needed) ------------------------------------------------------ extern int rmr_bytes2meid( rmr_mbuf_t* mbuf, unsigned char const* src, int len ); diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h index 418ebbc..0d4458a 100644 --- a/src/rmr/common/include/rmr_agnostic.h +++ b/src/rmr/common/include/rmr_agnostic.h @@ -60,6 +60,8 @@ typedef struct uta_ctx uta_ctx_t; // internal flags, must be > than UFLAG_MASK //#define IFL_.... +#define CFL_MTC_ENABLED 0x01 // multi-threaded call is enabled + // msg buffer flags #define MFL_ZEROCOPY 0x01 // the message is an allocated zero copy message and can be sent. #define MFL_NOALLOC 0x02 // send should NOT allocate a new buffer before returning @@ -68,6 +70,7 @@ typedef struct uta_ctx uta_ctx_t; #define MAX_EP_GROUP 32 // max number of endpoints in a group #define MAX_RTG_MSG_SZ 2048 // max expected message size from route generator +#define MAX_CALL_ID 255 // largest call ID that is supported //#define DEF_RTG_MSGID "" // default to pick up all messages from rtg #define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on @@ -83,6 +86,7 @@ typedef struct uta_ctx uta_ctx_t; #define RMR_D1_LEN(h) (ntohl(((uta_mhdr_t *)h)->len2)) #define RMR_D2_LEN(h) (ntohl(((uta_mhdr_t *)h)->len3)) +// CAUTION: if using an offset with a header pointer, the pointer MUST be cast to void* before adding the offset! #define TRACE_OFFSET(h) ((ntohl(((uta_mhdr_t *)h)->len0))) #define DATA1_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)) #define DATA2_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)) @@ -98,12 +102,17 @@ typedef struct uta_ctx uta_ctx_t; #define SET_HDR_D1_LEN(h,l) (((uta_mhdr_t *)h)->len2=htonl((int32_t)l)) #define SET_HDR_D2_LEN(h,l) (((uta_mhdr_t *)h)->len3=htonl((int32_t)l)) + // index of things in the d1 data space +#define D1_CALLID_IDX 0 // the call-id to match on return + +#define NO_CALL_ID 0 // no call id associated with the message (normal queue) #define V1_PAYLOAD_OFFSET(h) (sizeof(uta_v1mhdr_t)) // v2 header flags #define HFL_HAS_TRACE 0x01 // Trace data is populated #define HFL_SUBID 0x02 // subscription ID is populated +#define HFL_CALL_MSG 0x04 // msg sent via blocking call /* Message header; interpreted by the other side, but never seen by @@ -212,6 +221,18 @@ typedef struct ring { } ring_t; +// --------- multi-threaded call things ----------------------------------------- +/* + A chute provides a return path for a received message that a thread has blocked + on. The receive thread will set the mbuf pointer and tickler the barrier to + signal to the call thread that data is ready. +*/ +typedef struct chute { + rmr_mbuf_t* mbuf; // pointer to message buffer received + sem_t barrier; // semaphore that the thread is waiting on + unsigned char expect[RMR_MAX_XID]; // the expected transaction ID +} chute_t; + // -------------- common static prototypes -------------------------------------- diff --git a/src/rmr/common/src/mbuf_api.c b/src/rmr/common/src/mbuf_api.c index e801ac9..65979de 100644 --- a/src/rmr/common/src/mbuf_api.c +++ b/src/rmr/common/src/mbuf_api.c @@ -34,6 +34,7 @@ #include #include #include +#include #include "rmr.h" // things the users see #include "rmr_agnostic.h" // agnostic things (must be included before private) @@ -246,6 +247,11 @@ extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size ) } hdr = (uta_mhdr_t *) msg->header; + if( !hdr ) { + errno = EINVAL; + return 0; + } + len = RMR_TR_LEN( hdr ); if( len != size ) { // different sized trace data, must realloc the buffer diff --git a/src/rmr/common/src/mt_call_static.c b/src/rmr/common/src/mt_call_static.c new file mode 100644 index 0000000..ee3cda2 --- /dev/null +++ b/src/rmr/common/src/mt_call_static.c @@ -0,0 +1,80 @@ +// :vi sw=4 ts=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: mt_call_static.c + Abstract: Static multi-threaded call support. + + Author: E. Scott Daniels + Date: 17 May 2019 +*/ + +#ifndef mtcall_static_c +#define mtcall_static_c + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + + +/* + Initialises the chutes that are then hung off of the context. + + Returns > 0 on success; 0 on failure with errno set. +*/ +static int init_mtcall( uta_ctx_t* ctx ) { + int rc = 1; // return code 1== good. + int i; + chute_t* chutes; + + if( ctx == NULL ) { + errno = EINVAL; + return 0; + } + + chutes = ctx->chutes = (chute_t *) malloc( sizeof( chute_t ) * (MAX_CALL_ID+1) ); + if( chutes == NULL ) { + return 0; + } + + for( i = 0; i < MAX_CALL_ID; i++ ) { // initialise all of the semaphores + chutes[i].mbuf = NULL; + if( sem_init( &chutes[i].barrier, 0, 0 ) < 0 ) { + fprintf( stderr, "[ERR] rmr: unable to initialise mt call chute [%d]: %s\n", i, strerror( errno ) ); + rc = -1; + } + } + + + return rc; +} + +#endif diff --git a/src/rmr/common/src/rt_generic_static.c b/src/rmr/common/src/rt_generic_static.c index 64cf47f..7add33c 100644 --- a/src/rmr/common/src/rt_generic_static.c +++ b/src/rmr/common/src/rt_generic_static.c @@ -413,6 +413,10 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { case 'u': // update current table, not a total replacement tokens[1] = clip( tokens[1] ); if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building + if( ctx->new_rtable == NULL ) { // update table not in progress + break; + } + if( ntoks >2 ) { if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n", @@ -809,9 +813,10 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) { return NULL; } - ep->open = 0; // not connected + ep->open = 0; // not connected ep->addr = uta_h2ip( ep_name ); ep->name = strdup( ep_name ); + pthread_mutex_init( &ep->gate, NULL ); // init with default attrs rmr_sym_put( rt->hash, ep_name, 1, ep ); } diff --git a/src/rmr/nanomsg/include/rmr_private.h b/src/rmr/nanomsg/include/rmr_private.h index 7d78382..8e60d63 100644 --- a/src/rmr/nanomsg/include/rmr_private.h +++ b/src/rmr/nanomsg/include/rmr_private.h @@ -42,6 +42,7 @@ struct endpoint { char* addr; // address used for connection int nn_sock; // the nano-msg socket to write to for this entry int open; // true if we've established the connection + pthread_mutex_t gate; // must be able to serialise some transport level functions on the ep }; /* diff --git a/src/rmr/nanomsg/src/rmr.c b/src/rmr/nanomsg/src/rmr.c index b57df3a..93e4f7a 100644 --- a/src/rmr/nanomsg/src/rmr.c +++ b/src/rmr/nanomsg/src/rmr.c @@ -44,6 +44,7 @@ #include #include #include +#include #include #include diff --git a/src/rmr/nng/include/rmr_nng_private.h b/src/rmr/nng/include/rmr_nng_private.h index 6bca91f..2349ce6 100644 --- a/src/rmr/nng/include/rmr_nng_private.h +++ b/src/rmr/nng/include/rmr_nng_private.h @@ -43,6 +43,7 @@ struct endpoint { nng_socket nn_sock; // the nano-msg socket to write to for this entry nng_dialer dialer; // the connection specific information (retry timout etc) int open; // set to true if we've connected as socket cannot be checked directly) + pthread_mutex_t gate; // we must serialise when we open/link to the endpoint }; /* @@ -71,7 +72,7 @@ struct uta_ctx { int nrtele; // number of elements in the routing table int send_retries; // number of retries send_msg() should attempt if eagain/timeout indicated by nng int trace_data_len; // number of bytes to allocate in header for trace data - int d1_len; // extra header data 1 length (future) + int d1_len; // extra header data 1 length int d2_len; // extra header data 2 length (future) nng_socket nn_sock; // our general listen socket route_table_t* rtable; // the active route table @@ -79,6 +80,7 @@ struct uta_ctx { route_table_t* new_rtable; // route table under construction if_addrs_t* ip_list; // list manager of the IP addresses that are on our known interfaces void* mring; // ring where msgs are queued while waiting for a call response msg + chute_t* chutes; char* rtg_addr; // addr/port of the route table generation publisher int rtg_port; // the port that the rtg listens on @@ -87,6 +89,7 @@ struct uta_ctx { epoll_stuff_t* eps; // epoll information needed for the rcv with timeout call pthread_t rtc_th; // thread info for the rtc listener + pthread_t mtc_th; // thread info for the multi-thread call receive process }; @@ -101,7 +104,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ); static void free_ctx( uta_ctx_t* ctx ); // --- rt table things --------------------------- -static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ); +static int uta_link2( endpoint_t* ep ); static int rt_link2_ep( endpoint_t* ep ); static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ); static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ); diff --git a/src/rmr/nng/src/mt_call_nng_static.c b/src/rmr/nng/src/mt_call_nng_static.c new file mode 100644 index 0000000..2ef21f6 --- /dev/null +++ b/src/rmr/nng/src/mt_call_nng_static.c @@ -0,0 +1,114 @@ +// : vi ts=4 sw=4 noet : +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: mt_call_nng_static.c + Abstract: Static funcitons related to the multi-threaded call feature + which are NNG specific. + + Author: E. Scott Daniels + Date: 20 May 2019 +*/ + +#ifndef _mtcall_nng_static_c +#define _mtcall_nng_static_c +#include + +static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { + chute_t* chute; + int state; + + if( ! uta_ring_insert( ctx->mring, mbuf ) ) { + rmr_free_msg( mbuf ); // drop if ring is full + } + + chute = &ctx->chutes[0]; + chute->mbuf = mbuf; + state = sem_post( &chute->barrier ); // tickle the ring monitor +} + +/* + This is expected to execute in a separate thread. It is responsible for + _all_ receives and queues them on the appropriate ring, or chute. + + The "state" of the message is checked which determines where the message + is delivered. + + Flags indicate that the message is a call generated message, then + the message is queued on the normal receive ring. + + Chute ID is == 0, then the message is queued on the normal receive ring. + + The transaction ID in the message matches the expected ID in the chute, + then the message is given to the chute and the chute's semaphore is tickled. + + If none are true, the message is dropped. +*/ +static void* mt_receive( void* vctx ) { + uta_ctx_t* ctx; + uta_mhdr_t* hdr; // header of the message received + rmr_mbuf_t* mbuf; // msg received + unsigned char* d1; // pointer at d1 data ([0] is the call_id) + chute_t* chute; + unsigned int call_id; // the id assigned to the call generated message + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + if( DEBUG ) fprintf( stderr, "rmr mt_receive: bad parms, thread not started\n" ); + return NULL; + } + + fprintf( stderr, "[INFO] rmr mt_receiver is spinning\n" ); + + while( ! ctx->shutdown ) { + mbuf = rcv_msg( ctx, NULL ); + + if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL ) { + if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue + queue_normal( ctx, mbuf ); + } else { + if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue + queue_normal( ctx, mbuf ); + } else { + d1 = DATA1_ADDR( hdr ); + if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue + queue_normal( ctx, mbuf ); + } else { + chute = &ctx->chutes[call_id]; + if( memcmp( mbuf->xaction, chute->expect, RMR_MAX_XID ) == 0 ) { // match + chute->mbuf = mbuf; + sem_post( &chute->barrier ); + } else { + rmr_free_msg( mbuf ); + mbuf = NULL; + } + } + } + } + } else { + if( ! mbuf ) { // very very unlikely, but prevent leaks + rmr_free_msg( mbuf ); + } + } + } + + return NULL; +} + +#endif diff --git a/src/rmr/nng/src/rmr_nng.c b/src/rmr/nng/src/rmr_nng.c index e77c9f3..582cbb0 100644 --- a/src/rmr/nng/src/rmr_nng.c +++ b/src/rmr/nng/src/rmr_nng.c @@ -50,6 +50,8 @@ #include #include #include +#include +#include #include #include @@ -70,6 +72,8 @@ #include "tools_static.c" #include "sr_nng_static.c" // send/receive static functions #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!) +#include "mt_call_static.c" +#include "mt_call_nng_static.c" //------------------------------------------------------------------------------ @@ -181,103 +185,20 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { } /* - send message with maximum timeout. - Accept a message and send it to an endpoint based on message type. - If NNG reports that the send attempt timed out, or should be retried, - RMr will retry for approximately max_to microseconds; rounded to the next - higher value of 10. - - Allocates a new message buffer for the next send. If a message type has - more than one group of endpoints defined, then the message will be sent - in round robin fashion to one endpoint in each group. - - An endpoint will be looked up in the route table using the message type and - the subscription id. If the subscription id is "UNSET_SUBID", then only the - message type is used. If the initial lookup, with a subid, fails, then a - second lookup using just the mtype is tried. - - CAUTION: this is a non-blocking send. If the message cannot be sent, then - it will return with an error and errno set to eagain. If the send is - a limited fanout, then the returned status is the status of the last - send attempt. - + This is a wrapper to the real timeout send. We must wrap it now to ensure that + the call flag and call-id are reset */ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { - nng_socket nn_sock; // endpoint socket for send - uta_ctx_t* ctx; - int group; // selected group to get socket for - int send_again; // true if the message must be sent again - rmr_mbuf_t* clone_m; // cloned message for an nth send - int sock_ok; // got a valid socket from round robin select - uint64_t key; // mtype or sub-id/mtype sym table key - int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails - - if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast - errno = EINVAL; // if msg is null, this is their clue - if( msg != NULL ) { - msg->state = RMR_ERR_BADARG; - errno = EINVAL; // must ensure it's not eagain - } - return msg; - } + char* d1; // point at the call-id in the header - errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't - if( msg->header == NULL ) { - fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" ); - msg->state = RMR_ERR_NOHDR; - errno = EBADMSG; // must ensure it's not eagain - return msg; - } + if( msg != NULL ) { + ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off - if( max_to < 0 ) { - max_to = ctx->send_retries; // convert to retries - } + d1 = DATA1_ADDR( msg->header ); + d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end + } - send_again = 1; // force loop entry - group = 0; // always start with group 0 - - key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry - if( msg->sub_id != UNSET_SUBID ) { - altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry - } - while( send_again ) { - sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups - if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n", - msg->mtype, send_again, group, msg->len, sock_ok, altk_ok ); - - if( ! sock_ok ) { - if( altk_ok ) { // we can try with the alternate (no sub-id) key - altk_ok = 0; - key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again - send_again = 1; // ensure we don't exit the while - continue; - } - - msg->state = RMR_ERR_NOENDPT; - errno = ENXIO; // must ensure it's not eagain - return msg; // caller can resend (maybe) or free - } - - group++; - - if( send_again ) { - clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available - if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); - msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer - msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success - /* - if( msg ) { - // error do we need to count successes/errors, how to report some success, esp if last fails? - } - */ - - msg = clone_m; // clone will be the next to send - } else { - msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was - } - } - - return msg; // last message caries the status of last/only send attempt + return mtosend_msg( vctx, msg, max_to ); } /* @@ -286,7 +207,16 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { See rmr_stimeout() for info on setting the default timeout. */ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { - return rmr_mtosend_msg( vctx, msg, -1 ); // retries < uses default from ctx + char* d1; // point at the call-id in the header + + if( msg != NULL ) { + ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off + + d1 = DATA1_ADDR( msg->header ); + d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end + } + + return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx } /* @@ -314,12 +244,11 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { The caller must check for this and handle. */ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { - nng_socket nn_sock; // endpoint socket for send + nng_socket nn_sock; // endpoint socket for send uta_ctx_t* ctx; - int state; - uta_mhdr_t* hdr; - char* hold_src; // we need the original source if send fails - int sock_ok; // true if we found a valid endpoint socket + int state; + char* hold_src; // we need the original source if send fails + int sock_ok; // true if we found a valid endpoint socket if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -336,19 +265,20 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { return msg; } + ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // socket of specific endpoint if( ! sock_ok ) { msg->state = RMR_ERR_NOENDPT; return msg; // preallocated msg can be reused since not given back to nn } - msg->state = RMR_OK; // ensure it is clear before send - hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours + msg->state = RMR_OK; // ensure it is clear before send + hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to + strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again - msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source + strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again + msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source } free( hold_src ); @@ -356,6 +286,16 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { } /* + If multi-threading call is turned on, this invokes that mechanism with the special call + id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original + behavour (described below) is carried out. This is safe to use when mt is enabled, but + the user app is invoking rmr_call() from only one thread, and the caller doesn't need + a flexible timeout. + + On timeout this function will return a nil pointer. If the original message could not + be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status. + + Original behavour: Call sends the message based on message routing using the message type, and waits for a response message to arrive with the same transaction id that was in the outgoing message. If, while wiating for the expected response, messages are received which do not have the @@ -373,8 +313,6 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { EAGAIN -- the underlying message system wsa interrupted or the device was busy; user should call this function with the message again. - - QUESTION: should user specify the number of messages to allow to queue? */ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { uta_ctx_t* ctx; @@ -387,6 +325,10 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { return msg; } + if( ctx->flags & CFL_MTC_ENABLED ) { // if multi threaded call is on, use that + return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec + } + memcpy( expected_id, msg->xaction, RMR_MAX_XID ); expected_id[RMR_MAX_XID] = 0; // ensure it's a string if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id ); @@ -426,6 +368,10 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { } errno = 0; + if( ctx->flags & CFL_MTC_ENABLED ) { // must pop from ring with a semaphore dec first + return rmr_mt_rcv( ctx, old_msg, -1 ); + } + qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued if( qm != NULL ) { if( old_msg ) { @@ -458,6 +404,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { return old_msg; } + if( ctx->flags & CFL_MTC_ENABLED ) { // must pop from ring with a semaphore dec first + return rmr_mt_rcv( ctx, old_msg, ms_to ); + } + qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued if( qm != NULL ) { if( old_msg ) { @@ -650,7 +600,14 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { memset( ctx, 0, sizeof( uta_ctx_t ) ); ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning - ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response + ctx->d1_len = 4; // data1 space in header -- 4 bytes for now + + if( flags & RMRFL_MTCALL ) { // mt call support is on, need bigger ring + ctx->mring = uta_mk_ring( 2048 ); // message ring filled by rcv thread + init_mtcall( ctx ); // set up call chutes + } else { + ctx->mring = uta_mk_ring( 128 ); // ring filled only on blocking call + } ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh if( max_msg_size > 0 ) { @@ -713,6 +670,14 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } } + if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) { // mt call support is on, must start the listener thread if not running + ctx->flags |= CFL_MTC_ENABLED; + if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // kick the receiver + fprintf( stderr, "[WARN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); + } + + } + free( proto_port ); return (void *) ctx; } @@ -818,4 +783,210 @@ extern void rmr_close( void* vctx ) { } +// ----- multi-threaded call/receive support ------------------------------------------------- +/* + Blocks on the receive ring chute semaphore and then reads from the ring + when it is tickled. If max_wait is -1 then the function blocks until + a message is ready on the ring. Else max_wait is assumed to be the number + of millaseconds to wait before returning a timeout message. +*/ +extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { + uta_ctx_t* ctx; + uta_mhdr_t* hdr; // header in the transport buffer + chute_t* chute; + struct timespec ts; // time info if we have a timeout + long new_ms; // adjusted mu-sec + long seconds = 0; // max wait seconds + long nano_sec; // max wait xlated to nano seconds + int state; + rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + if( mbuf ) { + mbuf->state = RMR_ERR_BADARG; + } + return mbuf; + } + + if( ! (ctx->flags & CFL_MTC_ENABLED) ) { + errno = EINVAL; + if( mbuf != NULL ) { + mbuf->state = RMR_ERR_NOTSUPP; + } + return mbuf; + } + + ombuf = mbuf; + if( ombuf ) { + ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure + ombuf->len = 0; + } + + chute = &ctx->chutes[0]; // chute 0 used only for its semaphore + + if( max_wait > 0 ) { + clock_gettime( CLOCK_REALTIME, &ts ); + + if( max_wait > 999 ) { + seconds = (max_wait - 999)/1000; + max_wait -= seconds * 1000; + ts.tv_sec += seconds; + } + if( max_wait > 0 ) { + nano_sec = max_wait * 1000000; + ts.tv_nsec += nano_sec; + if( ts.tv_nsec > 999999999 ) { + ts.tv_nsec -= 999999999; + ts.tv_sec++; + } + } + + seconds = 1; // use as flag later to invoked timed wait + } + + errno = 0; + while( chute->mbuf == NULL && ! errno ) { + if( seconds ) { + state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout + } else { + state = sem_wait( &chute->barrier ); + } + + if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit + errno = 0; + } + } + + if( state < 0 ) { + mbuf = ombuf; // return caller's buffer if they passed one in + } else { + if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" ); + if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued + if( mbuf ) { + mbuf->state = RMR_OK; + + if( ombuf ) { + rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring + } + } else { + mbuf = ombuf; // no buffer, return user's if there + } + } + } + + return mbuf; +} + +/* + Accept a message buffer and caller ID, send the message and then wait + for the receiver to tickle the semaphore letting us know that a message + has been received. The call_id is a value between 2 and 255, inclusive; if + it's not in this range an error will be returned. Max wait is the amount + of time in millaseconds that the call should block for. If 0 is given + then no timeout is set. + + If the mt_call feature has not been initialised, then the attempt to use this + funciton will fail with RMR_ERR_NOTSUPP + + If no matching message is received before the max_wait period expires, a + nil pointer is returned, and errno is set to ETIMEOUT. If any other error + occurs after the message has been sent, then a nil pointer is returned + with errno set to some other value. +*/ +extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { + rmr_mbuf_t* ombuf; // original mbuf passed in + uta_ctx_t* ctx; + uta_mhdr_t* hdr; // header in the transport buffer + chute_t* chute; + unsigned char* d1; // d1 data in header + struct timespec ts; // time info if we have a timeout + long new_ms; // adjusted mu-sec + long seconds = 0; // max wait seconds + long nano_sec; // max wait xlated to nano seconds + int state; + + if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) { + errno = EINVAL; + if( mbuf ) { + mbuf->state = RMR_ERR_BADARG; + } + return mbuf; + } + + if( ! (ctx->flags & CFL_MTC_ENABLED) ) { + mbuf->state = RMR_ERR_NOTSUPP; + return mbuf; + } + + if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them + mbuf->state = RMR_ERR_BADARG; + return mbuf; + } + + ombuf = mbuf; // save to return timeout status with + + chute = &ctx->chutes[call_id]; + if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped + rmr_free_msg( chute->mbuf ); + chute->mbuf = NULL; + } + + hdr = (uta_mhdr_t *) mbuf->header; + hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call + memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for + d1 = DATA1_ADDR( hdr ); + d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response + mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend + + if( max_wait > 0 ) { + clock_gettime( CLOCK_REALTIME, &ts ); + + if( max_wait > 999 ) { + seconds = (max_wait - 999)/1000; + max_wait -= seconds * 1000; + ts.tv_sec += seconds; + } + if( max_wait > 0 ) { + nano_sec = max_wait * 1000000; + ts.tv_nsec += nano_sec; + if( ts.tv_nsec > 999999999 ) { + ts.tv_nsec -= 999999999; + ts.tv_sec++; + } + } + + seconds = 1; // use as flag later to invoked timed wait + } + + mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! + if( mbuf ) { + if( mbuf->state != RMR_OK ) { + return mbuf; // timeout or unable to connect or no endpoint are most likely issues + } + } + + errno = 0; + while( chute->mbuf == NULL && ! errno ) { + if( seconds ) { + state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout + } else { + state = sem_wait( &chute->barrier ); + } + + if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit + errno = 0; + } + } + + if( state < 0 ) { + return NULL; // leave errno as set by sem wait call + } + + mbuf = chute->mbuf; + mbuf->state = RMR_OK; + chute->mbuf = NULL; + + return mbuf; +} diff --git a/src/rmr/nng/src/rtable_nng_static.c b/src/rmr/nng/src/rtable_nng_static.c index 6122f69..d903de3 100644 --- a/src/rmr/nng/src/rtable_nng_static.c +++ b/src/rmr/nng/src/rtable_nng_static.c @@ -53,12 +53,29 @@ Target assumed to be address:port. The new socket is returned via the user supplied pointer so that a success/fail code is returned directly. Return value is 0 (false) on failure, 1 (true) on success. + + In order to support multi-threaded user applications we must hold a lock before + we attempt to create a dialer and connect. NNG is thread safe, but we can + get things into a bad state if we allow a collision here. The lock grab + only happens on the intial session setup. */ -static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { +//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) { +static int uta_link2( endpoint_t* ep ) { + char* target; + nng_socket* nn_sock; + nng_dialer* dialer; char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection char* addr; int state = FALSE; + if( ep == NULL ) { + return FALSE; + } + + target = ep->addr; + nn_sock = &ep->nn_sock; + dialer = &ep->dialer; + if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "" : target ); return FALSE; @@ -69,13 +86,22 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { return FALSE; } + pthread_mutex_lock( &ep->gate ); // grab the lock + if( ep->open ) { + pthread_mutex_unlock( &ep->gate ); + return TRUE; + } + + if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode + pthread_mutex_unlock( &ep->gate ); fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target ); return FALSE; } snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target ); if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) { + pthread_mutex_unlock( &ep->gate ); fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno ); nng_close( *nn_sock ); return FALSE; @@ -85,6 +111,7 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg) + pthread_mutex_unlock( &ep->gate ); fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) ); nng_close( *nn_sock ); return FALSE; @@ -92,6 +119,8 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target ); + ep->open = TRUE; // must set before release + pthread_mutex_unlock( &ep->gate ); return TRUE; } @@ -108,7 +137,7 @@ static int rt_link2_ep( endpoint_t* ep ) { return TRUE; } - ep->open = uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ); + uta_link2( ep ); return ep->open; } @@ -198,7 +227,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s if( ep->addr == NULL ) { // name didn't resolve before, try again ep->addr = uta_h2ip( ep->name ); } - if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link + if( uta_link2( ep ) ) { // find entry in table and create link state = TRUE; ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller @@ -302,7 +331,7 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, ep->addr = uta_h2ip( ep->name ); } - if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link + if( uta_link2( ep ) ) { // find entry in table and create link ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller } else { diff --git a/src/rmr/nng/src/sr_nng_static.c b/src/rmr/nng/src/sr_nng_static.c index 7c17589..da4dfbd 100644 --- a/src/rmr/nng/src/sr_nng_static.c +++ b/src/rmr/nng/src/sr_nng_static.c @@ -142,8 +142,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s hdr->sub_id = htonl( UNSET_SUBID ); SET_HDR_LEN( hdr ); // ensure these are converted to net byte order SET_HDR_TR_LEN( hdr, ctx->trace_data_len ); - //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them - //SET_HDR_D2_LEN( hdr, ctx->d2_len ); + SET_HDR_D1_LEN( hdr, ctx->d1_len ); + //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future } msg->len = 0; // length of data in the payload msg->alloc_len = mlen; // length of allocated transport buffer @@ -322,7 +322,6 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { uta_mhdr_t* hdr; uta_v1mhdr_t* v1hdr; int tr_old_len; // tr size in new buffer - int coffset; // an offset to something in the header for copy nm = (rmr_mbuf_t *) malloc( sizeof *nm ); if( nm == NULL ) { @@ -349,22 +348,19 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { nm->payload = (void *) v1hdr + sizeof( *v1hdr ); break; - default: // current message always caught here + default: // current message version always caught here hdr = nm->header; - memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data might have changed - if( RMR_D1_LEN( hdr ) ) { - coffset = DATA1_OFFSET( hdr ); // offset to d1 - memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary + memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed + SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy + if( RMR_D1_LEN( hdr ) ) { + memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary } if( RMR_D2_LEN( hdr ) ) { - coffset = DATA2_OFFSET( hdr ); // offset to d2 - memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) ); // copy data2 and data2 if necessary + memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) ); } - SET_HDR_TR_LEN( hdr, tr_len ); // MUST set before pointing payload nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload - SET_HDR_TR_LEN( hdr, tr_len ); // do NOT copy old trace data, just set the new header break; } @@ -427,6 +423,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->len = 0; msg->payload = NULL; msg->xaction = NULL; + msg->tp_buf = NULL; msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) { @@ -590,6 +587,108 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock return msg; } +/* + send message with maximum timeout. + Accept a message and send it to an endpoint based on message type. + If NNG reports that the send attempt timed out, or should be retried, + RMr will retry for approximately max_to microseconds; rounded to the next + higher value of 10. + + Allocates a new message buffer for the next send. If a message type has + more than one group of endpoints defined, then the message will be sent + in round robin fashion to one endpoint in each group. + + An endpoint will be looked up in the route table using the message type and + the subscription id. If the subscription id is "UNSET_SUBID", then only the + message type is used. If the initial lookup, with a subid, fails, then a + second lookup using just the mtype is tried. + + CAUTION: this is a non-blocking send. If the message cannot be sent, then + it will return with an error and errno set to eagain. If the send is + a limited fanout, then the returned status is the status of the last + send attempt. + +*/ +static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { + nng_socket nn_sock; // endpoint socket for send + uta_ctx_t* ctx; + int group; // selected group to get socket for + int send_again; // true if the message must be sent again + rmr_mbuf_t* clone_m; // cloned message for an nth send + int sock_ok; // got a valid socket from round robin select + uint64_t key; // mtype or sub-id/mtype sym table key + int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails + char* d1; + + if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast + errno = EINVAL; // if msg is null, this is their clue + if( msg != NULL ) { + msg->state = RMR_ERR_BADARG; + errno = EINVAL; // must ensure it's not eagain + } + return msg; + } + + errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't + if( msg->header == NULL ) { + fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" ); + msg->state = RMR_ERR_NOHDR; + errno = EBADMSG; // must ensure it's not eagain + return msg; + } + + if( max_to < 0 ) { + max_to = ctx->send_retries; // convert to retries + } + + send_again = 1; // force loop entry + group = 0; // always start with group 0 + + key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry + if( msg->sub_id != UNSET_SUBID ) { + altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry + } + while( send_again ) { + sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups + if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n", + msg->mtype, send_again, group, msg->len, sock_ok, altk_ok ); + + if( ! sock_ok ) { + if( altk_ok ) { // we can try with the alternate (no sub-id) key + altk_ok = 0; + key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again + send_again = 1; // ensure we don't exit the while + continue; + } + + msg->state = RMR_ERR_NOENDPT; + errno = ENXIO; // must ensure it's not eagain + return msg; // caller can resend (maybe) or free + } + + group++; + + if( send_again ) { + clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available + if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); + msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer + msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success + /* + if( msg ) { + // error do we need to count successes/errors, how to report some success, esp if last fails? + } + */ + + msg = clone_m; // clone will be the next to send + } else { + msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was + } + } + + return msg; // last message caries the status of last/only send attempt +} + + /* A generic wrapper to the real send to keep wormhole stuff agnostic. We assume the wormhole function vetted the buffer so we don't have to. diff --git a/test/Makefile b/test/Makefile index d6c2c64..db4655a 100644 --- a/test/Makefile +++ b/test/Makefile @@ -24,6 +24,7 @@ coverage_opts = -ftest-coverage -fprofile-arcs #libs = ../build/librmr_nng.a -L ../build/lib -lnng -lpthread -lm libs = -L ../build/lib -lnng -lpthread -lm +ipaths = -I ../src/rmr/common/src/ -I ../src/rmr/common/include -I ../src/rmr/nng/include/ -I ../src/rmr/nng/src/ -I ../src/rmr/nanomsg/include/ -I ../src/rmr/nanomsg/src/ #sa_tests = sa_tools_test.o @@ -31,7 +32,7 @@ libs = -L ../build/lib -lnng -lpthread -lm $(CC) -g $< -c %:: %.c - $(CC) -I ../src/common/src/ -I ../src/common/include -I ../src/nng/include -I ../src/nanomsg/include $(coverage_opts) -fPIC -g $< -o $@ $(libs) + $(CC) $(ipaths) $(coverage_opts) -fPIC -g $< -o $@ $(libs) # catch all all: diff --git a/test/app_test/Makefile b/test/app_test/Makefile index f05f49e..07748ee 100644 --- a/test/app_test/Makefile +++ b/test/app_test/Makefile @@ -19,25 +19,32 @@ # NOTE: this makefile assumes that RMr has been built using the directory .build # at the top most repo directory (e.g. ../../.build). It can be changed # if you need to by adding "build_path=" to the make command line. +# To use this makefile to build on a system where RMr is already installed +# try: make build_path=/usr/local/lib +# +# By default we prefer the Korn shell (it's just better). If you really need +# to build with a dfferent shell add "SHELL=path" to the command line: +# make SHELL=/bin/bash +# .EXPORT_ALL_VARIABLES: .ONESHELL: #.SHELLFLAGS = -e # hosed on some flavours so keep it off -SHELL = /bin/ksh +SHELL ?= /bin/ksh build_path ?= ../../.build -header_path := $(shell find ../../.build -name 'rmr.h' |head -1 | sed 's!/rmr/.*!!' ) +header_path := $(shell find $(build_path) -name 'rmr.h' |head -1 | sed 's!/rmr/.*!!' ) C_INCLUDE_PATH := $(header_path) LD_LIBRARY_PATH=$(build_path):$(build_path)/lib LIBRARY_PATH = $(LD_LIBRARY_PATH) # These programmes are designed to test some basic application level functions -# from the perspective of two communicating processes. +# from the perspective of two, or more, communicating processes. .PHONY: all -all: sender receiver sender_nano receiver_nano +all: sender receiver sender_nano receiver_nano caller mt_receiver receiver_nano: receiver.c gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm @@ -45,13 +52,23 @@ receiver_nano: receiver.c receiver: receiver.c gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm +mt_receiver: receiver.c + gcc -I $${C_INCLUDE_PATH:-.} -DMTC $< -g -o $@ -lrmr_nng -lnng -lpthread -lm + sender_nano: sender.c gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm sender: sender.c gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm +caller: caller.c + gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm + -.PHONY: clean +# clean removes intermediates; nuke removes everything that can be built +.PHONY: clean nuke clean: - rm -f sender sender_nano receiver receiver_nano *.o + rm -f *.o + +nuke: clean + rm -f sender sender_nano receiver receiver_nano caller mt_receiver diff --git a/test/app_test/caller.c b/test/app_test/caller.c new file mode 100644 index 0000000..7b7336a --- /dev/null +++ b/test/app_test/caller.c @@ -0,0 +1,326 @@ +// :vim ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: caller.c + Abstract: This is a simple sender which will send a series of messages using + rmr_call(). N threads are started each sending the desired number + of messages and expecting an 'ack' for each. Each ack is examined + to verify that the thread id placed into the message matches (meaning + that the ack was delivered by RMr to the correct thread's chute. + + In addition, the main thread listens for messages in order to verify + that a main or receiving thread can receive messages concurrently + while call acks are pending and being processed. + + Message format is: + ck1 ck2| @ tid + + Ck1 is the simple check sum of the msg-text (NOT includeing ) + Ck2 is the simple check sum of the trace data which is a nil terminated + series of bytes. + tid is the thread id assigned by the main thread. + + Parms: argv[1] == number of msgs to send (10) + argv[2] == delay (mu-seconds, 1000000 default) + argv[3] == number of threads (3) + argv[4] == listen port + + Sender will send for at most 20 seconds, so if nmsgs and delay extend + beyond that period the total number of messages sent will be less + than n. + + Date: 18 April 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + + +#include + +#define TRACE_SIZE 40 // bytes in header to provide for trace junk + +/* + Thread data +*/ +typedef struct tdata { + int id; // the id we'll pass to RMr mt-call function NOT the thread id + int n2send; // number of messages to send + int delay; // ms delay between messages + void* mrc; // RMr context + int state; +} tdata_t; + + + +// -------------------------------------------------------------------------------- + + +static int sum( char* str ) { + int sum = 0; + int i = 0; + + while( *str ) { + sum += *(str++) + i++; + } + + return sum % 255; +} + + + +/* + Executed as a thread, this puppy will generate calls to ensure that we get the + response back to the right thread, that we can handle threads, etc. +*/ +static void* mk_calls( void* data ) { + tdata_t* control; + rmr_mbuf_t* sbuf; // send buffer + int count = 0; + int rt_count = 0; // number of messages requiring a spin retry + int ok_msg = 0; // received messages that were sent by us + int bad_msg = 0; // received messages that were sent by a different thread + int drops = 0; + int fail_count = 0; // # of failure sends after first successful send + int successful = 0; // set to true after we have a successful send + char wbuf[1024]; + char xbuf[1024]; // build transaction string here + char trace[1024]; + int xaction_id = 1; + char* tok; + int state = 0; + + if( (control = (tdata_t *) data) == NULL ) { + fprintf( stderr, "thread data was nil; bailing out\n" ); + } + //fprintf( stderr, " thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay ); + + sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send + + memset( trace, 0, sizeof( trace ) ); + while( count < control->n2send ) { // we send n messages after the first message is successful + snprintf( trace, 100, "%lld", (long long) time( NULL ) ); + rmr_set_trace( sbuf, trace, TRACE_SIZE ); // fully populate so we dont cause a buffer realloc + + snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id ); + snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf ); + snprintf( xbuf, 200, "%31d", xaction_id ); + rmr_bytes2xact( sbuf, xbuf, 32 ); + + sbuf->mtype = 5; // mtype is always 5 as the test receiver acks just mtype 5 messages + sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string + sbuf->state = 0; + sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry) + + if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send + rt_count++; + } + while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying + sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response + } + + if( sbuf != NULL ) { + switch( sbuf->state ) { + case RMR_OK: // we should have a buffer back from the sender here + successful = 1; + if( (tok = strchr( sbuf->payload, '@' )) != NULL ) { + if( atoi( tok+1 ) == control->id ) { + //fprintf( stderr, " tid=%-2d ok ack\n", control->id ); + ok_msg++; + } else { + bad_msg++; + //fprintf( stderr, " tid=%-2d bad ack %s\n", control->id, sbuf->payload ); + } + } + //fprintf( stderr, " tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload ); + // future -- verify that we see our ID at the end of the message + count++; + break; + + default: + fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno ); + sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer + if( successful ) { + fail_count++; // count failures after first successful message + } else { + // some error (not connected likely), don't count this + sleep( 1 ); + } + break; + } + } else { + //fprintf( stderr, " tid=%-2d call finished, no sbuf\n", control->id ); + sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf + drops++; + count++; + } + + if( control->delay > 0 ) { + usleep( control->delay ); + } + } + + state = 1; + if( ok_msg < (control->n2send-1) || bad_msg > 0 ) { // allow one drop to pass + state = 0; + } + if( count < control->n2send ) { + state = 0; + } + + control->state = -state; // signal inactive to main thread; -1 == pass, 0 == fail + fprintf( stderr, " [%s] tid=%-2d sent=%d ok-acks=%d bad-acks=%d drops=%d failures=%d retries=%d\n", + state ? "PASS" : "FAIL", control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count ); + + + return NULL; +} + +int main( int argc, char** argv ) { + void* mrc; // msg router context + rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow + struct epoll_event events[1]; // list of events to give to epoll + struct epoll_event epe; // event definition for event to listen to + int ep_fd = -1; // epoll's file des (given to epoll_wait) + int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on + int nready; // number of events ready for receive + char* listen_port = "43086"; + long timeout = 0; + int delay = 100000; // usec between send attempts + int nmsgs = 10; // number of messages to send + int nthreads = 3; + tdata_t* cvs; // vector of control blocks + int i; + pthread_t* pt_info; // thread stuff + int failures = 0; + int pings = 0; // number of messages received on normal channel + + if( argc > 1 ) { + nmsgs = atoi( argv[1] ); + } + if( argc > 2 ) { + delay = atoi( argv[2] ); + } + if( argc > 3 ) { + nthreads = atoi( argv[3] ); + } + if( argc > 4 ) { + listen_port = argv[4]; + } + + fprintf( stderr, " listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay ); + + if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled + fprintf( stderr, " unable to initialise RMr\n" ); + exit( 1 ); + } + + rmr_init_trace( mrc, TRACE_SIZE ); + + if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG + if( rcv_fd < 0 ) { + fprintf( stderr, " unable to set up polling fd\n" ); + exit( 1 ); + } + if( (ep_fd = epoll_create1( 0 )) < 0 ) { + fprintf( stderr, " [FAIL] unable to create epoll fd: %d\n", errno ); + exit( 1 ); + } + epe.events = EPOLLIN; + epe.data.fd = rcv_fd; + + if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) { + fprintf( stderr, " [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); + exit( 1 ); + } + } else { + rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive + } + + + cvs = malloc( sizeof( tdata_t ) * nthreads ); + pt_info = malloc( sizeof( pthread_t ) * nthreads ); + if( cvs == NULL ) { + fprintf( stderr, " unable to allocate control vector\n" ); + exit( 1 ); + } + + + timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much) + while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one + fprintf( stderr, " waiting for rmr to show ready\n" ); + sleep( 1 ); + + if( time( NULL ) > timeout ) { + fprintf( stderr, " giving up\n" ); + exit( 1 ); + } + } + fprintf( stderr, " rmr is ready; starting threads\n" ); + + for( i = 0; i < nthreads; i++ ) { + cvs[i].mrc = mrc; + cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1 + cvs[i].delay = delay; + cvs[i].n2send = nmsgs; + cvs[i].state = 1; + + pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread + } + + timeout = time( NULL ) + 20; + i = 0; + while( nthreads > 0 ) { + if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success + nthreads--; + if( cvs[i].state == 0 ) { + failures++; + } + i++; + } else { + // sleep( 1 ); + rbuf = rmr_torcv_msg( mrc, rbuf, 1000 ); + if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) { + pings++; + rmr_free_msg( rbuf ); + rbuf = NULL; + } + } + if( time( NULL ) > timeout ) { + failures += nthreads; + fprintf( stderr, " timeout waiting for threads to finish; %d were not finished\n", nthreads ); + break; + } + } + + fprintf( stderr, " [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings ); + rmr_close( mrc ); + + return failures > 0; +} + diff --git a/test/app_test/receiver.c b/test/app_test/receiver.c index ed2450b..e016f9e 100644 --- a/test/app_test/receiver.c +++ b/test/app_test/receiver.c @@ -47,6 +47,12 @@ RMR_SEED_RT -- path to the static routing table RMR_RTG_SVC -- port to listen for RTG connections + Compile time options + if -DMTC is defined on the compile command, then RMr is initialised + with the multi-threaded receive thread rather than using the same + process receive function. All other functions in the receiver are + the same. + Date: 18 April 2019 Author: E. Scott Daniels */ @@ -125,7 +131,12 @@ int main( int argc, char** argv ) { fprintf( stderr, " listening on port: %s for a max of %d messages\n", listen_port, nmsgs ); +#ifdef MTC + mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode + +#else mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines! +#endif if( mrc == NULL ) { fprintf( stderr, " ABORT: unable to initialise RMr\n" ); exit( 1 ); diff --git a/test/app_test/run_call_test.ksh b/test/app_test/run_call_test.ksh new file mode 100644 index 0000000..08562ed --- /dev/null +++ b/test/app_test/run_call_test.ksh @@ -0,0 +1,230 @@ +#!/usr/bin/env ksh +# :vi ts=4 sw=4 noet : +#================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#================================================================================== +# + +# --------------------------------------------------------------------------------- +# Mnemonic: run_call_test.ksh +# Abstract: This is a simple script to set up and run the basic send/receive +# processes for some library validation on top of nano/nng. +# It should be possible to clone the repo, switch to this directory +# and execute 'ksh run -B' which will build RMr, make the sender and +# recevier then run the basic test. +# +# The call test drives three RMr based processes: +# caller which invokes rmr_mt_call() to send n messages +# +# receiver which executes rmr_rts_msg() on each received +# message with type == 5. +# +# sender which sends n "pings" to the caller process +# +# The script assumes that all three proessess are running on the same +# host/container such that setting up the route table with localhost:port +# will work. +# +# The number of message, and the delay between each message may be given +# on the command line. The number of threads may also be given. The +# number of messages defines the number _each_ caller will sent (5000 +# messages and three threads == 15,000 total messages. +# +# Example command line: +# ksh ./run_call_test.ksh # default 10 messages at 1 msg/sec 3 threads +# ksh ./run_call_test.ksh -n 5000 -t 6 -d 400 # 5k messages, 6 threads, delay 400 mus +# +# Date: 20 May 2019 +# Author: E. Scott Daniels +# --------------------------------------------------------------------------------- + + +# The sender and receiver are run asynch. Their exit statuses are captured in a +# file in order for the 'main' to pick them up easily. +# +function run_sender { + if (( nano_sender )) + then + # ./sender_nano $nmsg $delay + echo "nano is not supported" + exit 1 + else + ./caller $nmsg $delay $nthreads + fi + echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch +} + +# start receiver listening for nmsgs from each thread +function run_rcvr { + if (( nano_receiver )) + then + #./receiver_nano $(( nmsg * nthreads )) + echo "nano is not supported" + exit 1 + else + ./mt_receiver $(( nmsg * nthreads )) # we'll test with the RMr multi threaded receive function + fi + echo $? >/tmp/PID$$.rrc +} + +# This will send 100 messages to the caller process. This is a test to verify that +# threaded calling is not affected by normal messages and that normal messages can +# be received concurrently. +# +function run_pinger { + RMR_RTG_SVC=9999 ./sender 100 100 4 3333 >/dev/NULL 2>&1 # send pings +} + + +# Generate a route table that is tailored to our needs of sender sending messages to +# the caller, and caller sending mtype == 5 to the receiver. +# +function mk_rt { + +cat <caller.rt +# this is a specialised rt for caller testing. mtype 5 go to the +# receiver, and all others go to the caller. + +newrt | start +mse | 0 | 0 | localhost:43086 +mse | 1 | 10 | localhost:43086 +mse | 2 | 20 | localhost:43086 +rte | 3 | localhost:43086 +mse | 3 | 100 | localhost:43086 # special test to ensure that this does not affect previous entry +rte | 4 | localhost:43086 +rte | 5 | localhost:4560 +rte | 6 | localhost:43086 +rte | 7 | localhost:43086 +rte | 8 | localhost:43086 +rte | 9 | localhost:43086 +rte | 10 | localhost:43086 +rte | 11 | localhost:43086 +rte | 12 | localhost:43086 +rte | 13 | localhost:43086 + +newrt | end | 16 +endKat +} + +# --------------------------------------------------------- + +nmsg=10 # total number of messages to be exchanged (-n value changes) +delay=1000000 # microsec sleep between msg 1,000,000 == 1s +nano_sender=0 # start nano version if set (-N) +nano_receiver=0 +wait=1 +rebuild=0 +verbose=0 +nthreads=3 +dev_base=1 # -D turns off to allow this to run on installed libs + +nano_sender=0 # mt-call is not supported in nano +nano_receiver=0 + + +while [[ $1 == -* ]] +do + case $1 in + -B) rebuild=1;; + -d) delay=$2; shift;; + -D) dev_base=0;; + -n) nmsg=$2; shift;; + -t) nthreads=$2; shift;; + -v) verbose=1;; + + *) echo "unrecognised option: $1" + echo "usage: $0 [-B] [-d micro-sec-delay] [-n num-msgs] [-t num-threads]" + echo " -B forces a rebuild which will use .build" + exit 1 + ;; + esac + + shift +done + +if (( verbose )) +then + echo "2" >.verbose + export RMR_VCTL_FILE=".verbose" +fi + +if (( rebuild )) +then + build_path=../../.build + set -e + ksh ./rebuild.ksh + set +e +else + build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option + + if [[ ! -d $build_path ]] + then + echo "cannot find build in: $build_path" + echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this" + exit 1 + fi +fi + +if (( dev_base )) # assume we are testing against what we've built, not what is installed +then + if [[ -d $build_path/lib64 ]] + then + export LD_LIBRARY_PATH=$build_path:$build_path/lib64 + else + export LD_LIBRARY_PATH=$build_path:$build_path/lib + fi +else # -D option gets us here to test an installed library + export LD_LIBRARY_PATH=/usr/local/lib + export LIBRARY_PATH=$LD_LIBRARY_PATH +fi + +export RMR_SEED_RT=${RMR_SEED_RT:-./caller.rt} # allow easy testing with different rt + +if [[ ! -f $RMR_SEED_RT ]] # special caller rt for three process setup +then + mk_rt +fi + +if [[ ! -f ./sender ]] +then + if ! make >/dev/null 2>&1 + then + echo "[FAIL] cannot find sender binary, and cannot make it.... humm?" + exit 1 + fi +fi + +run_rcvr & +sleep 2 # if caller starts faster than rcvr we can drop, so pause a bit +run_sender & +run_pinger & + +wait +head -1 /tmp/PID$$.rrc | read rrc # get pass/fail state from each +head -1 /tmp/PID$$.src | read src + +if (( !! (src + rrc) )) +then + echo "[FAIL] sender rc=$src receiver rc=$rrc" +else + echo "[PASS] sender rc=$src receiver rc=$rrc" +fi + +rm /tmp/PID$$.* +rm -f .verbose + +exit $(( !! (src + rrc) )) + diff --git a/test/hdr_static_test.c b/test/hdr_static_test.c index 82a01a8..f716378 100644 --- a/test/hdr_static_test.c +++ b/test/hdr_static_test.c @@ -34,6 +34,8 @@ #include #include #include +#include +#include #include #include @@ -41,13 +43,13 @@ #include #include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" -#include "../src/nng/include/rmr_nng_private.h" +#include "rmr.h" +#include "rmr_agnostic.h" +#include "rmr_nng_private.h" #define EMULATE_NNG #include "test_nng_em.c" -#include "../src/nng/src/sr_nng_static.c" +#include "sr_nng_static.c" #include "test_support.c" diff --git a/test/mbuf_api_static_test.c b/test/mbuf_api_static_test.c index b2f1f7d..99452dc 100644 --- a/test/mbuf_api_static_test.c +++ b/test/mbuf_api_static_test.c @@ -34,9 +34,11 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" +#include "rmr.h" +#include "rmr_agnostic.h" int mbuf_api_test( ) { diff --git a/test/mbuf_api_test.c b/test/mbuf_api_test.c index 573415e..fdc221a 100644 --- a/test/mbuf_api_test.c +++ b/test/mbuf_api_test.c @@ -39,12 +39,14 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" +#include "rmr.h" +#include "rmr_agnostic.h" -#include "../src/common/src/mbuf_api.c" // module under test +#include "mbuf_api.c" #include "test_support.c" // our private library of test tools #include "mbuf_api_static_test.c" // test functions diff --git a/test/ring_static_test.c b/test/ring_static_test.c index e0c9b6e..bd2b428 100644 --- a/test/ring_static_test.c +++ b/test/ring_static_test.c @@ -34,10 +34,11 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" -//#include "../src/common/src/ring_static.c" +#include "rmr.h" +#include "rmr_agnostic.h" /* diff --git a/test/ring_test.c b/test/ring_test.c index 5e1a81c..51b8260 100644 --- a/test/ring_test.c +++ b/test/ring_test.c @@ -35,10 +35,12 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" -#include "../src/common/src/ring_static.c" +#include "rmr.h" +#include "rmr_agnostic.h" +#include "ring_static.c" #include "test_support.c" // things like fail_if() #include "ring_static_test.c" // the actual tests diff --git a/test/rmr_nano_test.c b/test/rmr_nano_test.c index 50c4266..c161f91 100644 --- a/test/rmr_nano_test.c +++ b/test/rmr_nano_test.c @@ -54,39 +54,32 @@ #include #include #include +#include +#include #define DEBUG 1 #include -//#include -//#include -//#include -//#include #undef EMULATE_NNG #include "test_nng_em.c" // nng/nn emulation (before including things under test) -#include "../src/common/include/rmr.h" // things the users see -#include "../src/common/include/rmr_symtab.h" -#include "../src/common/include/rmr_agnostic.h" // transport agnostic header -#include "../src/nanomsg/include/rmr_private.h" // transport specific +#include "rmr.h" // things the users see +#include "rmr_symtab.h" +#include "rmr_agnostic.h" // transport agnostic header +#include "rmr_private.h" // transport specific -#include "../src/common/src/symtab.c" -#include "../src/nanomsg/src/rmr.c" -#include "../src/common/src/mbuf_api.c" -#include "../src/nanomsg/src/rtable_static.c" +#include "symtab.c" +#include "rmr.c" +#include "mbuf_api.c" +#include "rtable_static.c" static void gen_rt( uta_ctx_t* ctx ); // defined in sr_static_test, but used by a few others (eliminate order requirement below) // specific test tools in this directory #include "test_support.c" // things like fail_if() // and finally.... -//#include "tools_static_test.c" // local test functions pulled directly because of static nature of things -//#include "symtab_static_test.c" -//#include "ring_static_test.c" -//#include "rt_static_test.c" -//#include "wormhole_static_test.c" #include "mbuf_api_static_test.c" #include "sr_nano_static_test.c" diff --git a/test/rmr_nng_api_static_test.c b/test/rmr_nng_api_static_test.c index 2db1a20..0cb49a5 100644 --- a/test/rmr_nng_api_static_test.c +++ b/test/rmr_nng_api_static_test.c @@ -57,9 +57,11 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" +#include "rmr.h" +#include "rmr_agnostic.h" /* Send a 'burst' of messages to drive some send retry failures to increase RMr coverage @@ -84,6 +86,22 @@ static void send_n_msgs( void* ctx, int n ) { } } +/* + Refresh or allocate a message with some default values +*/ +static rmr_mbuf_t* fresh_msg( void* ctx, rmr_mbuf_t* msg ) { + if( ! msg ) { + msg = rmr_alloc_msg( ctx, 2048 ); + } + + msg->mtype = 0; + msg->sub_id = -1; + msg->state = 0; + msg->len = 100; + + return msg; +} + static int rmr_api_test( ) { int errors = 0; void* rmc; // route manager context @@ -113,6 +131,7 @@ static int rmr_api_test( ) { errors += fail_if_nil( rmc, "rmr_init returned a nil pointer when driving for default port " ); } + v = rmr_ready( rmc ); // unknown return; not checking at the moment msg = rmr_alloc_msg( NULL, 1024 ); // should return nil pointer @@ -285,7 +304,7 @@ static int rmr_api_test( ) { errors += fail_if( i >= 40, "torcv_msg never returned a timeout " ); - // ---- trace things that are not a part of the mbuf_api functions and thus must be tested here + // ---- trace things that are not a part of the mbuf_api functions and thus must be tested here ------- state = rmr_init_trace( NULL, 37 ); // coverage test nil context errors += fail_not_equal( state, 0, "attempt to initialise trace with nil context returned non-zero state (a) " ); errors += fail_if_equal( errno, 0, "attempt to initialise trace with nil context did not set errno as expected " ); @@ -306,10 +325,110 @@ static int rmr_api_test( ) { em_send_failures = 0; + ((uta_ctx_t *)rmc)->shutdown = 1; rmr_close( NULL ); // drive for coverage rmr_close( rmc ); // no return to check; drive for coverage + // -- allocate a new context for mt-call and drive that stuff ----------------------------------------- +#ifdef EMULATE_NNG + msg = fresh_msg( rmc, msg ); // ensure we have one with known contents + + msg->state = 0; + msg = rmr_mt_call( rmc, msg, 3, 10 ); // drive when not in mt setup + if( msg ) { + errors += fail_not_equal( msg->state, RMR_ERR_NOTSUPP, "rmr_mt_call did not set not supported error when not mt initialised" ); + } else { + errors += fail_if_nil( msg, "rmr_mt_call returned nil pointer when not mt initialised" ); + } + msg = fresh_msg( rmc, msg ); + + msg = rmr_mt_rcv( rmc, msg, 10 ); // gen not supported error if ctx not set for mt + if( msg ) { + errors += fail_not_equal( msg->state, RMR_ERR_NOTSUPP, "rmr_mt_rcv did not return not supported state when mt not initialised" ); + } else { + errors += fail_if_nil( msg, "nil pointer from rmr_mt_rcv when mt not initialised\n" ); + } + msg = fresh_msg( rmc, msg ); + + msg->state = 0; + if( msg ) { + msg = rmr_mt_call( rmc, msg, 1000, 10 ); // thread id out of range + errors += fail_if_equal( msg->state, 0, "rmr_mt_call did not set an error when given an invalid call-id" ); + } else { + errors += fail_if_nil( msg, "rmr_mt_call returned a nil pointer when given an invalid call-id" ); + } + msg = fresh_msg( rmc, msg ); + + state = init_mtcall( NULL ); // drive for coverage + errors += fail_not_equal( state, 0, "init_mtcall did not return false (a) when given a nil context pointer" ); + + + if( (rmc = rmr_init( NULL, 1024, FL_NOTHREAD | RMRFL_MTCALL )) == NULL ) { // drive multi-call setup code without rtc thread + errors += fail_if_nil( rmc, "rmr_init returned a nil pointer when driving for mt-call setup " ); + } + + gen_rt( rmc ); // must attach a route table so sends succeed + + fprintf( stderr, " enabling mt messages\n" ); + em_set_rcvdelay( 1 ); // force slow msg rate during mt testing + em_set_mtc_msgs( 1 ); // emulated nngrcv will now generate messages with call-id and call flag + + msg->state = 0; + msg = rmr_mt_call( NULL, msg, 3, 10 ); // should timeout + if( msg ) { + errors += fail_if( msg->state == 0, "rmr_mt_call did not set message state when given message with nil context " ); + } + msg = fresh_msg( rmc, msg ); + + fprintf( stderr, " invoking mt_call with timout == 2999\n" ); + msg = rmr_mt_call( rmc, msg, 2, 2999 ); // long timeout to drive time building code, should receive + if( msg ) { + if( msg->state != RMR_OK ) { + fprintf( stderr, " rmr_mt_call returned error in mbuf: %d\n", msg->state ); + } else { + errors += fail_not_nil( msg, "rmr_mt_call did not return a nil pointer on read timeout" ); + } + } + msg = fresh_msg( rmc, msg ); + + msg = rmr_mt_rcv( NULL, NULL, 10 ); + errors += fail_not_nil( msg, "rmr_mt_rcv returned a non-nil message when given nil message and nil context" ); + + fprintf( stderr, " invoking mt_rcv with timout == 2999\n" ); + msg = fresh_msg( rmc, msg ); + msg = rmr_mt_rcv( rmc, msg, 2999 ); + if( !msg ) { + errors += fail_if_nil( msg, "rmr_mt_rcv returned a nil message when given valid message and timeout of 29999" ); + } + msg = fresh_msg( rmc, msg ); + + msg = rmr_mt_rcv( rmc, msg, -1 ); + if( !msg ) { + errors += fail_if_nil( msg, "rmr_mt_rcv returned a nil message when given valid message unlimited timeout" ); + } + msg = fresh_msg( rmc, msg ); + + fprintf( stderr, " waiting 20.0 seconds for a known call xaction to arrive (%ld)\n", time( NULL ) ); + snprintf( msg->xaction, 17, "%015d", 5 ); // we'll reset receive counter before calling mt_call so this will arrive again + em_set_rcvcount( 0 ); + msg = rmr_mt_call( rmc, msg, 2, 15000 ); // we need about 10s to get the message with the 'slow rate' + if( msg ) { + errors += fail_not_equal( msg->state, RMR_OK, "mt_call with known xaction id bad state (a)" ); + } else { + errors += fail_if_nil( msg, "mt_call with known xaction id returned nil message" ); + } + fprintf( stderr, " time check: %ld\n", time( NULL ) ); + + + em_set_mtc_msgs( 0 ); // turn off + em_set_rcvdelay( 0 ); // full speed receive rate + ((uta_ctx_t *)rmc)->shutdown = 1; // force the mt-reciver attached to the context to stop +#endif + + + // --------------- phew, done ------------------------------------------------------------------------------ + if( ! errors ) { fprintf( stderr, " all RMr API tests pass\n" ); } diff --git a/test/rmr_nng_test.c b/test/rmr_nng_test.c index 5dbeb21..1a8df89 100644 --- a/test/rmr_nng_test.c +++ b/test/rmr_nng_test.c @@ -51,6 +51,8 @@ #include #include #include +#include +#include #define DEBUG 1 @@ -64,14 +66,14 @@ #include "test_nng_em.c" // nng/nn emulation (before including things under test) -#include "../src/common/include/rmr.h" // things the users see -#include "../src/common/include/rmr_symtab.h" -#include "../src/common/include/rmr_agnostic.h" // transport agnostic header -#include "../src/nng/include/rmr_nng_private.h" // transport specific +#include "rmr.h" // things the users see +#include "rmr_symtab.h" +#include "rmr_agnostic.h" // transport agnostic header +#include "rmr_nng_private.h" // transport specific -#include "../src/common/src/symtab.c" -#include "../src/nng/src/rmr_nng.c" -#include "../src/common/src/mbuf_api.c" +#include "symtab.c" +#include "rmr_nng.c" +#include "mbuf_api.c" static void gen_rt( uta_ctx_t* ctx ); // defined in sr_nng_static_test, but used by a few others (eliminate order requirement below) diff --git a/test/rt_nano_static_test.c b/test/rt_nano_static_test.c index 90cad7e..49cff8a 100644 --- a/test/rt_nano_static_test.c +++ b/test/rt_nano_static_test.c @@ -34,9 +34,11 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" +#include "rmr.h" +#include "rmr_agnostic.h" typedef struct entry_info { int group; diff --git a/test/rt_static_test.c b/test/rt_static_test.c index 972b767..13a216b 100644 --- a/test/rt_static_test.c +++ b/test/rt_static_test.c @@ -34,9 +34,11 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" +#include "rmr.h" +#include "rmr_agnostic.h" typedef struct entry_info { int group; @@ -276,16 +278,12 @@ static int rt_test( ) { uta_fib( "no-suhch-file" ); // drive some error checking for coverage -/* - if( ctx ) { - if( ctx->rtg_addr ) { - free( ctx->rtg_addr ); - } - free( ctx ); - } -*/ - state = uta_link2( "worm", NULL, NULL ); + ep = (endpoint_t *) malloc( sizeof( *ep ) ); + pthread_mutex_init( &ep->gate, NULL ); + ep->name = strdup( "worm" ); + ep->addr = NULL; + state = uta_link2( ep ); errors += fail_if_true( state, "link2 did not return false when given nil pointers" ); state = uta_epsock_rr( rt, 122, 0, NULL, NULL ); diff --git a/test/sr_nano_static_test.c b/test/sr_nano_static_test.c index 8e939da..14c678f 100644 --- a/test/sr_nano_static_test.c +++ b/test/sr_nano_static_test.c @@ -34,12 +34,16 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" +#include "rmr.h" +#include "rmr_agnostic.h" /* Generate a simple route table (for all but direct route table testing). + This table contains multiple tables inasmuch as a second update set of + records follows the initial set. */ static void gen_rt( uta_ctx_t* ctx ) { int fd; diff --git a/test/sr_nng_static_test.c b/test/sr_nng_static_test.c index c58b281..809deba 100644 --- a/test/sr_nng_static_test.c +++ b/test/sr_nng_static_test.c @@ -34,9 +34,11 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" +#include "rmr.h" +#include "rmr_agnostic.h" /* Generate a simple route table (for all but direct route table testing). @@ -162,7 +164,7 @@ static int sr_nng_test() { mbuf = rmr_rcv_msg( ctx, NULL ); } - size = 2048 - sizeof( uta_mhdr_t ); // emulated nng receive allocates 2K payloads + size = 2048 - em_hdr_size(); // emulated nng receive allocates 2K buffers -- subtract off header size state = rmr_payload_size( mbuf ); errors += fail_not_equal( state, size, "payload size didn't return expected value" ); // receive should always give 4k buffer diff --git a/test/symtab_static_test.c b/test/symtab_static_test.c index f0649c1..7d5a39c 100644 --- a/test/symtab_static_test.c +++ b/test/symtab_static_test.c @@ -28,7 +28,7 @@ Author: E. Scott Daniels */ -#include "../src/common/include/rmr_symtab.h" +#include "rmr_symtab.h" // -- parent must include if needed #include "../src/common/src/symtab.c" #include "test_support.c" diff --git a/test/symtab_test.c b/test/symtab_test.c index f39085c..b755e9c 100644 --- a/test/symtab_test.c +++ b/test/symtab_test.c @@ -29,8 +29,8 @@ #define NO_DUMMY_RMR 1 // no dummy rmr functions; we don't pull in rmr.h or agnostic.h -#include "../src/common/include/rmr_symtab.h" -#include "../src/common/src/symtab.c" +#include "rmr_symtab.h" +#include "symtab.c" #include "test_support.c" diff --git a/test/test_nng_em.c b/test/test_nng_em.c index f1976b9..c2101f5 100644 --- a/test/test_nng_em.c +++ b/test/test_nng_em.c @@ -29,14 +29,28 @@ Author: E. Scott Daniels */ + +#include "rmr.h" // we use some of rmr defs in building dummy messages, so we need these +#include "rmr_agnostic.h" + // ---------------------- emulated nng functions --------------------------- #ifndef _em_nn #define _em_nn +#include + static int em_send_failures = 0; // test programme can set this to emulate eagain send failures static int em_timeout = -1; // set by set socket option +static int em_mtc_msgs = 0; // set to generate 'received' messages with mt-call header data +static int return_value = 0; // functions should return this value +static int rcv_count = 0; // receive counter for transaction id to allow test to rest +static int rcv_delay = 0; // forced delay before call to rcvmsg starts to work + +static int gates_ok = 0; +static pthread_mutex_t rcv_gate; + // ----------- epoll emulation --------------------------------------------- @@ -95,17 +109,12 @@ struct em_msg { int32_t len1; // length of the tracing data int32_t len2; // length of data 1 (d1) int32_t len3; // length of data 2 (d2) - + int32_t sub_id; // subscription id (-1 invalid) }; -static int return_value = 0; -//-------------------------------------------------------------------------- -#ifdef EMULATE_NNG -struct nn_msghdr { - int boo; -}; +// -- emulation control functions ------------------------------------------------------ /* Test app can call this to have all emulated functions return failure instead @@ -122,34 +131,106 @@ static int em_nng_foo() { } +/* + Turns on/off the generation of multi-threaded call messages +*/ +static int em_set_mtc_msgs( int state ) { + em_mtc_msgs = state; +} + +/* + Returns the size of the header we inserted +*/ +static int em_hdr_size() { + if( em_mtc_msgs ) { + return (int) sizeof( struct em_msg ) + 4; + } + + return (int) sizeof( struct em_msg ); +} + +static void em_set_rcvcount( int v ) { + rcv_count = v; +} + +static void em_set_rcvdelay( int v ) { + rcv_delay = v; +} + +static void em_start() { + if( ! gates_ok ) { + pthread_mutex_init( &rcv_gate, NULL ); + gates_ok = 1; + } +} + +//-------------------------------------------------------------------------- +#ifdef EMULATE_NNG +struct nn_msghdr { + int boo; +}; + + /* Receive message must allocate a new buffer and return the pointer into *m. Every 9 messages or so we'll simulate an old version message + + If em_mtc_msgs is set, then we add a non-zero d1 field with + the call-id set to 2, and alternate the call flag */ static int em_nng_recvmsg( nng_socket s, nng_msg ** m, int i ) { + static int call_flag = 0; + void* b; struct em_msg* msg; - static int count = 0; // we'll simulate a message going in by dropping an rmr-ish msg with transaction id only int trace_size = 0; + int d1_size = 0; + unsigned char* d1; + + if( rcv_delay > 0 ) { + sleep( rcv_delay ); + } - //sleep( 1 ); + if( em_mtc_msgs ) { + d1_size = 4; + } b = (void *) malloc( 2048 ); if( m != NULL ) { memset( b, 0, 2048 ); + *m = (nng_msg *) b; msg = (struct em_msg *) b; - if( count % 10 == 9 ) { - //msg->rmr_ver = htonl( MSG_VER ); - msg->rmr_ver = ALT_MSG_VER; // emulate the bug in RMr v1 + if( ! em_mtc_msgs && (rcv_count % 10) == 9 ) { + msg->rmr_ver = ALT_MSG_VER; // allow emulation the bug in RMr v1 } else { msg->rmr_ver = htonl( MSG_VER ); } + msg->mtype = htonl( 1 ); msg->plen = htonl( 220 ); msg->len0 = htonl( sizeof( struct em_msg ) ); msg->len1 = htonl( trace_size ); - snprintf( msg->xid, 32, "%015d", count++ ); // simple transaction id so we can test receive specific and ring stuff + msg->len2 = htonl( d1_size ); + msg->len3 = htonl( 0 ); + + pthread_mutex_lock( &rcv_gate ); // hold lock to update counter/flag + if( em_mtc_msgs ) { + d1 = DATA1_ADDR( msg ); + d1[0] = 2; // simulated msgs always on chute 2 + if( call_flag ) { + rcv_count++; + msg->flags |= HFL_CALL_MSG; + } + if( rcv_delay > 0 ) { + fprintf( stderr, " count=%d flag=%d %02x \n", rcv_count, call_flag, msg->flags ); + } + call_flag = !call_flag; + } else { + rcv_count++; + } + pthread_mutex_unlock( &rcv_gate ); + snprintf( msg->xid, 32, "%015d", rcv_count ); // simple transaction id so we can test receive specific and ring stuff snprintf( msg->src, 16, "localhost:4562" ); // set src id (unrealistic) so that rts() can be tested } diff --git a/test/tools_test.c b/test/tools_test.c index eac19a6..6725e9b 100644 --- a/test/tools_test.c +++ b/test/tools_test.c @@ -35,9 +35,11 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" +#include "rmr.h" +#include "rmr_agnostic.h" #include "test_support.c" // our private library of test tools // ===== dummy context for tools testing so we don't have to pull in all of the nano/nng specific stuff ===== @@ -66,7 +68,7 @@ struct uta_ctx { }; -#include "../src/common/src/tools_static.c" +#include "tools_static.c" int main( ) { diff --git a/test/unit_test.ksh b/test/unit_test.ksh index 9953733..559f7d3 100755 --- a/test/unit_test.ksh +++ b/test/unit_test.ksh @@ -320,7 +320,7 @@ else fi fi -export C_INCLUDE_PATH="../src/common/include:$C_INCLUDE_PATH" +export C_INCLUDE_PATH="../src/rmr/common/include:$C_INCLUDE_PATH" module_cov_target=80 builder="make -B %s" # default to plain ole make diff --git a/test/wormhole_static_test.c b/test/wormhole_static_test.c index f2d4bf6..06cb36c 100644 --- a/test/wormhole_static_test.c +++ b/test/wormhole_static_test.c @@ -34,9 +34,11 @@ #include #include #include +#include +#include -#include "../src/common/include/rmr.h" -#include "../src/common/include/rmr_agnostic.h" +#include "rmr.h" +#include "rmr_agnostic.h" /* -- 2.16.6