set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "0" )
-set( patch_level "21" )
+set( patch_level "22" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_lib "lib" )
rmr_mbuf_t* clone_m; // cloned message for an nth send
uint64_t key; // lookup key is now subid and mtype
int max_rt = 1000;
+ int altk_ok = 0; // ok to retry with alt key when true
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
group = 0; // always start with group 0
key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry
+ if( msg->sub_id != UNSET_SUBID ) { // if sub id set, allow retry with just mtype if no endpoint when sub-id used
+ altk_ok = 1;
+ }
+
while( send_again ) {
max_rt = 1000;
nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups
- if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n",
- msg->mtype, send_again, group, nn_sock, msg->len );
- group++;
+ if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
+ msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
if( nn_sock < 0 ) {
+ if( altk_ok ) { // ok to retry with alternate key
+ key = build_rt_key( UNSET_SUBID, msg->mtype ); // build key with just mtype and retry
+ send_again = 1;
+ altk_ok = 0;
+ continue;
+ }
+
msg->state = RMR_ERR_NOENDPT;
errno = ENXIO; // must ensure it's not eagain
return msg; // caller can resend (maybe) or free
}
+ group++;
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
more than one group of endpoints defined, then the message will be sent
in round robin fashion to one endpoint in each group.
+ An endpoint will be looked up in the route table using the message type and
+ the subscription id. If the subscription id is "UNSET_SUBID", then only the
+ message type is used. If the initial lookup, with a subid, fails, then a
+ second lookup using just the mtype is tried.
+
CAUTION: this is a non-blocking send. If the message cannot be sent, then
it will return with an error and errno set to eagain. If the send is
a limited fanout, then the returned status is the status of the last
rmr_mbuf_t* clone_m; // cloned message for an nth send
int sock_ok; // got a valid socket from round robin select
uint64_t key; // mtype or sub-id/mtype sym table key
+ int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
group = 0; // always start with group 0
key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
+ if( msg->sub_id != UNSET_SUBID ) {
+ altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+ }
while( send_again ) {
sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
- if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
- msg->mtype, send_again, group, msg->len, sock_ok );
- group++;
+ if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+ msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
if( ! sock_ok ) {
+ if( altk_ok ) { // we can try with the alternate (no sub-id) key
+ altk_ok = 0;
+ key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again
+ send_again = 1; // ensure we don't exit the while
+ continue;
+ }
+
msg->state = RMR_ERR_NOENDPT;
errno = ENXIO; // must ensure it's not eagain
return msg; // caller can resend (maybe) or free
}
+ group++;
+
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
export RMR_RTG_SVC=8990
if (( $nano_sender ))
then
- ./sender_nano $(( nmsg * nrcvrs )) $delay 1
+ ./sender_nano $(( nmsg * nrcvrs )) $delay $max_mtype
else
- ./sender $(( nmsg * nrcvrs )) $delay 1
+ ./sender $(( nmsg * nrcvrs )) $delay $max_mtype
fi
echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch
}
rte |0 | $endpoints |0
rte |1 | $endpoints |10
mse |2 | 20 | $endpoints # new style mtype/subid entry
- rte |3 | $endpoints |0
- rte |4 | $endpoints |0
- rte |5 | $endpoints |0
- rte |6 | $endpoints |0
- rte |7 | $endpoints |0
- rte |8 | $endpoints |0
- rte |9 | $endpoints |0
- rte |10 | $endpoints |0
- rte |11 | $endpoints |0
+ rte |3 | $endpoints | -1
+ rte |4 | $endpoints | -1
+ rte |5 | $endpoints | -1
+ rte |6 | $endpoints | -1
+ rte |7 | $endpoints | -1
+ rte |8 | $endpoints | -1
+ rte |9 | $endpoints | -1
+ rte |10 | $endpoints | -1
+ rte |11 | $endpoints | -1
newrt |end
endKat
wait=1
rebuild=0
verbose=0
+max_mtype=1 # causes all msgs to go with type 1; use -M to set up, but likely harder to validate
nrcvrs=3 # this is sane, but -r allows it to be set up
while [[ $1 == -* ]]
case $1 in
-B) rebuild=1;;
-d) delay=$2; shift;;
+ -m) max_mtype=$2; shift;;
-N) nano_sender=1
nano_receiver=1
;;
shift
done
-
if (( verbose ))
then
echo "2" >.verbose
errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno " );
- snprintf( msg->xaction, 17, "%015d", 16 ); // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
msg->state = 0;
msg = rmr_call( NULL, msg );
errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context " );
+ snprintf( msg->xaction, 17, "%015d", 16 ); // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
msg->mtype = 0;
- msg = rmr_call( rmc, msg ); // this call should return a message as we can anticipate a dummy message in
+ msg->sub_id = -1;
+ msg = rmr_call( rmc, msg ); // dummy nng/nano function will sequentually add xactions and should match or '16'
errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed " );
if( msg ) {
errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return " );
errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return " );
}
- snprintf( wbuf, 17, "%015d", 14 ); // if we call receive we should find this in the first 15 tries
- for( i = 0; i < 16; i++ ) {
+ snprintf( wbuf, 17, "%015d", 14 ); // while waiting, the queued messages should have #14, so issue a few receives looking for it
+ for( i = 0; i < 16; i++ ) { // it should be in the first 15
msg = rmr_rcv_msg( rmc, msg );
if( msg ) {
if( strcmp( wbuf, msg->xaction ) == 0 ) { // found the queued message
if( ! msg ) {
msg = rmr_alloc_msg( rmc, 2048 ); // something buggered above; get a new one
}
- msg = rmr_call( rmc, msg ); // make a call that we never expect a response on
- errors += fail_if_nil( msg, "rmr_call returned a non-nil message on call expected not to receive a response " );
- if( msg ) {
- errors += fail_if_equal( msg->state, RMR_OK, "rmr_call did not properly set state on queued message receive " );
- errors += fail_if( errno == 0, "rmr_call did not properly set errno on queued message receivesuccessful " );
- }
-
- msg = rmr_call( rmc, msg ); // this should "timeout" because the message xaction id won't ever appear again
- errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to fail " );
+ msg->mtype = 0;
+ msg->sub_id = -1;
+ msg = rmr_call( rmc, msg ); // make a call that we never expect a response on (nil pointer back)
+ errors += fail_not_nil( msg, "rmr_call returned a nil message on call expected not to receive a response " );
errors += fail_if( errno == 0, "rmr_call did not set errno on failure " );
rmr_free_msg( NULL ); // drive for coverage; nothing to check