From: E. Scott Daniels Date: Mon, 13 May 2019 17:28:39 +0000 (+0000) Subject: fix(send): Add second key lookup if sub-id is set X-Git-Tag: 1.0.31~18 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=907fbf43104b1670b7374bf1a4b22096977774bf;p=ric-plt%2Flib%2Frmr.git fix(send): Add second key lookup if sub-id is set When finding an endpoint based on mtype+sub_id a second lookup using just mtype is now performed if the mtype/sub_id combination did not map to an endpoint. This allows for "auto correction" if the user application does not reset (clear) the sub_id when a mtype should not be sent with one. Fixed broken unit test for rmr_call() Change-Id: I5056e3d0a477d5a4272031ad8b2046e3ffca7c8a Signed-off-by: E. Scott Daniels --- diff --git a/CMakeLists.txt b/CMakeLists.txt index 21ea46a..51639d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,7 +23,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "0" ) -set( patch_level "21" ) +set( patch_level "22" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_lib "lib" ) diff --git a/src/nanomsg/src/rmr.c b/src/nanomsg/src/rmr.c index 392feb7..e61664b 100644 --- a/src/nanomsg/src/rmr.c +++ b/src/nanomsg/src/rmr.c @@ -244,6 +244,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { rmr_mbuf_t* clone_m; // cloned message for an nth send uint64_t key; // lookup key is now subid and mtype int max_rt = 1000; + int altk_ok = 0; // ok to retry with alt key when true if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -266,18 +267,29 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { group = 0; // always start with group 0 key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry + if( msg->sub_id != UNSET_SUBID ) { // if sub id set, allow retry with just mtype if no endpoint when sub-id used + altk_ok = 1; + } + while( send_again ) { max_rt = 1000; nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups - if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n", - msg->mtype, send_again, group, nn_sock, msg->len ); - group++; + if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n", + msg->mtype, send_again, group, nn_sock, msg->len, altk_ok ); if( nn_sock < 0 ) { + if( altk_ok ) { // ok to retry with alternate key + key = build_rt_key( UNSET_SUBID, msg->mtype ); // build key with just mtype and retry + send_again = 1; + altk_ok = 0; + continue; + } + msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain return msg; // caller can resend (maybe) or free } + group++; if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available diff --git a/src/nng/src/rmr_nng.c b/src/nng/src/rmr_nng.c index ba7c852..6240d52 100644 --- a/src/nng/src/rmr_nng.c +++ b/src/nng/src/rmr_nng.c @@ -191,6 +191,11 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { more than one group of endpoints defined, then the message will be sent in round robin fashion to one endpoint in each group. + An endpoint will be looked up in the route table using the message type and + the subscription id. If the subscription id is "UNSET_SUBID", then only the + message type is used. If the initial lookup, with a subid, fails, then a + second lookup using just the mtype is tried. + CAUTION: this is a non-blocking send. If the message cannot be sent, then it will return with an error and errno set to eagain. If the send is a limited fanout, then the returned status is the status of the last @@ -205,6 +210,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { rmr_mbuf_t* clone_m; // cloned message for an nth send int sock_ok; // got a valid socket from round robin select uint64_t key; // mtype or sub-id/mtype sym table key + int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -231,18 +237,29 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { group = 0; // always start with group 0 key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry + if( msg->sub_id != UNSET_SUBID ) { + altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry + } while( send_again ) { sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups - if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n", - msg->mtype, send_again, group, msg->len, sock_ok ); - group++; + if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n", + msg->mtype, send_again, group, msg->len, sock_ok, altk_ok ); if( ! sock_ok ) { + if( altk_ok ) { // we can try with the alternate (no sub-id) key + altk_ok = 0; + key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again + send_again = 1; // ensure we don't exit the while + continue; + } + msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain return msg; // caller can resend (maybe) or free } + group++; + if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); diff --git a/test/app_test/run_rr_test.ksh b/test/app_test/run_rr_test.ksh index 3727cc2..4a3fd1c 100644 --- a/test/app_test/run_rr_test.ksh +++ b/test/app_test/run_rr_test.ksh @@ -46,9 +46,9 @@ function run_sender { export RMR_RTG_SVC=8990 if (( $nano_sender )) then - ./sender_nano $(( nmsg * nrcvrs )) $delay 1 + ./sender_nano $(( nmsg * nrcvrs )) $delay $max_mtype else - ./sender $(( nmsg * nrcvrs )) $delay 1 + ./sender $(( nmsg * nrcvrs )) $delay $max_mtype fi echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch } @@ -85,15 +85,15 @@ function set_rt { rte |0 | $endpoints |0 rte |1 | $endpoints |10 mse |2 | 20 | $endpoints # new style mtype/subid entry - rte |3 | $endpoints |0 - rte |4 | $endpoints |0 - rte |5 | $endpoints |0 - rte |6 | $endpoints |0 - rte |7 | $endpoints |0 - rte |8 | $endpoints |0 - rte |9 | $endpoints |0 - rte |10 | $endpoints |0 - rte |11 | $endpoints |0 + rte |3 | $endpoints | -1 + rte |4 | $endpoints | -1 + rte |5 | $endpoints | -1 + rte |6 | $endpoints | -1 + rte |7 | $endpoints | -1 + rte |8 | $endpoints | -1 + rte |9 | $endpoints | -1 + rte |10 | $endpoints | -1 + rte |11 | $endpoints | -1 newrt |end endKat @@ -114,6 +114,7 @@ nano_receiver=0 wait=1 rebuild=0 verbose=0 +max_mtype=1 # causes all msgs to go with type 1; use -M to set up, but likely harder to validate nrcvrs=3 # this is sane, but -r allows it to be set up while [[ $1 == -* ]] @@ -121,6 +122,7 @@ do case $1 in -B) rebuild=1;; -d) delay=$2; shift;; + -m) max_mtype=$2; shift;; -N) nano_sender=1 nano_receiver=1 ;; @@ -138,7 +140,6 @@ do shift done - if (( verbose )) then echo "2" >.verbose diff --git a/test/rmr_nng_api_static_test.c b/test/rmr_nng_api_static_test.c index e0dc4d8..2db1a20 100644 --- a/test/rmr_nng_api_static_test.c +++ b/test/rmr_nng_api_static_test.c @@ -222,22 +222,23 @@ static int rmr_api_test( ) { errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno " ); - snprintf( msg->xaction, 17, "%015d", 16 ); // dummy transaction id (emulation generates, this should arrive after a few calls to recv) msg->state = 0; msg = rmr_call( NULL, msg ); errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context " ); + snprintf( msg->xaction, 17, "%015d", 16 ); // dummy transaction id (emulation generates, this should arrive after a few calls to recv) msg->mtype = 0; - msg = rmr_call( rmc, msg ); // this call should return a message as we can anticipate a dummy message in + msg->sub_id = -1; + msg = rmr_call( rmc, msg ); // dummy nng/nano function will sequentually add xactions and should match or '16' errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed " ); if( msg ) { errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return " ); errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return " ); } - snprintf( wbuf, 17, "%015d", 14 ); // if we call receive we should find this in the first 15 tries - for( i = 0; i < 16; i++ ) { + snprintf( wbuf, 17, "%015d", 14 ); // while waiting, the queued messages should have #14, so issue a few receives looking for it + for( i = 0; i < 16; i++ ) { // it should be in the first 15 msg = rmr_rcv_msg( rmc, msg ); if( msg ) { if( strcmp( wbuf, msg->xaction ) == 0 ) { // found the queued message @@ -254,15 +255,10 @@ static int rmr_api_test( ) { if( ! msg ) { msg = rmr_alloc_msg( rmc, 2048 ); // something buggered above; get a new one } - msg = rmr_call( rmc, msg ); // make a call that we never expect a response on - errors += fail_if_nil( msg, "rmr_call returned a non-nil message on call expected not to receive a response " ); - if( msg ) { - errors += fail_if_equal( msg->state, RMR_OK, "rmr_call did not properly set state on queued message receive " ); - errors += fail_if( errno == 0, "rmr_call did not properly set errno on queued message receivesuccessful " ); - } - - msg = rmr_call( rmc, msg ); // this should "timeout" because the message xaction id won't ever appear again - errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to fail " ); + msg->mtype = 0; + msg->sub_id = -1; + msg = rmr_call( rmc, msg ); // make a call that we never expect a response on (nil pointer back) + errors += fail_not_nil( msg, "rmr_call returned a nil message on call expected not to receive a response " ); errors += fail_if( errno == 0, "rmr_call did not set errno on failure " ); rmr_free_msg( NULL ); // drive for coverage; nothing to check