X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnng%2Fsrc%2Fsr_nng_static.c;h=3435cb30915ae1fb4d0904e9a69ebd3e186377fb;hb=c06c626ddf4f45a28a3db3f1afbb7ac87160045f;hp=c04690d4a7f8deec6ff5548b963f04dde2d4bff5;hpb=6511ac74cdc367a94bffeb3743624775acd52c5b;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/sr_nng_static.c b/src/rmr/nng/src/sr_nng_static.c index c04690d..3435cb3 100644 --- a/src/rmr/nng/src/sr_nng_static.c +++ b/src/rmr/nng/src/sr_nng_static.c @@ -1,4 +1,4 @@ - // : vi ts=4 sw=4 noet 2 +// vim: ts=4 sw=4 noet : /* ================================================================================== Copyright (c) 2019 Nokia @@ -42,6 +42,10 @@ into the message, and sets errno to something that might be useful. If we don't have a specific RMr state, then we return the default (e.g. receive failed). + + The addition of the connection shut error code to the switch requires + that the NNG version at commit e618abf8f3db2a94269a (or after) be + used for compiling RMR. */ static inline int xlate_nng_state( int state, int def_state ) { @@ -81,6 +85,7 @@ static inline int xlate_nng_state( int state, int def_state ) { state = def_state; break; + case NNG_ECONNSHUT: // new error with nng commit e618abf8f3db2a94269a79c8901a51148d48fcc2 (Sept 2019) case NNG_ECLOSED: errno = EBADFD; // file des not in a good state for the operation state = def_state; @@ -525,7 +530,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock uta_mhdr_t* hdr; int nng_flags = NNG_FLAG_NONBLOCK; // if we need to set any nng flags (zc buffer) add it to this int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU - int tr_len; // trace len in sending message so we alloc new message with same trace size + int tr_len; // trace len in sending message so we alloc new message with same trace sizes // future: ensure that application did not overrun the XID buffer; last byte must be 0 @@ -540,6 +545,11 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); } + if( retries == 0 ) { + spin_retries = 100; + retries++; + } + errno = 0; msg->state = RMR_OK; if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer @@ -629,6 +639,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock */ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { + endpoint_t* ep; // end point that we're attempting to send to rtable_ent_t* rte; // the route table entry which matches the message key nng_socket nn_sock; // endpoint socket for send uta_ctx_t* ctx; @@ -651,7 +662,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't if( msg->header == NULL ) { - fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" ); + fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" ); msg->state = RMR_ERR_NOHDR; errno = EBADMSG; // must ensure it's not eagain msg->tp_state = errno; @@ -675,7 +686,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { send_again = 1; // force loop entry group = 0; // always start with group 0 while( send_again ) { - sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock ); // select endpt from rr group and set again if more groups + sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n", msg->flags, msg->mtype, send_again, group, msg->len, sock_ok ); @@ -714,6 +725,22 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { } } } + + if( ep != NULL && msg != NULL ) { + switch( msg->state ) { + case RMR_OK: + ep->scounts[EPSC_GOOD]++; + break; + + case RMR_ERR_RETRY: + ep->scounts[EPSC_TRANS]++; + break; + + default: + ep->scounts[EPSC_FAIL]++; + break; + } + } } else { if( ctx->flags & CTXFL_WARN ) { fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );