When finding an endpoint based on mtype+sub_id a second lookup
using just mtype is now performed if the mtype/sub_id combination
did not map to an endpoint. This allows for "auto correction"
if the user application does not reset (clear) the sub_id when a
mtype should not be sent with one.
Fixed broken unit test for rmr_call()
Change-Id: I5056e3d0a477d5a4272031ad8b2046e3ffca7c8a
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "0" )
set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "0" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_lib "lib" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_lib "lib" )
rmr_mbuf_t* clone_m; // cloned message for an nth send
uint64_t key; // lookup key is now subid and mtype
int max_rt = 1000;
rmr_mbuf_t* clone_m; // cloned message for an nth send
uint64_t key; // lookup key is now subid and mtype
int max_rt = 1000;
+ int altk_ok = 0; // ok to retry with alt key when true
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
group = 0; // always start with group 0
key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry
group = 0; // always start with group 0
key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry
+ if( msg->sub_id != UNSET_SUBID ) { // if sub id set, allow retry with just mtype if no endpoint when sub-id used
+ altk_ok = 1;
+ }
+
while( send_again ) {
max_rt = 1000;
nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups
while( send_again ) {
max_rt = 1000;
nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups
- if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n",
- msg->mtype, send_again, group, nn_sock, msg->len );
- group++;
+ if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
+ msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
+ if( altk_ok ) { // ok to retry with alternate key
+ key = build_rt_key( UNSET_SUBID, msg->mtype ); // build key with just mtype and retry
+ send_again = 1;
+ altk_ok = 0;
+ continue;
+ }
+
msg->state = RMR_ERR_NOENDPT;
errno = ENXIO; // must ensure it's not eagain
return msg; // caller can resend (maybe) or free
}
msg->state = RMR_ERR_NOENDPT;
errno = ENXIO; // must ensure it's not eagain
return msg; // caller can resend (maybe) or free
}
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
more than one group of endpoints defined, then the message will be sent
in round robin fashion to one endpoint in each group.
more than one group of endpoints defined, then the message will be sent
in round robin fashion to one endpoint in each group.
+ An endpoint will be looked up in the route table using the message type and
+ the subscription id. If the subscription id is "UNSET_SUBID", then only the
+ message type is used. If the initial lookup, with a subid, fails, then a
+ second lookup using just the mtype is tried.
+
CAUTION: this is a non-blocking send. If the message cannot be sent, then
it will return with an error and errno set to eagain. If the send is
a limited fanout, then the returned status is the status of the last
CAUTION: this is a non-blocking send. If the message cannot be sent, then
it will return with an error and errno set to eagain. If the send is
a limited fanout, then the returned status is the status of the last
rmr_mbuf_t* clone_m; // cloned message for an nth send
int sock_ok; // got a valid socket from round robin select
uint64_t key; // mtype or sub-id/mtype sym table key
rmr_mbuf_t* clone_m; // cloned message for an nth send
int sock_ok; // got a valid socket from round robin select
uint64_t key; // mtype or sub-id/mtype sym table key
+ int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
group = 0; // always start with group 0
key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
group = 0; // always start with group 0
key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
+ if( msg->sub_id != UNSET_SUBID ) {
+ altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+ }
while( send_again ) {
sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
while( send_again ) {
sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
- if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
- msg->mtype, send_again, group, msg->len, sock_ok );
- group++;
+ if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+ msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
+ if( altk_ok ) { // we can try with the alternate (no sub-id) key
+ altk_ok = 0;
+ key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again
+ send_again = 1; // ensure we don't exit the while
+ continue;
+ }
+
msg->state = RMR_ERR_NOENDPT;
errno = ENXIO; // must ensure it's not eagain
return msg; // caller can resend (maybe) or free
}
msg->state = RMR_ERR_NOENDPT;
errno = ENXIO; // must ensure it's not eagain
return msg; // caller can resend (maybe) or free
}
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
export RMR_RTG_SVC=8990
if (( $nano_sender ))
then
export RMR_RTG_SVC=8990
if (( $nano_sender ))
then
- ./sender_nano $(( nmsg * nrcvrs )) $delay 1
+ ./sender_nano $(( nmsg * nrcvrs )) $delay $max_mtype
- ./sender $(( nmsg * nrcvrs )) $delay 1
+ ./sender $(( nmsg * nrcvrs )) $delay $max_mtype
fi
echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch
}
fi
echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch
}
rte |0 | $endpoints |0
rte |1 | $endpoints |10
mse |2 | 20 | $endpoints # new style mtype/subid entry
rte |0 | $endpoints |0
rte |1 | $endpoints |10
mse |2 | 20 | $endpoints # new style mtype/subid entry
- rte |3 | $endpoints |0
- rte |4 | $endpoints |0
- rte |5 | $endpoints |0
- rte |6 | $endpoints |0
- rte |7 | $endpoints |0
- rte |8 | $endpoints |0
- rte |9 | $endpoints |0
- rte |10 | $endpoints |0
- rte |11 | $endpoints |0
+ rte |3 | $endpoints | -1
+ rte |4 | $endpoints | -1
+ rte |5 | $endpoints | -1
+ rte |6 | $endpoints | -1
+ rte |7 | $endpoints | -1
+ rte |8 | $endpoints | -1
+ rte |9 | $endpoints | -1
+ rte |10 | $endpoints | -1
+ rte |11 | $endpoints | -1
wait=1
rebuild=0
verbose=0
wait=1
rebuild=0
verbose=0
+max_mtype=1 # causes all msgs to go with type 1; use -M to set up, but likely harder to validate
nrcvrs=3 # this is sane, but -r allows it to be set up
while [[ $1 == -* ]]
nrcvrs=3 # this is sane, but -r allows it to be set up
while [[ $1 == -* ]]
case $1 in
-B) rebuild=1;;
-d) delay=$2; shift;;
case $1 in
-B) rebuild=1;;
-d) delay=$2; shift;;
+ -m) max_mtype=$2; shift;;
-N) nano_sender=1
nano_receiver=1
;;
-N) nano_sender=1
nano_receiver=1
;;
if (( verbose ))
then
echo "2" >.verbose
if (( verbose ))
then
echo "2" >.verbose
errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno " );
errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno " );
- snprintf( msg->xaction, 17, "%015d", 16 ); // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
msg->state = 0;
msg = rmr_call( NULL, msg );
errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context " );
msg->state = 0;
msg = rmr_call( NULL, msg );
errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context " );
+ snprintf( msg->xaction, 17, "%015d", 16 ); // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
- msg = rmr_call( rmc, msg ); // this call should return a message as we can anticipate a dummy message in
+ msg->sub_id = -1;
+ msg = rmr_call( rmc, msg ); // dummy nng/nano function will sequentually add xactions and should match or '16'
errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed " );
if( msg ) {
errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return " );
errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return " );
}
errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed " );
if( msg ) {
errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return " );
errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return " );
}
- snprintf( wbuf, 17, "%015d", 14 ); // if we call receive we should find this in the first 15 tries
- for( i = 0; i < 16; i++ ) {
+ snprintf( wbuf, 17, "%015d", 14 ); // while waiting, the queued messages should have #14, so issue a few receives looking for it
+ for( i = 0; i < 16; i++ ) { // it should be in the first 15
msg = rmr_rcv_msg( rmc, msg );
if( msg ) {
if( strcmp( wbuf, msg->xaction ) == 0 ) { // found the queued message
msg = rmr_rcv_msg( rmc, msg );
if( msg ) {
if( strcmp( wbuf, msg->xaction ) == 0 ) { // found the queued message
if( ! msg ) {
msg = rmr_alloc_msg( rmc, 2048 ); // something buggered above; get a new one
}
if( ! msg ) {
msg = rmr_alloc_msg( rmc, 2048 ); // something buggered above; get a new one
}
- msg = rmr_call( rmc, msg ); // make a call that we never expect a response on
- errors += fail_if_nil( msg, "rmr_call returned a non-nil message on call expected not to receive a response " );
- if( msg ) {
- errors += fail_if_equal( msg->state, RMR_OK, "rmr_call did not properly set state on queued message receive " );
- errors += fail_if( errno == 0, "rmr_call did not properly set errno on queued message receivesuccessful " );
- }
-
- msg = rmr_call( rmc, msg ); // this should "timeout" because the message xaction id won't ever appear again
- errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to fail " );
+ msg->mtype = 0;
+ msg->sub_id = -1;
+ msg = rmr_call( rmc, msg ); // make a call that we never expect a response on (nil pointer back)
+ errors += fail_not_nil( msg, "rmr_call returned a nil message on call expected not to receive a response " );
errors += fail_if( errno == 0, "rmr_call did not set errno on failure " );
rmr_free_msg( NULL ); // drive for coverage; nothing to check
errors += fail_if( errno == 0, "rmr_call did not set errno on failure " );
rmr_free_msg( NULL ); // drive for coverage; nothing to check