X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fnanomsg%2Fsrc%2Frmr.c;fp=src%2Fnanomsg%2Fsrc%2Frmr.c;h=e67402f5c8ebe46d1f42cfa4eda8120069144088;hb=a41c6f5f26b3a44009f4aff3df3f83b9a79ace01;hp=dcf7e67ffde4a11668dfbee500a4045ab3e78cd4;hpb=8790bf0c4f4f08fd05853afa67e211112b344a42;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nanomsg/src/rmr.c b/src/nanomsg/src/rmr.c index dcf7e67..e67402f 100644 --- a/src/nanomsg/src/rmr.c +++ b/src/nanomsg/src/rmr.c @@ -242,6 +242,8 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { int group; // selected group to get socket for int send_again; // true if the message must be sent again rmr_mbuf_t* clone_m; // cloned message for an nth send + uint64_t key; // lookup key is now subid and mtype + int max_rt = 1000; if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -263,8 +265,10 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { send_again = 1; // force loop entry group = 0; // always start with group 0 + key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry while( send_again ) { - nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again ); // round robin select endpoint; again set if mult groups + max_rt = 1000; + nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n", msg->mtype, send_again, group, nn_sock, msg->len ); group++; @@ -277,18 +281,21 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available - if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); + if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len ); msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer msg = send_msg( ctx, msg, nn_sock ); // do the hard work, msg should be nil on success - /* - if( msg ) { - // error do we need to count successes/errors, how to report some success, esp if last fails? + while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) { + msg = send_msg( ctx, msg, nn_sock ); + max_rt--; } - */ msg = clone_m; // clone will be the next to send } else { msg = send_msg( ctx, msg, nn_sock ); // send the last, and allocate a new buffer; drops the clone if it was + while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) { + msg = send_msg( ctx, msg, nn_sock ); + max_rt--; + } } }