X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnng%2Fsrc%2Frmr_nng.c;h=d9c4c061c3759cea494b3d3260bcb2df642461c1;hb=7d1b7b0de82414ad25ebd323b2913de72ab448c1;hp=bc3fe107cba3a476cbc35e70a04ea10ffe6e2f3b;hpb=dfe7b622b128e7bfb4a5e1f7e0afdb84e6001d14;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/rmr_nng.c b/src/rmr/nng/src/rmr_nng.c index bc3fe10..d9c4c06 100644 --- a/src/rmr/nng/src/rmr_nng.c +++ b/src/rmr/nng/src/rmr_nng.c @@ -1,4 +1,4 @@ -// : vi ts=4 sw=4 noet : +// vim: ts=4 sw=4 noet : /* ================================================================================== Copyright (c) 2019 Nokia @@ -250,6 +250,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { char* hold_src; // we need the original source if send fails char* hold_ip; // also must hold original ip int sock_ok = 0; // true if we found a valid endpoint socket + endpoint_t* ep; // end point to track counts if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -270,10 +271,10 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // src is always used first for rts + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // src is always used first for rts if( ! sock_ok ) { if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock ); + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep ); } if( ! sock_ok ) { msg->state = RMR_ERR_NOENDPT; @@ -287,6 +288,21 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { + if( ep != NULL ) { + switch( msg->state ) { + case RMR_OK: + ep->scounts[EPSC_GOOD]++; + break; + + case RMR_ERR_RETRY: + ep->scounts[EPSC_TRANS]++; + break; + + default: + ep->scounts[EPSC_FAIL]++; + break; + } + } strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source @@ -438,7 +454,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) { fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno ); free( eps ); - return NULL; + ctx->eps = NULL; + if( old_msg != NULL ) { + old_msg->state = RMR_ERR_INITFAILED; + old_msg->tp_state = errno; + } + return old_msg; } eps->nng_fd = rmr_get_rcvfd( ctx ); @@ -448,7 +469,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) { fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); free( eps ); - return NULL; + ctx->eps = NULL; + if( old_msg != NULL ) { + old_msg->state = RMR_ERR_INITFAILED; + old_msg->tp_state = errno; + } + return old_msg; } ctx->eps = eps; @@ -599,8 +625,8 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { int state; if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", - RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -883,11 +909,11 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { chute = &ctx->chutes[0]; // chute 0 used only for its semaphore - if( max_wait > 0 ) { + if( max_wait >= 0 ) { clock_gettime( CLOCK_REALTIME, &ts ); if( max_wait > 999 ) { - seconds = (max_wait - 999)/1000; + seconds = max_wait / 1000; max_wait -= seconds * 1000; ts.tv_sec += seconds; } @@ -1000,11 +1026,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend - if( max_wait > 0 ) { + if( max_wait >= 0 ) { clock_gettime( CLOCK_REALTIME, &ts ); if( max_wait > 999 ) { - seconds = (max_wait - 999)/1000; + seconds = max_wait / 1000; max_wait -= seconds * 1000; ts.tv_sec += seconds; }