X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fsr_si_static.c;h=b56b6dc77820ac6b577c3f8fc9748e9b636f71fd;hb=5ec64c5253b3b7611ec69cc1487989fae45eca26;hp=798fe261c6d2a54a50e1ba6fddb782f5506ce77c;hpb=e15b1382dc618f6832957e67dc638b5e8f3f414d;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/sr_si_static.c b/src/rmr/si/src/sr_si_static.c index 798fe26..b56b6dc 100644 --- a/src/rmr/si/src/sr_si_static.c +++ b/src/rmr/si/src/sr_si_static.c @@ -126,6 +126,7 @@ static inline void insert_mlen( uint32_t len, char* buf ) { blen++; *blen = htonl( len ); // new systems want a converted integer + memset( &buf[TP_SZFIELD_LEN], 0, 4 ); // clear to prevent future conversion issues buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER; // marker to flag this is generated by a new message } @@ -208,8 +209,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s msg->state = state; // fill in caller's state (likely the state of the last operation) msg->flags = MFL_ZEROCOPY; // this is a zerocopy sendable message msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :( - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); - strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags ); @@ -351,7 +352,8 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version switch( ntohl( v1hdr->rmr_ver ) ) { case 1: - memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header + hdr = nm->header; + memcpy( hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header nm->payload = (void *) v1hdr + sizeof( *v1hdr ); break; @@ -422,6 +424,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version switch( ntohl( v1hdr->rmr_ver ) ) { case 1: + v1hdr = nm->header; memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header nm->payload = (void *) v1hdr + sizeof( *v1hdr ); break; @@ -552,11 +555,11 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, SET_HDR_LEN( nm->header ); if( copy ) { // if we need to copy the old payload too - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize ); memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize ); } else { // just need to copy header - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) ); memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) ); } ref_tpbuf( nm, mlen ); // set payload and other pointers in the message to the new tp buffer @@ -620,8 +623,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours - strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); } if( retries == 0 ) { @@ -721,6 +724,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { int sock_ok; // got a valid socket from round robin select char* d1; int ok_sends = 0; // track number of ok sends + route_table_t* rt; // active route table if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -745,7 +749,9 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { max_to = ctx->send_retries; // convert to retries } - if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key + rt = get_rt( ctx ); // get active route table and up ref count + if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key + release_rt( ctx, rt ); rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id ); msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain @@ -759,7 +765,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { if( rte->nrrgroups > 0 ) { // this is a round robin entry if groups are listed sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups } else { - sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep ); + sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep ); send_again = 0; } @@ -772,6 +778,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available if( clone_m == NULL ) { + release_rt( ctx, rt ); msg->state = RMR_ERR_SENDFAILED; errno = ENOMEM; msg->tp_state = errno; @@ -805,7 +812,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { case RMR_OK: ep->scounts[EPSC_GOOD]++; break; - + case RMR_ERR_RETRY: ep->scounts[EPSC_TRANS]++; break; @@ -823,6 +830,8 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { } } + release_rt( ctx, rt ); // we can safely dec the ref counter now + if( msg ) { // call functions don't get a buffer back, so a nil check is required msg->flags &= ~MFL_NOALLOC; // must return with this flag off if( ok_sends ) { // multiple rr-groups and one was successful; report ok