X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fnng%2Fsrc%2Frmr_nng.c;h=e77c9f3b464906fa0c26034d66b19b68438d2700;hb=refs%2Fchanges%2F64%2F164%2F1;hp=af8ce6fdc11e4087e50fa0677739af2426894442;hpb=68c5cf1104e89f5c43786a3e48f5c6a1e757f59f;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nng/src/rmr_nng.c b/src/nng/src/rmr_nng.c index af8ce6f..e77c9f3 100644 --- a/src/nng/src/rmr_nng.c +++ b/src/nng/src/rmr_nng.c @@ -1,14 +1,14 @@ // : vi ts=4 sw=4 noet : /* ================================================================================== - Copyright (c) 2019 Nokia + Copyright (c) 2019 Nokia Copyright (c) 2018-2019 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -20,17 +20,17 @@ /* Mnemonic: rmr_nng.c - Abstract: This is the compile point for the nng version of the rmr + Abstract: This is the compile point for the nng version of the rmr library (formarly known as uta, so internal function names are likely still uta_*) - With the exception of the symtab portion of the library, - RMr is built with a single compile so as to "hide" the + With the exception of the symtab portion of the library, + RMr is built with a single compile so as to "hide" the internal functions as statics. Because they interdepend on each other, and CMake has issues with generating two different wormhole objects from a single source, we just pull it all together with a centralised comple using - includes. + includes. Future: the API functions at this point can be separated into a common source module. @@ -92,7 +92,7 @@ static void free_ctx( uta_ctx_t* ctx ) { Returns the size of the payload (bytes) that the msg buffer references. Len in a message is the number of bytes which were received, or should be transmitted, however, it is possible that the mbuf was allocated - with a larger payload space than the payload length indicates; this + with a larger payload space than the payload length indicates; this function returns the absolute maximum space that the user has available in the payload. On error (bad msg buffer) -1 is returned and errno should indicate the rason. @@ -119,10 +119,47 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) { return NULL; } - m = alloc_zcmsg( ctx, NULL, size, 0 ); + m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data return m; } + +/* + Allocates a send message as a zerocopy message allowing the underlying message protocol + to send the buffer without copy. In addition, a trace data field of tr_size will be + added and the supplied data coppied to the buffer before returning the message to + the caller. +*/ +extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) { + uta_ctx_t* ctx; + rmr_mbuf_t* m; + int state; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return NULL; + } + + m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size + if( m != NULL ) { + state = rmr_set_trace( m, data, tr_size ); // roll their data in + if( state != tr_size ) { + m->state = RMR_ERR_INITFAILED; + } + } + + return m; +} + +/* + This provides an external path to the realloc static function as it's called by an + outward facing mbuf api function. Used to reallocate a message with a different + trace data size. +*/ +extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { + return realloc_msg( msg, new_tr_size ); +} + + /* Return the message to the available pool, or free it outright. */ @@ -139,26 +176,31 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { } } } - + free( mbuf ); } /* send message with maximum timeout. - Accept a message and send it to an endpoint based on message type. + Accept a message and send it to an endpoint based on message type. If NNG reports that the send attempt timed out, or should be retried, RMr will retry for approximately max_to microseconds; rounded to the next higher value of 10. Allocates a new message buffer for the next send. If a message type has more than one group of endpoints defined, then the message will be sent - in round robin fashion to one endpoint in each group. + in round robin fashion to one endpoint in each group. + + An endpoint will be looked up in the route table using the message type and + the subscription id. If the subscription id is "UNSET_SUBID", then only the + message type is used. If the initial lookup, with a subid, fails, then a + second lookup using just the mtype is tried. CAUTION: this is a non-blocking send. If the message cannot be sent, then it will return with an error and errno set to eagain. If the send is a limited fanout, then the returned status is the status of the last send attempt. - + */ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { nng_socket nn_sock; // endpoint socket for send @@ -167,13 +209,15 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { int send_again; // true if the message must be sent again rmr_mbuf_t* clone_m; // cloned message for an nth send int sock_ok; // got a valid socket from round robin select + uint64_t key; // mtype or sub-id/mtype sym table key + int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue if( msg != NULL ) { msg->state = RMR_ERR_BADARG; errno = EINVAL; // must ensure it's not eagain - } + } return msg; } @@ -192,18 +236,30 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { send_again = 1; // force loop entry group = 0; // always start with group 0 + key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry + if( msg->sub_id != UNSET_SUBID ) { + altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry + } while( send_again ) { - sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups - if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n", - msg->mtype, send_again, group, msg->len, sock_ok ); - group++; + sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups + if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n", + msg->mtype, send_again, group, msg->len, sock_ok, altk_ok ); if( ! sock_ok ) { + if( altk_ok ) { // we can try with the alternate (no sub-id) key + altk_ok = 0; + key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again + send_again = 1; // ensure we don't exit the while + continue; + } + msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain return msg; // caller can resend (maybe) or free } + group++; + if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); @@ -212,7 +268,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { /* if( msg ) { // error do we need to count successes/errors, how to report some success, esp if last fails? - } + } */ msg = clone_m; // clone will be the next to send @@ -225,7 +281,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { } /* - Send with default max timeout as is set in the context. + Send with default max timeout as is set in the context. See rmr_mtosend_msg() for more details on the parameters. See rmr_stimeout() for info on setting the default timeout. */ @@ -234,9 +290,9 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { } /* - Return to sender allows a message to be sent back to the endpoint where it originated. + Return to sender allows a message to be sent back to the endpoint where it originated. The source information in the message is used to select the socket on which to write - the message rather than using the message type and round-robin selection. This + the message rather than using the message type and round-robin selection. This should return a message buffer with the state of the send operation set. On success (state is RMR_OK, the caller may use the buffer for another receive operation), and on error it can be passed back to this function to retry the send if desired. On error, @@ -269,7 +325,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { errno = EINVAL; // if msg is null, this is their clue if( msg != NULL ) { msg->state = RMR_ERR_BADARG; - } + } return msg; } @@ -308,7 +364,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK to ensure that no error was encountered. If the state is UTA_BADARG, then the message - may be resent (likely the context pointer was nil). If the message is sent, but no + may be resent (likely the context pointer was nil). If the message is sent, but no response is received, a nil message is returned with errno set to indicate the likley issue: ETIMEDOUT -- too many messages were queued before reciving the expected response @@ -327,7 +383,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast if( msg != NULL ) { msg->state = RMR_ERR_BADARG; - } + } return msg; } @@ -364,7 +420,7 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { if( old_msg != NULL ) { old_msg->state = RMR_ERR_BADARG; - } + } errno = EINVAL; return old_msg; } @@ -383,8 +439,8 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { } /* - This implements a receive with a timeout via epoll. Mostly this is for - wrappers as native C applications can use epoll directly and will not have + This implements a receive with a timeout via epoll. Mostly this is for + wrappers as native C applications can use epoll directly and will not have to depend on this. */ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { @@ -397,7 +453,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { if( old_msg != NULL ) { old_msg->state = RMR_ERR_BADARG; - } + } errno = EINVAL; return old_msg; } @@ -412,23 +468,23 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { } if( (eps = ctx->eps) == NULL ) { // set up epoll on first call - eps = malloc( sizeof *eps ); + eps = malloc( sizeof *eps ); - if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) { - fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno ); + if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) { + fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno ); free( eps ); return NULL; - } + } eps->nng_fd = rmr_get_rcvfd( ctx ); eps->epe.events = EPOLLIN; - eps->epe.data.fd = eps->nng_fd; + eps->epe.data.fd = eps->nng_fd; - if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) { - fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); + if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) { + fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); free( eps ); return NULL; - } + } ctx->eps = eps; } @@ -436,7 +492,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( old_msg ) { msg = old_msg; } else { - msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check + msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check } if( ms_to < 0 ) { @@ -455,23 +511,23 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { /* This blocks until the message with the 'expect' ID is received. Messages which are received - before the expected message are queued onto the message ring. The function will return + before the expected message are queued onto the message ring. The function will return a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the expected message is received. If the queued message ring fills a nil pointer is returned and errno is set to ENOBUFS. - Generally this will be invoked only by the call() function as it waits for a response, but + Generally this will be invoked only by the call() function as it waits for a response, but it is exposed to the user application as three is no reason not to. */ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) { uta_ctx_t* ctx; int queued = 0; // number we pushed into the ring int exp_len = 0; // length of expected ID - + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { if( msg != NULL ) { msg->state = RMR_ERR_BADARG; - } + } errno = EINVAL; return msg; } @@ -516,12 +572,12 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, // CAUTION: these are not supported as they must be set differently (between create and open) in NNG. // until those details are worked out, these generate a warning. /* - Set send timeout. The value time is assumed to be microseconds. The timeout is the + Set send timeout. The value time is assumed to be microseconds. The timeout is the rough maximum amount of time that RMr will block on a send attempt when the underlying mechnism indicates eagain or etimeedout. All other error conditions are reported without this delay. Setting a timeout of 0 causes no retries to be attempted in RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning, - but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) + but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) after every 10K send attempts until the time value is reached. Retries are abandoned if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT. @@ -557,9 +613,9 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { /* This is the actual init workhorse. The user visible function meerly ensures that the - calling programme does NOT set any internal flags that are supported, and then + calling programme does NOT set any internal flags that are supported, and then invokes this. Internal functions (the route table collector) which need additional - open ports without starting additional route table collectors, will invoke this + open ports without starting additional route table collectors, will invoke this directly with the proper flag. */ static void* init( char* uproto_port, int max_msg_size, int flags ) { @@ -570,12 +626,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { char* port; char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined) char* proto_port; - char wbuf[1024]; // work buffer + char wbuf[1024]; // work buffer char* tok; // pointer at token in a buffer int state; if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", + fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -674,13 +730,34 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { Flags: No user flags supported (needed) at the moment, but this provides for extension - without drastically changing anything. The user should invoke with RMRFL_NONE to + without drastically changing anything. The user should invoke with RMRFL_NONE to avoid any misbehavour as there are internal flags which are suported */ extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) { return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off } +/* + This sets the default trace length which will be added to any message buffers + allocated. It can be set at any time, and if rmr_set_trace() is given a + trace len that is different than the default allcoated in a message, the message + will be resized. + + Returns 0 on failure and 1 on success. If failure, then errno will be set. +*/ +extern int rmr_init_trace( void* vctx, int tr_len ) { + uta_ctx_t* ctx; + + errno = 0; + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return 0; + } + + ctx->trace_data_len = tr_len; + return 1; +} + /* Return true if routing table is initialised etc. and app can send/receive. */ @@ -699,9 +776,9 @@ extern int rmr_ready( void* vctx ) { } /* - Returns a file descriptor which can be used with epoll() to signal a receive - pending. The file descriptor should NOT be read from directly, nor closed, as NNG - does not support this. + Returns a file descriptor which can be used with epoll() to signal a receive + pending. The file descriptor should NOT be read from directly, nor closed, as NNG + does not support this. */ extern int rmr_get_rcvfd( void* vctx ) { uta_ctx_t* ctx; @@ -725,7 +802,7 @@ extern int rmr_get_rcvfd( void* vctx ) { Clean up things. There isn't an nng_flush() per se, but we can pause, generate - a context switch, which should allow the last sent buffer to + a context switch, which should allow the last sent buffer to flow. There isn't exactly an nng_term/close either, so there isn't much we can do. */