X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fsr_si_static.c;h=4f8672888206d5b0c580fb904fbebcbc63c64429;hb=8c6756d9d6f94beca0bc382f97383ca5e79d16c7;hp=119b88ec4aedce3296b030e2556cd1e8ca4524c3;hpb=fcea3951d44de0cc55d33c5e114487abe79d3406;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/sr_si_static.c b/src/rmr/si/src/sr_si_static.c index 119b88e..4f86728 100644 --- a/src/rmr/si/src/sr_si_static.c +++ b/src/rmr/si/src/sr_si_static.c @@ -1,8 +1,8 @@ // vim: ts=4 sw=4 noet : /* ================================================================================== - Copyright (c) 2019-2020 Nokia - Copyright (c) 2018-2020 AT&T Intellectual Property. + Copyright (c) 2019-2021 Nokia + Copyright (c) 2018-2021 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -281,27 +281,14 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) { msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN; v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version - - if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order - ver = 1; - v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message - } else { - ver = ntohl( v1hdr->rmr_ver ); - } + ver = ntohl( v1hdr->rmr_ver ); switch( ver ) { - case 1: - msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger) - msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits) - msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above) + // version 1 is deprecated case 1: + // version 2 is deprecated case 2: - msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area - msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message - msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order - msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this - msg->state = RMR_OK; - hlen = sizeof( uta_v1mhdr_t ); - break; + case 3: + // fall-through default: // current version always lands here hdr = (uta_mhdr_t *) msg->header; @@ -351,12 +338,11 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version switch( ntohl( v1hdr->rmr_ver ) ) { - case 1: - hdr = nm->header; - memcpy( hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header - nm->payload = (void *) v1hdr + sizeof( *v1hdr ); - break; + // version 1 deprecated case 1: + // version 2 deprecated + case 3: + // fall-through default: // current message always caught here hdr = nm->header; memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data @@ -423,11 +409,10 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version switch( ntohl( v1hdr->rmr_ver ) ) { - case 1: - v1hdr = nm->header; - memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header - nm->payload = (void *) v1hdr + sizeof( *v1hdr ); - break; + // version 1 not supported + // version 2 not supported + case 3: + // fall-through default: // current message version always caught here hdr = nm->header; @@ -554,7 +539,7 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; // point at the new header and copy from old SET_HDR_LEN( nm->header ); - if( copy ) { // if we need to copy the old payload too + if( copy != 0 ) { // if we need to copy the old payload too memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) ); if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize ); } else { // just need to copy header @@ -581,19 +566,6 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, return nm; } -/* - For SI95 based transport all receives are driven through the threaded - ring and thus this function should NOT be called. If it is we will panic - and abort straight away. -*/ -static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { - -fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort! rcv_msg called and it shouldn't be\n" ); -exit( 1 ); - - return NULL; -} - /* This does the hard work of actually sending the message to the given socket. On success, a new message struct is returned. On error, the original msg is returned with the state @@ -673,11 +645,12 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r rmr_free_msg( msg ); // not wanting a meessage back, trash this one return NULL; } - } else { // send failed -- return original message - if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us + } else { // send failed or would block -- return original message + if( state == SI_ERR_BLOCKED || errno == EAGAIN ) { errno = EAGAIN; - msg->state = RMR_ERR_RETRY; // errno will have nano reason + msg->state = RMR_ERR_RETRY; } else { + rmr_vlog( RMR_VL_WARN, "send failed: mt=%d errno=%d %s\n", msg->mtype, errno, strerror( errno ) ); msg->state = RMR_ERR_SENDFAILED; } @@ -724,6 +697,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { int sock_ok; // got a valid socket from round robin select char* d1; int ok_sends = 0; // track number of ok sends + route_table_t* rt; // active route table if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -748,7 +722,9 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { max_to = ctx->send_retries; // convert to retries } - if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key + rt = get_rt( ctx ); // get active route table and up ref count + if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key + release_rt( ctx, rt ); rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id ); msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain @@ -762,7 +738,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { if( rte->nrrgroups > 0 ) { // this is a round robin entry if groups are listed sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups } else { - sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep ); + sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep ); send_again = 0; } @@ -775,6 +751,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available if( clone_m == NULL ) { + release_rt( ctx, rt ); msg->state = RMR_ERR_SENDFAILED; errno = ENOMEM; msg->tp_state = errno; @@ -803,21 +780,8 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { } } - if( ep != NULL && msg != NULL ) { - switch( msg->state ) { - case RMR_OK: - ep->scounts[EPSC_GOOD]++; - break; - - case RMR_ERR_RETRY: - ep->scounts[EPSC_TRANS]++; - break; - - default: - ep->scounts[EPSC_FAIL]++; - uta_ep_failed( ep ); // sending to ep failed; set up to reconnect - break; - } + if( msg != NULL ) { + incr_ep_counts( msg->state, ep ); } } else { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id ); @@ -826,6 +790,8 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { } } + release_rt( ctx, rt ); // we can safely dec the ref counter now + if( msg ) { // call functions don't get a buffer back, so a nil check is required msg->flags &= ~MFL_NOALLOC; // must return with this flag off if( ok_sends ) { // multiple rr-groups and one was successful; report ok