X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnng%2Fsrc%2Frmr_nng.c;fp=src%2Frmr%2Fnng%2Fsrc%2Frmr_nng.c;h=e77c9f3b464906fa0c26034d66b19b68438d2700;hb=68c1ab2191d9959fde0bd275a560f7c9cf6df485;hp=0000000000000000000000000000000000000000;hpb=f7d44570f8de6e15f768e8e2d9b6061cd0bff11f;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/rmr_nng.c b/src/rmr/nng/src/rmr_nng.c new file mode 100644 index 0000000..e77c9f3 --- /dev/null +++ b/src/rmr/nng/src/rmr_nng.c @@ -0,0 +1,821 @@ +// : vi ts=4 sw=4 noet : +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: rmr_nng.c + Abstract: This is the compile point for the nng version of the rmr + library (formarly known as uta, so internal function names + are likely still uta_*) + + With the exception of the symtab portion of the library, + RMr is built with a single compile so as to "hide" the + internal functions as statics. Because they interdepend + on each other, and CMake has issues with generating two + different wormhole objects from a single source, we just + pull it all together with a centralised comple using + includes. + + Future: the API functions at this point can be separated + into a common source module. + + Author: E. Scott Daniels + Date: 1 February 2019 +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + + +#include "rmr.h" // things the users see +#include "rmr_agnostic.h" // agnostic things (must be included before private) +#include "rmr_nng_private.h" // things that we need too +#include "rmr_symtab.h" + +#include "ring_static.c" // message ring support +#include "rt_generic_static.c" // route table things not transport specific +#include "rtable_nng_static.c" // route table things -- transport specific +#include "rtc_static.c" // route table collector +#include "tools_static.c" +#include "sr_nng_static.c" // send/receive static functions +#include "wormholes.c" // wormhole api externals and related static functions (must be LAST!) + + +//------------------------------------------------------------------------------ + + +/* + Clean up a context. +*/ +static void free_ctx( uta_ctx_t* ctx ) { + if( ctx ) { + if( ctx->rtg_addr ) { + free( ctx->rtg_addr ); + } + } +} + +// --------------- public functions -------------------------------------------------------------------------- + +/* + Returns the size of the payload (bytes) that the msg buffer references. + Len in a message is the number of bytes which were received, or should + be transmitted, however, it is possible that the mbuf was allocated + with a larger payload space than the payload length indicates; this + function returns the absolute maximum space that the user has available + in the payload. On error (bad msg buffer) -1 is returned and errno should + indicate the rason. +*/ +extern int rmr_payload_size( rmr_mbuf_t* msg ) { + if( msg == NULL || msg->header == NULL ) { + errno = EINVAL; + return -1; + } + + errno = 0; + return msg->alloc_len - RMR_HDR_LEN( msg->header ); // allocated transport size less the header and other data bits +} + +/* + Allocates a send message as a zerocopy message allowing the underlying message protocol + to send the buffer without copy. +*/ +extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) { + uta_ctx_t* ctx; + rmr_mbuf_t* m; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return NULL; + } + + m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data + return m; +} + + +/* + Allocates a send message as a zerocopy message allowing the underlying message protocol + to send the buffer without copy. In addition, a trace data field of tr_size will be + added and the supplied data coppied to the buffer before returning the message to + the caller. +*/ +extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) { + uta_ctx_t* ctx; + rmr_mbuf_t* m; + int state; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return NULL; + } + + m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size + if( m != NULL ) { + state = rmr_set_trace( m, data, tr_size ); // roll their data in + if( state != tr_size ) { + m->state = RMR_ERR_INITFAILED; + } + } + + return m; +} + +/* + This provides an external path to the realloc static function as it's called by an + outward facing mbuf api function. Used to reallocate a message with a different + trace data size. +*/ +extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { + return realloc_msg( msg, new_tr_size ); +} + + +/* + Return the message to the available pool, or free it outright. +*/ +extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { + if( mbuf == NULL ) { + return; + } + + if( mbuf->header ) { + if( mbuf->flags & MFL_ZEROCOPY ) { + //nng_free( (void *) mbuf->header, mbuf->alloc_len ); + if( mbuf->tp_buf ) { + nng_msg_free( mbuf->tp_buf ); + } + } + } + + free( mbuf ); +} + +/* + send message with maximum timeout. + Accept a message and send it to an endpoint based on message type. + If NNG reports that the send attempt timed out, or should be retried, + RMr will retry for approximately max_to microseconds; rounded to the next + higher value of 10. + + Allocates a new message buffer for the next send. If a message type has + more than one group of endpoints defined, then the message will be sent + in round robin fashion to one endpoint in each group. + + An endpoint will be looked up in the route table using the message type and + the subscription id. If the subscription id is "UNSET_SUBID", then only the + message type is used. If the initial lookup, with a subid, fails, then a + second lookup using just the mtype is tried. + + CAUTION: this is a non-blocking send. If the message cannot be sent, then + it will return with an error and errno set to eagain. If the send is + a limited fanout, then the returned status is the status of the last + send attempt. + +*/ +extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { + nng_socket nn_sock; // endpoint socket for send + uta_ctx_t* ctx; + int group; // selected group to get socket for + int send_again; // true if the message must be sent again + rmr_mbuf_t* clone_m; // cloned message for an nth send + int sock_ok; // got a valid socket from round robin select + uint64_t key; // mtype or sub-id/mtype sym table key + int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails + + if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast + errno = EINVAL; // if msg is null, this is their clue + if( msg != NULL ) { + msg->state = RMR_ERR_BADARG; + errno = EINVAL; // must ensure it's not eagain + } + return msg; + } + + errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't + if( msg->header == NULL ) { + fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" ); + msg->state = RMR_ERR_NOHDR; + errno = EBADMSG; // must ensure it's not eagain + return msg; + } + + if( max_to < 0 ) { + max_to = ctx->send_retries; // convert to retries + } + + send_again = 1; // force loop entry + group = 0; // always start with group 0 + + key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry + if( msg->sub_id != UNSET_SUBID ) { + altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry + } + while( send_again ) { + sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups + if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n", + msg->mtype, send_again, group, msg->len, sock_ok, altk_ok ); + + if( ! sock_ok ) { + if( altk_ok ) { // we can try with the alternate (no sub-id) key + altk_ok = 0; + key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again + send_again = 1; // ensure we don't exit the while + continue; + } + + msg->state = RMR_ERR_NOENDPT; + errno = ENXIO; // must ensure it's not eagain + return msg; // caller can resend (maybe) or free + } + + group++; + + if( send_again ) { + clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available + if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); + msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer + msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success + /* + if( msg ) { + // error do we need to count successes/errors, how to report some success, esp if last fails? + } + */ + + msg = clone_m; // clone will be the next to send + } else { + msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was + } + } + + return msg; // last message caries the status of last/only send attempt +} + +/* + Send with default max timeout as is set in the context. + See rmr_mtosend_msg() for more details on the parameters. + See rmr_stimeout() for info on setting the default timeout. +*/ +extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { + return rmr_mtosend_msg( vctx, msg, -1 ); // retries < uses default from ctx +} + +/* + Return to sender allows a message to be sent back to the endpoint where it originated. + The source information in the message is used to select the socket on which to write + the message rather than using the message type and round-robin selection. This + should return a message buffer with the state of the send operation set. On success + (state is RMR_OK, the caller may use the buffer for another receive operation), and on + error it can be passed back to this function to retry the send if desired. On error, + errno will liklely have the failure reason set by the nng send processing. + The following are possible values for the state in the message buffer: + + Message states returned: + RMR_ERR_BADARG - argument (context or msg) was nil or invalid + RMR_ERR_NOHDR - message did not have a header + RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined + RMR_ERR_SENDFAILED - send failed; errno has nano error code + RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN) + + A nil message as the return value is rare, and generally indicates some kind of horrible + failure. The value of errno might give a clue as to what is wrong. + + CAUTION: + Like send_msg(), this is non-blocking and will return the msg if there is an errror. + The caller must check for this and handle. +*/ +extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { + nng_socket nn_sock; // endpoint socket for send + uta_ctx_t* ctx; + int state; + uta_mhdr_t* hdr; + char* hold_src; // we need the original source if send fails + int sock_ok; // true if we found a valid endpoint socket + + if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast + errno = EINVAL; // if msg is null, this is their clue + if( msg != NULL ) { + msg->state = RMR_ERR_BADARG; + } + return msg; + } + + errno = 0; // at this point any bad state is in msg returned + if( msg->header == NULL ) { + fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" ); + msg->state = RMR_ERR_NOHDR; + return msg; + } + + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // socket of specific endpoint + if( ! sock_ok ) { + msg->state = RMR_ERR_NOENDPT; + return msg; // preallocated msg can be reused since not given back to nn + } + + msg->state = RMR_OK; // ensure it is clear before send + hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to + strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours + msg = send_msg( ctx, msg, nn_sock, -1 ); + if( msg ) { + strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again + msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source + } + + free( hold_src ); + return msg; +} + +/* + Call sends the message based on message routing using the message type, and waits for a + response message to arrive with the same transaction id that was in the outgoing message. + If, while wiating for the expected response, messages are received which do not have the + desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the + order that they were received. + + Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK + to ensure that no error was encountered. If the state is UTA_BADARG, then the message + may be resent (likely the context pointer was nil). If the message is sent, but no + response is received, a nil message is returned with errno set to indicate the likley + issue: + ETIMEDOUT -- too many messages were queued before reciving the expected response + ENOBUFS -- the queued message ring is full, messages were dropped + EINVAL -- A parameter was not valid + EAGAIN -- the underlying message system wsa interrupted or the device was busy; + user should call this function with the message again. + + + QUESTION: should user specify the number of messages to allow to queue? +*/ +extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { + uta_ctx_t* ctx; + unsigned char expected_id[RMR_MAX_XID+1]; // the transaction id in the message; we wait for response with same ID + + if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast + if( msg != NULL ) { + msg->state = RMR_ERR_BADARG; + } + return msg; + } + + memcpy( expected_id, msg->xaction, RMR_MAX_XID ); + expected_id[RMR_MAX_XID] = 0; // ensure it's a string + if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id ); + errno = 0; + msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send + + msg = rmr_send_msg( ctx, msg ); + if( msg ) { // msg should be nil, if not there was a problem; return buffer to user + if( msg->state != RMR_ERR_RETRY ) { + msg->state = RMR_ERR_CALLFAILED; // errno not available to all wrappers; don't stomp if marked retry + } + return msg; + } + + return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 ); // wait for msg allowing 20 to queue ahead +} + +/* + The outward facing receive function. When invoked it will pop the oldest message + from the receive ring, if any are queued, and return it. If the ring is empty + then the receive function is invoked to wait for the next message to arrive (blocking). + + If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If + nil, a new one will be allocated. However, the caller should NOT expect to get the same + struct back (if a queued message is returned the message struct will be different). +*/ +extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { + uta_ctx_t* ctx; + rmr_mbuf_t* qm; // message that was queued on the ring + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + if( old_msg != NULL ) { + old_msg->state = RMR_ERR_BADARG; + } + errno = EINVAL; + return old_msg; + } + errno = 0; + + qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued + if( qm != NULL ) { + if( old_msg ) { + rmr_free_msg( old_msg ); // future: push onto a free list??? + } + + return qm; + } + + return rcv_msg( ctx, old_msg ); // nothing queued, wait for one +} + +/* + This implements a receive with a timeout via epoll. Mostly this is for + wrappers as native C applications can use epoll directly and will not have + to depend on this. +*/ +extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { + struct epoll_stuff* eps; // convience pointer + uta_ctx_t* ctx; + rmr_mbuf_t* qm; // message that was queued on the ring + int nready; + rmr_mbuf_t* msg; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + if( old_msg != NULL ) { + old_msg->state = RMR_ERR_BADARG; + } + errno = EINVAL; + return old_msg; + } + + qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued + if( qm != NULL ) { + if( old_msg ) { + rmr_free_msg( old_msg ); // future: push onto a free list??? + } + + return qm; + } + + if( (eps = ctx->eps) == NULL ) { // set up epoll on first call + eps = malloc( sizeof *eps ); + + if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) { + fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno ); + free( eps ); + return NULL; + } + + eps->nng_fd = rmr_get_rcvfd( ctx ); + eps->epe.events = EPOLLIN; + eps->epe.data.fd = eps->nng_fd; + + if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) { + fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); + free( eps ); + return NULL; + } + + ctx->eps = eps; + } + + if( old_msg ) { + msg = old_msg; + } else { + msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check + } + + if( ms_to < 0 ) { + ms_to = 0; + } + + nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to ); // block until something or timedout + if( nready <= 0 ) { // we only wait on ours, so we assume ready means it's ours + msg->state = RMR_ERR_TIMEOUT; + } else { + return rcv_msg( ctx, msg ); // receive it and return it + } + + return msg; // return empty message with state set +} + +/* + This blocks until the message with the 'expect' ID is received. Messages which are received + before the expected message are queued onto the message ring. The function will return + a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the + expected message is received. If the queued message ring fills a nil pointer is returned + and errno is set to ENOBUFS. + + Generally this will be invoked only by the call() function as it waits for a response, but + it is exposed to the user application as three is no reason not to. +*/ +extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) { + uta_ctx_t* ctx; + int queued = 0; // number we pushed into the ring + int exp_len = 0; // length of expected ID + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + if( msg != NULL ) { + msg->state = RMR_ERR_BADARG; + } + errno = EINVAL; + return msg; + } + + errno = 0; + + if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive + return rmr_rcv_msg( ctx, msg ); + } + + exp_len = strlen( expect ); + if( exp_len > RMR_MAX_XID ) { + exp_len = RMR_MAX_XID; + } + if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect ); + + while( queued < allow2queue ) { + msg = rcv_msg( ctx, msg ); // hard wait for next + if( msg->state == RMR_OK ) { + if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it + if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued ); + return msg; + } + + if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full + if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" ); + errno = ENOBUFS; + return NULL; + } + + if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype ); + queued++; + msg = NULL; + } + } + + if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect ); + errno = ETIMEDOUT; + return NULL; +} + +// CAUTION: these are not supported as they must be set differently (between create and open) in NNG. +// until those details are worked out, these generate a warning. +/* + Set send timeout. The value time is assumed to be microseconds. The timeout is the + rough maximum amount of time that RMr will block on a send attempt when the underlying + mechnism indicates eagain or etimeedout. All other error conditions are reported + without this delay. Setting a timeout of 0 causes no retries to be attempted in + RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning, + but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) + after every 10K send attempts until the time value is reached. Retries are abandoned + if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT. + + The default, if this function is not used, is 1; meaning that RMr will retry, but will + not enter a sleep. In all cases the caller should check the status in the message returned + after a send call. + + Returns -1 if the context was invalid; RMR_OK otherwise. +*/ +extern int rmr_set_stimeout( void* vctx, int time ) { + uta_ctx_t* ctx; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return -1; + } + + if( time < 0 ) { + time = 0; + } + + ctx->send_retries = time; + return RMR_OK; +} + +/* + Set receive timeout -- not supported in nng implementation +*/ +extern int rmr_set_rtimeout( void* vctx, int time ) { + fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" ); + return 0; +} + + +/* + This is the actual init workhorse. The user visible function meerly ensures that the + calling programme does NOT set any internal flags that are supported, and then + invokes this. Internal functions (the route table collector) which need additional + open ports without starting additional route table collectors, will invoke this + directly with the proper flag. +*/ +static void* init( char* uproto_port, int max_msg_size, int flags ) { + static int announced = 0; + uta_ctx_t* ctx = NULL; + char bind_info[NNG_MAXADDRLEN]; // bind info + char* proto = "tcp"; // pointer into the proto/port string user supplied + char* port; + char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined) + char* proto_port; + char wbuf[1024]; // work buffer + char* tok; // pointer at token in a buffer + int state; + + if( ! announced ) { + fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", + RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + announced = 1; + } + + errno = 0; + if( uproto_port == NULL ) { + proto_port = strdup( DEF_COMM_PORT ); + } else { + proto_port = strdup( uproto_port ); // so we can modify it + } + + if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) { + errno = ENOMEM; + return NULL; + } + memset( ctx, 0, sizeof( uta_ctx_t ) ); + + ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning + ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response + + ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh + if( max_msg_size > 0 ) { + ctx->max_plen = max_msg_size; + } + + // we're using a listener to get rtg updates, so we do NOT need this. + //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors + + if( nng_pull0_open( &ctx->nn_sock ) != 0 ) { // and assign the mode + fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno ); + free_ctx( ctx ); + return NULL; + } + + if( (port = strchr( proto_port, ':' )) != NULL ) { + if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly + port++; + } else { + *(port++) = 0; // term proto string and point at port string + proto = proto_port; // user supplied proto so point at it rather than default + } + } else { + port = proto_port; // assume something like "1234" was passed + } + + if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { + fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); + return NULL; + } + if( (tok = strchr( wbuf, '.' )) != NULL ) { + *tok = 0; // we don't keep domain portion + } + ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID ); + if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) { // our registered name is host:port + fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port ); + return NULL; + } + + ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons + + + + if( (interface = getenv( ENV_BIND_IF )) == NULL ) { + interface = "0.0.0.0"; + } + // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started + // rather than using this generic listen() call. + snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port ); + if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) { + fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) ); + nng_close( ctx->nn_sock ); + free_ctx( ctx ); + return NULL; + } + + if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc + if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread + fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) ); + } + } + + free( proto_port ); + return (void *) ctx; +} + +/* + Initialise the message routing environment. Flags are one of the UTAFL_ + constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol + (tcp) to be used, then :port is all that is needed. + + At the moment it seems that TCP really is the only viable protocol, but + we'll allow flexibility. + + The return value is a void pointer which must be passed to most uta functions. On + error, a nil pointer is returned and errno should be set. + + Flags: + No user flags supported (needed) at the moment, but this provides for extension + without drastically changing anything. The user should invoke with RMRFL_NONE to + avoid any misbehavour as there are internal flags which are suported +*/ +extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) { + return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off +} + +/* + This sets the default trace length which will be added to any message buffers + allocated. It can be set at any time, and if rmr_set_trace() is given a + trace len that is different than the default allcoated in a message, the message + will be resized. + + Returns 0 on failure and 1 on success. If failure, then errno will be set. +*/ +extern int rmr_init_trace( void* vctx, int tr_len ) { + uta_ctx_t* ctx; + + errno = 0; + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return 0; + } + + ctx->trace_data_len = tr_len; + return 1; +} + +/* + Return true if routing table is initialised etc. and app can send/receive. +*/ +extern int rmr_ready( void* vctx ) { + uta_ctx_t *ctx; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return FALSE; + } + + if( ctx->rtable != NULL ) { + return TRUE; + } + + return FALSE; +} + +/* + Returns a file descriptor which can be used with epoll() to signal a receive + pending. The file descriptor should NOT be read from directly, nor closed, as NNG + does not support this. +*/ +extern int rmr_get_rcvfd( void* vctx ) { + uta_ctx_t* ctx; + int fd; + int state; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return -1; + } + + if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) { + fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) ); + return -1; + } + + return fd; +} + + +/* + Clean up things. + + There isn't an nng_flush() per se, but we can pause, generate + a context switch, which should allow the last sent buffer to + flow. There isn't exactly an nng_term/close either, so there + isn't much we can do. +*/ +extern void rmr_close( void* vctx ) { + uta_ctx_t *ctx; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return; + } + + ctx->shutdown = 1; + nng_close( ctx->nn_sock ); +} + + +