set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "0" )
-set( patch_level "25" )
+set( patch_level "26" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_lib "lib" )
rmr_tralloc_msg.3
rmr_get_trlen.3
rmr_get_src.3
+ rmr_mt_call.3
+ rmr_mt_rcv.3
)
# empty list of roff/troff input files we generated
which must be allocated, possibly, prior to the application knowing the actual size of
a specific message.
+&space
+&ital(Flags) allows for selection of some RMr options at the time of initialisation.
+These are set by ORing &cw(RMRFL_) constants from the RMr header file. Currently the
+following flags are supported:
+
+&half_space
+&beg_dlist(1i : &bold_font )
+&ditem(RMRFL_NONE)
+ No flags are set.
+
+&half_space
+&ditem(RMRFL_NOTHREAD)
+ The route table collector thread is not to be started. This should only be used
+ by the route table generator application if it is based on RMr.
+
+&half_space
+&ditem(RMRFL_MTCALL)
+ Enable multi-threaded call support.
+&end_dlist
+
+&h3(Multi-threaded Calling)
+The support for an application to issue a &ital(blocking call) by the &cw(rmr_call()) function
+was limited such that only user applications which were operating in a single thread
+could safely use the function.
+Further, timeouts were message count based and not time unit based.
+Multi-threaded call support adds the ability for a user application with multiple threads
+to invoke a blocking call function with the guarentee that the correct response message
+is delivered to the thread.
+The additional support is implemented with the &ital( rmr_mt_call() ) and &ital( rmr_mt_rcv() )
+function calls.
+&space
+
+Multi-threaded call support requires the user application to specifically enable it
+when RMr is initialised.
+This is necessary because a second, dedicated, receiver thread must be started, and
+requires all messages to be examined and queued by this thread.
+The additional overhead is minimal, queuing information is all in the RMr message
+header, but as an additional process is necessary the user application must "opt in"
+to this approach.
+
&space
&h2(ENVIRONMENT)
As a part of the initialisation process &cw(rmr_init) will look into the available
rmr_call(3),
rmr_free_msg(3),
rmr_get_rcvfd(3),
+rmr_mt_call(3),
+rmr_mt_rcv(3),
rmr_payload_size(3),
rmr_send_msg(3),
rmr_rcv_msg(3),
--- /dev/null
+.if false
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+.fi
+
+
+.if false
+ Mnemonic rmr_mt_call_man.xfm
+ Abstract The manual page for the rmr multi-threaded call function.
+ Author E. Scott Daniels
+ Date 24 May 2019
+.fi
+
+.** if formatting with tfm, the roff.im will cause roff output to be generated
+.** if formatting with pfm, then pretty postscript will be generated
+.gv e LIB lib
+.if pfm
+ .im &{lib}/generic_ps.im
+.ei
+ .gv e OUTPUT_RST use_rst
+ .if .ev &use_rst 1 =
+ .im &{lib}/rst.im
+ .ei
+ .im &{lib}/roff.im
+ .fi
+.fi
+
+&line_len(6i)
+
+&h1(RMR Library Functions)
+&h2(NAME)
+ rmr_mt_call
+
+&h2(SYNOPSIS )
+&indent
+&ex_start
+#include <rmr/rmr.h>
+
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* msg, int id, int timeout );
+&ex_end
+&uindent
+
+&h2(DESCRIPTION)
+The &cw(rmr_mt_call) function sends the user application message to a remote
+endpoint, and waits for a corresponding response message before returning
+control to the user application.
+The user application supplies a completed message buffer, as it would for
+a &cw(rmr_send_msg) call, but unlike with a send, the buffer returned will have
+the response from the application that received the message.
+The thread invoking the &ital( rmr_mt_call()) will block until a message arrives
+or until &ital(timeout) milliseconds has passed; which ever comes first.
+Using a timeout value of zero (0) will cause the thread to block without a timeout.
+
+&space
+The &ital( id ) supplied as the third parameter is an integer in the range of 2 through
+255 inclusive.
+This is a caller defined "thread number" and is used to match the response message
+with the correct user application thread.
+If the ID value is not in the proper range, the attempt to make the call will fail.
+
+&space
+Messages which are received while waiting for the response are queued on a &ital(normal)
+receive queue and will be delivered to the user application with the next invocation
+of &itaL( rmr_mt_rcv() ) or &ital( rmr_rvv_msg().)
+by RMR, and are returned to the user application when &cw(rmr_rcv_msg) is
+invoked.
+These messages are returned in th order received, one per call to &cw(rmr_rcv_msg.)
+
+&space
+NOTE: Currently the multi-threaded functions are supported only when the NNG
+transport mechanism is being used. It will not be possible to link a programme
+using the nanomsg version of the library when references to this function are
+present.
+
+&h3(The Transaction ID)
+The user application is responsible for setting the value of the transaction ID field
+before invoking &ital(rmr_mt_call.)
+The transaction ID is a &cw(RMR_MAX_XID) byte field that is used to match the
+response message when it arrives.
+RMr will compare &bold(all) of the bytes in the field, so the caller must ensure
+that they are set correctly to avoid missing the response message.
+(The application which returns the response message is also expected to ensure that
+the return buffer has the matching transaction ID. This can be done transparently if
+the application uses the &ital( rmr_rts_msg() ) function and does not adjust the
+transaction ID.
+
+
+&h2(RETURN VALUE)
+The &cw(rmr_mt_call) function returns a pointer to a message buffer with the state set to reflect
+the overall state of call processing.
+If the state is &cw(RMR_OK) then the buffer contains the response message; otherwise
+the state indicates the error encountered while attempting to send the message.
+
+&space
+If no response message is received when the timeout period has expired, a nil pointer
+will be returned (NULL).
+
+&h2(ERRORS)
+These values are reflected in the state field of the returned message.
+
+&half_space
+&beg_dlist(.75i : ^&bold_font )
+&di(RMR_OK) The call was successful and the message buffer references the response message.
+
+&half_space
+&di(RMR_ERR_BADARG) An argument passed to the function was invalid.
+
+&half_space
+&di(RMR_ERR_CALLFAILED) The call failed and the value of &ital(errno,) as described below,
+ should be checked for the specific reason.
+
+&half_space
+&di(RMR_ERR_NOENDPT) An endpoint associated with the message type could not be found in the
+ route table.
+
+&half_space
+&di(RMR_ERR_RETRY) The underlying transport mechanism was unable to accept the message
+ for sending. The user application can retry the call operation if appropriate to
+ do so.
+
+&end_dlist
+
+&space
+The global "variable" &ital(errno) will be set to one of the following values if the
+overall call processing was not successful.
+&half_space
+
+&beg_dlist(.75i : ^&bold_font )
+&di(ETIMEDOUT) Too many messages were queued before receiving the expected response
+
+&half_space
+&di(ENOBUFS) The queued message ring is full, messages were dropped
+
+&half_space
+&di(EINVAL) A parameter was not valid
+
+&half_space
+&di(EAGAIN) The underlying message system wsa interrupted or the device was busy;
+ the message was &bold(not) sent, and user application should call
+ this function with the message again.
+&end_dlist
+
+&h2(EXAMPLE)
+The following code bit shows one way of using the &cw(rmr_mt_call) function, and illustrates
+how the transaction ID must be set.
+
+&space
+&ex_start
+ int retries_left = 5; // max retries on dev not available
+ static rmr_mbuf_t* mbuf = NULL; // response msg
+ msg_t* pm; // private message (payload)
+
+ // get a send buffer and reference the payload
+ mbuf = rmr_alloc_msg( mr, RMR_MAX_RCV_BYTES );
+ pm = (msg_t*) mbuf->payload;
+
+ // generate an xaction ID and fill in payload with data and msg type
+ rmr_bytes2xact( mbuf, xid, RMR_MAX_XID );
+ snprintf( pm->req, sizeof( pm->req ), "{ \"req\": \"num users\"}" );
+ mbuf->mtype = MT_USR_RESP;
+
+ msg = rmr_mt_call( mr, msg, my_id, 100 ); // wait up to 100ms
+ if( ! msg ) { // probably a timeout and no msg received
+ return NULL; // let errno trickle up
+ }
+
+ if( mbuf->state != RMR_OK ) {
+ while( retries_left-- > 0 && // loop as long as eagain
+ mbuf->state == RMR_ERR_RETRY &&
+ (msg = rmr_mt_call( mr, msg )) != NULL &&
+ mbuf->state != RMR_OK ) {
+
+ usleep( retry_delay );
+ }
+
+ if( mbuf == NULL || mbuf->state != RMR_OK ) {
+ rmr_free_msg( mbuf ); // safe if nil
+ return NULL;
+ }
+ }
+
+ // do something with mbuf
+&ex_end
+
+
+&h2(SEE ALSO )
+.ju off
+rmr_alloc_msg(3),
+rmr_free_msg(3),
+rmr_init(3),
+rmr_mt_rcv(3),
+rmr_payload_size(3),
+rmr_send_msg(3),
+rmr_rcv_msg(3),
+rmr_rcv_specific(3),
+rmr_rts_msg(3),
+rmr_ready(3),
+rmr_fib(3),
+rmr_has_str(3),
+rmr_tokenise(3),
+rmr_mk_ring(3),
+rmr_ring_free(3)
+.ju on
+
+
+.qu
+
--- /dev/null
+.if false
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+.fi
+.if false
+ Mnemonic rmr_mt_rcv_man.xfm
+ Abstract The manual page for the rmr_mt_rcv function.
+ Author E. Scott Daniels
+ Date 24 May 2019
+.fi
+
+.** if formatting with tfm, the roff.im will cause roff output to be generated
+.** if formatting with pfm, then pretty postscript will be generated
+.gv e LIB lib
+.if pfm
+ .im &{lib}/generic_ps.im
+.ei
+ .gv e OUTPUT_RST use_rst
+ .if .ev &use_rst 1 =
+ .im &{lib}/rst.im
+ .ei
+ .im &{lib}/roff.im
+ .fi
+.fi
+
+&line_len(6i)
+
+&h1(RMR Library Functions)
+&h2(NAME)
+ rmr_mt_rcv
+
+&h2(SYNOPSIS )
+&indent
+&ex_start
+#include <rmr/rmr.h>
+
+rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* old_msg, int timeout );
+&ex_end
+&uindent
+
+&h2(DESCRIPTION)
+The &cw(rmr_mt_rcv) function blocks until a message is received, or the timeout
+period (milliseconds) has passed.
+The result is an RMr message buffer which references a received message.
+In the case of a timeout the state will be reflected in an "empty buffer" (if old_msg
+was not nil, or simply with the return of a nil pointer.
+If a timeout value of zero (0) is given, then the function will block until
+the next message received.
+
+&space
+The &ital(vctx) pointer is the pointer returned by the &cw(rmr_init) function.
+&ital(Old_msg) is a pointer to a previously used message buffer or NULL.
+The ability to reuse message buffers helps to avoid alloc/free cycles in the
+user application.
+When no buffer is available to supply, the receive function will allocate one.
+
+&space
+The &ital(old_msg) parameter allows the user to pass a previously generated RMr
+message back to RMr for reuse.
+Optionally, the user application may pass a nil pointer if no reusable message
+is available.
+When a timeout occurs, and old_msg was not nil, the state will be returned
+by returning a pointer to the old message with the state set.
+
+&space
+It is possible to use the &ital(rmr_rcv_msg()) function instead of this function.
+Doing so might be advantagous if the user programme does not always start the
+multi-threaded mode and the use of &ital(rmr_rcv_msg()) would make the flow of
+the code more simple.
+The advantags of using this function are the ability to set a timeout without
+using epoll, and a small performance gain (if multi-threaded mode is enabled, and the
+&ital(rmr_rcv_msg()) function is used, it simply invokes this function without
+a timeout value, thus there is the small cost of a second call that results).
+Similarly, the &ital(rmr_torcv_msg()) call can be used when in multi-threaded
+mode with the same "pass through" overhead to using this function directly.
+
+&space
+NOTE: Currently the multi-threaded functions are supported only when the NNG
+transport mechanism is being used. It will not be possible to link a programme
+using the nanomsg version of the library when references to this function are
+present.
+
+&h2(RETURN VALUE)
+When a message is received before the timeout period expires, a pointer to the
+RMr message buffer which describes the message is returned.
+This will, with a high probability, be a different message buffer than &ital(old_msg;)
+the user application should not continue to use &ital(old_msg) after it is passed
+to this function.
+
+&space
+In the event of a timeout the return value will be the old msg with the state set,
+or a nil pointer if no old message was provided.
+
+&h2(ERRORS)
+The &ital(state) field in the message buffer will be set to one of the following
+values:
+&space
+
+&beg_dlist(.75i : ^&bold_font )
+&di(RMR_OK) The message was received without error.
+
+&half_space
+&di(RMR_ERR_BADARG) A parameter passed to the function was not valid (e.g. a nil pointer).
+
+indicate either &cw(RMR_OK) or
+&cw(RMR_ERR_EMPTY) if an empty message was received.
+
+&half_space
+&di(RMR_ERR_EMPTY) The message received had no associated data. The length of the
+ message will be 0.
+
+&half_space
+&di(RMR_ERR_NOTSUPP) The multi-threaded option was not enabled when RMr was
+initialised. See the man page for &ital(rmr_init() ) for details.
+
+&half_space
+&di(RMR_ERR_RCVFAILED) A hard error occurred preventing the receive from completing.
+&end_dlist
+
+
+When a nil pointer is returned, or any other state value was set in the message
+buffer, &cw(errno) will be set to one of the following:
+&space
+
+&beg_dlist(.75i : ^&bold_font )
+&di(INVAL) Parameter(s) passed to the function were not valid.
+
+&half_space
+&di(EBADF) The underlying message transport is unable to process the request.
+
+&half_space
+&di(ENOTSUP) The underlying message transport is unable to process the request.
+
+&half_space
+&di(EFSM) The underlying message transport is unable to process the request.
+
+&half_space
+&di(EAGAIN) The underlying message transport is unable to process the request.
+
+&half_space
+&di(EINTR) The underlying message transport is unable to process the request.
+
+&half_space
+&di(ETIMEDOUT) The underlying message transport is unable to process the request.
+
+&half_space
+&di(ETERM) The underlying message transport is unable to process the request.
+&end_dlist
+
+&h2(EXAMPLE)
+&space
+&ex_start
+ rmr_mbuf_t* mbuf = NULL; // received msg
+
+ msg = rmr_mt_recv( mr, mbuf, 100 ); // wait up to 100ms
+ if( msg != NULL ) {
+ switch( msg->state ) {
+ case RMR_OK:
+ printf( "got a good message\n" );
+ break;
+
+ case RMR_ERR_EMPTY:
+ printf( "received timed out\n" );
+ break;
+
+ default:
+ printf( "receive error: %d\n", mbuf->state );
+ break;
+ }
+ } else {
+ printf( "receive timeout (nil)\n" );
+ }
+&ex_end
+
+&h2(SEE ALSO )
+.ju off
+rmr_alloc_msg(3),
+rmr_call(3),
+rmr_free_msg(3),
+rmr_get_rcvfd(3),
+rmr_init(3),
+rmr_mk_ring(3),
+rmr_mt_call(3),
+rmr_payload_size(3),
+rmr_send_msg(3),
+rmr_torcv_msg(3),
+rmr_rcv_specific(3),
+rmr_rts_msg(3),
+rmr_ready(3),
+rmr_ring_free(3),
+rmr_torcv_msg(3)
+.ju on
+
+
+.qu
+
rmr_rcv_specific(3),
rmr_rts_msg(3),
rmr_ready(3),
-rmr_ring_free(3)
+rmr_ring_free(3),
+rmr_torcv_msg(3)
.ju on
#define RIC_UNDEFINED -1
// ---------------------------------------------------------
+#define RIC_SCTP_CONNECTION_FAILURE 1080
+
#define RIC_SUB_REQ 12010
#define RIC_SUB_RESP 12011
#define RIC_SUB_FAILURE 12012
#define RIC_X2_RESET 10070
#define RIC_X2_RESET_RESP 10071
+
#define RIC_ENDC_X2_SETUP_REQ 10360
#define RIC_ENDC_X2_SETUP_RESP 10361
#define RIC_ENDC_X2_SETUP_FAILURE 10362
// various flags for function calls
#define RMRFL_NONE 0x00 // no flags
-#define RMRFL_AUTO_ALLOC 0x01 // send auto allocates a zerocopy buffer
+#define RMRFL_NOTHREAD 0x01 // do not start an additional route collector thread
+#define RMRFL_MTCALL 0x02 // set up multi-threaded call support (rmr_init)
+#define RMRFL_AUTO_ALLOC 0x03 // send auto allocates a zerocopy buffer
#define RMR_DEF_SIZE 0 // pass as size to have msg allocation use the default msg size
#define RMR_ERR_UNSET 13 // the message hasn't been populated with a transport buffer
#define RMR_ERR_TRUNC 14 // received message likely truncated
#define RMR_ERR_INITFAILED 15 // initialisation of something (probably message) failed
+#define RMR_ERR_NOTSUPP 16 // the request is not supported, or RMr was not initialised for the request
#define RMR_WH_CONNECTED(a) (a>=0) // for now whid is integer; it could be pointer at some future date
extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg );
extern void rmr_wh_close( void* vctx, int whid );
+// ----- mt call support --------------------------------------------------------------------------------
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait );
+extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait );
// ----- msg buffer operations (no context needed) ------------------------------------------------------
extern int rmr_bytes2meid( rmr_mbuf_t* mbuf, unsigned char const* src, int len );
// internal flags, must be > than UFLAG_MASK
//#define IFL_....
+#define CFL_MTC_ENABLED 0x01 // multi-threaded call is enabled
+
// msg buffer flags
#define MFL_ZEROCOPY 0x01 // the message is an allocated zero copy message and can be sent.
#define MFL_NOALLOC 0x02 // send should NOT allocate a new buffer before returning
#define MAX_EP_GROUP 32 // max number of endpoints in a group
#define MAX_RTG_MSG_SZ 2048 // max expected message size from route generator
+#define MAX_CALL_ID 255 // largest call ID that is supported
//#define DEF_RTG_MSGID "" // default to pick up all messages from rtg
#define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on
#define RMR_D1_LEN(h) (ntohl(((uta_mhdr_t *)h)->len2))
#define RMR_D2_LEN(h) (ntohl(((uta_mhdr_t *)h)->len3))
+// CAUTION: if using an offset with a header pointer, the pointer MUST be cast to void* before adding the offset!
#define TRACE_OFFSET(h) ((ntohl(((uta_mhdr_t *)h)->len0)))
#define DATA1_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
#define DATA2_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
#define SET_HDR_D1_LEN(h,l) (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
#define SET_HDR_D2_LEN(h,l) (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
+ // index of things in the d1 data space
+#define D1_CALLID_IDX 0 // the call-id to match on return
+
+#define NO_CALL_ID 0 // no call id associated with the message (normal queue)
#define V1_PAYLOAD_OFFSET(h) (sizeof(uta_v1mhdr_t))
// v2 header flags
#define HFL_HAS_TRACE 0x01 // Trace data is populated
#define HFL_SUBID 0x02 // subscription ID is populated
+#define HFL_CALL_MSG 0x04 // msg sent via blocking call
/*
Message header; interpreted by the other side, but never seen by
} ring_t;
+// --------- multi-threaded call things -----------------------------------------
+/*
+ A chute provides a return path for a received message that a thread has blocked
+ on. The receive thread will set the mbuf pointer and tickler the barrier to
+ signal to the call thread that data is ready.
+*/
+typedef struct chute {
+ rmr_mbuf_t* mbuf; // pointer to message buffer received
+ sem_t barrier; // semaphore that the thread is waiting on
+ unsigned char expect[RMR_MAX_XID]; // the expected transaction ID
+} chute_t;
+
// -------------- common static prototypes --------------------------------------
#include <string.h>
#include <unistd.h>
#include <stdio.h>
+#include <semaphore.h>
#include "rmr.h" // things the users see
#include "rmr_agnostic.h" // agnostic things (must be included before private)
}
hdr = (uta_mhdr_t *) msg->header;
+ if( !hdr ) {
+ errno = EINVAL;
+ return 0;
+ }
+
len = RMR_TR_LEN( hdr );
if( len != size ) { // different sized trace data, must realloc the buffer
--- /dev/null
+// :vi sw=4 ts=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: mt_call_static.c
+ Abstract: Static multi-threaded call support.
+
+ Author: E. Scott Daniels
+ Date: 17 May 2019
+*/
+
+#ifndef mtcall_static_c
+#define mtcall_static_c
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <errno.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <time.h>
+
+#include <semaphore.h>
+
+
+/*
+ Initialises the chutes that are then hung off of the context.
+
+ Returns > 0 on success; 0 on failure with errno set.
+*/
+static int init_mtcall( uta_ctx_t* ctx ) {
+ int rc = 1; // return code 1== good.
+ int i;
+ chute_t* chutes;
+
+ if( ctx == NULL ) {
+ errno = EINVAL;
+ return 0;
+ }
+
+ chutes = ctx->chutes = (chute_t *) malloc( sizeof( chute_t ) * (MAX_CALL_ID+1) );
+ if( chutes == NULL ) {
+ return 0;
+ }
+
+ for( i = 0; i < MAX_CALL_ID; i++ ) { // initialise all of the semaphores
+ chutes[i].mbuf = NULL;
+ if( sem_init( &chutes[i].barrier, 0, 0 ) < 0 ) {
+ fprintf( stderr, "[ERR] rmr: unable to initialise mt call chute [%d]: %s\n", i, strerror( errno ) );
+ rc = -1;
+ }
+ }
+
+
+ return rc;
+}
+
+#endif
case 'u': // update current table, not a total replacement
tokens[1] = clip( tokens[1] );
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
+ if( ctx->new_rtable == NULL ) { // update table not in progress
+ break;
+ }
+
if( ntoks >2 ) {
if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
return NULL;
}
- ep->open = 0; // not connected
+ ep->open = 0; // not connected
ep->addr = uta_h2ip( ep_name );
ep->name = strdup( ep_name );
+ pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
rmr_sym_put( rt->hash, ep_name, 1, ep );
}
char* addr; // address used for connection
int nn_sock; // the nano-msg socket to write to for this entry
int open; // true if we've established the connection
+ pthread_mutex_t gate; // must be able to serialise some transport level functions on the ep
};
/*
#include <stdint.h>
#include <time.h>
#include <arpa/inet.h>
+#include <semaphore.h>
#include <nanomsg/nn.h>
#include <nanomsg/tcp.h>
nng_socket nn_sock; // the nano-msg socket to write to for this entry
nng_dialer dialer; // the connection specific information (retry timout etc)
int open; // set to true if we've connected as socket cannot be checked directly)
+ pthread_mutex_t gate; // we must serialise when we open/link to the endpoint
};
/*
int nrtele; // number of elements in the routing table
int send_retries; // number of retries send_msg() should attempt if eagain/timeout indicated by nng
int trace_data_len; // number of bytes to allocate in header for trace data
- int d1_len; // extra header data 1 length (future)
+ int d1_len; // extra header data 1 length
int d2_len; // extra header data 2 length (future)
nng_socket nn_sock; // our general listen socket
route_table_t* rtable; // the active route table
route_table_t* new_rtable; // route table under construction
if_addrs_t* ip_list; // list manager of the IP addresses that are on our known interfaces
void* mring; // ring where msgs are queued while waiting for a call response msg
+ chute_t* chutes;
char* rtg_addr; // addr/port of the route table generation publisher
int rtg_port; // the port that the rtg listens on
epoll_stuff_t* eps; // epoll information needed for the rcv with timeout call
pthread_t rtc_th; // thread info for the rtc listener
+ pthread_t mtc_th; // thread info for the multi-thread call receive process
};
static void free_ctx( uta_ctx_t* ctx );
// --- rt table things ---------------------------
-static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer );
+static int uta_link2( endpoint_t* ep );
static int rt_link2_ep( endpoint_t* ep );
static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock );
static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock );
--- /dev/null
+// : vi ts=4 sw=4 noet :
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: mt_call_nng_static.c
+ Abstract: Static funcitons related to the multi-threaded call feature
+ which are NNG specific.
+
+ Author: E. Scott Daniels
+ Date: 20 May 2019
+*/
+
+#ifndef _mtcall_nng_static_c
+#define _mtcall_nng_static_c
+#include <semaphore.h>
+
+static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
+ chute_t* chute;
+ int state;
+
+ if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
+ rmr_free_msg( mbuf ); // drop if ring is full
+ }
+
+ chute = &ctx->chutes[0];
+ chute->mbuf = mbuf;
+ state = sem_post( &chute->barrier ); // tickle the ring monitor
+}
+
+/*
+ This is expected to execute in a separate thread. It is responsible for
+ _all_ receives and queues them on the appropriate ring, or chute.
+
+ The "state" of the message is checked which determines where the message
+ is delivered.
+
+ Flags indicate that the message is a call generated message, then
+ the message is queued on the normal receive ring.
+
+ Chute ID is == 0, then the message is queued on the normal receive ring.
+
+ The transaction ID in the message matches the expected ID in the chute,
+ then the message is given to the chute and the chute's semaphore is tickled.
+
+ If none are true, the message is dropped.
+*/
+static void* mt_receive( void* vctx ) {
+ uta_ctx_t* ctx;
+ uta_mhdr_t* hdr; // header of the message received
+ rmr_mbuf_t* mbuf; // msg received
+ unsigned char* d1; // pointer at d1 data ([0] is the call_id)
+ chute_t* chute;
+ unsigned int call_id; // the id assigned to the call generated message
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ if( DEBUG ) fprintf( stderr, "rmr mt_receive: bad parms, thread not started\n" );
+ return NULL;
+ }
+
+ fprintf( stderr, "[INFO] rmr mt_receiver is spinning\n" );
+
+ while( ! ctx->shutdown ) {
+ mbuf = rcv_msg( ctx, NULL );
+
+ if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL ) {
+ if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
+ queue_normal( ctx, mbuf );
+ } else {
+ if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue
+ queue_normal( ctx, mbuf );
+ } else {
+ d1 = DATA1_ADDR( hdr );
+ if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue
+ queue_normal( ctx, mbuf );
+ } else {
+ chute = &ctx->chutes[call_id];
+ if( memcmp( mbuf->xaction, chute->expect, RMR_MAX_XID ) == 0 ) { // match
+ chute->mbuf = mbuf;
+ sem_post( &chute->barrier );
+ } else {
+ rmr_free_msg( mbuf );
+ mbuf = NULL;
+ }
+ }
+ }
+ }
+ } else {
+ if( ! mbuf ) { // very very unlikely, but prevent leaks
+ rmr_free_msg( mbuf );
+ }
+ }
+ }
+
+ return NULL;
+}
+
+#endif
#include <unistd.h>
#include <time.h>
#include <arpa/inet.h>
+#include <semaphore.h>
+#include <pthread.h>
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include "tools_static.c"
#include "sr_nng_static.c" // send/receive static functions
#include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
+#include "mt_call_static.c"
+#include "mt_call_nng_static.c"
//------------------------------------------------------------------------------
}
/*
- send message with maximum timeout.
- Accept a message and send it to an endpoint based on message type.
- If NNG reports that the send attempt timed out, or should be retried,
- RMr will retry for approximately max_to microseconds; rounded to the next
- higher value of 10.
-
- Allocates a new message buffer for the next send. If a message type has
- more than one group of endpoints defined, then the message will be sent
- in round robin fashion to one endpoint in each group.
-
- An endpoint will be looked up in the route table using the message type and
- the subscription id. If the subscription id is "UNSET_SUBID", then only the
- message type is used. If the initial lookup, with a subid, fails, then a
- second lookup using just the mtype is tried.
-
- CAUTION: this is a non-blocking send. If the message cannot be sent, then
- it will return with an error and errno set to eagain. If the send is
- a limited fanout, then the returned status is the status of the last
- send attempt.
-
+ This is a wrapper to the real timeout send. We must wrap it now to ensure that
+ the call flag and call-id are reset
*/
extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
- nng_socket nn_sock; // endpoint socket for send
- uta_ctx_t* ctx;
- int group; // selected group to get socket for
- int send_again; // true if the message must be sent again
- rmr_mbuf_t* clone_m; // cloned message for an nth send
- int sock_ok; // got a valid socket from round robin select
- uint64_t key; // mtype or sub-id/mtype sym table key
- int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails
-
- if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
- errno = EINVAL; // if msg is null, this is their clue
- if( msg != NULL ) {
- msg->state = RMR_ERR_BADARG;
- errno = EINVAL; // must ensure it's not eagain
- }
- return msg;
- }
+ char* d1; // point at the call-id in the header
- errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
- if( msg->header == NULL ) {
- fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
- msg->state = RMR_ERR_NOHDR;
- errno = EBADMSG; // must ensure it's not eagain
- return msg;
- }
+ if( msg != NULL ) {
+ ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
- if( max_to < 0 ) {
- max_to = ctx->send_retries; // convert to retries
- }
+ d1 = DATA1_ADDR( msg->header );
+ d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
+ }
- send_again = 1; // force loop entry
- group = 0; // always start with group 0
-
- key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
- if( msg->sub_id != UNSET_SUBID ) {
- altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
- }
- while( send_again ) {
- sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
- if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
- msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
-
- if( ! sock_ok ) {
- if( altk_ok ) { // we can try with the alternate (no sub-id) key
- altk_ok = 0;
- key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again
- send_again = 1; // ensure we don't exit the while
- continue;
- }
-
- msg->state = RMR_ERR_NOENDPT;
- errno = ENXIO; // must ensure it's not eagain
- return msg; // caller can resend (maybe) or free
- }
-
- group++;
-
- if( send_again ) {
- clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
- if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
- msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
- msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
- /*
- if( msg ) {
- // error do we need to count successes/errors, how to report some success, esp if last fails?
- }
- */
-
- msg = clone_m; // clone will be the next to send
- } else {
- msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
- }
- }
-
- return msg; // last message caries the status of last/only send attempt
+ return mtosend_msg( vctx, msg, max_to );
}
/*
See rmr_stimeout() for info on setting the default timeout.
*/
extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
- return rmr_mtosend_msg( vctx, msg, -1 ); // retries < uses default from ctx
+ char* d1; // point at the call-id in the header
+
+ if( msg != NULL ) {
+ ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
+
+ d1 = DATA1_ADDR( msg->header );
+ d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
+ }
+
+ return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
}
/*
The caller must check for this and handle.
*/
extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
- nng_socket nn_sock; // endpoint socket for send
+ nng_socket nn_sock; // endpoint socket for send
uta_ctx_t* ctx;
- int state;
- uta_mhdr_t* hdr;
- char* hold_src; // we need the original source if send fails
- int sock_ok; // true if we found a valid endpoint socket
+ int state;
+ char* hold_src; // we need the original source if send fails
+ int sock_ok; // true if we found a valid endpoint socket
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
return msg;
}
+ ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // socket of specific endpoint
if( ! sock_ok ) {
msg->state = RMR_ERR_NOENDPT;
return msg; // preallocated msg can be reused since not given back to nn
}
- msg->state = RMR_OK; // ensure it is clear before send
- hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
+ msg->state = RMR_OK; // ensure it is clear before send
+ hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
msg = send_msg( ctx, msg, nn_sock, -1 );
if( msg ) {
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again
- msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again
+ msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
}
free( hold_src );
}
/*
+ If multi-threading call is turned on, this invokes that mechanism with the special call
+ id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
+ behavour (described below) is carried out. This is safe to use when mt is enabled, but
+ the user app is invoking rmr_call() from only one thread, and the caller doesn't need
+ a flexible timeout.
+
+ On timeout this function will return a nil pointer. If the original message could not
+ be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
+
+ Original behavour:
Call sends the message based on message routing using the message type, and waits for a
response message to arrive with the same transaction id that was in the outgoing message.
If, while wiating for the expected response, messages are received which do not have the
EAGAIN -- the underlying message system wsa interrupted or the device was busy;
user should call this function with the message again.
-
- QUESTION: should user specify the number of messages to allow to queue?
*/
extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
uta_ctx_t* ctx;
return msg;
}
+ if( ctx->flags & CFL_MTC_ENABLED ) { // if multi threaded call is on, use that
+ return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec
+ }
+
memcpy( expected_id, msg->xaction, RMR_MAX_XID );
expected_id[RMR_MAX_XID] = 0; // ensure it's a string
if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
}
errno = 0;
+ if( ctx->flags & CFL_MTC_ENABLED ) { // must pop from ring with a semaphore dec first
+ return rmr_mt_rcv( ctx, old_msg, -1 );
+ }
+
qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
if( qm != NULL ) {
if( old_msg ) {
return old_msg;
}
+ if( ctx->flags & CFL_MTC_ENABLED ) { // must pop from ring with a semaphore dec first
+ return rmr_mt_rcv( ctx, old_msg, ms_to );
+ }
+
qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
if( qm != NULL ) {
if( old_msg ) {
memset( ctx, 0, sizeof( uta_ctx_t ) );
ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
- ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
+ ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
+
+ if( flags & RMRFL_MTCALL ) { // mt call support is on, need bigger ring
+ ctx->mring = uta_mk_ring( 2048 ); // message ring filled by rcv thread
+ init_mtcall( ctx ); // set up call chutes
+ } else {
+ ctx->mring = uta_mk_ring( 128 ); // ring filled only on blocking call
+ }
ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
if( max_msg_size > 0 ) {
}
}
+ if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) { // mt call support is on, must start the listener thread if not running
+ ctx->flags |= CFL_MTC_ENABLED;
+ if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // kick the receiver
+ fprintf( stderr, "[WARN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+ }
+
+ }
+
free( proto_port );
return (void *) ctx;
}
}
+// ----- multi-threaded call/receive support -------------------------------------------------
+/*
+ Blocks on the receive ring chute semaphore and then reads from the ring
+ when it is tickled. If max_wait is -1 then the function blocks until
+ a message is ready on the ring. Else max_wait is assumed to be the number
+ of millaseconds to wait before returning a timeout message.
+*/
+extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
+ uta_ctx_t* ctx;
+ uta_mhdr_t* hdr; // header in the transport buffer
+ chute_t* chute;
+ struct timespec ts; // time info if we have a timeout
+ long new_ms; // adjusted mu-sec
+ long seconds = 0; // max wait seconds
+ long nano_sec; // max wait xlated to nano seconds
+ int state;
+ rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
+ if( mbuf ) {
+ mbuf->state = RMR_ERR_BADARG;
+ }
+ return mbuf;
+ }
+
+ if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+ errno = EINVAL;
+ if( mbuf != NULL ) {
+ mbuf->state = RMR_ERR_NOTSUPP;
+ }
+ return mbuf;
+ }
+
+ ombuf = mbuf;
+ if( ombuf ) {
+ ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
+ ombuf->len = 0;
+ }
+
+ chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
+
+ if( max_wait > 0 ) {
+ clock_gettime( CLOCK_REALTIME, &ts );
+
+ if( max_wait > 999 ) {
+ seconds = (max_wait - 999)/1000;
+ max_wait -= seconds * 1000;
+ ts.tv_sec += seconds;
+ }
+ if( max_wait > 0 ) {
+ nano_sec = max_wait * 1000000;
+ ts.tv_nsec += nano_sec;
+ if( ts.tv_nsec > 999999999 ) {
+ ts.tv_nsec -= 999999999;
+ ts.tv_sec++;
+ }
+ }
+
+ seconds = 1; // use as flag later to invoked timed wait
+ }
+
+ errno = 0;
+ while( chute->mbuf == NULL && ! errno ) {
+ if( seconds ) {
+ state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
+ } else {
+ state = sem_wait( &chute->barrier );
+ }
+
+ if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
+ errno = 0;
+ }
+ }
+
+ if( state < 0 ) {
+ mbuf = ombuf; // return caller's buffer if they passed one in
+ } else {
+ if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+ if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
+ if( mbuf ) {
+ mbuf->state = RMR_OK;
+
+ if( ombuf ) {
+ rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
+ }
+ } else {
+ mbuf = ombuf; // no buffer, return user's if there
+ }
+ }
+ }
+
+ return mbuf;
+}
+
+/*
+ Accept a message buffer and caller ID, send the message and then wait
+ for the receiver to tickle the semaphore letting us know that a message
+ has been received. The call_id is a value between 2 and 255, inclusive; if
+ it's not in this range an error will be returned. Max wait is the amount
+ of time in millaseconds that the call should block for. If 0 is given
+ then no timeout is set.
+
+ If the mt_call feature has not been initialised, then the attempt to use this
+ funciton will fail with RMR_ERR_NOTSUPP
+
+ If no matching message is received before the max_wait period expires, a
+ nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+ occurs after the message has been sent, then a nil pointer is returned
+ with errno set to some other value.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+ rmr_mbuf_t* ombuf; // original mbuf passed in
+ uta_ctx_t* ctx;
+ uta_mhdr_t* hdr; // header in the transport buffer
+ chute_t* chute;
+ unsigned char* d1; // d1 data in header
+ struct timespec ts; // time info if we have a timeout
+ long new_ms; // adjusted mu-sec
+ long seconds = 0; // max wait seconds
+ long nano_sec; // max wait xlated to nano seconds
+ int state;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
+ errno = EINVAL;
+ if( mbuf ) {
+ mbuf->state = RMR_ERR_BADARG;
+ }
+ return mbuf;
+ }
+
+ if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+ mbuf->state = RMR_ERR_NOTSUPP;
+ return mbuf;
+ }
+
+ if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
+ mbuf->state = RMR_ERR_BADARG;
+ return mbuf;
+ }
+
+ ombuf = mbuf; // save to return timeout status with
+
+ chute = &ctx->chutes[call_id];
+ if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
+ rmr_free_msg( chute->mbuf );
+ chute->mbuf = NULL;
+ }
+
+ hdr = (uta_mhdr_t *) mbuf->header;
+ hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
+ memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
+ d1 = DATA1_ADDR( hdr );
+ d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
+ mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
+
+ if( max_wait > 0 ) {
+ clock_gettime( CLOCK_REALTIME, &ts );
+
+ if( max_wait > 999 ) {
+ seconds = (max_wait - 999)/1000;
+ max_wait -= seconds * 1000;
+ ts.tv_sec += seconds;
+ }
+ if( max_wait > 0 ) {
+ nano_sec = max_wait * 1000000;
+ ts.tv_nsec += nano_sec;
+ if( ts.tv_nsec > 999999999 ) {
+ ts.tv_nsec -= 999999999;
+ ts.tv_sec++;
+ }
+ }
+
+ seconds = 1; // use as flag later to invoked timed wait
+ }
+
+ mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
+ if( mbuf ) {
+ if( mbuf->state != RMR_OK ) {
+ return mbuf; // timeout or unable to connect or no endpoint are most likely issues
+ }
+ }
+
+ errno = 0;
+ while( chute->mbuf == NULL && ! errno ) {
+ if( seconds ) {
+ state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
+ } else {
+ state = sem_wait( &chute->barrier );
+ }
+
+ if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
+ errno = 0;
+ }
+ }
+
+ if( state < 0 ) {
+ return NULL; // leave errno as set by sem wait call
+ }
+
+ mbuf = chute->mbuf;
+ mbuf->state = RMR_OK;
+ chute->mbuf = NULL;
+
+ return mbuf;
+}
Target assumed to be address:port. The new socket is returned via the
user supplied pointer so that a success/fail code is returned directly.
Return value is 0 (false) on failure, 1 (true) on success.
+
+ In order to support multi-threaded user applications we must hold a lock before
+ we attempt to create a dialer and connect. NNG is thread safe, but we can
+ get things into a bad state if we allow a collision here. The lock grab
+ only happens on the intial session setup.
*/
-static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
+//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
+static int uta_link2( endpoint_t* ep ) {
+ char* target;
+ nng_socket* nn_sock;
+ nng_dialer* dialer;
char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection
char* addr;
int state = FALSE;
+ if( ep == NULL ) {
+ return FALSE;
+ }
+
+ target = ep->addr;
+ nn_sock = &ep->nn_sock;
+ dialer = &ep->dialer;
+
if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port
fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
return FALSE;
return FALSE;
}
+ pthread_mutex_lock( &ep->gate ); // grab the lock
+ if( ep->open ) {
+ pthread_mutex_unlock( &ep->gate );
+ return TRUE;
+ }
+
+
if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode
+ pthread_mutex_unlock( &ep->gate );
fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
return FALSE;
}
snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
+ pthread_mutex_unlock( &ep->gate );
fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
nng_close( *nn_sock );
return FALSE;
nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap
if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg)
+ pthread_mutex_unlock( &ep->gate );
fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
nng_close( *nn_sock );
return FALSE;
if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
+ ep->open = TRUE; // must set before release
+ pthread_mutex_unlock( &ep->gate );
return TRUE;
}
return TRUE;
}
- ep->open = uta_link2( ep->addr, &ep->nn_sock, &ep->dialer );
+ uta_link2( ep );
return ep->open;
}
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = uta_h2ip( ep->name );
}
- if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link
+ if( uta_link2( ep ) ) { // find entry in table and create link
state = TRUE;
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
ep->addr = uta_h2ip( ep->name );
}
- if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link
+ if( uta_link2( ep ) ) { // find entry in table and create link
ep->open = TRUE;
*nn_sock = ep->nn_sock; // pass socket back to caller
} else {
hdr->sub_id = htonl( UNSET_SUBID );
SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
- //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them
- //SET_HDR_D2_LEN( hdr, ctx->d2_len );
+ SET_HDR_D1_LEN( hdr, ctx->d1_len );
+ //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
}
msg->len = 0; // length of data in the payload
msg->alloc_len = mlen; // length of allocated transport buffer
uta_mhdr_t* hdr;
uta_v1mhdr_t* v1hdr;
int tr_old_len; // tr size in new buffer
- int coffset; // an offset to something in the header for copy
nm = (rmr_mbuf_t *) malloc( sizeof *nm );
if( nm == NULL ) {
nm->payload = (void *) v1hdr + sizeof( *v1hdr );
break;
- default: // current message always caught here
+ default: // current message version always caught here
hdr = nm->header;
- memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data might have changed
- if( RMR_D1_LEN( hdr ) ) {
- coffset = DATA1_OFFSET( hdr ); // offset to d1
- memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
+ memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
+ SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
+ if( RMR_D1_LEN( hdr ) ) {
+ memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
}
if( RMR_D2_LEN( hdr ) ) {
- coffset = DATA2_OFFSET( hdr ); // offset to d2
- memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) ); // copy data2 and data2 if necessary
+ memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
}
- SET_HDR_TR_LEN( hdr, tr_len ); // MUST set before pointing payload
nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
- SET_HDR_TR_LEN( hdr, tr_len ); // do NOT copy old trace data, just set the new header
break;
}
msg->len = 0;
msg->payload = NULL;
msg->xaction = NULL;
+ msg->tp_buf = NULL;
msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
return msg;
}
+/*
+ send message with maximum timeout.
+ Accept a message and send it to an endpoint based on message type.
+ If NNG reports that the send attempt timed out, or should be retried,
+ RMr will retry for approximately max_to microseconds; rounded to the next
+ higher value of 10.
+
+ Allocates a new message buffer for the next send. If a message type has
+ more than one group of endpoints defined, then the message will be sent
+ in round robin fashion to one endpoint in each group.
+
+ An endpoint will be looked up in the route table using the message type and
+ the subscription id. If the subscription id is "UNSET_SUBID", then only the
+ message type is used. If the initial lookup, with a subid, fails, then a
+ second lookup using just the mtype is tried.
+
+ CAUTION: this is a non-blocking send. If the message cannot be sent, then
+ it will return with an error and errno set to eagain. If the send is
+ a limited fanout, then the returned status is the status of the last
+ send attempt.
+
+*/
+static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
+ nng_socket nn_sock; // endpoint socket for send
+ uta_ctx_t* ctx;
+ int group; // selected group to get socket for
+ int send_again; // true if the message must be sent again
+ rmr_mbuf_t* clone_m; // cloned message for an nth send
+ int sock_ok; // got a valid socket from round robin select
+ uint64_t key; // mtype or sub-id/mtype sym table key
+ int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails
+ char* d1;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
+ errno = EINVAL; // if msg is null, this is their clue
+ if( msg != NULL ) {
+ msg->state = RMR_ERR_BADARG;
+ errno = EINVAL; // must ensure it's not eagain
+ }
+ return msg;
+ }
+
+ errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
+ if( msg->header == NULL ) {
+ fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
+ msg->state = RMR_ERR_NOHDR;
+ errno = EBADMSG; // must ensure it's not eagain
+ return msg;
+ }
+
+ if( max_to < 0 ) {
+ max_to = ctx->send_retries; // convert to retries
+ }
+
+ send_again = 1; // force loop entry
+ group = 0; // always start with group 0
+
+ key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
+ if( msg->sub_id != UNSET_SUBID ) {
+ altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+ }
+ while( send_again ) {
+ sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
+ if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+ msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
+
+ if( ! sock_ok ) {
+ if( altk_ok ) { // we can try with the alternate (no sub-id) key
+ altk_ok = 0;
+ key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again
+ send_again = 1; // ensure we don't exit the while
+ continue;
+ }
+
+ msg->state = RMR_ERR_NOENDPT;
+ errno = ENXIO; // must ensure it's not eagain
+ return msg; // caller can resend (maybe) or free
+ }
+
+ group++;
+
+ if( send_again ) {
+ clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
+ if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+ msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
+ msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
+ /*
+ if( msg ) {
+ // error do we need to count successes/errors, how to report some success, esp if last fails?
+ }
+ */
+
+ msg = clone_m; // clone will be the next to send
+ } else {
+ msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
+ }
+ }
+
+ return msg; // last message caries the status of last/only send attempt
+}
+
+
/*
A generic wrapper to the real send to keep wormhole stuff agnostic.
We assume the wormhole function vetted the buffer so we don't have to.
#libs = ../build/librmr_nng.a -L ../build/lib -lnng -lpthread -lm
libs = -L ../build/lib -lnng -lpthread -lm
+ipaths = -I ../src/rmr/common/src/ -I ../src/rmr/common/include -I ../src/rmr/nng/include/ -I ../src/rmr/nng/src/ -I ../src/rmr/nanomsg/include/ -I ../src/rmr/nanomsg/src/
#sa_tests = sa_tools_test.o
$(CC) -g $< -c
%:: %.c
- $(CC) -I ../src/common/src/ -I ../src/common/include -I ../src/nng/include -I ../src/nanomsg/include $(coverage_opts) -fPIC -g $< -o $@ $(libs)
+ $(CC) $(ipaths) $(coverage_opts) -fPIC -g $< -o $@ $(libs)
# catch all
all:
# NOTE: this makefile assumes that RMr has been built using the directory .build
# at the top most repo directory (e.g. ../../.build). It can be changed
# if you need to by adding "build_path=<path>" to the make command line.
+# To use this makefile to build on a system where RMr is already installed
+# try: make build_path=/usr/local/lib
+#
+# By default we prefer the Korn shell (it's just better). If you really need
+# to build with a dfferent shell add "SHELL=path" to the command line:
+# make SHELL=/bin/bash
+#
.EXPORT_ALL_VARIABLES:
.ONESHELL:
#.SHELLFLAGS = -e # hosed on some flavours so keep it off
-SHELL = /bin/ksh
+SHELL ?= /bin/ksh
build_path ?= ../../.build
-header_path := $(shell find ../../.build -name 'rmr.h' |head -1 | sed 's!/rmr/.*!!' )
+header_path := $(shell find $(build_path) -name 'rmr.h' |head -1 | sed 's!/rmr/.*!!' )
C_INCLUDE_PATH := $(header_path)
LD_LIBRARY_PATH=$(build_path):$(build_path)/lib
LIBRARY_PATH = $(LD_LIBRARY_PATH)
# These programmes are designed to test some basic application level functions
-# from the perspective of two communicating processes.
+# from the perspective of two, or more, communicating processes.
.PHONY: all
-all: sender receiver sender_nano receiver_nano
+all: sender receiver sender_nano receiver_nano caller mt_receiver
receiver_nano: receiver.c
gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm
receiver: receiver.c
gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+mt_receiver: receiver.c
+ gcc -I $${C_INCLUDE_PATH:-.} -DMTC $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
sender_nano: sender.c
gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm
sender: sender.c
gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+caller: caller.c
+ gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
-.PHONY: clean
+# clean removes intermediates; nuke removes everything that can be built
+.PHONY: clean nuke
clean:
- rm -f sender sender_nano receiver receiver_nano *.o
+ rm -f *.o
+
+nuke: clean
+ rm -f sender sender_nano receiver receiver_nano caller mt_receiver
--- /dev/null
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: caller.c
+ Abstract: This is a simple sender which will send a series of messages using
+ rmr_call(). N threads are started each sending the desired number
+ of messages and expecting an 'ack' for each. Each ack is examined
+ to verify that the thread id placed into the message matches (meaning
+ that the ack was delivered by RMr to the correct thread's chute.
+
+ In addition, the main thread listens for messages in order to verify
+ that a main or receiving thread can receive messages concurrently
+ while call acks are pending and being processed.
+
+ Message format is:
+ ck1 ck2|<msg-txt> @ tid<nil>
+
+ Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
+ Ck2 is the simple check sum of the trace data which is a nil terminated
+ series of bytes.
+ tid is the thread id assigned by the main thread.
+
+ Parms: argv[1] == number of msgs to send (10)
+ argv[2] == delay (mu-seconds, 1000000 default)
+ argv[3] == number of threads (3)
+ argv[4] == listen port
+
+ Sender will send for at most 20 seconds, so if nmsgs and delay extend
+ beyond that period the total number of messages sent will be less
+ than n.
+
+ Date: 18 April 2019
+ Author: E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <pthread.h>
+
+
+#include <rmr/rmr.h>
+
+#define TRACE_SIZE 40 // bytes in header to provide for trace junk
+
+/*
+ Thread data
+*/
+typedef struct tdata {
+ int id; // the id we'll pass to RMr mt-call function NOT the thread id
+ int n2send; // number of messages to send
+ int delay; // ms delay between messages
+ void* mrc; // RMr context
+ int state;
+} tdata_t;
+
+
+
+// --------------------------------------------------------------------------------
+
+
+static int sum( char* str ) {
+ int sum = 0;
+ int i = 0;
+
+ while( *str ) {
+ sum += *(str++) + i++;
+ }
+
+ return sum % 255;
+}
+
+
+
+/*
+ Executed as a thread, this puppy will generate calls to ensure that we get the
+ response back to the right thread, that we can handle threads, etc.
+*/
+static void* mk_calls( void* data ) {
+ tdata_t* control;
+ rmr_mbuf_t* sbuf; // send buffer
+ int count = 0;
+ int rt_count = 0; // number of messages requiring a spin retry
+ int ok_msg = 0; // received messages that were sent by us
+ int bad_msg = 0; // received messages that were sent by a different thread
+ int drops = 0;
+ int fail_count = 0; // # of failure sends after first successful send
+ int successful = 0; // set to true after we have a successful send
+ char wbuf[1024];
+ char xbuf[1024]; // build transaction string here
+ char trace[1024];
+ int xaction_id = 1;
+ char* tok;
+ int state = 0;
+
+ if( (control = (tdata_t *) data) == NULL ) {
+ fprintf( stderr, "thread data was nil; bailing out\n" );
+ }
+ //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
+
+ sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
+
+ memset( trace, 0, sizeof( trace ) );
+ while( count < control->n2send ) { // we send n messages after the first message is successful
+ snprintf( trace, 100, "%lld", (long long) time( NULL ) );
+ rmr_set_trace( sbuf, trace, TRACE_SIZE ); // fully populate so we dont cause a buffer realloc
+
+ snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
+ snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
+ snprintf( xbuf, 200, "%31d", xaction_id );
+ rmr_bytes2xact( sbuf, xbuf, 32 );
+
+ sbuf->mtype = 5; // mtype is always 5 as the test receiver acks just mtype 5 messages
+ sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string
+ sbuf->state = 0;
+ sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
+
+ if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send
+ rt_count++;
+ }
+ while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying
+ sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response
+ }
+
+ if( sbuf != NULL ) {
+ switch( sbuf->state ) {
+ case RMR_OK: // we should have a buffer back from the sender here
+ successful = 1;
+ if( (tok = strchr( sbuf->payload, '@' )) != NULL ) {
+ if( atoi( tok+1 ) == control->id ) {
+ //fprintf( stderr, "<THRD> tid=%-2d ok ack\n", control->id );
+ ok_msg++;
+ } else {
+ bad_msg++;
+ //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload );
+ }
+ }
+ //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload );
+ // future -- verify that we see our ID at the end of the message
+ count++;
+ break;
+
+ default:
+ fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
+ sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer
+ if( successful ) {
+ fail_count++; // count failures after first successful message
+ } else {
+ // some error (not connected likely), don't count this
+ sleep( 1 );
+ }
+ break;
+ }
+ } else {
+ //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
+ sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf
+ drops++;
+ count++;
+ }
+
+ if( control->delay > 0 ) {
+ usleep( control->delay );
+ }
+ }
+
+ state = 1;
+ if( ok_msg < (control->n2send-1) || bad_msg > 0 ) { // allow one drop to pass
+ state = 0;
+ }
+ if( count < control->n2send ) {
+ state = 0;
+ }
+
+ control->state = -state; // signal inactive to main thread; -1 == pass, 0 == fail
+ fprintf( stderr, "<THRD> [%s] tid=%-2d sent=%d ok-acks=%d bad-acks=%d drops=%d failures=%d retries=%d\n",
+ state ? "PASS" : "FAIL", control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
+
+
+ return NULL;
+}
+
+int main( int argc, char** argv ) {
+ void* mrc; // msg router context
+ rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
+ struct epoll_event events[1]; // list of events to give to epoll
+ struct epoll_event epe; // event definition for event to listen to
+ int ep_fd = -1; // epoll's file des (given to epoll_wait)
+ int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
+ int nready; // number of events ready for receive
+ char* listen_port = "43086";
+ long timeout = 0;
+ int delay = 100000; // usec between send attempts
+ int nmsgs = 10; // number of messages to send
+ int nthreads = 3;
+ tdata_t* cvs; // vector of control blocks
+ int i;
+ pthread_t* pt_info; // thread stuff
+ int failures = 0;
+ int pings = 0; // number of messages received on normal channel
+
+ if( argc > 1 ) {
+ nmsgs = atoi( argv[1] );
+ }
+ if( argc > 2 ) {
+ delay = atoi( argv[2] );
+ }
+ if( argc > 3 ) {
+ nthreads = atoi( argv[3] );
+ }
+ if( argc > 4 ) {
+ listen_port = argv[4];
+ }
+
+ fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+ if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled
+ fprintf( stderr, "<CALL> unable to initialise RMr\n" );
+ exit( 1 );
+ }
+
+ rmr_init_trace( mrc, TRACE_SIZE );
+
+ if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
+ if( rcv_fd < 0 ) {
+ fprintf( stderr, "<CALL> unable to set up polling fd\n" );
+ exit( 1 );
+ }
+ if( (ep_fd = epoll_create1( 0 )) < 0 ) {
+ fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
+ exit( 1 );
+ }
+ epe.events = EPOLLIN;
+ epe.data.fd = rcv_fd;
+
+ if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
+ fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+ exit( 1 );
+ }
+ } else {
+ rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
+ }
+
+
+ cvs = malloc( sizeof( tdata_t ) * nthreads );
+ pt_info = malloc( sizeof( pthread_t ) * nthreads );
+ if( cvs == NULL ) {
+ fprintf( stderr, "<CALL> unable to allocate control vector\n" );
+ exit( 1 );
+ }
+
+
+ timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
+ while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
+ fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
+ sleep( 1 );
+
+ if( time( NULL ) > timeout ) {
+ fprintf( stderr, "<CALL> giving up\n" );
+ exit( 1 );
+ }
+ }
+ fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
+
+ for( i = 0; i < nthreads; i++ ) {
+ cvs[i].mrc = mrc;
+ cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1
+ cvs[i].delay = delay;
+ cvs[i].n2send = nmsgs;
+ cvs[i].state = 1;
+
+ pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread
+ }
+
+ timeout = time( NULL ) + 20;
+ i = 0;
+ while( nthreads > 0 ) {
+ if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success
+ nthreads--;
+ if( cvs[i].state == 0 ) {
+ failures++;
+ }
+ i++;
+ } else {
+ // sleep( 1 );
+ rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
+ if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
+ pings++;
+ rmr_free_msg( rbuf );
+ rbuf = NULL;
+ }
+ }
+ if( time( NULL ) > timeout ) {
+ failures += nthreads;
+ fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
+ break;
+ }
+ }
+
+ fprintf( stderr, "<CALL> [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings );
+ rmr_close( mrc );
+
+ return failures > 0;
+}
+
RMR_SEED_RT -- path to the static routing table
RMR_RTG_SVC -- port to listen for RTG connections
+ Compile time options
+ if -DMTC is defined on the compile command, then RMr is initialised
+ with the multi-threaded receive thread rather than using the same
+ process receive function. All other functions in the receiver are
+ the same.
+
Date: 18 April 2019
Author: E. Scott Daniels
*/
fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+#ifdef MTC
+ mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
+
+#else
mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
+#endif
if( mrc == NULL ) {
fprintf( stderr, "<RCVR> ABORT: unable to initialise RMr\n" );
exit( 1 );
--- /dev/null
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+# Mnemonic: run_call_test.ksh
+# Abstract: This is a simple script to set up and run the basic send/receive
+# processes for some library validation on top of nano/nng.
+# It should be possible to clone the repo, switch to this directory
+# and execute 'ksh run -B' which will build RMr, make the sender and
+# recevier then run the basic test.
+#
+# The call test drives three RMr based processes:
+# caller which invokes rmr_mt_call() to send n messages
+#
+# receiver which executes rmr_rts_msg() on each received
+# message with type == 5.
+#
+# sender which sends n "pings" to the caller process
+#
+# The script assumes that all three proessess are running on the same
+# host/container such that setting up the route table with localhost:port
+# will work.
+#
+# The number of message, and the delay between each message may be given
+# on the command line. The number of threads may also be given. The
+# number of messages defines the number _each_ caller will sent (5000
+# messages and three threads == 15,000 total messages.
+#
+# Example command line:
+# ksh ./run_call_test.ksh # default 10 messages at 1 msg/sec 3 threads
+# ksh ./run_call_test.ksh -n 5000 -t 6 -d 400 # 5k messages, 6 threads, delay 400 mus
+#
+# Date: 20 May 2019
+# Author: E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receiver are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+ if (( nano_sender ))
+ then
+ # ./sender_nano $nmsg $delay
+ echo "nano is not supported"
+ exit 1
+ else
+ ./caller $nmsg $delay $nthreads
+ fi
+ echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch
+}
+
+# start receiver listening for nmsgs from each thread
+function run_rcvr {
+ if (( nano_receiver ))
+ then
+ #./receiver_nano $(( nmsg * nthreads ))
+ echo "nano is not supported"
+ exit 1
+ else
+ ./mt_receiver $(( nmsg * nthreads )) # we'll test with the RMr multi threaded receive function
+ fi
+ echo $? >/tmp/PID$$.rrc
+}
+
+# This will send 100 messages to the caller process. This is a test to verify that
+# threaded calling is not affected by normal messages and that normal messages can
+# be received concurrently.
+#
+function run_pinger {
+ RMR_RTG_SVC=9999 ./sender 100 100 4 3333 >/dev/NULL 2>&1 # send pings
+}
+
+
+# Generate a route table that is tailored to our needs of sender sending messages to
+# the caller, and caller sending mtype == 5 to the receiver.
+#
+function mk_rt {
+
+cat <<endKat >caller.rt
+# this is a specialised rt for caller testing. mtype 5 go to the
+# receiver, and all others go to the caller.
+
+newrt | start
+mse | 0 | 0 | localhost:43086
+mse | 1 | 10 | localhost:43086
+mse | 2 | 20 | localhost:43086
+rte | 3 | localhost:43086
+mse | 3 | 100 | localhost:43086 # special test to ensure that this does not affect previous entry
+rte | 4 | localhost:43086
+rte | 5 | localhost:4560
+rte | 6 | localhost:43086
+rte | 7 | localhost:43086
+rte | 8 | localhost:43086
+rte | 9 | localhost:43086
+rte | 10 | localhost:43086
+rte | 11 | localhost:43086
+rte | 12 | localhost:43086
+rte | 13 | localhost:43086
+
+newrt | end | 16
+endKat
+}
+
+# ---------------------------------------------------------
+
+nmsg=10 # total number of messages to be exchanged (-n value changes)
+delay=1000000 # microsec sleep between msg 1,000,000 == 1s
+nano_sender=0 # start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+verbose=0
+nthreads=3
+dev_base=1 # -D turns off to allow this to run on installed libs
+
+nano_sender=0 # mt-call is not supported in nano
+nano_receiver=0
+
+
+while [[ $1 == -* ]]
+do
+ case $1 in
+ -B) rebuild=1;;
+ -d) delay=$2; shift;;
+ -D) dev_base=0;;
+ -n) nmsg=$2; shift;;
+ -t) nthreads=$2; shift;;
+ -v) verbose=1;;
+
+ *) echo "unrecognised option: $1"
+ echo "usage: $0 [-B] [-d micro-sec-delay] [-n num-msgs] [-t num-threads]"
+ echo " -B forces a rebuild which will use .build"
+ exit 1
+ ;;
+ esac
+
+ shift
+done
+
+if (( verbose ))
+then
+ echo "2" >.verbose
+ export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( rebuild ))
+then
+ build_path=../../.build
+ set -e
+ ksh ./rebuild.ksh
+ set +e
+else
+ build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option
+
+ if [[ ! -d $build_path ]]
+ then
+ echo "cannot find build in: $build_path"
+ echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+ exit 1
+ fi
+fi
+
+if (( dev_base )) # assume we are testing against what we've built, not what is installed
+then
+ if [[ -d $build_path/lib64 ]]
+ then
+ export LD_LIBRARY_PATH=$build_path:$build_path/lib64
+ else
+ export LD_LIBRARY_PATH=$build_path:$build_path/lib
+ fi
+else # -D option gets us here to test an installed library
+ export LD_LIBRARY_PATH=/usr/local/lib
+ export LIBRARY_PATH=$LD_LIBRARY_PATH
+fi
+
+export RMR_SEED_RT=${RMR_SEED_RT:-./caller.rt} # allow easy testing with different rt
+
+if [[ ! -f $RMR_SEED_RT ]] # special caller rt for three process setup
+then
+ mk_rt
+fi
+
+if [[ ! -f ./sender ]]
+then
+ if ! make >/dev/null 2>&1
+ then
+ echo "[FAIL] cannot find sender binary, and cannot make it.... humm?"
+ exit 1
+ fi
+fi
+
+run_rcvr &
+sleep 2 # if caller starts faster than rcvr we can drop, so pause a bit
+run_sender &
+run_pinger &
+
+wait
+head -1 /tmp/PID$$.rrc | read rrc # get pass/fail state from each
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+ echo "[FAIL] sender rc=$src receiver rc=$rrc"
+else
+ echo "[PASS] sender rc=$src receiver rc=$rrc"
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+
+exit $(( !! (src + rrc) ))
+
#include <string.h>
#include <stdint.h>
#include <netdb.h>
+#include <pthread.h>
+#include <semaphore.h>
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pipeline0/push.h>
#include <nng/protocol/pipeline0/pull.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
-#include "../src/nng/include/rmr_nng_private.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
+#include "rmr_nng_private.h"
#define EMULATE_NNG
#include "test_nng_em.c"
-#include "../src/nng/src/sr_nng_static.c"
+#include "sr_nng_static.c"
#include "test_support.c"
#include <errno.h>
#include <string.h>
#include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
int mbuf_api_test( ) {
#include <errno.h>
#include <pthread.h>
#include <ctype.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
-#include "../src/common/src/mbuf_api.c" // module under test
+#include "mbuf_api.c"
#include "test_support.c" // our private library of test tools
#include "mbuf_api_static_test.c" // test functions
#include <errno.h>
#include <string.h>
#include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
-//#include "../src/common/src/ring_static.c"
+#include "rmr.h"
+#include "rmr_agnostic.h"
/*
#include <errno.h>
#include <string.h>
#include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
-#include "../src/common/src/ring_static.c"
+#include "rmr.h"
+#include "rmr_agnostic.h"
+#include "ring_static.c"
#include "test_support.c" // things like fail_if()
#include "ring_static_test.c" // the actual tests
#include <stdint.h>
#include <ctype.h>
#include <sys/epoll.h>
+#include <pthread.h>
+#include <semaphore.h>
#define DEBUG 1
#include <nanomsg/nn.h>
-//#include <nng/protocol/pubsub0/pub.h>
-//#include <nng/protocol/pubsub0/sub.h>
-//#include <nng/protocol/pipeline0/push.h>
-//#include <nng/protocol/pipeline0/pull.h>
#undef EMULATE_NNG
#include "test_nng_em.c" // nng/nn emulation (before including things under test)
-#include "../src/common/include/rmr.h" // things the users see
-#include "../src/common/include/rmr_symtab.h"
-#include "../src/common/include/rmr_agnostic.h" // transport agnostic header
-#include "../src/nanomsg/include/rmr_private.h" // transport specific
+#include "rmr.h" // things the users see
+#include "rmr_symtab.h"
+#include "rmr_agnostic.h" // transport agnostic header
+#include "rmr_private.h" // transport specific
-#include "../src/common/src/symtab.c"
-#include "../src/nanomsg/src/rmr.c"
-#include "../src/common/src/mbuf_api.c"
-#include "../src/nanomsg/src/rtable_static.c"
+#include "symtab.c"
+#include "rmr.c"
+#include "mbuf_api.c"
+#include "rtable_static.c"
static void gen_rt( uta_ctx_t* ctx ); // defined in sr_static_test, but used by a few others (eliminate order requirement below)
// specific test tools in this directory
#include "test_support.c" // things like fail_if()
// and finally....
-//#include "tools_static_test.c" // local test functions pulled directly because of static nature of things
-//#include "symtab_static_test.c"
-//#include "ring_static_test.c"
-//#include "rt_static_test.c"
-//#include "wormhole_static_test.c"
#include "mbuf_api_static_test.c"
#include "sr_nano_static_test.c"
#include <errno.h>
#include <string.h>
#include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
/*
Send a 'burst' of messages to drive some send retry failures to increase RMr coverage
}
}
+/*
+ Refresh or allocate a message with some default values
+*/
+static rmr_mbuf_t* fresh_msg( void* ctx, rmr_mbuf_t* msg ) {
+ if( ! msg ) {
+ msg = rmr_alloc_msg( ctx, 2048 );
+ }
+
+ msg->mtype = 0;
+ msg->sub_id = -1;
+ msg->state = 0;
+ msg->len = 100;
+
+ return msg;
+}
+
static int rmr_api_test( ) {
int errors = 0;
void* rmc; // route manager context
errors += fail_if_nil( rmc, "rmr_init returned a nil pointer when driving for default port " );
}
+
v = rmr_ready( rmc ); // unknown return; not checking at the moment
msg = rmr_alloc_msg( NULL, 1024 ); // should return nil pointer
errors += fail_if( i >= 40, "torcv_msg never returned a timeout " );
- // ---- trace things that are not a part of the mbuf_api functions and thus must be tested here
+ // ---- trace things that are not a part of the mbuf_api functions and thus must be tested here -------
state = rmr_init_trace( NULL, 37 ); // coverage test nil context
errors += fail_not_equal( state, 0, "attempt to initialise trace with nil context returned non-zero state (a) " );
errors += fail_if_equal( errno, 0, "attempt to initialise trace with nil context did not set errno as expected " );
em_send_failures = 0;
+ ((uta_ctx_t *)rmc)->shutdown = 1;
rmr_close( NULL ); // drive for coverage
rmr_close( rmc ); // no return to check; drive for coverage
+ // -- allocate a new context for mt-call and drive that stuff -----------------------------------------
+#ifdef EMULATE_NNG
+ msg = fresh_msg( rmc, msg ); // ensure we have one with known contents
+
+ msg->state = 0;
+ msg = rmr_mt_call( rmc, msg, 3, 10 ); // drive when not in mt setup
+ if( msg ) {
+ errors += fail_not_equal( msg->state, RMR_ERR_NOTSUPP, "rmr_mt_call did not set not supported error when not mt initialised" );
+ } else {
+ errors += fail_if_nil( msg, "rmr_mt_call returned nil pointer when not mt initialised" );
+ }
+ msg = fresh_msg( rmc, msg );
+
+ msg = rmr_mt_rcv( rmc, msg, 10 ); // gen not supported error if ctx not set for mt
+ if( msg ) {
+ errors += fail_not_equal( msg->state, RMR_ERR_NOTSUPP, "rmr_mt_rcv did not return not supported state when mt not initialised" );
+ } else {
+ errors += fail_if_nil( msg, "nil pointer from rmr_mt_rcv when mt not initialised\n" );
+ }
+ msg = fresh_msg( rmc, msg );
+
+ msg->state = 0;
+ if( msg ) {
+ msg = rmr_mt_call( rmc, msg, 1000, 10 ); // thread id out of range
+ errors += fail_if_equal( msg->state, 0, "rmr_mt_call did not set an error when given an invalid call-id" );
+ } else {
+ errors += fail_if_nil( msg, "rmr_mt_call returned a nil pointer when given an invalid call-id" );
+ }
+ msg = fresh_msg( rmc, msg );
+
+ state = init_mtcall( NULL ); // drive for coverage
+ errors += fail_not_equal( state, 0, "init_mtcall did not return false (a) when given a nil context pointer" );
+
+
+ if( (rmc = rmr_init( NULL, 1024, FL_NOTHREAD | RMRFL_MTCALL )) == NULL ) { // drive multi-call setup code without rtc thread
+ errors += fail_if_nil( rmc, "rmr_init returned a nil pointer when driving for mt-call setup " );
+ }
+
+ gen_rt( rmc ); // must attach a route table so sends succeed
+
+ fprintf( stderr, "<INFO> enabling mt messages\n" );
+ em_set_rcvdelay( 1 ); // force slow msg rate during mt testing
+ em_set_mtc_msgs( 1 ); // emulated nngrcv will now generate messages with call-id and call flag
+
+ msg->state = 0;
+ msg = rmr_mt_call( NULL, msg, 3, 10 ); // should timeout
+ if( msg ) {
+ errors += fail_if( msg->state == 0, "rmr_mt_call did not set message state when given message with nil context " );
+ }
+ msg = fresh_msg( rmc, msg );
+
+ fprintf( stderr, "<INFO> invoking mt_call with timout == 2999\n" );
+ msg = rmr_mt_call( rmc, msg, 2, 2999 ); // long timeout to drive time building code, should receive
+ if( msg ) {
+ if( msg->state != RMR_OK ) {
+ fprintf( stderr, "<INFO> rmr_mt_call returned error in mbuf: %d\n", msg->state );
+ } else {
+ errors += fail_not_nil( msg, "rmr_mt_call did not return a nil pointer on read timeout" );
+ }
+ }
+ msg = fresh_msg( rmc, msg );
+
+ msg = rmr_mt_rcv( NULL, NULL, 10 );
+ errors += fail_not_nil( msg, "rmr_mt_rcv returned a non-nil message when given nil message and nil context" );
+
+ fprintf( stderr, "<INFO> invoking mt_rcv with timout == 2999\n" );
+ msg = fresh_msg( rmc, msg );
+ msg = rmr_mt_rcv( rmc, msg, 2999 );
+ if( !msg ) {
+ errors += fail_if_nil( msg, "rmr_mt_rcv returned a nil message when given valid message and timeout of 29999" );
+ }
+ msg = fresh_msg( rmc, msg );
+
+ msg = rmr_mt_rcv( rmc, msg, -1 );
+ if( !msg ) {
+ errors += fail_if_nil( msg, "rmr_mt_rcv returned a nil message when given valid message unlimited timeout" );
+ }
+ msg = fresh_msg( rmc, msg );
+
+ fprintf( stderr, "<INFO> waiting 20.0 seconds for a known call xaction to arrive (%ld)\n", time( NULL ) );
+ snprintf( msg->xaction, 17, "%015d", 5 ); // we'll reset receive counter before calling mt_call so this will arrive again
+ em_set_rcvcount( 0 );
+ msg = rmr_mt_call( rmc, msg, 2, 15000 ); // we need about 10s to get the message with the 'slow rate'
+ if( msg ) {
+ errors += fail_not_equal( msg->state, RMR_OK, "mt_call with known xaction id bad state (a)" );
+ } else {
+ errors += fail_if_nil( msg, "mt_call with known xaction id returned nil message" );
+ }
+ fprintf( stderr, "<INFO> time check: %ld\n", time( NULL ) );
+
+
+ em_set_mtc_msgs( 0 ); // turn off
+ em_set_rcvdelay( 0 ); // full speed receive rate
+ ((uta_ctx_t *)rmc)->shutdown = 1; // force the mt-reciver attached to the context to stop
+#endif
+
+
+ // --------------- phew, done ------------------------------------------------------------------------------
+
if( ! errors ) {
fprintf( stderr, "<INFO> all RMr API tests pass\n" );
}
#include <stdint.h>
#include <ctype.h>
#include <sys/epoll.h>
+#include <pthread.h>
+#include <semaphore.h>
#define DEBUG 1
#include "test_nng_em.c" // nng/nn emulation (before including things under test)
-#include "../src/common/include/rmr.h" // things the users see
-#include "../src/common/include/rmr_symtab.h"
-#include "../src/common/include/rmr_agnostic.h" // transport agnostic header
-#include "../src/nng/include/rmr_nng_private.h" // transport specific
+#include "rmr.h" // things the users see
+#include "rmr_symtab.h"
+#include "rmr_agnostic.h" // transport agnostic header
+#include "rmr_nng_private.h" // transport specific
-#include "../src/common/src/symtab.c"
-#include "../src/nng/src/rmr_nng.c"
-#include "../src/common/src/mbuf_api.c"
+#include "symtab.c"
+#include "rmr_nng.c"
+#include "mbuf_api.c"
static void gen_rt( uta_ctx_t* ctx ); // defined in sr_nng_static_test, but used by a few others (eliminate order requirement below)
#include <errno.h>
#include <string.h>
#include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
typedef struct entry_info {
int group;
#include <errno.h>
#include <string.h>
#include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
typedef struct entry_info {
int group;
uta_fib( "no-suhch-file" ); // drive some error checking for coverage
-/*
- if( ctx ) {
- if( ctx->rtg_addr ) {
- free( ctx->rtg_addr );
- }
- free( ctx );
- }
-*/
- state = uta_link2( "worm", NULL, NULL );
+ ep = (endpoint_t *) malloc( sizeof( *ep ) );
+ pthread_mutex_init( &ep->gate, NULL );
+ ep->name = strdup( "worm" );
+ ep->addr = NULL;
+ state = uta_link2( ep );
errors += fail_if_true( state, "link2 did not return false when given nil pointers" );
state = uta_epsock_rr( rt, 122, 0, NULL, NULL );
#include <errno.h>
#include <string.h>
#include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
/*
Generate a simple route table (for all but direct route table testing).
+ This table contains multiple tables inasmuch as a second update set of
+ records follows the initial set.
*/
static void gen_rt( uta_ctx_t* ctx ) {
int fd;
#include <errno.h>
#include <string.h>
#include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
/*
Generate a simple route table (for all but direct route table testing).
mbuf = rmr_rcv_msg( ctx, NULL );
}
- size = 2048 - sizeof( uta_mhdr_t ); // emulated nng receive allocates 2K payloads
+ size = 2048 - em_hdr_size(); // emulated nng receive allocates 2K buffers -- subtract off header size
state = rmr_payload_size( mbuf );
errors += fail_not_equal( state, size, "payload size didn't return expected value" ); // receive should always give 4k buffer
Author: E. Scott Daniels
*/
-#include "../src/common/include/rmr_symtab.h"
+#include "rmr_symtab.h"
// -- parent must include if needed #include "../src/common/src/symtab.c"
#include "test_support.c"
#define NO_DUMMY_RMR 1 // no dummy rmr functions; we don't pull in rmr.h or agnostic.h
-#include "../src/common/include/rmr_symtab.h"
-#include "../src/common/src/symtab.c"
+#include "rmr_symtab.h"
+#include "symtab.c"
#include "test_support.c"
Author: E. Scott Daniels
*/
+
+#include "rmr.h" // we use some of rmr defs in building dummy messages, so we need these
+#include "rmr_agnostic.h"
+
// ---------------------- emulated nng functions ---------------------------
#ifndef _em_nn
#define _em_nn
+#include <pthread.h>
+
static int em_send_failures = 0; // test programme can set this to emulate eagain send failures
static int em_timeout = -1; // set by set socket option
+static int em_mtc_msgs = 0; // set to generate 'received' messages with mt-call header data
+static int return_value = 0; // functions should return this value
+static int rcv_count = 0; // receive counter for transaction id to allow test to rest
+static int rcv_delay = 0; // forced delay before call to rcvmsg starts to work
+
+static int gates_ok = 0;
+static pthread_mutex_t rcv_gate;
+
// ----------- epoll emulation ---------------------------------------------
int32_t len1; // length of the tracing data
int32_t len2; // length of data 1 (d1)
int32_t len3; // length of data 2 (d2)
-
+ int32_t sub_id; // subscription id (-1 invalid)
};
-static int return_value = 0;
-//--------------------------------------------------------------------------
-#ifdef EMULATE_NNG
-struct nn_msghdr {
- int boo;
-};
+// -- emulation control functions ------------------------------------------------------
/*
Test app can call this to have all emulated functions return failure instead
}
+/*
+ Turns on/off the generation of multi-threaded call messages
+*/
+static int em_set_mtc_msgs( int state ) {
+ em_mtc_msgs = state;
+}
+
+/*
+ Returns the size of the header we inserted
+*/
+static int em_hdr_size() {
+ if( em_mtc_msgs ) {
+ return (int) sizeof( struct em_msg ) + 4;
+ }
+
+ return (int) sizeof( struct em_msg );
+}
+
+static void em_set_rcvcount( int v ) {
+ rcv_count = v;
+}
+
+static void em_set_rcvdelay( int v ) {
+ rcv_delay = v;
+}
+
+static void em_start() {
+ if( ! gates_ok ) {
+ pthread_mutex_init( &rcv_gate, NULL );
+ gates_ok = 1;
+ }
+}
+
+//--------------------------------------------------------------------------
+#ifdef EMULATE_NNG
+struct nn_msghdr {
+ int boo;
+};
+
+
/*
Receive message must allocate a new buffer and return the pointer into *m.
Every 9 messages or so we'll simulate an old version message
+
+ If em_mtc_msgs is set, then we add a non-zero d1 field with
+ the call-id set to 2, and alternate the call flag
*/
static int em_nng_recvmsg( nng_socket s, nng_msg ** m, int i ) {
+ static int call_flag = 0;
+
void* b;
struct em_msg* msg;
- static int count = 0; // we'll simulate a message going in by dropping an rmr-ish msg with transaction id only
int trace_size = 0;
+ int d1_size = 0;
+ unsigned char* d1;
+
+ if( rcv_delay > 0 ) {
+ sleep( rcv_delay );
+ }
- //sleep( 1 );
+ if( em_mtc_msgs ) {
+ d1_size = 4;
+ }
b = (void *) malloc( 2048 );
if( m != NULL ) {
memset( b, 0, 2048 );
+
*m = (nng_msg *) b;
msg = (struct em_msg *) b;
- if( count % 10 == 9 ) {
- //msg->rmr_ver = htonl( MSG_VER );
- msg->rmr_ver = ALT_MSG_VER; // emulate the bug in RMr v1
+ if( ! em_mtc_msgs && (rcv_count % 10) == 9 ) {
+ msg->rmr_ver = ALT_MSG_VER; // allow emulation the bug in RMr v1
} else {
msg->rmr_ver = htonl( MSG_VER );
}
+
msg->mtype = htonl( 1 );
msg->plen = htonl( 220 );
msg->len0 = htonl( sizeof( struct em_msg ) );
msg->len1 = htonl( trace_size );
- snprintf( msg->xid, 32, "%015d", count++ ); // simple transaction id so we can test receive specific and ring stuff
+ msg->len2 = htonl( d1_size );
+ msg->len3 = htonl( 0 );
+
+ pthread_mutex_lock( &rcv_gate ); // hold lock to update counter/flag
+ if( em_mtc_msgs ) {
+ d1 = DATA1_ADDR( msg );
+ d1[0] = 2; // simulated msgs always on chute 2
+ if( call_flag ) {
+ rcv_count++;
+ msg->flags |= HFL_CALL_MSG;
+ }
+ if( rcv_delay > 0 ) {
+ fprintf( stderr, "<EM> count=%d flag=%d %02x \n", rcv_count, call_flag, msg->flags );
+ }
+ call_flag = !call_flag;
+ } else {
+ rcv_count++;
+ }
+ pthread_mutex_unlock( &rcv_gate );
+ snprintf( msg->xid, 32, "%015d", rcv_count ); // simple transaction id so we can test receive specific and ring stuff
snprintf( msg->src, 16, "localhost:4562" ); // set src id (unrealistic) so that rts() can be tested
}
#include <errno.h>
#include <pthread.h>
#include <ctype.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
#include "test_support.c" // our private library of test tools
// ===== dummy context for tools testing so we don't have to pull in all of the nano/nng specific stuff =====
};
-#include "../src/common/src/tools_static.c"
+#include "tools_static.c"
int main( ) {
fi
fi
-export C_INCLUDE_PATH="../src/common/include:$C_INCLUDE_PATH"
+export C_INCLUDE_PATH="../src/rmr/common/include:$C_INCLUDE_PATH"
module_cov_target=80
builder="make -B %s" # default to plain ole make
#include <errno.h>
#include <string.h>
#include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
-#include "../src/common/include/rmr.h"
-#include "../src/common/include/rmr_agnostic.h"
+#include "rmr.h"
+#include "rmr_agnostic.h"
/*