X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fnanomsg%2Fsrc%2Frmr.c;h=b57df3ae5b93b31b9b44dcb0687b66ba47ec84b8;hb=a012cf63dfdad3656c995cb06c316fd208c63b98;hp=29001176b0c650a8d8b54c3628ffbf2ac2aa7e0e;hpb=68c5cf1104e89f5c43786a3e48f5c6a1e757f59f;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nanomsg/src/rmr.c b/src/nanomsg/src/rmr.c index 2900117..b57df3a 100644 --- a/src/nanomsg/src/rmr.c +++ b/src/nanomsg/src/rmr.c @@ -1,14 +1,14 @@ // :vi sw=4 ts=4 noet: /* ================================================================================== - Copyright (c) 2019 Nokia + Copyright (c) 2019 Nokia Copyright (c) 2018-2019 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -24,9 +24,9 @@ the older nanomsg messaging transport mehhanism. To "hide" internal functions the choice was made to implement them - all as static functions. This means that we include nearly + all as static functions. This means that we include nearly all of our modules here as 90% of the library is not visible to - the outside world. + the outside world. Author: E. Scott Daniels Date: 28 November 2018 @@ -80,8 +80,8 @@ static void free_ctx( uta_ctx_t* ctx ) { // --------------- public functions -------------------------------------------------------------------------- /* - Set the receive timeout to time. If time >1000 we assume the time is milliseconds, - else we assume seconds. Setting -1 is always block. + Set the receive timeout to time (ms). A value of 0 is the same as a non-blocking + receive and -1 is block for ever. Returns the nn value (0 on success <0 on error). */ extern int rmr_set_rtimeout( void* vctx, int time ) { @@ -92,11 +92,11 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { return -1; } - if( time > 0 ) { - if( time < 1000 ) { - time = time * 1000; // assume seconds, nn wants ms - } - } + if( ctx->last_rto == time ) { + return 0; + } + + ctx->last_rto = time; return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) ); } @@ -108,7 +108,6 @@ extern int rmr_rcv_to( void* vctx, int time ) { return rmr_rcv_to( vctx, time ); } - /* Set the send timeout to time. If time >1000 we assume the time is milliseconds, else we assume seconds. Setting -1 is always block. @@ -123,10 +122,10 @@ extern int rmr_set_stimeout( void* vctx, int time ) { } if( time > 0 ) { - if( time < 1000 ) { + if( time < 1000 ) { time = time * 1000; // assume seconds, nn wants ms } - } + } return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) ); } @@ -142,7 +141,7 @@ extern int rmr_send_to( void* vctx, int time ) { Returns the size of the payload (bytes) that the msg buffer references. Len in a message is the number of bytes which were received, or should be transmitted, however, it is possible that the mbuf was allocated - with a larger payload space than the payload length indicates; this + with a larger payload space than the payload length indicates; this function returns the absolute maximum space that the user has available in the payload. On error (bad msg buffer) -1 is returned and errno should indicate the rason. @@ -169,10 +168,44 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) { return NULL; } - m = alloc_zcmsg( ctx, NULL, size, 0 ); + m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); return m; } +/* + Allocates a send message as a zerocopy message allowing the underlying message protocol + to send the buffer without copy. In addition, a trace data field of tr_size will be + added and the supplied data coppied to the buffer before returning the message to + the caller. +*/ +extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) { + uta_ctx_t* ctx; + rmr_mbuf_t* m; + int state; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return NULL; + } + + m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size + if( m != NULL ) { + state = rmr_set_trace( m, data, tr_size ); // roll their data in + if( state != tr_size ) { + m->state = RMR_ERR_INITFAILED; + } + } + + return m; +} + +/* + Need an external path to the realloc static function as it's called by an + outward facing mbuf api function. +*/ +extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { + return realloc_msg( msg, new_tr_size ); +} + /* Return the message to the available pool, or free it outright. */ @@ -188,15 +221,15 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { free( mbuf->header ); } } - + free( mbuf ); } /* - Accept a message and send it to an endpoint based on message type. + Accept a message and send it to an endpoint based on message type. Allocates a new message buffer for the next send. If a message type has more than one group of endpoints defined, then the message will be sent - in round robin fashion to one endpoint in each group. + in round robin fashion to one endpoint in each group. CAUTION: this is a non-blocking send. If the message cannot be sent, then it will return with an error and errno set to eagain. If the send is @@ -209,13 +242,16 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { int group; // selected group to get socket for int send_again; // true if the message must be sent again rmr_mbuf_t* clone_m; // cloned message for an nth send + uint64_t key; // lookup key is now subid and mtype + int max_rt = 1000; + int altk_ok = 0; // ok to retry with alt key when true if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue if( msg != NULL ) { msg->state = RMR_ERR_BADARG; errno = EINVAL; // must ensure it's not eagain - } + } return msg; } @@ -230,32 +266,48 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { send_again = 1; // force loop entry group = 0; // always start with group 0 + key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry + if( msg->sub_id != UNSET_SUBID ) { // if sub id set, allow retry with just mtype if no endpoint when sub-id used + altk_ok = 1; + } + while( send_again ) { - nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again ); // round robin select endpoint; again set if mult groups - if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n", - msg->mtype, send_again, group, nn_sock, msg->len ); - group++; + max_rt = 1000; + nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups + if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n", + msg->mtype, send_again, group, nn_sock, msg->len, altk_ok ); if( nn_sock < 0 ) { + if( altk_ok ) { // ok to retry with alternate key + key = build_rt_key( UNSET_SUBID, msg->mtype ); // build key with just mtype and retry + send_again = 1; + altk_ok = 0; + continue; + } + msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain return msg; // caller can resend (maybe) or free } + group++; if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available - if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); + if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len ); msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer msg = send_msg( ctx, msg, nn_sock ); // do the hard work, msg should be nil on success - /* - if( msg ) { - // error do we need to count successes/errors, how to report some success, esp if last fails? - } - */ + while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) { + msg = send_msg( ctx, msg, nn_sock ); + max_rt--; + } msg = clone_m; // clone will be the next to send } else { msg = send_msg( ctx, msg, nn_sock ); // send the last, and allocate a new buffer; drops the clone if it was + while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) { + msg = send_msg( ctx, msg, nn_sock ); + max_rt--; + } } } @@ -263,9 +315,9 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { } /* - Return to sender allows a message to be sent back to the endpoint where it originated. + Return to sender allows a message to be sent back to the endpoint where it originated. The source information in the message is used to select the socket on which to write - the message rather than using the message type and round-robin selection. This + the message rather than using the message type and round-robin selection. This should return a message buffer with the state of the send operation set. On success (state is RMR_OK, the caller may use the buffer for another receive operation), and on error it can be passed back to this function to retry the send if desired. On error, @@ -297,7 +349,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { errno = EINVAL; // if msg is null, this is their clue if( msg != NULL ) { msg->state = RMR_ERR_BADARG; - } + } return msg; } @@ -335,7 +387,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK to ensure that no error was encountered. If the state is UTA_BADARG, then the message - may be resent (likely the context pointer was nil). If the message is sent, but no + may be resent (likely the context pointer was nil). If the message is sent, but no response is received, a nil message is returned with errno set to indicate the likley issue: ETIMEDOUT -- too many messages were queued before reciving the expected response @@ -354,7 +406,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast if( msg != NULL ) { msg->state = RMR_ERR_BADARG; - } + } return msg; } @@ -391,7 +443,7 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { if( old_msg != NULL ) { old_msg->state = RMR_ERR_BADARG; - } + } errno = EINVAL; return old_msg; } @@ -410,34 +462,41 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { } /* - Receive with a timeout. This is a convenience function when sitting on top of - nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg(). + Receive with a timeout. This is a convenience function when sitting on top of + nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg(). */ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { - rmr_set_rtimeout( vctx, ms_to ); + uta_ctx_t* ctx; + + if( (ctx = (uta_ctx_t *) vctx) != NULL ) { + if( ctx->last_rto != ms_to ) { // avoid call overhead + rmr_set_rtimeout( vctx, ms_to ); + } + } + return rmr_rcv_msg( vctx, old_msg ); } /* This blocks until the message with the 'expect' ID is received. Messages which are received - before the expected message are queued onto the message ring. The function will return + before the expected message are queued onto the message ring. The function will return a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the expected message is received. If the queued message ring fills a nil pointer is returned and errno is set to ENOBUFS. - Generally this will be invoked only by the call() function as it waits for a response, but + Generally this will be invoked only by the call() function as it waits for a response, but it is exposed to the user application as three is no reason not to. */ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) { uta_ctx_t* ctx; int queued = 0; // number we pushed into the ring int exp_len = 0; // length of expected ID - + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { if( msg != NULL ) { msg->state = RMR_ERR_BADARG; - } + } errno = EINVAL; return msg; } @@ -497,12 +556,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { char* proto = "tcp"; // pointer into the proto/port string user supplied char* port; char* proto_port; - char wbuf[1024]; // work buffer + char wbuf[1024]; // work buffer char* tok; // pointer at token in a buffer int state; char* interface = NULL; // interface to bind to pulled from RMR_BIND_IF if set - fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n", + fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n", QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); errno = 0; @@ -520,6 +579,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response + ctx->last_rto = -2; // last receive timeout that was set; invalid value to force first to set ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size if( max_msg_size > 0 ) { @@ -534,7 +594,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors - ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); // our 'listen' socket should allow multiple senders to connect + ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); // our 'listen' socket should allow multiple senders to connect if( ctx->nn_sock < 0 ) { fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno ); free_ctx( ctx ); @@ -569,7 +629,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { interface = "0.0.0.0"; } snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port ); - if( nn_bind( ctx->nn_sock, bind_info ) < 0) { // bind and automatically accept client sessions + if( nn_bind( ctx->nn_sock, bind_info ) < 0) { // bind and automatically accept client sessions fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) ); nn_close( ctx->nn_sock ); free_ctx( ctx ); @@ -586,10 +646,30 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { return (void *) ctx; } +/* + This sets the default trace length which will be added to any message buffers + allocated. It can be set at any time, and if rmr_set_trace() is given a + trace len that is different than the default allcoated in a message, the message + will be resized. + + Returns 0 on failure and 1 on success. If failure, then errno will be set. +*/ +extern int rmr_init_trace( void* vctx, int tr_len ) { + uta_ctx_t* ctx; + + errno = 0; + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return 0; + } + + ctx->trace_data_len = tr_len; + return 1; +} /* Publicly facing initialisation function. Wrapper for the init() funcion above - as it needs to ensure internal flags are masked off before calling the + as it needs to ensure internal flags are masked off before calling the real workhorse. */ extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) { @@ -614,7 +694,7 @@ extern int rmr_ready( void* vctx ) { } /* - Provides a non-fatal (compile) interface for the nng only function. + Provides a non-fatal (compile) interface for the nng only function. Not supported on top of nano, so this always returns -1. */ extern int rmr_get_rcvfd( void* vctx ) { @@ -631,6 +711,6 @@ extern void rmr_close( void* vctx ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return; } - + nn_close( ctx->nn_sock ); }