X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fnng%2Fsrc%2Frmr_nng.c;fp=src%2Fnng%2Fsrc%2Frmr_nng.c;h=6240d528415a7f9c094a7d5fe2f6b05873087576;hb=907fbf43104b1670b7374bf1a4b22096977774bf;hp=ba7c852f99461466602e003e830dd74baebca7e2;hpb=6d3e334fe1980c1a3581f3d8fd26d7ec7e8d60f7;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nng/src/rmr_nng.c b/src/nng/src/rmr_nng.c index ba7c852..6240d52 100644 --- a/src/nng/src/rmr_nng.c +++ b/src/nng/src/rmr_nng.c @@ -191,6 +191,11 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { more than one group of endpoints defined, then the message will be sent in round robin fashion to one endpoint in each group. + An endpoint will be looked up in the route table using the message type and + the subscription id. If the subscription id is "UNSET_SUBID", then only the + message type is used. If the initial lookup, with a subid, fails, then a + second lookup using just the mtype is tried. + CAUTION: this is a non-blocking send. If the message cannot be sent, then it will return with an error and errno set to eagain. If the send is a limited fanout, then the returned status is the status of the last @@ -205,6 +210,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { rmr_mbuf_t* clone_m; // cloned message for an nth send int sock_ok; // got a valid socket from round robin select uint64_t key; // mtype or sub-id/mtype sym table key + int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -231,18 +237,29 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { group = 0; // always start with group 0 key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry + if( msg->sub_id != UNSET_SUBID ) { + altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry + } while( send_again ) { sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups - if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n", - msg->mtype, send_again, group, msg->len, sock_ok ); - group++; + if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n", + msg->mtype, send_again, group, msg->len, sock_ok, altk_ok ); if( ! sock_ok ) { + if( altk_ok ) { // we can try with the alternate (no sub-id) key + altk_ok = 0; + key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again + send_again = 1; // ensure we don't exit the while + continue; + } + msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain return msg; // caller can resend (maybe) or free } + group++; + if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );