X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fnng%2Fsrc%2Frmr_nng.c;h=e77c9f3b464906fa0c26034d66b19b68438d2700;hb=a012cf63dfdad3656c995cb06c316fd208c63b98;hp=a3fcd8977b7c7628c42b9669f0b08c3c20b6b9c8;hpb=d710957ed5d73bf2da2ceea3f5a1a3c509275c30;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nng/src/rmr_nng.c b/src/nng/src/rmr_nng.c index a3fcd89..e77c9f3 100644 --- a/src/nng/src/rmr_nng.c +++ b/src/nng/src/rmr_nng.c @@ -1,14 +1,14 @@ // : vi ts=4 sw=4 noet : /* ================================================================================== - Copyright (c) 2019 Nokia + Copyright (c) 2019 Nokia Copyright (c) 2018-2019 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -20,17 +20,17 @@ /* Mnemonic: rmr_nng.c - Abstract: This is the compile point for the nng version of the rmr + Abstract: This is the compile point for the nng version of the rmr library (formarly known as uta, so internal function names are likely still uta_*) - With the exception of the symtab portion of the library, - RMr is built with a single compile so as to "hide" the + With the exception of the symtab portion of the library, + RMr is built with a single compile so as to "hide" the internal functions as statics. Because they interdepend on each other, and CMake has issues with generating two different wormhole objects from a single source, we just pull it all together with a centralised comple using - includes. + includes. Future: the API functions at this point can be separated into a common source module. @@ -92,7 +92,7 @@ static void free_ctx( uta_ctx_t* ctx ) { Returns the size of the payload (bytes) that the msg buffer references. Len in a message is the number of bytes which were received, or should be transmitted, however, it is possible that the mbuf was allocated - with a larger payload space than the payload length indicates; this + with a larger payload space than the payload length indicates; this function returns the absolute maximum space that the user has available in the payload. On error (bad msg buffer) -1 is returned and errno should indicate the rason. @@ -176,26 +176,31 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { } } } - + free( mbuf ); } /* send message with maximum timeout. - Accept a message and send it to an endpoint based on message type. + Accept a message and send it to an endpoint based on message type. If NNG reports that the send attempt timed out, or should be retried, RMr will retry for approximately max_to microseconds; rounded to the next higher value of 10. Allocates a new message buffer for the next send. If a message type has more than one group of endpoints defined, then the message will be sent - in round robin fashion to one endpoint in each group. + in round robin fashion to one endpoint in each group. + + An endpoint will be looked up in the route table using the message type and + the subscription id. If the subscription id is "UNSET_SUBID", then only the + message type is used. If the initial lookup, with a subid, fails, then a + second lookup using just the mtype is tried. CAUTION: this is a non-blocking send. If the message cannot be sent, then it will return with an error and errno set to eagain. If the send is a limited fanout, then the returned status is the status of the last send attempt. - + */ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { nng_socket nn_sock; // endpoint socket for send @@ -204,13 +209,15 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { int send_again; // true if the message must be sent again rmr_mbuf_t* clone_m; // cloned message for an nth send int sock_ok; // got a valid socket from round robin select + uint64_t key; // mtype or sub-id/mtype sym table key + int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue if( msg != NULL ) { msg->state = RMR_ERR_BADARG; errno = EINVAL; // must ensure it's not eagain - } + } return msg; } @@ -229,18 +236,30 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { send_again = 1; // force loop entry group = 0; // always start with group 0 + key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry + if( msg->sub_id != UNSET_SUBID ) { + altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry + } while( send_again ) { - sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups - if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n", - msg->mtype, send_again, group, msg->len, sock_ok ); - group++; + sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups + if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n", + msg->mtype, send_again, group, msg->len, sock_ok, altk_ok ); if( ! sock_ok ) { + if( altk_ok ) { // we can try with the alternate (no sub-id) key + altk_ok = 0; + key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again + send_again = 1; // ensure we don't exit the while + continue; + } + msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain return msg; // caller can resend (maybe) or free } + group++; + if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); @@ -249,7 +268,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { /* if( msg ) { // error do we need to count successes/errors, how to report some success, esp if last fails? - } + } */ msg = clone_m; // clone will be the next to send @@ -262,7 +281,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { } /* - Send with default max timeout as is set in the context. + Send with default max timeout as is set in the context. See rmr_mtosend_msg() for more details on the parameters. See rmr_stimeout() for info on setting the default timeout. */ @@ -271,9 +290,9 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { } /* - Return to sender allows a message to be sent back to the endpoint where it originated. + Return to sender allows a message to be sent back to the endpoint where it originated. The source information in the message is used to select the socket on which to write - the message rather than using the message type and round-robin selection. This + the message rather than using the message type and round-robin selection. This should return a message buffer with the state of the send operation set. On success (state is RMR_OK, the caller may use the buffer for another receive operation), and on error it can be passed back to this function to retry the send if desired. On error, @@ -306,7 +325,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { errno = EINVAL; // if msg is null, this is their clue if( msg != NULL ) { msg->state = RMR_ERR_BADARG; - } + } return msg; } @@ -345,7 +364,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK to ensure that no error was encountered. If the state is UTA_BADARG, then the message - may be resent (likely the context pointer was nil). If the message is sent, but no + may be resent (likely the context pointer was nil). If the message is sent, but no response is received, a nil message is returned with errno set to indicate the likley issue: ETIMEDOUT -- too many messages were queued before reciving the expected response @@ -364,7 +383,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast if( msg != NULL ) { msg->state = RMR_ERR_BADARG; - } + } return msg; } @@ -401,7 +420,7 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { if( old_msg != NULL ) { old_msg->state = RMR_ERR_BADARG; - } + } errno = EINVAL; return old_msg; } @@ -420,8 +439,8 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { } /* - This implements a receive with a timeout via epoll. Mostly this is for - wrappers as native C applications can use epoll directly and will not have + This implements a receive with a timeout via epoll. Mostly this is for + wrappers as native C applications can use epoll directly and will not have to depend on this. */ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { @@ -434,7 +453,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { if( old_msg != NULL ) { old_msg->state = RMR_ERR_BADARG; - } + } errno = EINVAL; return old_msg; } @@ -449,23 +468,23 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { } if( (eps = ctx->eps) == NULL ) { // set up epoll on first call - eps = malloc( sizeof *eps ); + eps = malloc( sizeof *eps ); - if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) { - fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno ); + if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) { + fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno ); free( eps ); return NULL; - } + } eps->nng_fd = rmr_get_rcvfd( ctx ); eps->epe.events = EPOLLIN; - eps->epe.data.fd = eps->nng_fd; + eps->epe.data.fd = eps->nng_fd; - if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) { - fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); + if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) { + fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); free( eps ); return NULL; - } + } ctx->eps = eps; } @@ -492,23 +511,23 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { /* This blocks until the message with the 'expect' ID is received. Messages which are received - before the expected message are queued onto the message ring. The function will return + before the expected message are queued onto the message ring. The function will return a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the expected message is received. If the queued message ring fills a nil pointer is returned and errno is set to ENOBUFS. - Generally this will be invoked only by the call() function as it waits for a response, but + Generally this will be invoked only by the call() function as it waits for a response, but it is exposed to the user application as three is no reason not to. */ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) { uta_ctx_t* ctx; int queued = 0; // number we pushed into the ring int exp_len = 0; // length of expected ID - + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { if( msg != NULL ) { msg->state = RMR_ERR_BADARG; - } + } errno = EINVAL; return msg; } @@ -553,12 +572,12 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, // CAUTION: these are not supported as they must be set differently (between create and open) in NNG. // until those details are worked out, these generate a warning. /* - Set send timeout. The value time is assumed to be microseconds. The timeout is the + Set send timeout. The value time is assumed to be microseconds. The timeout is the rough maximum amount of time that RMr will block on a send attempt when the underlying mechnism indicates eagain or etimeedout. All other error conditions are reported without this delay. Setting a timeout of 0 causes no retries to be attempted in RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning, - but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) + but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) after every 10K send attempts until the time value is reached. Retries are abandoned if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT. @@ -594,9 +613,9 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { /* This is the actual init workhorse. The user visible function meerly ensures that the - calling programme does NOT set any internal flags that are supported, and then + calling programme does NOT set any internal flags that are supported, and then invokes this. Internal functions (the route table collector) which need additional - open ports without starting additional route table collectors, will invoke this + open ports without starting additional route table collectors, will invoke this directly with the proper flag. */ static void* init( char* uproto_port, int max_msg_size, int flags ) { @@ -607,12 +626,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { char* port; char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined) char* proto_port; - char wbuf[1024]; // work buffer + char wbuf[1024]; // work buffer char* tok; // pointer at token in a buffer int state; if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", + fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -711,7 +730,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { Flags: No user flags supported (needed) at the moment, but this provides for extension - without drastically changing anything. The user should invoke with RMRFL_NONE to + without drastically changing anything. The user should invoke with RMRFL_NONE to avoid any misbehavour as there are internal flags which are suported */ extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) { @@ -720,7 +739,7 @@ extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) { /* This sets the default trace length which will be added to any message buffers - allocated. It can be set at any time, and if rmr_set_trace() is given a + allocated. It can be set at any time, and if rmr_set_trace() is given a trace len that is different than the default allcoated in a message, the message will be resized. @@ -757,9 +776,9 @@ extern int rmr_ready( void* vctx ) { } /* - Returns a file descriptor which can be used with epoll() to signal a receive - pending. The file descriptor should NOT be read from directly, nor closed, as NNG - does not support this. + Returns a file descriptor which can be used with epoll() to signal a receive + pending. The file descriptor should NOT be read from directly, nor closed, as NNG + does not support this. */ extern int rmr_get_rcvfd( void* vctx ) { uta_ctx_t* ctx; @@ -783,7 +802,7 @@ extern int rmr_get_rcvfd( void* vctx ) { Clean up things. There isn't an nng_flush() per se, but we can pause, generate - a context switch, which should allow the last sent buffer to + a context switch, which should allow the last sent buffer to flow. There isn't exactly an nng_term/close either, so there isn't much we can do. */