From a41c6f5f26b3a44009f4aff3df3f83b9a79ace01 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Tue, 23 Apr 2019 18:24:25 +0000 Subject: [PATCH] feat(routing): Support session based routing The session id field in a message buffer is now used directly for routing. Change-Id: I3634c97588b11172db964b2d06c96c317d8b8ae3 Signed-off-by: E. Scott Daniels Routing table entry changes to pick up subid Change-Id: If08dc21aae4acaab350ba75a8854ad2f24007b03 Signed-off-by: E. Scott Daniels Fix unit test for rmr_call It was not properly setting message type and now that RMr ensures that invalid message type is set by default on a newly created message this was causing unit test to fail. Change-Id: I50f08d1038ea7fca2a070cdd949657bfbc25f3fd Signed-off-by: E. Scott Daniels Add application level tests Added round robin and multi group application level test scripts. Change-Id: Ic6aebaf3bc1edb763decc7fd0aebb09df116f20c Signed-off-by: E. Scott Daniels NNG based sub-id support added Change-Id: I0d36b55bb90a315ba94c9476df88e2c7eac6c383 Signed-off-by: E. Scott Daniels Correct bug in app test script Change-Id: I5b4a9f32aa1bc2907f320b8ad4628e0948062904 Signed-off-by: E. Scott Daniels Nano sub-id changes and unit test updates Change-Id: Ia69f2fb33de3bbee2f33f9a4c5def779c872e52c Signed-off-by: E. Scott Daniels Change nil-sub_id key to high order key If there is no sub-id, then the key is based only on the message type, but to allow for a sub-id == 0 the key when there is no subscription id must be set to 0xffffffff00000000 + msg type. New version for deb is 1.0.19 Change-Id: I55f89d368466a0137fdea99410c76ba72e1923ab Signed-off-by: E. Scott Daniels --- CMakeLists.txt | 2 +- src/common/include/rmr.h | 2 + src/common/include/rmr_agnostic.h | 3 +- src/common/include/rmr_symtab.h | 8 +- src/common/src/rt_generic_static.c | 151 ++++++++++++++++++++++---- src/common/src/symtab.c | 38 +++---- src/nanomsg/include/rmr_private.h | 44 +------- src/nanomsg/src/rmr.c | 19 ++-- src/nanomsg/src/rtable_static.c | 26 ++--- src/nanomsg/src/sr_static.c | 5 + src/nng/include/rmr_nng_private.h | 2 +- src/nng/src/rmr_nng.c | 4 +- src/nng/src/rtable_nng_static.c | 10 +- src/nng/src/sr_nng_static.c | 4 + test/app_test/rebuild.ksh | 46 ++++++++ test/app_test/receiver.c | 34 +++++- test/app_test/rt.mask | 39 +++---- test/app_test/run_app_test.ksh | 27 +++-- test/app_test/run_multi_test.ksh | 208 ++++++++++++++++++++++++++++++++++++ test/app_test/run_rr_test.ksh | 211 +++++++++++++++++++++++++++++++++++++ test/app_test/sender.c | 59 ++++++++--- test/rmr_nng_api_static_test.c | 22 +++- test/rt_static_test.c | 10 ++ test/sr_nng_static_test.c | 20 +++- test/unit_test.ksh | 9 +- test/wormhole_static_test.c | 16 ++- 26 files changed, 844 insertions(+), 175 deletions(-) create mode 100644 test/app_test/rebuild.ksh create mode 100644 test/app_test/run_multi_test.ksh create mode 100644 test/app_test/run_rr_test.ksh diff --git a/CMakeLists.txt b/CMakeLists.txt index 2782147..365d370 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,7 +24,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "0" ) -set( patch_level "18" ) +set( patch_level "19" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_lib "lib" ) diff --git a/src/common/include/rmr.h b/src/common/include/rmr.h index 6bfe075..e6c1660 100644 --- a/src/common/include/rmr.h +++ b/src/common/include/rmr.h @@ -47,6 +47,8 @@ extern "C" { #define RMR_DEF_SIZE 0 // pass as size to have msg allocation use the default msg size +#define RMR_VOID_MSGTYPE (-1) // unset/invalid message type and sub id +#define RMR_VOID_SUBID (-1) #define RMR_OK 0 // state is good #define RMR_ERR_BADARG 1 // argument passd to function was unusable diff --git a/src/common/include/rmr_agnostic.h b/src/common/include/rmr_agnostic.h index a23cff7..06da060 100644 --- a/src/common/include/rmr_agnostic.h +++ b/src/common/include/rmr_agnostic.h @@ -230,6 +230,7 @@ static int ie_test( void* r, int i_factor, long inserts ); // ----- route table generic static things --------- +static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ); static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ); static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ); static char* uta_fib( char* fname ); @@ -237,7 +238,7 @@ static route_table_t* uta_rt_init( ); static route_table_t* uta_rt_clone( route_table_t* srt ); static void uta_rt_drop( route_table_t* rt ); static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ); -static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ); +static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ); static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ); static void read_static_rt( uta_ctx_t* ctx, int vlevel ); static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ); diff --git a/src/common/include/rmr_symtab.h b/src/common/include/rmr_symtab.h index 7d4d18c..74c5aa3 100644 --- a/src/common/include/rmr_symtab.h +++ b/src/common/include/rmr_symtab.h @@ -28,6 +28,8 @@ #ifndef _rmr_symtab_h #define _rmr_symtab_h +#include + /* --------- symtab ---------------- */ #define UT_FL_NOCOPY 0x00 /* use user pointer */ #define UT_FL_COPY 0x01 /* make a copy of the string data */ @@ -39,12 +41,12 @@ extern void rmr_sym_clear( void *s ); extern void rmr_sym_dump( void *s ); extern void *rmr_sym_alloc( int size ); extern void rmr_sym_del( void *s, const char *name, unsigned int class ); -extern void *rmr_sym_ndel( void *vtable, int key ); +extern void *rmr_sym_ndel( void *vtable, uint64_t key ); extern void rmr_sym_free( void *vtable ); extern void *rmr_sym_get( void *s, const char *name, unsigned int class ); extern int rmr_sym_put( void *s, const char *name, unsigned int class, void *val ); -extern int rmr_sym_map( void *s, unsigned int key, void *val ); -extern void *rmr_sym_pull( void *vtable, int key ); +extern int rmr_sym_map( void *s, uint64_t key, void *val ); +extern void *rmr_sym_pull( void *vtable, uint64_t key ); extern void rmr_sym_stats( void *s, int level ); extern void rmr_sym_foreach_class( void *vst, unsigned int class, void (* user_fun)( void*, void*, const char*, void*, void* ), void *user_data ); diff --git a/src/common/src/rt_generic_static.c b/src/common/src/rt_generic_static.c index 4dd9344..5bbacbd 100644 --- a/src/common/src/rt_generic_static.c +++ b/src/common/src/rt_generic_static.c @@ -44,6 +44,7 @@ #include #include #include +#include /* @@ -56,13 +57,43 @@ typedef struct thing_list { void** things; } thing_list_t; +// ------------------------------------------------------------------------------------------------ + + +/* + Little diddy to trim whitespace and trailing comments. Like shell, trailing comments + must be at the start of a word (i.e. must be immediatly preceeded by whitespace). +*/ +static char* clip( char* buf ) { + char* tok; + + while( *buf && isspace( *buf ) ) { // skip leading whitespace + buf++; + } + + if( (tok = strchr( buf, '#' )) != NULL ) { + if( tok == buf ) { + return buf; // just push back; leading comment sym handled there + } + + if( isspace( *(tok-1) ) ) { + *tok = 0; + } + } + + for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too + *(tok+1) = 0; + + return buf; +} + /* Given a message type create a route table entry and add to the hash keyed on the message type. Once in the hash, endpoints can be added with uta_add_ep. Size is the number of group slots to allocate in the entry. */ -static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) { +static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) { rtable_ent_t* rte; if( rt == NULL ) { @@ -87,12 +118,67 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups ); rte->nrrgroups = nrrgroups; - rmr_sym_map( rt->hash, mtype, rte ); // add to hash using numeric mtype as key + rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key - if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: mt=%d groups=%d\n", mtype, nrrgroups ); + if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lu groups=%d\n", key, nrrgroups ); return rte; } +/* + This accepts partially parsed information from a record sent by route manager or read from + a file such that: + ts_field is the msg-type,sender field + subid is the integer subscription id + rr_field is the endpoint information for round robening message over + + If all goes well, this will add an RTE to the table under construction. + + The ts_field is checked to see if we should ingest this record. We ingest if one of + these is true: + there is no sender info (a generic entry for all) + there is sender and our host:port matches one of the senders + the sender info is an IP address that matches one of our IP addresses +*/ +static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) { + rtable_ent_t* rte; // route table entry added + char* tok; + int ntoks; + uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype + char* tokens[128]; + char* gtokens[64]; + int i; + int ngtoks; // number of tokens in the group list + int grp; // index into group list + + ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments + rr_field = clip( rr_field ); + + if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) + (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list + has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses + + key = build_rt_key( subid, atoi( ts_field ) ); + + if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lu\n", ts_field, subid, key ); + + if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups + rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key + + for( grp = 0; grp < ngtoks; grp++ ) { + if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) { + for( i = 0; i < ntoks; i++ ) { + if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", ts_field ); + uta_add_ep( ctx->new_rtable, rte, tokens[i], grp ); + } + } + } + } + } else { + if( DEBUG || (vlevel > 2) ) + fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] ); + } +} + /* Parse a single record recevied from the route table generator, or read from a static route table file. Start records cause a new table to @@ -100,6 +186,11 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) entry records are added to the currenly 'in progress' table, and an end record causes the in progress table to be finalised and the currently active table is replaced. + + We expect one of several types: + newrt|{start|end} + rte|[,sender]|[;,...] + mse|[,sender]||[;,...] */ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { int i; @@ -129,6 +220,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { break; case 'n': // newrt|{start|end} + tokens[1] = clip( tokens[1] ); if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building if( ctx->new_rtable ) { uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced @@ -155,33 +247,29 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { } break; + case 'm': // assume mse entry + if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently + break; + } + + if( ntoks < 4 ) { + if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks ); + break; + } + + build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel ); + break; + case 'r': // assume rt entry if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently break; } - if( ((tok = strchr( tokens[1], ',' )) == NULL ) || // no sender names - (uta_has_str( tokens[1], ctx->my_name, ',', 127) >= 0) || // our name isn't in the list - has_myip( tokens[1], ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses - - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s\n", tokens[1] ); - - if( (ngtoks = uta_tokenise( tokens[2], gtokens, 64, ';' )) > 0 ) { // split last field by groups first - rte = uta_add_rte( ctx->new_rtable, atoi( tokens[1] ), ngtoks ); // get/create entry for message type - for( grp = 0; grp < ngtoks; grp++ ) { - if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) { - for( i = 0; i < ntoks; i++ ) { - if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", tokens[i] ); - uta_add_ep( ctx->new_rtable, rte, tokens[i], grp ); - } - } - } - } + if( ntoks > 3 ) { // assume new entry with subid last + build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel ); } else { - if( DEBUG || (vlevel > 2) ) - fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] ); + build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id } - break; default: @@ -463,4 +551,21 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) { } +/* + Given a session id and message type build a key that can be used to look up the rte in the route + table hash. Sub_id is expected to be -1 if there is no session id associated with the entry. +*/ +static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) { + uint64_t key; + + if( sub_id == UNSET_SUBID ) { + key = 0xffffffff00000000 | mtype; + } else { + key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff); + } + + return key; +} + + #endif diff --git a/src/common/src/symtab.c b/src/common/src/symtab.c index a1a792d..31f150f 100644 --- a/src/common/src/symtab.c +++ b/src/common/src/symtab.c @@ -28,6 +28,7 @@ Abstract: Symbol table -- slightly streamlined from it's original 2000 version Things changed for the Ric Msg implemention (Nov 2018): - no concept of copy/free of the user data (functions removed) - add ability to support an integer key (class 0) + Numeric key is an unsigned, 64bit key. - externally visible names given a rmr_ extension as it's being incorporated into the RIC msg routing library and will be available to user applications. @@ -46,6 +47,7 @@ Mod: 2016 23 Feb - converted Symtab refs so that caller need only a #include #include #include +#include #include "rmr_symtab.h" @@ -56,7 +58,7 @@ typedef struct Sym_ele struct Sym_ele *next; /* pointer at next element in list */ struct Sym_ele *prev; /* larger table, easier deletes */ const char *name; /* symbol name */ - unsigned int nkey; // the numeric key + uint64_t nkey; // the numeric key void *val; /* user data associated with name */ unsigned long mcount; /* modificaitons to value */ unsigned long rcount; /* references to symbol */ @@ -136,11 +138,11 @@ static inline int same( unsigned int c1, unsigned int c2, const char *s1, const much the same. */ static int putin( Sym_tab *table, const char *name, unsigned int class, void *val ) { - Sym_ele *eptr; /* pointer into hash table */ + Sym_ele *eptr; /* pointer into hash table */ Sym_ele **sym_tab; /* pointer into hash table */ - int hv; /* hash value */ - int rc = 0; /* assume it existed */ - unsigned int nkey = 0; // numeric key if class == 0 + int hv; /* hash value */ + int rc = 0; /* assume it existed */ + uint64_t nkey = 0; // numeric key if class == 0 sym_tab = table->symlist; @@ -148,7 +150,7 @@ static int putin( Sym_tab *table, const char *name, unsigned int class, void *va hv = sym_hash( name, table->size ); // hash it for( eptr=sym_tab[hv]; eptr && ! same( class, eptr->class, eptr->name, name); eptr=eptr->next ); } else { - nkey = *((int *) name); + nkey = *((uint64_t *) name); hv = nkey % table->size; // just hash the number for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next ); } @@ -237,7 +239,7 @@ extern void rmr_sym_dump( void *vtable ) if( eptr->val && eptr->class ) { fprintf( stderr, "key=%s val@=%p\n", eptr->name, eptr->val ); } else { - fprintf( stderr, "nkey=%d val@=%p\n", eptr->nkey, eptr->val ); + fprintf( stderr, "nkey=%lu val@=%p\n", (unsigned long) eptr->nkey, eptr->val ); } } } @@ -284,9 +286,9 @@ extern void rmr_sym_del( void *vtable, const char *name, unsigned int class ) { Sym_tab *table; Sym_ele **sym_tab; - Sym_ele *eptr; /* pointer into hash table */ - int hv; /* hash value */ - unsigned int nkey; // class 0, name points to integer not string + Sym_ele *eptr; /* pointer into hash table */ + int hv; /* hash value */ + uint64_t nkey; // class 0, name points to integer not string table = (Sym_tab *) vtable; sym_tab = table->symlist; @@ -306,7 +308,7 @@ extern void rmr_sym_del( void *vtable, const char *name, unsigned int class ) /* Delete element by numberic key. */ -extern void *rmr_sym_ndel( void *vtable, int key ) { +extern void *rmr_sym_ndel( void *vtable, uint64_t key ) { rmr_sym_del( vtable, (const char *) &key, 0 ); } @@ -317,7 +319,7 @@ extern void *rmr_sym_get( void *vtable, const char *name, unsigned int class ) Sym_ele **sym_tab; Sym_ele *eptr; // element from table int hv; // hash value of key - unsigned int nkey; // numeric key if class 0 + uint64_t nkey; // numeric key if class 0 table = (Sym_tab *) vtable; sym_tab = table->symlist; @@ -326,7 +328,7 @@ extern void *rmr_sym_get( void *vtable, const char *name, unsigned int class ) hv = sym_hash( name, table->size ); for(eptr=sym_tab[hv]; eptr && ! same(class, eptr->class, eptr->name, name); eptr=eptr->next ); } else { - nkey = *((int *) name); + nkey = *((uint64_t *) name); hv = nkey % table->size; // just hash the number for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next ); } @@ -343,7 +345,7 @@ extern void *rmr_sym_get( void *vtable, const char *name, unsigned int class ) /* Retrieve the data referenced by a numerical key. */ -extern void *rmr_sym_pull( void *vtable, int key ) { +extern void *rmr_sym_pull( void *vtable, uint64_t key ) { return rmr_sym_get( vtable, (const char *) &key, 0 ); } @@ -366,11 +368,11 @@ extern int rmr_sym_put( void *vtable, const char *name, unsigned int class, void } /* - Add a new entry assuming that the key is an unsigned integer. + Add a new entry assuming that the key is an unsigned, 64 bit, integer. Returns 1 if new, 0 if existed */ -extern int rmr_sym_map( void *vtable, unsigned int key, void *val ) { +extern int rmr_sym_map( void *vtable, uint64_t key, void *val ) { Sym_tab *table; table = (Sym_tab *) vtable; @@ -407,7 +409,7 @@ extern void rmr_sym_stats( void *vtable, int level ) if( eptr->class ) { // a string key fprintf( stderr, "sym: (%d) key=%s val@=%p ref=%ld mod=%lu\n", i, eptr->name, eptr->val, eptr->rcount, eptr->mcount ); } else { - fprintf( stderr, "sym: (%d) key=%d val@=%p ref=%ld mod=%lu\n", i, eptr->nkey, eptr->val, eptr->rcount, eptr->mcount ); + fprintf( stderr, "sym: (%d) key=%lu val@=%p ref=%ld mod=%lu\n", i, (unsigned long) eptr->nkey, eptr->val, eptr->rcount, eptr->mcount ); } } } @@ -434,7 +436,7 @@ extern void rmr_sym_stats( void *vtable, int level ) if( eptr->class ) { fprintf( stderr, "\t%s\n", eptr->name ); } else { - fprintf( stderr, "\t%d (numeric key)\n", eptr->nkey ); + fprintf( stderr, "\t%lu (numeric key)\n", (unsigned long) eptr->nkey ); } } } diff --git a/src/nanomsg/include/rmr_private.h b/src/nanomsg/include/rmr_private.h index dde00ae..7d78382 100644 --- a/src/nanomsg/include/rmr_private.h +++ b/src/nanomsg/include/rmr_private.h @@ -97,7 +97,7 @@ static int uta_link2( char* target ); static int rt_link2_ep( endpoint_t* ep ); static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ); static int uta_epsock_byname( route_table_t* rt, char* ep_name ); -static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ); +static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ); // ------ msg ------------------------------------------------ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int tr_size ); @@ -107,48 +107,6 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ); static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ); static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ); - - -/* -// --- message ring -------------------------- -static void* uta_mk_ring( int size ); -static void uta_ring_free( void* vr ); -static inline void* uta_ring_extract( void* vr ); -static inline int uta_ring_insert( void* vr, void* new_data ); - -// --- message and context management -------- -static int ie_test( void* r, int i_factor, long inserts ); -static void free_ctx( uta_ctx_t* ctx ); -static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ); -static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ); -static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ); - -// ----- route table static things --------- -static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ); -static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ); -static char* uta_fib( char* fname ); -static route_table_t* uta_rt_init( ); -static route_table_t* uta_rt_clone( route_table_t* srt ); -static void uta_rt_drop( route_table_t* rt ); -static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ); -static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ); -static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ); -static int uta_epsock_byname( route_table_t* rt, char* ep_name ); -static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ); -static void read_static_rt( uta_ctx_t* ctx, int vlevel ); -static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ); -static void* rtc( void* vctx ); -static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ); - - -// --- tools_static protos ------------------ -static int uta_tokenise( char* buf, char** tokens, int max, char sep ); -static char* uta_h2ip( char const* hname ); -static int uta_link2( char* target ); -static int uta_lookup_rtg( uta_ctx_t* ctx ); -static int uta_has_str( char const* buf, char const* str, char sep, int max ); -*/ - static int rt_link2_ep( endpoint_t* ep ); static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ); diff --git a/src/nanomsg/src/rmr.c b/src/nanomsg/src/rmr.c index dcf7e67..e67402f 100644 --- a/src/nanomsg/src/rmr.c +++ b/src/nanomsg/src/rmr.c @@ -242,6 +242,8 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { int group; // selected group to get socket for int send_again; // true if the message must be sent again rmr_mbuf_t* clone_m; // cloned message for an nth send + uint64_t key; // lookup key is now subid and mtype + int max_rt = 1000; if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -263,8 +265,10 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { send_again = 1; // force loop entry group = 0; // always start with group 0 + key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry while( send_again ) { - nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again ); // round robin select endpoint; again set if mult groups + max_rt = 1000; + nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n", msg->mtype, send_again, group, nn_sock, msg->len ); group++; @@ -277,18 +281,21 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available - if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); + if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len ); msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer msg = send_msg( ctx, msg, nn_sock ); // do the hard work, msg should be nil on success - /* - if( msg ) { - // error do we need to count successes/errors, how to report some success, esp if last fails? + while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) { + msg = send_msg( ctx, msg, nn_sock ); + max_rt--; } - */ msg = clone_m; // clone will be the next to send } else { msg = send_msg( ctx, msg, nn_sock ); // send the last, and allocate a new buffer; drops the clone if it was + while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) { + msg = send_msg( ctx, msg, nn_sock ); + max_rt--; + } } } diff --git a/src/nanomsg/src/rtable_static.c b/src/nanomsg/src/rtable_static.c index 3ebad2d..cfcb27e 100644 --- a/src/nanomsg/src/rtable_static.c +++ b/src/nanomsg/src/rtable_static.c @@ -58,13 +58,13 @@ static int uta_link2( char* target ) { nn_sock = nn_socket( AF_SP, NN_PUSH ); // the socket we'll use to connect to the target if( nn_sock < 0 ) { - fprintf( stderr, "[WARN] rmr: link2: unable to create socket for link to target: %s: %d\n", target, errno ); + fprintf( stderr, "[WARN] rmr: link2: unable to create socket for link to target: %s: %d\n\n\n", target, errno ); return -1; } snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target ); if( nn_connect( nn_sock, conn_info ) < 0 ) { // connect failed - fprintf( stderr, "[WARN] rmr: link2: unable to create link to target: %s: %d\n", target, errno ); + fprintf( stderr, "[WARN] rmr: link2: unable to create link to target: %s: %d\n\n\n", target, errno ); nn_close( nn_sock ); return -1; } @@ -138,6 +138,7 @@ static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n } ep->nn_sock = -1; // not connected + ep->open = 0; ep->addr = uta_h2ip( ep_name ); ep->name = strdup( ep_name ); @@ -203,14 +204,15 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name ) { invoke this function again to make a selection against that group. If there are no more groups, more is set to 0. */ -static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) { +static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) { rtable_ent_t* rte; // matching rt entry endpoint_t* ep; // seected end point - int nn_sock = -1; + int nn_sock = -2; int dummy; rrgroup_t* rrg; - if( ! more ) { // eliminate cheks each time we need to user + + if( ! more ) { // eliminate checks each time we need to use more = &dummy; } @@ -219,20 +221,20 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) { return -1; } - if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) { + if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) { *more = 0; - //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype ); + //if( DEBUG ) fprintf( stderr, "#### >>> rte not found for type key=%lu\n", key ); return -1; } if( group < 0 || group >= rte->nrrgroups ) { - //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups ); + //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups ); *more = 0; return -1; } if( (rrg = rte->rrgroups[group]) == NULL ) { - //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype ); + //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %lu\n", key ); *more = 0; // groups are inserted contig, so nothing should be after a nil pointer return -1; } @@ -241,10 +243,10 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) { switch( rrg->nused ) { case 0: // nothing allocated, just punt - //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" ); + //if( DEBUG ) return -1; - case 1: // exactly one, no rr to deal with and more is not possible even if fanout > 1 + case 1: // exactly one, no rr to deal with nn_sock = rrg->epts[0]->nn_sock; ep = rrg->epts[0]; break; @@ -258,7 +260,7 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) { break; } - if( ! ep->open ) { // not connected + if( ep && ! ep->open ) { // not connected if( ep->addr == NULL ) { // name didn't resolve before, try again ep->addr = uta_h2ip( ep->name ); } diff --git a/src/nanomsg/src/sr_static.c b/src/nanomsg/src/sr_static.c index 77a0e64..18acdda 100644 --- a/src/nanomsg/src/sr_static.c +++ b/src/nanomsg/src/sr_static.c @@ -98,6 +98,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s exit( 1 ); } + memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // must ensure that header portion of tpbuf is 0 + msg->tp_buf = msg->header; hdr = (uta_mhdr_t *) msg->header; hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version hdr->sub_id = htonl( UNSET_SUBID ); @@ -108,6 +110,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s msg->len = 0; // length of data in the payload msg->alloc_len = mlen; // length of allocated payload + msg->sub_id = UNSET_SUBID; + msg->mtype = UNSET_MSGTYPE; msg->payload = PAYLOAD_ADDR( hdr ); // point at the payload in transport msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area msg->state = state; // fill in caller's state (likely the state of the last operation) @@ -323,6 +327,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) { // future: ensure that application did not overrun the XID buffer; last byte must be 0 + //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype ); hdr = (uta_mhdr_t *) msg->header; hdr->mtype = htonl( msg->mtype ); // stash type/len/sub-id in network byte order for transport hdr->sub_id = htonl( msg->sub_id ); diff --git a/src/nng/include/rmr_nng_private.h b/src/nng/include/rmr_nng_private.h index b810bdb..6bca91f 100644 --- a/src/nng/include/rmr_nng_private.h +++ b/src/nng/include/rmr_nng_private.h @@ -104,7 +104,7 @@ static void free_ctx( uta_ctx_t* ctx ); static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ); static int rt_link2_ep( endpoint_t* ep ); static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ); -static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock ); +static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ); static inline int xlate_nng_state( int state, int def_state ); diff --git a/src/nng/src/rmr_nng.c b/src/nng/src/rmr_nng.c index 70542f8..ba7c852 100644 --- a/src/nng/src/rmr_nng.c +++ b/src/nng/src/rmr_nng.c @@ -204,6 +204,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { int send_again; // true if the message must be sent again rmr_mbuf_t* clone_m; // cloned message for an nth send int sock_ok; // got a valid socket from round robin select + uint64_t key; // mtype or sub-id/mtype sym table key if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -229,8 +230,9 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { send_again = 1; // force loop entry group = 0; // always start with group 0 + key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry while( send_again ) { - sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups + sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n", msg->mtype, send_again, group, msg->len, sock_ok ); group++; diff --git a/src/nng/src/rtable_nng_static.c b/src/nng/src/rtable_nng_static.c index d6d3490..6122f69 100644 --- a/src/nng/src/rtable_nng_static.c +++ b/src/nng/src/rtable_nng_static.c @@ -231,7 +231,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s during test that different entries are being seleted; we cannot depend on the nng socket being different as we could with nano. */ -static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock ) { +static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) { rtable_ent_t* rte; // matching rt entry endpoint_t* ep; // seected end point int state = FALSE; // processing state @@ -254,20 +254,20 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nn return FALSE; } - if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) { + if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) { *more = 0; - //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype ); + //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %lu\n", key ); return FALSE; } if( group < 0 || group >= rte->nrrgroups ) { - //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups ); + //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups ); *more = 0; return FALSE; } if( (rrg = rte->rrgroups[group]) == NULL ) { - //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype ); + //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type key=%lu\n", key ); *more = 0; // groups are inserted contig, so nothing should be after a nil pointer return FALSE; } diff --git a/src/nng/src/sr_nng_static.c b/src/nng/src/sr_nng_static.c index e1a2fa5..7c17589 100644 --- a/src/nng/src/sr_nng_static.c +++ b/src/nng/src/sr_nng_static.c @@ -147,6 +147,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s } msg->len = 0; // length of data in the payload msg->alloc_len = mlen; // length of allocated transport buffer + msg->sub_id = UNSET_SUBID; + msg->mtype = UNSET_MSGTYPE; msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk) msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area msg->state = state; // fill in caller's state (likely the state of the last operation) @@ -175,6 +177,8 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) { memset( msg, 0, sizeof( *msg ) ); + msg->sub_id = UNSET_SUBID; + msg->mtype = UNSET_MSGTYPE; msg->tp_buf = NULL; msg->header = NULL; msg->len = -1; // no payload; invalid len diff --git a/test/app_test/rebuild.ksh b/test/app_test/rebuild.ksh new file mode 100644 index 0000000..04367ef --- /dev/null +++ b/test/app_test/rebuild.ksh @@ -0,0 +1,46 @@ +#!/usr/bin/env ksh +# :vi ts=4 sw=4 noet : +#================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#================================================================================== +# + +# --------------------------------------------------------------------------------- +# Mnemonic: rebuild.ksh +# Abstract: This is a simple script that will cause RMr to be rebuilt. It +# may be invoked by any of the run_* scripts in this directory. +# +# Date: 24 April 2019 +# Author: E. Scott Daniels +# --------------------------------------------------------------------------------- + + +build_path=../../.build + +( + set -e + mkdir -p $build_path + cd ${build_path%/*} # cd barfs on ../../.build, so we do this + cd ${build_path##*/} + cmake .. + make package +) +if (( $? != 0 )) +then + echo "build failed" + exit 1 +fi + diff --git a/test/app_test/receiver.c b/test/app_test/receiver.c index dd639ce..ed2450b 100644 --- a/test/app_test/receiver.c +++ b/test/app_test/receiver.c @@ -99,12 +99,15 @@ int main( int argc, char** argv ) { long good = 0; // good palyload buffers long bad = 0; // payload buffers which were not correct long bad_tr = 0; // trace buffers that were not correct + long bad_sid = 0; // bad subscription ids long timeout = 0; char* data; - char wbuf[1024]; // we'll pull trace data into here int nmsgs = 10; // number of messages to stop after (argv[1] overrides) int rt_count = 0; // retry count long ack_count = 0; // number of acks sent + int count_bins[11]; // histogram bins based on msg type (0-10) + char wbuf[1024]; // we'll pull trace data into here, and use as general working buffer + char sbuf[128]; // short buffer data = getenv( "RMR_RTG_SVC" ); if( data == NULL ) { @@ -118,6 +121,8 @@ int main( int argc, char** argv ) { listen_port = argv[2]; } + memset( count_bins, 0, sizeof( count_bins ) ); + fprintf( stderr, " listening on port: %s for a max of %d messages\n", listen_port, nmsgs ); mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines! @@ -165,11 +170,21 @@ int main( int argc, char** argv ) { } count++; // messages received for stats output - if( msg->mtype == 5 ) { // send an ack; sender will count but not process, so data in message is moot - msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry + if( msg->mtype < 3 ) { // count number of properly set subscription id + if( msg->sub_id != msg->mtype * 10 ) { + bad_sid++; + } + } + + if( msg->mtype >= 0 && msg->mtype <= 10 ) { + count_bins[msg->mtype]++; + } + + if( msg->mtype == 5 ) { // send an ack; sender will count but not process, so data in message is moot + msg = rmr_rts_msg( mrc, msg ); rt_count = 1000; while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :( - if( ack_count < 1 ) { // need to connect, so hard wait + if( ack_count < 1 ) { // 1st ack, so we need to connect, and we'll wait for that sleep( 1 ); } rt_count--; @@ -189,7 +204,16 @@ int main( int argc, char** argv ) { } } - fprintf( stderr, " [%s] %ld messages; good=%ld acked=%ld bad=%ld bad-trace=%ld\n", !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr ); + wbuf[0] = 0; + for( i = 0; i < 11; i++ ) { + snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] ); + strcat( wbuf, sbuf ); + } + + fprintf( stderr, " mtype histogram: %s\n", wbuf ); + fprintf( stderr, " [%s] %ld messages; good=%ld acked=%ld bad=%ld bad-trace=%ld bad-sub_id=%ld\n", + !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid ); + sleep( 2 ); // let any outbound acks flow before closing rmr_close( mrc ); diff --git a/test/app_test/rt.mask b/test/app_test/rt.mask index 80d139f..5fab235 100644 --- a/test/app_test/rt.mask +++ b/test/app_test/rt.mask @@ -1,21 +1,24 @@ # This is a 'mask' such that the run command can generate with the -# host name for the sender. +# host name for the sender. (not needed after RMr 1.0.18) -newrt|start -rte|0|localhost:4560 -rte|1|localhost:4560 -rte|2|localhost:4560 -rte|3|localhost:4560 -rte|4|localhost:4560 -rte|5|localhost:4560 -rte|6|localhost:4560 -rte|7|localhost:4560 -rte|8|localhost:4560 -rte|9|localhost:4560 -rte|10|localhost:4560 -rte|11|localhost:4560 -rte|12|localhost:4560 -rte|13|localhost:4560 -rte|999|%%hostname%%:43086 -newrt|end +newrt | start +mse | 0 | 0 | localhost:4560 +mse | 1 | 10 | localhost:4560 +mse | 2 | 20 | localhost:4560 +rte | 3 | localhost:4560 +mse | 3 | 100 | localhost:4560 # special test to ensure that this does not affect previous entry +rte | 4 | localhost:4560 +rte | 5 | localhost:4560 +rte | 6 | localhost:4560 +rte | 7 | localhost:4560 +rte | 8 | localhost:4560 +rte | 9 | localhost:4560 +rte | 10 | localhost:4560 +rte | 11 | localhost:4560 +rte | 12 | localhost:4560 +rte | 13 | localhost:4560 + +# this entry isn't needed after RMr 1.0.18 +rte | 999 | %%hostname%%:43086 +newrt | end diff --git a/test/app_test/run_app_test.ksh b/test/app_test/run_app_test.ksh index 7ad4934..3c4f20e 100644 --- a/test/app_test/run_app_test.ksh +++ b/test/app_test/run_app_test.ksh @@ -73,6 +73,7 @@ nano_sender=0 # start nano version if set (-N) nano_receiver=0 wait=1 rebuild=0 +verbose=0 while [[ $1 == -* ]] do @@ -83,6 +84,7 @@ do nano_receiver=1 ;; -n) nmsg=$2; shift;; + -v) verbose=1;; *) echo "unrecognised option: $1" echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]" @@ -94,23 +96,18 @@ do shift done +if (( verbose )) +then + echo "2" >.verbose + export RMR_VCTL_FILE=".verbose" +fi + if (( rebuild )) then build_path=../../.build - - ( - set -e - mkdir -p $build_path - cd ${build_path%/*} # cd barfs on ../../.build, so we do this - cd ${build_path##*/} - cmake .. - make package - ) - if (( $? != 0 )) - then - echo "build failed" - exit 1 - fi + set -e + ksh ./rebuild.ksh + set +e else build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option @@ -137,6 +134,7 @@ then fi run_rcvr & +sleep 2 # if sender starts faster than rcvr we can drop, so pause a bit run_sender & wait @@ -151,6 +149,7 @@ else fi rm /tmp/PID$$.* +rm -f .verbose exit $(( !! (src + rrc) )) diff --git a/test/app_test/run_multi_test.ksh b/test/app_test/run_multi_test.ksh new file mode 100644 index 0000000..9bfc59b --- /dev/null +++ b/test/app_test/run_multi_test.ksh @@ -0,0 +1,208 @@ +#!/usr/bin/env ksh +# :vi ts=4 sw=4 noet : +#================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#================================================================================== +# + +# --------------------------------------------------------------------------------- +# Mnemonic: run_multi_test.ksh +# Abstract: This is a simple script to set up and run the basic send/receive +# processes for some library validation on top of nano/nng. This +# particular tests starts several receivers and creates a route table +# which causes messages to be sent to all receivers in parallel +# (forcing message cloning internally in RMr). +# It should be possible to clone the repo, switch to this directory +# and execute 'ksh run -B' which will build RMr, make the sender and +# recevier then run the basic test. +# +# Example command line: +# ksh ./run_multi_test.ksh # default 10 messages at 1 msg/sec +# ksh ./run_multi_test.ksh -N # default but with nanomsg lib +# ksh ./run_multi_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between +# +# Date: 24 April 2019 +# Author: E. Scott Daniels +# --------------------------------------------------------------------------------- + + +# The sender and receivers are run asynch. Their exit statuses are captured in a +# file in order for the 'main' to pick them up easily. +# +function run_sender { + if (( $nano_sender )) + then + ./sender_nano $nmsg $delay + else + ./sender $nmsg $delay + fi + echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch +} + +# $1 is the instance so we can keep logs separate +function run_rcvr { + typeset port + + port=$(( 4460 + ${1:-0} )) + export RMR_RTG_SVC=$(( 9990 + $1 )) + if (( $nano_receiver )) + then + ./receiver_nano $nmsg $port + else + ./receiver $nmsg $port + fi + echo $? >/tmp/PID$$.$1.rrc +} + +# Drop a contrived route table in such that the sender sends each message to n +# receivers. +# +function set_rt { + typeset port=4460 + typeset groups="localhost:4460" + for (( i=1; i < ${1:-3}; i++ )) + do + groups="$groups;localhost:$((port+i))" + done + + cat <multi.rt + newrt | start + mse |0 | 0 | $groups + mse |1 | 10 | $groups + mse |2 | 20 | $groups + rte |3 | $groups + rte |4 | $groups + rte |5 | $groups + rte |6 | $groups + rte |7 | $groups + rte |8 | $groups + rte |9 | $groups + rte |10 | $groups + rte |11 | $groups + newrt | end +endKat + +} + +# --------------------------------------------------------- + +if [[ ! -f local.rt ]] # we need the real host name in the local.rt; build one from mask if not there +then + hn=$(hostname) + sed "s!%%hostname%%!$hn!" rt.mask >local.rt +fi + +nmsg=10 # total number of messages to be exchanged (-n value changes) +delay=1000000 # microsec sleep between msg 1,000,000 == 1s +nano_sender=0 # start nano version if set (-N) +nano_receiver=0 +wait=1 +rebuild=0 +verbose=0 +nrcvrs=3 # this is sane, but -r allows it to be set up + +while [[ $1 == -* ]] +do + case $1 in + -B) rebuild=1;; + -d) delay=$2; shift;; + -N) nano_sender=1 + nano_receiver=1 + ;; + -n) nmsg=$2; shift;; + -r) nrcvrs=$2; shift;; + -v) verbose=1;; + + *) echo "unrecognised option: $1" + echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]" + echo " -B forces a rebuild which will use .build" + exit 1 + ;; + esac + + shift +done + +if (( verbose )) +then + echo "2" >.verbose + export RMR_VCTL_FILE=".verbose" +fi + +if (( rebuild )) +then + build_path=../../.build # if we rebuild we can insist that it is in .build :) + set -e + ksh ./rebuild.ksh + set +e +else + build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option + + if [[ ! -d $build_path ]] + then + echo "cannot find build in: $build_path" + echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this" + exit 1 + fi +fi + +export LD_LIBRARY_PATH=$build_path:$build_path/lib +export LIBRARY_PATH=$LD_LIBRARY_PATH +export RMR_SEED_RT=./multi.rt + +set_rt $nrcvrs # set up the rt for n receivers + +if [[ ! -f ./sender ]] +then + if ! make >/dev/null 2>&1 + then + echo "[FAIL] cannot find sender binary, and cannot make it.... humm?" + exit 1 + fi +fi + +for (( i=0; i < nrcvrs; i++ )) # start the receivers with an instance number +do + run_rcvr $i & +done + +sleep 2 # let receivers init so we don't shoot at an empty target +run_sender & + +wait + + +for (( i=0; i < nrcvrs; i++ )) # collect return codes +do + head -1 /tmp/PID$$.$i.rrc | read x + (( rrc += x )) +done + +head -1 /tmp/PID$$.src | read src + +if (( !! (src + rrc) )) +then + echo "[FAIL] sender rc=$src receiver rc=$rrc" +else + echo "[PASS] sender rc=$src receiver rc=$rrc" + rm -f multi.rt +fi + +rm /tmp/PID$$.* +rm -f .verbose + +exit $(( !! (src + rrc) )) + diff --git a/test/app_test/run_rr_test.ksh b/test/app_test/run_rr_test.ksh new file mode 100644 index 0000000..2e52aa6 --- /dev/null +++ b/test/app_test/run_rr_test.ksh @@ -0,0 +1,211 @@ +#!/usr/bin/env ksh +# :vi ts=4 sw=4 noet : +#================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#================================================================================== +# + +# --------------------------------------------------------------------------------- +# Mnemonic: run_multi_test.ksh +# Abstract: This is a simple script to set up and run the basic send/receive +# processes for some library validation on top of nano/nng. This +# particular tests starts several receivers and creates a route table +# which causes messages to be sent round robin to all of the receivers. +# The number of messages command line parameter (-n) will be the number +# of messages that each receiver should expect; the sender will be asked +# to send r times that many messages so that as they are round robbined +# each receiver should get the same number of messages. +# +# Example command line: +# ksh ./run_rr_test.ksh # default 10 messages at 1 msg/sec +# ksh ./run_rr_test.ksh -N # default but with nanomsg lib +# ksh ./run_rr_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between +# +# Date: 24 April 2019 +# Author: E. Scott Daniels +# --------------------------------------------------------------------------------- + + +# The sender and receivers are run asynch. Their exit statuses are captured in a +# file in order for the 'main' to pick them up easily. +# +function run_sender { + export RMR_RTG_SVC=8990 + if (( $nano_sender )) + then + ./sender_nano $(( nmsg * nrcvrs )) $delay 1 + else + ./sender $(( nmsg * nrcvrs )) $delay 1 + fi + echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch +} + +# $1 is the instance so we can keep logs separate +function run_rcvr { + typeset port + + port=$(( 4560 + ${1:-0} )) + export RMR_RTG_SVC=$(( 9990 + $1 )) + if (( $nano_receiver )) + then + ./receiver_nano $nmsg $port + else + ./receiver $nmsg $port + fi + echo $? >/tmp/PID$$.$1.rrc +} + +# +# Drop a contrived route table in such that the sender sends each message to n +# receivers. +# +function set_rt { + typeset port=4560 + typeset endpoints="localhost:4560" + for (( i=1; i < ${1:-3}; i++ )) + do + endpoints="$endpoints,localhost:$((port+i))" + done + + cat <rr.rt + newrt |start + rte |0 | $endpoints |0 + rte |1 | $endpoints |10 + mse |2 | 20 | $endpoints # new style mtype/subid entry + rte |3 | $endpoints |0 + rte |4 | $endpoints |0 + rte |5 | $endpoints |0 + rte |6 | $endpoints |0 + rte |7 | $endpoints |0 + rte |8 | $endpoints |0 + rte |9 | $endpoints |0 + rte |10 | $endpoints |0 + rte |11 | $endpoints |0 + newrt |end +endKat + +} + +# --------------------------------------------------------- + +if [[ ! -f local.rt ]] # we need the real host name in the local.rt; build one from mask if not there +then + hn=$(hostname) + sed "s!%%hostname%%!$hn!" rt.mask >local.rt +fi + +nmsg=10 # total number of messages to be exchanged (-n value changes) +delay=1000 # microsec sleep between msg 1,000,000 == 1s (shorter than others b/c/ we are sending to multiple) +nano_sender=0 # start nano version if set (-N) +nano_receiver=0 +wait=1 +rebuild=0 +verbose=0 +nrcvrs=3 # this is sane, but -r allows it to be set up + +while [[ $1 == -* ]] +do + case $1 in + -B) rebuild=1;; + -d) delay=$2; shift;; + -N) nano_sender=1 + nano_receiver=1 + ;; + -n) nmsg=$2; shift;; + -r) nrcvrs=$2; shift;; + -v) verbose=1;; + + *) echo "unrecognised option: $1" + echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]" + echo " -B forces a rebuild which will use .build" + exit 1 + ;; + esac + + shift +done + + +if (( verbose )) +then + echo "2" >.verbose + export RMR_VCTL_FILE=".verbose" +fi + +if (( rebuild )) +then + build_path=../../.build + set -e + ksh ./rebuild.ksh + set +e +else + build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option + + if [[ ! -d $build_path ]] + then + echo "cannot find build in: $build_path" + echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this" + exit 1 + fi +fi + +export LD_LIBRARY_PATH=$build_path:$build_path/lib +export LIBRARY_PATH=$LD_LIBRARY_PATH +export RMR_SEED_RT=./rr.rt + +set_rt $nrcvrs + +if [[ ! -f ./sender ]] +then + if ! make >/dev/null 2>&1 + then + echo "[FAIL] cannot find sender binary, and cannot make it.... humm?" + exit 1 + fi +fi + +for (( i=0; i < nrcvrs; i++ )) # start the receivers with an instance number +do + run_rcvr $i & +done + +sleep 2 # wait to start sender else we might send before receivers up and drop messages +run_sender & + +wait + + +for (( i=0; i < nrcvrs; i++ )) # collect return codes +do + head -1 /tmp/PID$$.$i.rrc | read x + (( rrc += x )) +done + +head -1 /tmp/PID$$.src | read src + +if (( !! (src + rrc) )) +then + echo "[FAIL] sender rc=$src receiver rc=$rrc" +else + echo "[PASS] sender rc=$src receiver rc=$rrc" +fi + +rm /tmp/PID$$.* +rm -f .verbose +rm -f rr.rt + +exit $(( !! (src + rrc) )) + diff --git a/test/app_test/sender.c b/test/app_test/sender.c index 3602d4f..ac7d869 100644 --- a/test/app_test/sender.c +++ b/test/app_test/sender.c @@ -32,8 +32,11 @@ will give up and fail. - Message types will vary between 1 and 10, so the route table must - be set up to support those message types. + Message types will vary between 0 and 9, so the route table must + be set up to support those message types. Further, for message types + 0, 1 and 2, the subscription ID will be set to type x 10, so the route + table must be set to include the sub-id for those types in order for + the messages to reach their destination. Message format is: ck1 ck2| @@ -42,7 +45,10 @@ Ck2 is the simple check sum of the trace data which is a nil terminated series of bytes. - Parms: argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port + Parms: argv[1] == number of msgs to send (10) + argv[2] == delay (mu-seconds, 1000000 default) + argv[3] == max msg type (not inclusive; default 10) + argv[4] == listen port Sender will send for at most 20 seconds, so if nmsgs and delay extend beyond that period the total number of messages sent will be less @@ -75,16 +81,17 @@ static int sum( char* str ) { int main( int argc, char** argv ) { void* mrc; // msg router context - struct epoll_event events[1]; // list of events to give to epoll - struct epoll_event epe; // event definition for event to listen to + struct epoll_event events[1]; // list of events to give to epoll + struct epoll_event epe; // event definition for event to listen to int ep_fd = -1; // epoll's file des (given to epoll_wait) - int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on - int nready; // number of events ready for receive + int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on + int nready; // number of events ready for receive rmr_mbuf_t* sbuf; // send buffer rmr_mbuf_t* rbuf; // received buffer - int count = 0; - int rt_count = 0; // number of messages requiring a spin retry - int rcvd_count = 0; + int count = 0; + int rt_count = 0; // number of messages requiring a spin retry + int rcvd_count = 0; + int fail_count = 0; // # of failure sends after first successful send char* listen_port = "43086"; int mtype = 0; int stats_freq = 100; @@ -94,6 +101,7 @@ int main( int argc, char** argv ) { long timeout = 0; int delay = 100000; // usec between send attempts int nmsgs = 10; // number of messages to send + int max_mt = 10; // reset point for message type if( argc > 1 ) { nmsgs = atoi( argv[1] ); @@ -102,7 +110,10 @@ int main( int argc, char** argv ) { delay = atoi( argv[2] ); } if( argc > 3 ) { - listen_port = argv[3]; + max_mt = atoi( argv[3] ); + } + if( argc > 4 ) { + listen_port = argv[4]; } fprintf( stderr, " listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay ); @@ -150,13 +161,19 @@ int main( int argc, char** argv ) { timeout = time( NULL ) + 20; - while( count < nmsgs ) { // we send 10 messages after the first message is successful + while( count < nmsgs ) { // we send n messages after the first message is successful snprintf( trace, 100, "%lld", (long long) time( NULL ) ); rmr_set_trace( sbuf, trace, strlen( trace ) + 1 ); snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() ); snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf ); sbuf->mtype = mtype; // fill in the message bits + if( mtype < 3 ) { + sbuf->sub_id = mtype * 10; + } else { + sbuf->sub_id = -1; + } + sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string sbuf->state = 0; sbuf = rmr_send_msg( mrc, sbuf ); // send it (send returns an empty payload on success, or the original payload on fail/retry) @@ -167,7 +184,13 @@ int main( int argc, char** argv ) { while( sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry sbuf = rmr_send_msg( mrc, sbuf ); // retry send until it's good (simple test; real programmes should do better) } - successful = 1; + if( sbuf->state == RMR_OK ) { + successful = 1; // indicates only that we sent one successful message, not the current state + } else { + if( successful ) { + fail_count++; // count failures after first successful message + } + } break; case RMR_OK: @@ -175,15 +198,19 @@ int main( int argc, char** argv ) { break; default: + if( successful ) { + fail_count++; // count failures after first successful message + } // some error (not connected likely), don't count this + //sleep( 1 ); break; } if( successful ) { // once we have a message that was sent, start to increase things count++; mtype++; - if( mtype > 10 ) { // if large number of sends don't require infinite rt entries :) - mtype = 1; + if( mtype >= max_mt ) { // if large number of sends don't require infinite rt entries :) + mtype = 0; } } @@ -237,7 +264,7 @@ int main( int argc, char** argv ) { } } - fprintf( stderr, " [%s] sent %d messages received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL", count, rcvd_count, rt_count ); + fprintf( stderr, " [%s] sent=%d rcvd-acks=%d failures=%d retries=%d\n", count == nmsgs ? "PASS" : "FAIL", count, rcvd_count, fail_count, rt_count ); rmr_close( mrc ); return !( count == nmsgs ); diff --git a/test/rmr_nng_api_static_test.c b/test/rmr_nng_api_static_test.c index e86c6f5..cefc09f 100644 --- a/test/rmr_nng_api_static_test.c +++ b/test/rmr_nng_api_static_test.c @@ -139,6 +139,10 @@ static int rmr_api_test( ) { msg2 = rmr_send_msg( NULL, NULL ); // drive for coverage errors += fail_not_nil( msg2, "send_msg returned msg pointer when given a nil message and context" ); + msg->state = 0; + msg = rmr_send_msg( NULL, msg ); + errors += fail_if( msg->state == 0, "rmr_send_msg did not set msg state when msg given with nil context" ); + // --- sends will fail with a no endpoint error until a dummy route table is set, so we test fail case first. msg->len = 100; msg->mtype = 1; @@ -194,17 +198,28 @@ static int rmr_api_test( ) { rmr_rts_msg( rmc, NULL ); errors += fail_if( errno == 0, "rmr_rts_msg did not set errno when given a nil message" ); + msg->state = 0; + msg = rmr_rts_msg( NULL, msg ); // should set state in msg + errors += fail_if_equal( msg->state, 0, "rmr_rts_msg did not set state when given valid message but no context" ); + + msg = rmr_rts_msg( rmc, msg ); // return the buffer to the sender errors += fail_if_nil( msg, "rmr_rts_msg did not return a message pointer" ); errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno" ); snprintf( msg->xaction, 17, "%015d", 16 ); // dummy transaction id (emulation generates, this should arrive after a few calls to recv) + + msg->state = 0; + msg = rmr_call( NULL, msg ); + errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context" ); + + msg->mtype = 0; msg = rmr_call( rmc, msg ); // this call should return a message as we can anticipate a dummy message in errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed" ); if( msg ) { errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return" ); - errors += fail_if( errno != 0, "rmr_call did not properly set errno on successful return" ); + errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return" ); } snprintf( wbuf, 17, "%015d", 14 ); // if we call receive we should find this in the first 15 tries @@ -240,6 +255,11 @@ static int rmr_api_test( ) { rmr_free_msg( msg2 ); + msg2 = rmr_torcv_msg( NULL, NULL, 10 ); + errors += fail_not_nil( msg2, "rmr_torcv_msg returned a pointer when given nil information" ); + msg2 = rmr_torcv_msg( rmc, NULL, 10 ); + errors += fail_if_nil( msg2, "rmr_torcv_msg did not return a message pointer when given a nil old msg" ); + // --- test timeout receive; our dummy epoll function will return 1 ready on first call and 0 ready (timeout emulation) on second // however we must drain the swamp (queue) first, so run until we get a timeout error, or 20 and report error if we get to 20. msg = NULL; diff --git a/test/rt_static_test.c b/test/rt_static_test.c index 129eec1..710c75e 100644 --- a/test/rt_static_test.c +++ b/test/rt_static_test.c @@ -228,5 +228,15 @@ static int rt_test( ) { } */ + state = uta_link2( "worm", NULL, NULL ); + errors += fail_if_true( state, "link2 did not return false when given nil pointers" ); + + state = uta_epsock_rr( rt, 122, 0, NULL, NULL ); + errors += fail_if_true( state, "uta_epsock_rr returned bad state when given nil socket pointer" ); + + rt = uta_rt_init( ); // get us a route table + state = uta_epsock_rr( rt, 0, -1, NULL, &nn_sock ); + errors += fail_if_true( state, "uta_epsock_rr returned bad state (true) when given negative group number" ); + return !!errors; // 1 or 0 regardless of count } diff --git a/test/sr_nng_static_test.c b/test/sr_nng_static_test.c index d477391..4997706 100644 --- a/test/sr_nng_static_test.c +++ b/test/sr_nng_static_test.c @@ -46,14 +46,21 @@ static void gen_rt( uta_ctx_t* ctx ) { char* rt_stuff; // strings for the route table rt_stuff = + "newrt|end\n" // end of table check before start of table found + "# comment to drive full comment test\n" + "\n" // handle blank lines + " \n" // handle blank lines + "mse|4|10|localhost:4561\n" // entry before start message + "rte|4|localhost:4561\n" // entry before start message "newrt|start\n" // false start to drive detection "xxx|badentry to drive default case" "newrt|start\n" - "rte|0|localhost:4560,localhost:4562\n" + "rte|0|localhost:4560,localhost:4562\n" // these are legitimate entries for our testing "rte|1|localhost:4562;localhost:4561,localhost:4569\n" - "rte|2|localhost:4562\n" - "rte|4|localhost:4561\n" - "rte|5|localhost:4563\n" + "rte|2|localhost:4562| 10\n" // new subid at end + "mse|4|10|localhost:4561\n" // new msg/subid specifier rec + "mse|4|localhost:4561\n" // new mse entry with less than needed fields + " rte| 5 |localhost:4563 #garbage comment\n" // tests white space cleanup "rte|6|localhost:4562\n" "newrt|end\n"; @@ -92,6 +99,7 @@ static int sr_nng_test() { nng_socket nn_dummy_sock; // dummy needed to drive send int size; int i; + void* p; //ctx = rmr_init( "tcp:4360", 2048, 0 ); // do NOT call init -- that starts the rtc thread which isn't good here ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) ); // alloc the context manually @@ -104,6 +112,10 @@ static int sr_nng_test() { uta_lookup_rtg( ctx ); gen_rt( ctx ); // forces a static load with some known info since we don't start the rtc() + gen_rt( ctx ); // force a second load to test cloning + + p = rt_ensure_ep( NULL, "foo" ); // drive for coverage + errors += fail_not_nil( p, "rt_ensure_ep did not return nil when given nil route table" ); state = rmr_ready( NULL ); errors += fail_if_true( state, "reported ready when given a nil context" ); diff --git a/test/unit_test.ksh b/test/unit_test.ksh index 21848af..74bc048 100755 --- a/test/unit_test.ksh +++ b/test/unit_test.ksh @@ -267,6 +267,7 @@ verbose=0 show_all=1 # show all things -F sets to show failures only strict=0 # -s (strict) will set; when off, coverage state ignored in final pass/fail show_output=0 # show output from each test execution (-S) +quiet=0 while [[ $1 == "-"* ]] do @@ -285,6 +286,7 @@ do -s) strict=1;; # coverage counts toward pass/fail state -S) show_output=1;; # test output shown even on success -v) (( verbose++ ));; + -q) quiet=1;; # less chatty when spilling error log files -h) usage; exit 0;; --help) usage; exit 0;; @@ -353,7 +355,12 @@ do if ! ./${tfile%.c} >/tmp/PID$$.log 2>&1 then echo "[FAIL] unit test failed for: $tfile" - cat /tmp/PID$$.log + if (( quiet )) + then + grep "^<" /tmp/PID$$.log # in quiet mode just dump <...> messages which are assumed from the test programme not appl + else + cat /tmp/PID$$.log + fi (( ut_errors++ )) # cause failure even if not in strict mode continue # skip coverage tests for this else diff --git a/test/wormhole_static_test.c b/test/wormhole_static_test.c index 9cb8d82..f2d4bf6 100644 --- a/test/wormhole_static_test.c +++ b/test/wormhole_static_test.c @@ -48,6 +48,7 @@ static int worm_test( ) { char wbuf[1024]; int errors = 0; // number errors found int i; + void* p; rmr_mbuf_t* mbuf; // mbuf to send to peer int whid = -1; @@ -100,11 +101,22 @@ static int worm_test( ) { whid = rmr_wh_open( ctx, "localhost:21961" ); errors += fail_not_equal( whid, 3, "attempt to fill in a hole didn't return expected" ); - rmr_wh_send_msg( NULL, 0, NULL ); // tests for coverage - rmr_wh_send_msg( ctx, 0, NULL ); + p = rmr_wh_send_msg( NULL, 0, NULL ); // tests for coverage + fail_not_nil( p, "wh_send_msg returned a pointer when given nil context and message" ); + + p = rmr_wh_send_msg( ctx, 0, NULL ); + fail_not_nil( p, "wh_send_msg returned a pointer when given nil message with valid context" ); mbuf = rmr_alloc_msg( ctx, 2048 ); // get an muf to pass round errors += fail_if_nil( mbuf, "unable to allocate mbuf for send tests (giving up on send tests)" ); + + mbuf->state = 0; + mbuf = rmr_wh_send_msg( NULL, 0, mbuf ); + if( mbuf ) { + fail_if_equal( mbuf->state, 0, "wh_send_msg returned a zero state when given a nil context" ); + } + fail_if_nil( mbuf, "wh_send_msg returned a nil message buffer when given a nil context" ); + while( mbuf ) { if( !(mbuf = rmr_wh_send_msg( ctx, 50, mbuf )) ) { // test for coverage errors += fail_if_nil( mbuf, "send didn't return an mbuf (skip rest of send tests)" ); -- 2.16.6