X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fnanomsg%2Fsrc%2Frmr.c;h=b57df3ae5b93b31b9b44dcb0687b66ba47ec84b8;hb=a012cf63dfdad3656c995cb06c316fd208c63b98;hp=e67402f5c8ebe46d1f42cfa4eda8120069144088;hpb=a41c6f5f26b3a44009f4aff3df3f83b9a79ace01;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nanomsg/src/rmr.c b/src/nanomsg/src/rmr.c index e67402f..b57df3a 100644 --- a/src/nanomsg/src/rmr.c +++ b/src/nanomsg/src/rmr.c @@ -244,6 +244,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { rmr_mbuf_t* clone_m; // cloned message for an nth send uint64_t key; // lookup key is now subid and mtype int max_rt = 1000; + int altk_ok = 0; // ok to retry with alt key when true if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -266,18 +267,29 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { group = 0; // always start with group 0 key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry + if( msg->sub_id != UNSET_SUBID ) { // if sub id set, allow retry with just mtype if no endpoint when sub-id used + altk_ok = 1; + } + while( send_again ) { max_rt = 1000; nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups - if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n", - msg->mtype, send_again, group, nn_sock, msg->len ); - group++; + if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n", + msg->mtype, send_again, group, nn_sock, msg->len, altk_ok ); if( nn_sock < 0 ) { + if( altk_ok ) { // ok to retry with alternate key + key = build_rt_key( UNSET_SUBID, msg->mtype ); // build key with just mtype and retry + send_again = 1; + altk_ok = 0; + continue; + } + msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain return msg; // caller can resend (maybe) or free } + group++; if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available @@ -634,6 +646,26 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { return (void *) ctx; } +/* + This sets the default trace length which will be added to any message buffers + allocated. It can be set at any time, and if rmr_set_trace() is given a + trace len that is different than the default allcoated in a message, the message + will be resized. + + Returns 0 on failure and 1 on success. If failure, then errno will be set. +*/ +extern int rmr_init_trace( void* vctx, int tr_len ) { + uta_ctx_t* ctx; + + errno = 0; + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return 0; + } + + ctx->trace_data_len = tr_len; + return 1; +} /* Publicly facing initialisation function. Wrapper for the init() funcion above