X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=inline;f=src%2Fnanomsg%2Fsrc%2Frmr.c;h=b57df3ae5b93b31b9b44dcb0687b66ba47ec84b8;hb=a012cf63dfdad3656c995cb06c316fd208c63b98;hp=dcf7e67ffde4a11668dfbee500a4045ab3e78cd4;hpb=8790bf0c4f4f08fd05853afa67e211112b344a42;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nanomsg/src/rmr.c b/src/nanomsg/src/rmr.c index dcf7e67..b57df3a 100644 --- a/src/nanomsg/src/rmr.c +++ b/src/nanomsg/src/rmr.c @@ -242,6 +242,9 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { int group; // selected group to get socket for int send_again; // true if the message must be sent again rmr_mbuf_t* clone_m; // cloned message for an nth send + uint64_t key; // lookup key is now subid and mtype + int max_rt = 1000; + int altk_ok = 0; // ok to retry with alt key when true if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -263,32 +266,48 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { send_again = 1; // force loop entry group = 0; // always start with group 0 + key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry + if( msg->sub_id != UNSET_SUBID ) { // if sub id set, allow retry with just mtype if no endpoint when sub-id used + altk_ok = 1; + } + while( send_again ) { - nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again ); // round robin select endpoint; again set if mult groups - if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n", - msg->mtype, send_again, group, nn_sock, msg->len ); - group++; + max_rt = 1000; + nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups + if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n", + msg->mtype, send_again, group, nn_sock, msg->len, altk_ok ); if( nn_sock < 0 ) { + if( altk_ok ) { // ok to retry with alternate key + key = build_rt_key( UNSET_SUBID, msg->mtype ); // build key with just mtype and retry + send_again = 1; + altk_ok = 0; + continue; + } + msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain return msg; // caller can resend (maybe) or free } + group++; if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available - if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); + if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len ); msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer msg = send_msg( ctx, msg, nn_sock ); // do the hard work, msg should be nil on success - /* - if( msg ) { - // error do we need to count successes/errors, how to report some success, esp if last fails? + while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) { + msg = send_msg( ctx, msg, nn_sock ); + max_rt--; } - */ msg = clone_m; // clone will be the next to send } else { msg = send_msg( ctx, msg, nn_sock ); // send the last, and allocate a new buffer; drops the clone if it was + while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) { + msg = send_msg( ctx, msg, nn_sock ); + max_rt--; + } } } @@ -627,6 +646,26 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { return (void *) ctx; } +/* + This sets the default trace length which will be added to any message buffers + allocated. It can be set at any time, and if rmr_set_trace() is given a + trace len that is different than the default allcoated in a message, the message + will be resized. + + Returns 0 on failure and 1 on success. If failure, then errno will be set. +*/ +extern int rmr_init_trace( void* vctx, int tr_len ) { + uta_ctx_t* ctx; + + errno = 0; + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return 0; + } + + ctx->trace_data_len = tr_len; + return 1; +} /* Publicly facing initialisation function. Wrapper for the init() funcion above