feat(routing): Support session based routing 84/84/1
authorE. Scott Daniels <daniels@research.att.com>
Tue, 23 Apr 2019 18:24:25 +0000 (18:24 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Thu, 25 Apr 2019 15:53:25 +0000 (15:53 +0000)
The session id field in a message buffer is now used
directly for routing.

Change-Id: I3634c97588b11172db964b2d06c96c317d8b8ae3
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Routing table entry changes to pick up subid

Change-Id: If08dc21aae4acaab350ba75a8854ad2f24007b03
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Fix unit test for rmr_call

It was not properly setting message type and now that RMr
ensures that invalid message type is set by default on a newly
created message this was causing unit test to fail.

Change-Id: I50f08d1038ea7fca2a070cdd949657bfbc25f3fd
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Add application level tests

Added round robin and multi group application level
test scripts.

Change-Id: Ic6aebaf3bc1edb763decc7fd0aebb09df116f20c
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
NNG based sub-id support added

Change-Id: I0d36b55bb90a315ba94c9476df88e2c7eac6c383
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Correct bug in app test script

Change-Id: I5b4a9f32aa1bc2907f320b8ad4628e0948062904
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Nano sub-id changes and unit test updates

Change-Id: Ia69f2fb33de3bbee2f33f9a4c5def779c872e52c
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change nil-sub_id key to high order key

If there is no sub-id, then the key is based only on the
message type, but to allow for a sub-id == 0 the key
when there is no subscription id must be set to
0xffffffff00000000 + msg type.

New version for deb is 1.0.19

Change-Id: I55f89d368466a0137fdea99410c76ba72e1923ab
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
26 files changed:
CMakeLists.txt
src/common/include/rmr.h
src/common/include/rmr_agnostic.h
src/common/include/rmr_symtab.h
src/common/src/rt_generic_static.c
src/common/src/symtab.c
src/nanomsg/include/rmr_private.h
src/nanomsg/src/rmr.c
src/nanomsg/src/rtable_static.c
src/nanomsg/src/sr_static.c
src/nng/include/rmr_nng_private.h
src/nng/src/rmr_nng.c
src/nng/src/rtable_nng_static.c
src/nng/src/sr_nng_static.c
test/app_test/rebuild.ksh [new file with mode: 0644]
test/app_test/receiver.c
test/app_test/rt.mask
test/app_test/run_app_test.ksh
test/app_test/run_multi_test.ksh [new file with mode: 0644]
test/app_test/run_rr_test.ksh [new file with mode: 0644]
test/app_test/sender.c
test/rmr_nng_api_static_test.c
test/rt_static_test.c
test/sr_nng_static_test.c
test/unit_test.ksh
test/wormhole_static_test.c

index 2782147..365d370 100644 (file)
@@ -24,7 +24,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "18" )
+set( patch_level "19" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
index 6bfe075..e6c1660 100644 (file)
@@ -47,6 +47,8 @@ extern "C" {
 
 #define RMR_DEF_SIZE           0               // pass as size to have msg allocation use the default msg size
 
 
 #define RMR_DEF_SIZE           0               // pass as size to have msg allocation use the default msg size
 
+#define RMR_VOID_MSGTYPE       (-1)    // unset/invalid message type and sub id
+#define RMR_VOID_SUBID         (-1)
 
 #define RMR_OK                         0               // state is good
 #define RMR_ERR_BADARG         1               // argument passd to function was unusable
 
 #define RMR_OK                         0               // state is good
 #define RMR_ERR_BADARG         1               // argument passd to function was unusable
index a23cff7..06da060 100644 (file)
@@ -230,6 +230,7 @@ static int ie_test( void* r, int i_factor, long inserts );
 
 
 // ----- route table generic static things ---------
 
 
 // ----- route table generic static things ---------
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
 static char* uta_fib( char* fname );
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
 static char* uta_fib( char* fname );
@@ -237,7 +238,7 @@ static route_table_t* uta_rt_init( );
 static route_table_t* uta_rt_clone( route_table_t* srt );
 static void uta_rt_drop( route_table_t* rt );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
 static route_table_t* uta_rt_clone( route_table_t* srt );
 static void uta_rt_drop( route_table_t* rt );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups );
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
index 7d4d18c..74c5aa3 100644 (file)
@@ -28,6 +28,8 @@
 #ifndef _rmr_symtab_h
 #define _rmr_symtab_h
 
 #ifndef _rmr_symtab_h
 #define _rmr_symtab_h
 
+#include <netdb.h>
+
 /* --------- symtab ---------------- */
 #define UT_FL_NOCOPY 0x00          /* use user pointer */
 #define UT_FL_COPY 0x01            /* make a copy of the string data */
 /* --------- symtab ---------------- */
 #define UT_FL_NOCOPY 0x00          /* use user pointer */
 #define UT_FL_COPY 0x01            /* make a copy of the string data */
@@ -39,12 +41,12 @@ extern void rmr_sym_clear( void *s );
 extern void rmr_sym_dump( void *s );
 extern void *rmr_sym_alloc( int size );
 extern void rmr_sym_del( void *s, const char *name, unsigned int class );
 extern void rmr_sym_dump( void *s );
 extern void *rmr_sym_alloc( int size );
 extern void rmr_sym_del( void *s, const char *name, unsigned int class );
-extern void *rmr_sym_ndel( void *vtable, int key );
+extern void *rmr_sym_ndel( void *vtable, uint64_t key );
 extern void rmr_sym_free( void *vtable );
 extern void *rmr_sym_get( void *s,  const char *name, unsigned int class );
 extern int rmr_sym_put( void *s,  const char *name, unsigned int class, void *val );
 extern void rmr_sym_free( void *vtable );
 extern void *rmr_sym_get( void *s,  const char *name, unsigned int class );
 extern int rmr_sym_put( void *s,  const char *name, unsigned int class, void *val );
-extern int rmr_sym_map( void *s,  unsigned int key, void *val );
-extern void *rmr_sym_pull(  void *vtable, int key );
+extern int rmr_sym_map( void *s,  uint64_t key, void *val );
+extern void *rmr_sym_pull(  void *vtable, uint64_t key );
 extern void rmr_sym_stats( void *s, int level );
 extern void rmr_sym_foreach_class( void *vst, unsigned int class, void (* user_fun)( void*, void*, const char*, void*, void* ), void *user_data );
 
 extern void rmr_sym_stats( void *s, int level );
 extern void rmr_sym_foreach_class( void *vst, unsigned int class, void (* user_fun)( void*, void*, const char*, void*, void* ), void *user_data );
 
index 4dd9344..5bbacbd 100644 (file)
@@ -44,6 +44,7 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
+#include <netdb.h>
 
 
 /*
 
 
 /*
@@ -56,13 +57,43 @@ typedef struct thing_list {
        void** things;
 } thing_list_t;
 
        void** things;
 } thing_list_t;
 
+// ------------------------------------------------------------------------------------------------
+
+
+/*
+       Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
+       must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
+*/
+static char* clip( char* buf ) {
+       char*   tok;
+
+       while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
+               buf++;
+       }
+
+       if( (tok = strchr( buf, '#' )) != NULL ) {
+               if( tok == buf ) {
+                       return buf;                                     // just push back; leading comment sym handled there
+               }
+
+               if( isspace( *(tok-1) ) ) {
+                       *tok = 0;
+               }
+       }
+
+       for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
+       *(tok+1) = 0;
+
+       return buf;
+}
+
 
 /*
        Given a message type create a route table entry and add to the hash keyed on the
        message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
        is the number of group slots to allocate in the entry.
 */
 
 /*
        Given a message type create a route table entry and add to the hash keyed on the
        message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
        is the number of group slots to allocate in the entry.
 */
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) {
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
        rtable_ent_t* rte;
 
        if( rt == NULL ) {
        rtable_ent_t* rte;
 
        if( rt == NULL ) {
@@ -87,12 +118,67 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups )
        memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
        rte->nrrgroups = nrrgroups;
 
        memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
        rte->nrrgroups = nrrgroups;
 
-       rmr_sym_map( rt->hash, mtype, rte );                                                    // add to hash using numeric mtype as key
+       rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
 
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: mt=%d groups=%d\n", mtype, nrrgroups );
+       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lu groups=%d\n", key, nrrgroups );
        return rte;
 }
 
        return rte;
 }
 
+/*
+       This accepts partially parsed information from a record sent by route manager or read from
+       a file such that:
+               ts_field is the msg-type,sender field
+               subid is the integer subscription id
+               rr_field is the endpoint information for round robening message over
+
+       If all goes well, this will add an RTE to the table under construction.
+
+       The ts_field is checked to see if we should ingest this record. We ingest if one of
+       these is true:
+               there is no sender info (a generic entry for all)
+               there is sender and our host:port matches one of the senders
+               the sender info is an IP address that matches one of our IP addresses
+*/
+static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
+       rtable_ent_t*   rte;            // route table entry added
+       char*   tok;
+       int             ntoks;
+       uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
+       char*   tokens[128];
+       char*   gtokens[64];
+       int             i;
+       int             ngtoks;                         // number of tokens in the group list
+       int             grp;                            // index into group list
+
+       ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
+       rr_field = clip( rr_field );
+
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+               (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
+               has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
+
+                       key = build_rt_key( subid, atoi( ts_field ) );
+
+                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lu\n", ts_field, subid, key );
+
+                       if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
+                               rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
+
+                               for( grp = 0; grp < ngtoks; grp++ ) {
+                                       if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
+                                               for( i = 0; i < ntoks; i++ ) {
+                                                       if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", ts_field );
+                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+                                               }
+                                       }
+                               }
+                       }
+               } else {
+                       if( DEBUG || (vlevel > 2) )
+                               fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+               }
+}
+
 /*
        Parse a single record recevied from the route table generator, or read
        from a static route table file.  Start records cause a new table to
 /*
        Parse a single record recevied from the route table generator, or read
        from a static route table file.  Start records cause a new table to
@@ -100,6 +186,11 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups )
        entry records are added to the currenly 'in progress' table, and an
        end record causes the in progress table to be finalised and the
        currently active table is replaced.
        entry records are added to the currenly 'in progress' table, and an
        end record causes the in progress table to be finalised and the
        currently active table is replaced.
+
+       We expect one of several types:
+               newrt|{start|end}
+               rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
+               mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
 */
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
        int i;
 */
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
        int i;
@@ -129,6 +220,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                break;
 
                        case 'n':                                                                                               // newrt|{start|end}
                                break;
 
                        case 'n':                                                                                               // newrt|{start|end}
+                               tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
                                        if( ctx->new_rtable ) {
                                                uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
                                        if( ctx->new_rtable ) {
                                                uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
@@ -155,33 +247,29 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                }
                                break;
 
                                }
                                break;
 
+                       case 'm':                                       // assume mse entry
+                               if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
+                                       break;
+                               }
+
+                               if( ntoks < 4 ) {
+                                       if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
+                                       break;
+                               }
+
+                               build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
+                               break;
+
                        case 'r':                                       // assume rt entry
                                if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
                                        break;
                                }
 
                        case 'r':                                       // assume rt entry
                                if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
                                        break;
                                }
 
-                               if( ((tok = strchr( tokens[1], ',' )) == NULL ) ||                                      // no sender names
-                                       (uta_has_str( tokens[1],  ctx->my_name, ',', 127) >= 0) ||              // our name isn't in the list
-                                       has_myip( tokens[1], ctx->ip_list, ',', 127 ) ) {                               // the list has one of our IP addresses
-
-                                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s\n", tokens[1] );
-
-                                       if( (ngtoks = uta_tokenise( tokens[2], gtokens, 64, ';' )) > 0 ) {                                      // split last field by groups first
-                                               rte = uta_add_rte( ctx->new_rtable, atoi( tokens[1] ), ngtoks );                        // get/create entry for message type
-                                               for( grp = 0; grp < ngtoks; grp++ ) {
-                                                       if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
-                                                               for( i = 0; i < ntoks; i++ ) {
-                                                                       if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", tokens[i] );
-                                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
-                                                               }
-                                                       }
-                                               }
-                                       }
+                               if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
+                                       build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
                                } else {
                                } else {
-                                       if( DEBUG || (vlevel > 2) )
-                                               fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+                                       build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
                                }
                                }
-
                                break;
 
                        default:
                                break;
 
                        default:
@@ -463,4 +551,21 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
 }
 
 
 }
 
 
+/*
+       Given a session id and message type build a key that can be used to look up the rte in the route
+       table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
+*/
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
+       uint64_t key;
+
+       if( sub_id == UNSET_SUBID ) {
+               key = 0xffffffff00000000 | mtype;
+       } else {
+               key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
+       }
+
+       return key;
+}
+
+
 #endif
 #endif
index a1a792d..31f150f 100644 (file)
@@ -28,6 +28,7 @@ Abstract:     Symbol table -- slightly streamlined from it's original 2000 version
                        Things changed for the Ric Msg implemention (Nov 2018):
                                - no concept of copy/free of the user data (functions removed)
                                - add ability to support an integer key (class 0)
                        Things changed for the Ric Msg implemention (Nov 2018):
                                - no concept of copy/free of the user data (functions removed)
                                - add ability to support an integer key (class 0)
+                                 Numeric key is an unsigned, 64bit key.
                                - externally visible names given a rmr_ extension as it's being
                                  incorporated into the RIC msg routing library and will be
                                  available to user applications.
                                - externally visible names given a rmr_ extension as it's being
                                  incorporated into the RIC msg routing library and will be
                                  available to user applications.
@@ -46,6 +47,7 @@ Mod:          2016 23 Feb - converted Symtab refs so that caller need only a
 #include <string.h>
 #include <stdlib.h>
 #include <memory.h>
 #include <string.h>
 #include <stdlib.h>
 #include <memory.h>
+#include <netdb.h>
 
 #include "rmr_symtab.h"
 
 
 #include "rmr_symtab.h"
 
@@ -56,7 +58,7 @@ typedef struct Sym_ele
        struct Sym_ele *next;   /* pointer at next element in list */
        struct Sym_ele *prev;   /* larger table, easier deletes */
        const char *name;               /* symbol name */
        struct Sym_ele *next;   /* pointer at next element in list */
        struct Sym_ele *prev;   /* larger table, easier deletes */
        const char *name;               /* symbol name */
-       unsigned int    nkey;   // the numeric key
+       uint64_t nkey;                  // the numeric key
        void *val;              /* user data associated with name */
        unsigned long mcount;   /* modificaitons to value */
        unsigned long rcount;   /* references to symbol */
        void *val;              /* user data associated with name */
        unsigned long mcount;   /* modificaitons to value */
        unsigned long rcount;   /* references to symbol */
@@ -136,11 +138,11 @@ static inline int same( unsigned int c1, unsigned int c2, const char *s1, const
        much the same.
 */
 static int putin( Sym_tab *table, const char *name, unsigned int class, void *val ) {
        much the same.
 */
 static int putin( Sym_tab *table, const char *name, unsigned int class, void *val ) {
-       Sym_ele *eptr;          /* pointer into hash table */
+       Sym_ele *eptr;                  /* pointer into hash table */
        Sym_ele **sym_tab;      /* pointer into hash table */
        Sym_ele **sym_tab;      /* pointer into hash table */
-       int hv;                  /* hash value */
-       int rc = 0;              /* assume it existed */
-       unsigned int    nkey = 0;       // numeric key if class == 0
+       int hv;                 /* hash value */
+       int rc = 0;             /* assume it existed */
+       uint64_t nkey = 0;              // numeric key if class == 0
 
        sym_tab = table->symlist;
 
 
        sym_tab = table->symlist;
 
@@ -148,7 +150,7 @@ static int putin( Sym_tab *table, const char *name, unsigned int class, void *va
                hv = sym_hash( name, table->size );             // hash it
                for( eptr=sym_tab[hv]; eptr && ! same( class, eptr->class, eptr->name, name); eptr=eptr->next );
        } else {
                hv = sym_hash( name, table->size );             // hash it
                for( eptr=sym_tab[hv]; eptr && ! same( class, eptr->class, eptr->name, name); eptr=eptr->next );
        } else {
-               nkey = *((int *) name);
+               nkey = *((uint64_t *) name);
                hv = nkey % table->size;                                        // just hash the number
                for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next );
        }
                hv = nkey % table->size;                                        // just hash the number
                for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next );
        }
@@ -237,7 +239,7 @@ extern void rmr_sym_dump( void *vtable )
                        if( eptr->val && eptr->class ) {
                                fprintf( stderr, "key=%s val@=%p\n", eptr->name, eptr->val );
                        } else {
                        if( eptr->val && eptr->class ) {
                                fprintf( stderr, "key=%s val@=%p\n", eptr->name, eptr->val );
                        } else {
-                               fprintf( stderr, "nkey=%d val@=%p\n", eptr->nkey, eptr->val );
+                               fprintf( stderr, "nkey=%lu val@=%p\n", (unsigned long) eptr->nkey, eptr->val );
                        }
                }
        }
                        }
                }
        }
@@ -284,9 +286,9 @@ extern void rmr_sym_del( void *vtable, const char *name, unsigned int class )
 {
        Sym_tab *table;
        Sym_ele **sym_tab;
 {
        Sym_tab *table;
        Sym_ele **sym_tab;
-       Sym_ele *eptr;    /* pointer into hash table */
-       int hv;                  /* hash value */
-       unsigned int nkey;              // class 0, name points to integer not string
+       Sym_ele *eptr;                  /* pointer into hash table */
+       int hv;                 /* hash value */
+       uint64_t        nkey;           // class 0, name points to integer not string
 
        table = (Sym_tab *) vtable;
        sym_tab = table->symlist;
 
        table = (Sym_tab *) vtable;
        sym_tab = table->symlist;
@@ -306,7 +308,7 @@ extern void rmr_sym_del( void *vtable, const char *name, unsigned int class )
 /*
        Delete element by numberic key.
 */
 /*
        Delete element by numberic key.
 */
-extern void *rmr_sym_ndel(  void *vtable, int key ) {
+extern void *rmr_sym_ndel(  void *vtable, uint64_t key ) {
        rmr_sym_del( vtable, (const char *) &key, 0 );
 }
 
        rmr_sym_del( vtable, (const char *) &key, 0 );
 }
 
@@ -317,7 +319,7 @@ extern void *rmr_sym_get( void *vtable, const char *name, unsigned int class )
        Sym_ele **sym_tab;
        Sym_ele *eptr;                  // element from table
        int hv;                 // hash value of key
        Sym_ele **sym_tab;
        Sym_ele *eptr;                  // element from table
        int hv;                 // hash value of key
-       unsigned int nkey;              // numeric key if class 0
+       uint64_t nkey;                  // numeric key if class 0
 
        table = (Sym_tab *) vtable;
        sym_tab = table->symlist;
 
        table = (Sym_tab *) vtable;
        sym_tab = table->symlist;
@@ -326,7 +328,7 @@ extern void *rmr_sym_get( void *vtable, const char *name, unsigned int class )
                hv = sym_hash( name, table->size );
                for(eptr=sym_tab[hv]; eptr &&  ! same(class, eptr->class, eptr->name, name); eptr=eptr->next );
        } else {
                hv = sym_hash( name, table->size );
                for(eptr=sym_tab[hv]; eptr &&  ! same(class, eptr->class, eptr->name, name); eptr=eptr->next );
        } else {
-               nkey = *((int *) name);
+               nkey = *((uint64_t *) name);
                hv = nkey % table->size;                        // just hash the number
                for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next );
        }
                hv = nkey % table->size;                        // just hash the number
                for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next );
        }
@@ -343,7 +345,7 @@ extern void *rmr_sym_get( void *vtable, const char *name, unsigned int class )
 /*
        Retrieve the data referenced by a numerical key.
 */
 /*
        Retrieve the data referenced by a numerical key.
 */
-extern void *rmr_sym_pull(  void *vtable, int key ) {
+extern void *rmr_sym_pull(  void *vtable, uint64_t key ) {
        return rmr_sym_get( vtable, (const char *) &key, 0 );
 }
 
        return rmr_sym_get( vtable, (const char *) &key, 0 );
 }
 
@@ -366,11 +368,11 @@ extern int rmr_sym_put( void *vtable, const char *name, unsigned int class, void
 }
 
 /*
 }
 
 /*
-       Add a new entry assuming that the key is an unsigned integer.
+       Add a new entry assuming that the key is an unsigned, 64 bit, integer.
 
        Returns 1 if new, 0 if existed
 */
 
        Returns 1 if new, 0 if existed
 */
-extern int rmr_sym_map( void *vtable, unsigned int key, void *val ) {
+extern int rmr_sym_map( void *vtable, uint64_t key, void *val ) {
        Sym_tab *table;
 
        table = (Sym_tab *) vtable;
        Sym_tab *table;
 
        table = (Sym_tab *) vtable;
@@ -407,7 +409,7 @@ extern void rmr_sym_stats( void *vtable, int level )
                                        if( eptr->class  ) {                                    // a string key
                                                fprintf( stderr, "sym: (%d) key=%s val@=%p ref=%ld mod=%lu\n", i, eptr->name, eptr->val, eptr->rcount, eptr->mcount );
                                        } else {
                                        if( eptr->class  ) {                                    // a string key
                                                fprintf( stderr, "sym: (%d) key=%s val@=%p ref=%ld mod=%lu\n", i, eptr->name, eptr->val, eptr->rcount, eptr->mcount );
                                        } else {
-                                               fprintf( stderr, "sym: (%d) key=%d val@=%p ref=%ld mod=%lu\n", i, eptr->nkey, eptr->val, eptr->rcount, eptr->mcount );
+                                               fprintf( stderr, "sym: (%d) key=%lu val@=%p ref=%ld mod=%lu\n", i, (unsigned long) eptr->nkey, eptr->val, eptr->rcount, eptr->mcount );
                                        }
                                }
                        }
                                        }
                                }
                        }
@@ -434,7 +436,7 @@ extern void rmr_sym_stats( void *vtable, int level )
                        if( eptr->class ) {
                                fprintf( stderr, "\t%s\n", eptr->name );
                        } else {
                        if( eptr->class ) {
                                fprintf( stderr, "\t%s\n", eptr->name );
                        } else {
-                               fprintf( stderr, "\t%d (numeric key)\n", eptr->nkey );
+                               fprintf( stderr, "\t%lu (numeric key)\n", (unsigned long) eptr->nkey );
                        }
                }
        }
                        }
                }
        }
index dde00ae..7d78382 100644 (file)
@@ -97,7 +97,7 @@ static int uta_link2( char* target );
 static int rt_link2_ep( endpoint_t* ep );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
 static int uta_epsock_byname( route_table_t* rt, char* ep_name );
 static int rt_link2_ep( endpoint_t* ep );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
 static int uta_epsock_byname( route_table_t* rt, char* ep_name );
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more );
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more );
 
 // ------ msg ------------------------------------------------
 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int tr_size );
 
 // ------ msg ------------------------------------------------
 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int tr_size );
@@ -107,48 +107,6 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock );
 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
 
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock );
 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
 
-
-
-/*
-// --- message ring --------------------------
-static void* uta_mk_ring( int size );
-static void uta_ring_free( void* vr );
-static inline void* uta_ring_extract( void* vr );
-static inline int uta_ring_insert( void* vr, void* new_data );
-
-// --- message and context management --------
-static int ie_test( void* r, int i_factor, long inserts );
-static void free_ctx( uta_ctx_t* ctx );
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state );
-static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  );
-static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
-
-// ----- route table static things ---------
-static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
-static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
-static char* uta_fib( char* fname );
-static route_table_t* uta_rt_init( );
-static route_table_t* uta_rt_clone( route_table_t* srt );
-static void uta_rt_drop( route_table_t* rt );
-static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups );
-static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
-static int uta_epsock_byname( route_table_t* rt, char* ep_name );
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more );
-static void read_static_rt( uta_ctx_t* ctx, int vlevel );
-static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
-static void* rtc( void* vctx );
-static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
-
-
-// --- tools_static protos ------------------
-static int uta_tokenise( char* buf, char** tokens, int max, char sep );
-static char* uta_h2ip( char const* hname );
-static int uta_link2( char* target );
-static int uta_lookup_rtg( uta_ctx_t* ctx );
-static int uta_has_str( char const* buf, char const* str, char sep, int max );
-*/
-
 static int rt_link2_ep( endpoint_t* ep );
 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
 
 static int rt_link2_ep( endpoint_t* ep );
 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
 
index dcf7e67..e67402f 100644 (file)
@@ -242,6 +242,8 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        int     group;                                  // selected group to get socket for
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int     group;                                  // selected group to get socket for
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
+       uint64_t key;                           // lookup key is now subid and mtype
+       int max_rt = 1000;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -263,8 +265,10 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
+       key = build_rt_key( msg->sub_id, msg->mtype );                  // what we need to find the route table entry
        while( send_again ) {
        while( send_again ) {
-               nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again );         // round robin select endpoint; again set if mult groups
+               max_rt = 1000;
+               nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again );                // round robin select endpoint; again set if mult groups
                if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n",
                                msg->mtype, send_again, group, nn_sock, msg->len );
                group++;
                if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n",
                                msg->mtype, send_again, group, nn_sock, msg->len );
                group++;
@@ -277,18 +281,21 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
 
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
-                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len );
                        msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
                        msg = send_msg( ctx, msg, nn_sock );                                    // do the hard work, msg should be nil on success
                        msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
                        msg = send_msg( ctx, msg, nn_sock );                                    // do the hard work, msg should be nil on success
-                       /*
-                       if( msg ) {
-                               // error do we need to count successes/errors, how to report some success, esp if last fails?
+                       while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
+                               msg = send_msg( ctx, msg, nn_sock );
+                               max_rt--;
                        }
                        }
-                       */
 
                        msg = clone_m;                                                                                  // clone will be the next to send
                } else {
                        msg = send_msg( ctx, msg, nn_sock );                                    // send the last, and allocate a new buffer; drops the clone if it was
 
                        msg = clone_m;                                                                                  // clone will be the next to send
                } else {
                        msg = send_msg( ctx, msg, nn_sock );                                    // send the last, and allocate a new buffer; drops the clone if it was
+                       while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
+                               msg = send_msg( ctx, msg, nn_sock );
+                               max_rt--;
+                       }
                }
        }
 
                }
        }
 
index 3ebad2d..cfcb27e 100644 (file)
@@ -58,13 +58,13 @@ static int uta_link2( char* target ) {
 
        nn_sock = nn_socket( AF_SP, NN_PUSH );          // the socket we'll use to connect to the target
        if( nn_sock < 0 ) {
 
        nn_sock = nn_socket( AF_SP, NN_PUSH );          // the socket we'll use to connect to the target
        if( nn_sock < 0 ) {
-               fprintf( stderr, "[WARN] rmr: link2: unable to create socket for link to target: %s: %d\n", target, errno );
+               fprintf( stderr, "[WARN] rmr: link2: unable to create socket for link to target: %s: %d\n\n\n", target, errno );
                return -1;
        }
 
        snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
        if( nn_connect( nn_sock, conn_info ) < 0 ) {                                                    // connect failed
                return -1;
        }
 
        snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
        if( nn_connect( nn_sock, conn_info ) < 0 ) {                                                    // connect failed
-               fprintf( stderr, "[WARN] rmr: link2: unable to create link to target: %s: %d\n", target, errno );
+               fprintf( stderr, "[WARN] rmr: link2: unable to create link to target: %s: %d\n\n\n", target, errno );
                nn_close( nn_sock );
                return -1;
        }
                nn_close( nn_sock );
                return -1;
        }
@@ -138,6 +138,7 @@ static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                }
 
                ep->nn_sock = -1;                                       // not connected
                }
 
                ep->nn_sock = -1;                                       // not connected
+               ep->open = 0;
                ep->addr = uta_h2ip( ep_name );
                ep->name = strdup( ep_name );
 
                ep->addr = uta_h2ip( ep_name );
                ep->name = strdup( ep_name );
 
@@ -203,14 +204,15 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name ) {
        invoke this function again to make a selection against that group. If there
        are no more groups, more is set to 0.
 */
        invoke this function again to make a selection against that group. If there
        are no more groups, more is set to 0.
 */
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) {
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
        rtable_ent_t* rte;                      // matching rt entry
        endpoint_t*     ep;                             // seected end point
        rtable_ent_t* rte;                      // matching rt entry
        endpoint_t*     ep;                             // seected end point
-       int nn_sock = -1;
+       int nn_sock = -2;
        int dummy;
        rrgroup_t* rrg;
 
        int dummy;
        rrgroup_t* rrg;
 
-       if( ! more ) {                          // eliminate cheks each time we need to user
+
+       if( ! more ) {                          // eliminate checks each time we need to use
                more = &dummy;
        }
 
                more = &dummy;
        }
 
@@ -219,20 +221,20 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) {
                return -1;
        }
 
                return -1;
        }
 
-       if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) {
+       if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
                *more = 0;
                *more = 0;
-               //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype );
+               //if( DEBUG ) fprintf( stderr, "#### >>> rte not found for type key=%lu\n", key );
                return -1;
        }
 
        if( group < 0 || group >= rte->nrrgroups ) {
                return -1;
        }
 
        if( group < 0 || group >= rte->nrrgroups ) {
-               //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups );
+               //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
                *more = 0;
                return -1;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
                *more = 0;
                return -1;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
-               //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype );
+               //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %lu\n", key );
                *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
                return -1;
        }
                *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
                return -1;
        }
@@ -241,10 +243,10 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) {
 
        switch( rrg->nused ) {
                case 0:                         // nothing allocated, just punt
 
        switch( rrg->nused ) {
                case 0:                         // nothing allocated, just punt
-                       //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
+                       //if( DEBUG )
                        return -1;
 
                        return -1;
 
-               case 1:                         // exactly one, no rr to deal with and more is not possible even if fanout > 1
+               case 1:                         // exactly one, no rr to deal with
                        nn_sock = rrg->epts[0]->nn_sock;
                        ep = rrg->epts[0];
                        break;
                        nn_sock = rrg->epts[0]->nn_sock;
                        ep = rrg->epts[0];
                        break;
@@ -258,7 +260,7 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) {
                        break;
        }
 
                        break;
        }
 
-       if( ! ep->open ) {                              // not connected
+       if( ep && ! ep->open ) {                                // not connected
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                        ep->addr = uta_h2ip( ep->name );
                }
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                        ep->addr = uta_h2ip( ep->name );
                }
index 77a0e64..18acdda 100644 (file)
@@ -98,6 +98,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                exit( 1 );
        }
 
                exit( 1 );
        }
 
+       memset( msg->header, 0, sizeof( uta_mhdr_t ) );                 // must ensure that header portion of tpbuf is 0
+       msg->tp_buf = msg->header;
        hdr = (uta_mhdr_t *) msg->header;
        hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
        hdr->sub_id = htonl( UNSET_SUBID );
        hdr = (uta_mhdr_t *) msg->header;
        hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
        hdr->sub_id = htonl( UNSET_SUBID );
@@ -108,6 +110,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
 
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated payload
 
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated payload
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
@@ -323,6 +327,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
+       //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype );
        hdr = (uta_mhdr_t *) msg->header;
        hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
        hdr->sub_id = htonl( msg->sub_id );
        hdr = (uta_mhdr_t *) msg->header;
        hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
        hdr->sub_id = htonl( msg->sub_id );
index b810bdb..6bca91f 100644 (file)
@@ -104,7 +104,7 @@ static void free_ctx( uta_ctx_t* ctx );
 static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer );
 static int rt_link2_ep( endpoint_t* ep );
 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock );
 static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer );
 static int rt_link2_ep( endpoint_t* ep );
 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock );
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock );
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock );
 static inline int xlate_nng_state( int state, int def_state );
 
 
 static inline int xlate_nng_state( int state, int def_state );
 
 
index 70542f8..ba7c852 100644 (file)
@@ -204,6 +204,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int sock_ok;                            // got a valid socket from round robin select
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int sock_ok;                            // got a valid socket from round robin select
+       uint64_t key;                           // mtype or sub-id/mtype sym table key
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -229,8 +230,9 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
+       key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
        while( send_again ) {
        while( send_again ) {
-               sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock );               // round robin sel epoint; again set if mult groups
+               sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
                if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->mtype, send_again, group, msg->len, sock_ok );
                group++;
                if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->mtype, send_again, group, msg->len, sock_ok );
                group++;
index d6d3490..6122f69 100644 (file)
@@ -231,7 +231,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        during test that different entries are being seleted; we cannot depend on the nng
        socket being different as we could with nano.
 */
        during test that different entries are being seleted; we cannot depend on the nng
        socket being different as we could with nano.
 */
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock ) {
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
        rtable_ent_t* rte;                      // matching rt entry
        endpoint_t*     ep;                             // seected end point
        int  state = FALSE;                     // processing state
        rtable_ent_t* rte;                      // matching rt entry
        endpoint_t*     ep;                             // seected end point
        int  state = FALSE;                     // processing state
@@ -254,20 +254,20 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nn
                return FALSE;
        }
 
                return FALSE;
        }
 
-       if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) {
+       if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
                *more = 0;
                *more = 0;
-               //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype );
+               //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %lu\n", key );
                return FALSE;
        }
 
        if( group < 0 || group >= rte->nrrgroups ) {
                return FALSE;
        }
 
        if( group < 0 || group >= rte->nrrgroups ) {
-               //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups );
+               //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
                *more = 0;
                return FALSE;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
                *more = 0;
                return FALSE;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
-               //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype );
+               //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type key=%lu\n", key );
                *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
                return FALSE;
        }
                *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
                return FALSE;
        }
index e1a2fa5..7c17589 100644 (file)
@@ -147,6 +147,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        }
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
        }
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
@@ -175,6 +177,8 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
 
        memset( msg, 0, sizeof( *msg ) );
 
 
        memset( msg, 0, sizeof( *msg ) );
 
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
        msg->tp_buf = NULL;
        msg->header = NULL;
        msg->len = -1;                                                                                  // no payload; invalid len
        msg->tp_buf = NULL;
        msg->header = NULL;
        msg->len = -1;                                                                                  // no payload; invalid len
diff --git a/test/app_test/rebuild.ksh b/test/app_test/rebuild.ksh
new file mode 100644 (file)
index 0000000..04367ef
--- /dev/null
@@ -0,0 +1,46 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+#      Mnemonic:       rebuild.ksh
+#      Abstract:       This is a simple script that will cause RMr to be rebuilt. It 
+#                              may be invoked by any of the run_* scripts in this directory.
+#
+#      Date:           24 April 2019
+#      Author:         E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+build_path=../../.build
+
+(
+       set -e
+       mkdir -p $build_path
+       cd ${build_path%/*}                             # cd barfs on ../../.build, so we do this
+       cd ${build_path##*/}
+       cmake ..
+       make package
+)
+if (( $? != 0 ))
+then
+       echo "build failed"
+       exit 1
+fi
+
index dd639ce..ed2450b 100644 (file)
@@ -99,12 +99,15 @@ int main( int argc, char** argv ) {
        long good = 0;                                          // good palyload buffers
        long bad = 0;                                           // payload buffers which were not correct
        long bad_tr = 0;                                        // trace buffers that were not correct
        long good = 0;                                          // good palyload buffers
        long bad = 0;                                           // payload buffers which were not correct
        long bad_tr = 0;                                        // trace buffers that were not correct
+       long bad_sid = 0;                                       // bad subscription ids
        long timeout = 0;
        char*   data;
        long timeout = 0;
        char*   data;
-       char    wbuf[1024];                                     // we'll pull trace data into here
        int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
        int             rt_count = 0;                           // retry count
        long ack_count = 0;                                     // number of acks sent
        int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
        int             rt_count = 0;                           // retry count
        long ack_count = 0;                                     // number of acks sent
+       int             count_bins[11];                         // histogram bins based on msg type (0-10)
+       char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
+       char    sbuf[128];                                      // short buffer
 
        data = getenv( "RMR_RTG_SVC" );
        if( data == NULL ) {
 
        data = getenv( "RMR_RTG_SVC" );
        if( data == NULL ) {
@@ -118,6 +121,8 @@ int main( int argc, char** argv ) {
                listen_port = argv[2];
        }
 
                listen_port = argv[2];
        }
 
+       memset( count_bins, 0, sizeof( count_bins ) );
+
        fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
 
        mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
        fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
 
        mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
@@ -165,11 +170,21 @@ int main( int argc, char** argv ) {
                                }
                                count++;                                                                        // messages received for stats output
 
                                }
                                count++;                                                                        // messages received for stats output
 
-                               if( msg->mtype == 5 ) {                                         // send an ack; sender will count but not process, so data in message is moot
-                                       msg = rmr_rts_msg( mrc, msg );                                                          // we don't try to resend if this returns retry
+                               if( msg->mtype < 3 ) {                                                  // count number of properly set subscription id
+                                       if( msg->sub_id != msg->mtype * 10 ) {
+                                               bad_sid++;
+                                       }
+                               }
+
+                               if( msg->mtype >= 0 && msg->mtype <= 10 ) {
+                                       count_bins[msg->mtype]++;
+                               }
+
+                               if( msg->mtype == 5 ) {                                                                                 // send an ack; sender will count but not process, so data in message is moot
+                                       msg = rmr_rts_msg( mrc, msg );
                                        rt_count = 1000;
                                        while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
                                        rt_count = 1000;
                                        while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
-                                               if( ack_count < 1 ) {                                                                   // need to connect, so hard wait
+                                               if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
                                                        sleep( 1 );
                                                }
                                                rt_count--;
                                                        sleep( 1 );
                                                }
                                                rt_count--;
@@ -189,7 +204,16 @@ int main( int argc, char** argv ) {
                }
        }
 
                }
        }
 
-       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld\n", !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr );
+       wbuf[0] = 0;
+       for( i = 0; i < 11; i++ ) {
+               snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
+               strcat( wbuf, sbuf );
+       }
+
+       fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
+       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld bad-sub_id=%ld\n", 
+               !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
+
        sleep( 2 );                                                                     // let any outbound acks flow before closing
 
        rmr_close( mrc );
        sleep( 2 );                                                                     // let any outbound acks flow before closing
 
        rmr_close( mrc );
index 80d139f..5fab235 100644 (file)
@@ -1,21 +1,24 @@
 
 # This is a 'mask' such that the run command can generate with the 
 
 # This is a 'mask' such that the run command can generate with the 
-# host name for the sender.
+# host name for the sender. (not needed after RMr 1.0.18)
 
 
-newrt|start
-rte|0|localhost:4560
-rte|1|localhost:4560
-rte|2|localhost:4560
-rte|3|localhost:4560
-rte|4|localhost:4560
-rte|5|localhost:4560
-rte|6|localhost:4560
-rte|7|localhost:4560
-rte|8|localhost:4560
-rte|9|localhost:4560
-rte|10|localhost:4560
-rte|11|localhost:4560
-rte|12|localhost:4560
-rte|13|localhost:4560
-rte|999|%%hostname%%:43086
-newrt|end
+newrt | start
+mse | 0 |  0 | localhost:4560
+mse | 1 | 10 | localhost:4560
+mse | 2 | 20 | localhost:4560
+rte | 3 | localhost:4560
+mse | 3 | 100 | localhost:4560 # special test to ensure that this does not affect previous entry
+rte | 4 | localhost:4560
+rte | 5 | localhost:4560
+rte | 6 | localhost:4560
+rte | 7 | localhost:4560
+rte | 8 | localhost:4560
+rte | 9 | localhost:4560
+rte | 10 | localhost:4560
+rte | 11 | localhost:4560
+rte | 12 | localhost:4560
+rte | 13 | localhost:4560
+
+# this entry isn't needed after RMr 1.0.18
+rte | 999 | %%hostname%%:43086
+newrt | end
index 7ad4934..3c4f20e 100644 (file)
@@ -73,6 +73,7 @@ nano_sender=0                         # start nano version if set (-N)
 nano_receiver=0
 wait=1
 rebuild=0
 nano_receiver=0
 wait=1
 rebuild=0
+verbose=0
 
 while [[ $1 == -* ]]
 do
 
 while [[ $1 == -* ]]
 do
@@ -83,6 +84,7 @@ do
                        nano_receiver=1
                        ;;
                -n)     nmsg=$2; shift;;
                        nano_receiver=1
                        ;;
                -n)     nmsg=$2; shift;;
+               -v)     verbose=1;;
 
                *)      echo "unrecognised option: $1"
                        echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
 
                *)      echo "unrecognised option: $1"
                        echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
@@ -94,23 +96,18 @@ do
        shift
 done
 
        shift
 done
 
+if (( verbose ))
+then
+       echo "2" >.verbose
+       export RMR_VCTL_FILE=".verbose"
+fi
+
 if (( rebuild )) 
 then
        build_path=../../.build
 if (( rebuild )) 
 then
        build_path=../../.build
-
-       (
-               set -e
-               mkdir -p $build_path
-               cd ${build_path%/*}                             # cd barfs on ../../.build, so we do this
-               cd ${build_path##*/}
-               cmake ..
-               make package
-       )
-       if (( $? != 0 ))
-       then
-               echo "build failed"
-               exit 1
-       fi
+       set -e
+       ksh ./rebuild.ksh
+       set +e
 else
        build_path=${BUILD_PATH:-"../../.build"}        # we prefer .build at the root level, but allow user option
 
 else
        build_path=${BUILD_PATH:-"../../.build"}        # we prefer .build at the root level, but allow user option
 
@@ -137,6 +134,7 @@ then
 fi
 
 run_rcvr &
 fi
 
 run_rcvr &
+sleep 2                                # if sender starts faster than rcvr we can drop, so pause a bit
 run_sender &
 
 wait
 run_sender &
 
 wait
@@ -151,6 +149,7 @@ else
 fi
 
 rm /tmp/PID$$.*
 fi
 
 rm /tmp/PID$$.*
+rm -f .verbose
 
 exit $(( !! (src + rrc) ))
 
 
 exit $(( !! (src + rrc) ))
 
diff --git a/test/app_test/run_multi_test.ksh b/test/app_test/run_multi_test.ksh
new file mode 100644 (file)
index 0000000..9bfc59b
--- /dev/null
@@ -0,0 +1,208 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+#      Mnemonic:       run_multi_test.ksh
+#      Abstract:       This is a simple script to set up and run the basic send/receive
+#                              processes for some library validation on top of nano/nng. This 
+#                              particular tests starts several receivers and creates a route table
+#                              which causes messages to be sent to all receivers in parallel
+#                              (forcing message cloning internally in RMr).
+#                              It should be possible to clone the repo, switch to this directory
+#                              and execute  'ksh run -B'  which will build RMr, make the sender and
+#                              recevier then  run the basic test.
+#
+#                              Example command line:
+#                                      ksh ./run_multi_test.ksh                # default 10 messages at 1 msg/sec
+#                                      ksh ./run_multi_test.ksh -N    # default but with nanomsg lib
+#                                      ksh ./run_multi_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between
+#
+#      Date:           24 April 2019
+#      Author:         E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receivers are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+       if (( $nano_sender ))
+       then
+               ./sender_nano $nmsg $delay
+       else
+               ./sender $nmsg $delay
+       fi
+       echo $? >/tmp/PID$$.src         # must communicate state back via file b/c asynch
+}
+
+# $1 is the instance so we can keep logs separate
+function run_rcvr {
+       typeset port
+
+       port=$(( 4460 + ${1:-0} ))
+       export RMR_RTG_SVC=$(( 9990 + $1 ))
+       if (( $nano_receiver ))
+       then
+               ./receiver_nano $nmsg $port
+       else
+               ./receiver $nmsg $port
+       fi
+       echo $? >/tmp/PID$$.$1.rrc
+}
+
+#      Drop a contrived route table in such that the sender sends each message to n
+#      receivers.
+#
+function set_rt {
+       typeset port=4460
+       typeset groups="localhost:4460"
+       for (( i=1; i < ${1:-3}; i++ ))
+       do
+               groups="$groups;localhost:$((port+i))"
+       done
+
+       cat <<endKat >multi.rt
+               newrt | start
+               mse |0 | 0 | $groups
+               mse |1 | 10 | $groups
+               mse |2 | 20 | $groups
+               rte |3 | $groups 
+               rte |4 | $groups 
+               rte |5 | $groups
+               rte |6 | $groups
+               rte |7 | $groups
+               rte |8 | $groups
+               rte |9 | $groups
+               rte |10 | $groups
+               rte |11 | $groups
+               newrt | end
+endKat
+
+}
+
+# ---------------------------------------------------------
+
+if [[ ! -f local.rt ]]         # we need the real host name in the local.rt; build one from mask if not there
+then
+       hn=$(hostname)
+       sed "s!%%hostname%%!$hn!" rt.mask >local.rt
+fi
+
+nmsg=10                                                # total number of messages to be exchanged (-n value changes)
+delay=1000000                          # microsec sleep between msg 1,000,000 == 1s
+nano_sender=0                          # start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+verbose=0
+nrcvrs=3                                       # this is sane, but -r allows it to be set up
+
+while [[ $1 == -* ]]
+do
+       case $1 in 
+               -B)     rebuild=1;;
+               -d)     delay=$2; shift;;
+               -N)     nano_sender=1
+                       nano_receiver=1
+                       ;;
+               -n)     nmsg=$2; shift;;
+               -r)     nrcvrs=$2; shift;;
+               -v)     verbose=1;;
+
+               *)      echo "unrecognised option: $1"
+                       echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
+                       echo "  -B forces a rebuild which will use .build"
+                       exit 1
+                       ;;
+       esac
+
+       shift
+done
+
+if (( verbose ))
+then
+       echo "2" >.verbose
+       export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( rebuild )) 
+then
+       build_path=../../.build         # if we rebuild we can insist that it is in .build :)
+       set -e
+       ksh ./rebuild.ksh
+       set +e
+else
+       build_path=${BUILD_PATH:-"../../.build"}        # we prefer .build at the root level, but allow user option
+
+       if [[ ! -d $build_path ]]
+       then
+               echo "cannot find build in: $build_path"
+               echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+               exit 1
+       fi
+fi
+
+export LD_LIBRARY_PATH=$build_path:$build_path/lib
+export LIBRARY_PATH=$LD_LIBRARY_PATH
+export RMR_SEED_RT=./multi.rt
+
+set_rt $nrcvrs                                         # set up the rt for n receivers
+
+if [[ ! -f ./sender ]]
+then
+       if ! make >/dev/null 2>&1
+       then
+               echo "[FAIL] cannot find sender binary, and cannot make it.... humm?"
+               exit 1
+       fi
+fi
+
+for (( i=0; i < nrcvrs; i++ ))         # start the receivers with an instance number
+do
+       run_rcvr $i &
+done
+
+sleep 2                                # let receivers init so we don't shoot at an empty target
+run_sender &
+
+wait
+
+
+for (( i=0; i < nrcvrs; i++ ))         # collect return codes
+do
+       head -1 /tmp/PID$$.$i.rrc | read x
+       (( rrc += x ))
+done
+
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+       echo "[FAIL] sender rc=$src  receiver rc=$rrc"
+else
+       echo "[PASS] sender rc=$src  receiver rc=$rrc"
+       rm -f multi.rt
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+
+exit $(( !! (src + rrc) ))
+
diff --git a/test/app_test/run_rr_test.ksh b/test/app_test/run_rr_test.ksh
new file mode 100644 (file)
index 0000000..2e52aa6
--- /dev/null
@@ -0,0 +1,211 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+#      Mnemonic:       run_multi_test.ksh
+#      Abstract:       This is a simple script to set up and run the basic send/receive
+#                              processes for some library validation on top of nano/nng. This 
+#                              particular tests starts several receivers and creates a route table
+#                              which causes messages to be sent round robin to all of the receivers.
+#                              The number of messages command line parameter (-n) will be the number
+#                              of messages that each receiver should expect; the sender will be asked
+#                              to send r times that many messages so that as they are round robbined
+#                              each receiver should get the same number of messages.
+#
+#                              Example command line:
+#                                      ksh ./run_rr_test.ksh           # default 10 messages at 1 msg/sec
+#                                      ksh ./run_rr_test.ksh -N    # default but with nanomsg lib
+#                                      ksh ./run_rr_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between
+#
+#      Date:           24 April 2019
+#      Author:         E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receivers are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+       export RMR_RTG_SVC=8990
+       if (( $nano_sender ))
+       then
+               ./sender_nano $(( nmsg * nrcvrs )) $delay 1
+       else
+               ./sender $(( nmsg * nrcvrs ))  $delay 1
+       fi
+       echo $? >/tmp/PID$$.src         # must communicate state back via file b/c asynch
+}
+
+# $1 is the instance so we can keep logs separate
+function run_rcvr {
+       typeset port
+
+       port=$(( 4560 + ${1:-0} ))
+       export RMR_RTG_SVC=$(( 9990 + $1 ))
+       if (( $nano_receiver ))
+       then
+               ./receiver_nano $nmsg $port
+       else
+               ./receiver $nmsg $port
+       fi
+       echo $? >/tmp/PID$$.$1.rrc
+}
+
+#
+#      Drop a contrived route table in such that the sender sends each message to n
+#      receivers.
+#
+function set_rt {
+       typeset port=4560
+       typeset endpoints="localhost:4560"
+       for (( i=1; i < ${1:-3}; i++ ))
+       do
+               endpoints="$endpoints,localhost:$((port+i))"
+       done
+
+       cat <<endKat >rr.rt
+               newrt |start
+               rte |0 | $endpoints  |0
+               rte |1 | $endpoints  |10
+               mse |2 | 20 | $endpoints                # new style mtype/subid entry
+               rte |3 | $endpoints  |0
+               rte |4 | $endpoints  |0
+               rte |5 | $endpoints  |0
+               rte |6 | $endpoints  |0
+               rte |7 | $endpoints  |0
+               rte |8 | $endpoints  |0
+               rte |9 | $endpoints  |0
+               rte |10 | $endpoints  |0
+               rte |11 | $endpoints  |0
+               newrt |end
+endKat
+
+}
+
+# ---------------------------------------------------------
+
+if [[ ! -f local.rt ]]         # we need the real host name in the local.rt; build one from mask if not there
+then
+       hn=$(hostname)
+       sed "s!%%hostname%%!$hn!" rt.mask >local.rt
+fi
+
+nmsg=10                                                # total number of messages to be exchanged (-n value changes)
+delay=1000                                     # microsec sleep between msg 1,000,000 == 1s (shorter than others b/c/ we are sending to multiple)
+nano_sender=0                          # start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+verbose=0
+nrcvrs=3                                       # this is sane, but -r allows it to be set up
+
+while [[ $1 == -* ]]
+do
+       case $1 in 
+               -B)     rebuild=1;;
+               -d)     delay=$2; shift;;
+               -N)     nano_sender=1
+                       nano_receiver=1
+                       ;;
+               -n)     nmsg=$2; shift;;
+               -r)     nrcvrs=$2; shift;;
+               -v)     verbose=1;;
+
+               *)      echo "unrecognised option: $1"
+                       echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
+                       echo "  -B forces a rebuild which will use .build"
+                       exit 1
+                       ;;
+       esac
+
+       shift
+done
+
+
+if (( verbose ))
+then
+       echo "2" >.verbose
+       export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( rebuild )) 
+then
+       build_path=../../.build
+       set -e
+       ksh ./rebuild.ksh
+       set +e
+else
+       build_path=${BUILD_PATH:-"../../.build"}        # we prefer .build at the root level, but allow user option
+
+       if [[ ! -d $build_path ]]
+       then
+               echo "cannot find build in: $build_path"
+               echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+               exit 1
+       fi
+fi
+
+export LD_LIBRARY_PATH=$build_path:$build_path/lib
+export LIBRARY_PATH=$LD_LIBRARY_PATH
+export RMR_SEED_RT=./rr.rt
+
+set_rt $nrcvrs
+
+if [[ ! -f ./sender ]]
+then
+       if ! make >/dev/null 2>&1
+       then
+               echo "[FAIL] cannot find sender binary, and cannot make it.... humm?"
+               exit 1
+       fi
+fi
+
+for (( i=0; i < nrcvrs; i++ ))         # start the receivers with an instance number
+do
+       run_rcvr $i &
+done
+
+sleep 2                                        # wait to start sender else we might send before receivers up and drop messages
+run_sender &
+
+wait
+
+
+for (( i=0; i < nrcvrs; i++ ))         # collect return codes
+do
+       head -1 /tmp/PID$$.$i.rrc | read x
+       (( rrc += x ))
+done
+
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+       echo "[FAIL] sender rc=$src  receiver rc=$rrc"
+else
+       echo "[PASS] sender rc=$src  receiver rc=$rrc"
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+rm -f rr.rt
+
+exit $(( !! (src + rrc) ))
+
index 3602d4f..ac7d869 100644 (file)
                                will give up and fail.
 
 
                                will give up and fail.
 
 
-                               Message types will vary between 1 and 10, so the route table must
-                               be set up to support those message types.
+                               Message types will vary between 0 and 9, so the route table must
+                               be set up to support those message types. Further, for message types
+                               0, 1 and 2, the subscription ID will be set to type x 10, so the route
+                               table must be set to include the sub-id for those types in order for
+                               the messages to reach their destination.
 
                                Message format is:
                                        ck1 ck2|<msg-txt><nil>
 
                                Message format is:
                                        ck1 ck2|<msg-txt><nil>
                                Ck2 is the simple check sum of the trace data which is a nil terminated
                                series of bytes.
 
                                Ck2 is the simple check sum of the trace data which is a nil terminated
                                series of bytes.
 
-                               Parms:  argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port
+                               Parms:  argv[1] == number of msgs to send (10)
+                                               argv[2] == delay                (mu-seconds, 1000000 default)
+                                               argv[3] == max msg type (not inclusive; default 10)
+                                               argv[4] == listen port
 
                                Sender will send for at most 20 seconds, so if nmsgs and delay extend
                                beyond that period the total number of messages sent will be less
 
                                Sender will send for at most 20 seconds, so if nmsgs and delay extend
                                beyond that period the total number of messages sent will be less
@@ -75,16 +81,17 @@ static int sum( char* str ) {
 
 int main( int argc, char** argv ) {
        void* mrc;                                                      // msg router context
 
 int main( int argc, char** argv ) {
        void* mrc;                                                      // msg router context
-       struct epoll_event events[1];                   // list of events to give to epoll
-       struct epoll_event epe;                 // event definition for event to listen to
+       struct  epoll_event events[1];                  // list of events to give to epoll
+       struct  epoll_event epe;                                // event definition for event to listen to
        int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
        int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
-       int rcv_fd;                                                     // file des that NNG tickles -- give this to epoll to listen on
-       int nready;                                                             // number of events ready for receive
+       int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
+       int             nready;                                                 // number of events ready for receive
        rmr_mbuf_t*             sbuf;                                   // send buffer
        rmr_mbuf_t*             rbuf;                                   // received buffer
        rmr_mbuf_t*             sbuf;                                   // send buffer
        rmr_mbuf_t*             rbuf;                                   // received buffer
-       int     count = 0;
-       int     rt_count = 0;                                           // number of messages requiring a spin retry
-       int     rcvd_count = 0;
+       int             count = 0;
+       int             rt_count = 0;                                   // number of messages requiring a spin retry
+       int             rcvd_count = 0;
+       int             fail_count = 0;                                 // # of failure sends after first successful send
        char*   listen_port = "43086";
        int             mtype = 0;
        int             stats_freq = 100;
        char*   listen_port = "43086";
        int             mtype = 0;
        int             stats_freq = 100;
@@ -94,6 +101,7 @@ int main( int argc, char** argv ) {
        long    timeout = 0;
        int             delay = 100000;                                 // usec between send attempts
        int             nmsgs = 10;                                             // number of messages to send
        long    timeout = 0;
        int             delay = 100000;                                 // usec between send attempts
        int             nmsgs = 10;                                             // number of messages to send
+       int             max_mt = 10;                                    // reset point for message type
 
        if( argc > 1 ) {
                nmsgs = atoi( argv[1] );
 
        if( argc > 1 ) {
                nmsgs = atoi( argv[1] );
@@ -102,7 +110,10 @@ int main( int argc, char** argv ) {
                delay = atoi( argv[2] );
        }
        if( argc > 3 ) {
                delay = atoi( argv[2] );
        }
        if( argc > 3 ) {
-               listen_port = argv[3];
+               max_mt = atoi( argv[3] );
+       }
+       if( argc > 4 ) {
+               listen_port = argv[4];
        }
 
        fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
        }
 
        fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
@@ -150,13 +161,19 @@ int main( int argc, char** argv ) {
 
        timeout = time( NULL ) + 20;
 
 
        timeout = time( NULL ) + 20;
 
-       while( count < nmsgs ) {                                                                // we send 10 messages after the first message is successful
+       while( count < nmsgs ) {                                                                // we send n messages after the first message is successful
                snprintf( trace, 100, "%lld", (long long) time( NULL ) );
                rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
                snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() );
                snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
 
                sbuf->mtype = mtype;                                                    // fill in the message bits
                snprintf( trace, 100, "%lld", (long long) time( NULL ) );
                rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
                snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() );
                snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
 
                sbuf->mtype = mtype;                                                    // fill in the message bits
+               if( mtype < 3 ) {
+                       sbuf->sub_id = mtype * 10;
+               } else {
+                       sbuf->sub_id = -1;
+               }
+
                sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
                sbuf->state = 0;
                sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
                sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
                sbuf->state = 0;
                sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
@@ -167,7 +184,13 @@ int main( int argc, char** argv ) {
                                while( sbuf->state == RMR_ERR_RETRY ) {                 // soft failure (device busy?) retry
                                        sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
                                }
                                while( sbuf->state == RMR_ERR_RETRY ) {                 // soft failure (device busy?) retry
                                        sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
                                }
-                               successful = 1;
+                               if( sbuf->state == RMR_OK ) {
+                                       successful = 1;                                                         // indicates only that we sent one successful message, not the current state
+                               } else {
+                                       if( successful ) {
+                                               fail_count++;                                                   // count failures after first successful message
+                                       }
+                               }
                                break;
 
                        case RMR_OK:
                                break;
 
                        case RMR_OK:
@@ -175,15 +198,19 @@ int main( int argc, char** argv ) {
                                break;
 
                        default:
                                break;
 
                        default:
+                               if( successful ) {
+                                       fail_count++;                                                   // count failures after first successful message
+                               }
                                // some error (not connected likely), don't count this
                                // some error (not connected likely), don't count this
+                               //sleep( 1 );
                                break;
                }
 
                if( successful ) {                              // once we have a message that was sent, start to increase things
                        count++;
                        mtype++;
                                break;
                }
 
                if( successful ) {                              // once we have a message that was sent, start to increase things
                        count++;
                        mtype++;
-                       if( mtype > 10 ) {                      // if large number of sends don't require infinite rt entries :)
-                               mtype = 1;
+                       if( mtype >= max_mt ) {                 // if large number of sends don't require infinite rt entries :)
+                               mtype = 0;
                        }
                }
 
                        }
                }
 
@@ -237,7 +264,7 @@ int main( int argc, char** argv ) {
                        }
                }
 
                        }
                }
 
-       fprintf( stderr, "<SNDR> [%s] sent %d messages   received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL",  count, rcvd_count, rt_count );
+       fprintf( stderr, "<SNDR> [%s] sent=%d  rcvd-acks=%d  failures=%d retries=%d\n", count == nmsgs ? "PASS" : "FAIL",  count, rcvd_count, fail_count, rt_count );
        rmr_close( mrc );
 
        return !( count == nmsgs );
        rmr_close( mrc );
 
        return !( count == nmsgs );
index e86c6f5..cefc09f 100644 (file)
@@ -139,6 +139,10 @@ static int rmr_api_test( ) {
        msg2 = rmr_send_msg( NULL, NULL );                      // drive for coverage
        errors += fail_not_nil( msg2, "send_msg returned msg pointer when given a nil message and context" );
 
        msg2 = rmr_send_msg( NULL, NULL );                      // drive for coverage
        errors += fail_not_nil( msg2, "send_msg returned msg pointer when given a nil message and context" );
 
+       msg->state = 0;
+       msg = rmr_send_msg( NULL, msg );
+       errors += fail_if( msg->state == 0, "rmr_send_msg did not set msg state when msg given with nil context" );
+
        // --- sends will fail with a no endpoint error until a dummy route table is set, so we test fail case first.
        msg->len = 100;
        msg->mtype = 1;
        // --- sends will fail with a no endpoint error until a dummy route table is set, so we test fail case first.
        msg->len = 100;
        msg->mtype = 1;
@@ -194,17 +198,28 @@ static int rmr_api_test( ) {
        rmr_rts_msg( rmc, NULL );
        errors += fail_if( errno == 0, "rmr_rts_msg did not set errno when given a nil message" );
 
        rmr_rts_msg( rmc, NULL );
        errors += fail_if( errno == 0, "rmr_rts_msg did not set errno when given a nil message" );
 
+       msg->state = 0;
+       msg = rmr_rts_msg( NULL, msg );                 // should set state in msg
+       errors += fail_if_equal( msg->state, 0, "rmr_rts_msg did not set state when given valid message but no context" );
+       
+
        msg = rmr_rts_msg( rmc, msg );                  // return the buffer to the sender
        errors += fail_if_nil( msg, "rmr_rts_msg did not return a message pointer" );
        errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno" );
 
 
        snprintf( msg->xaction, 17, "%015d", 16 );              // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
        msg = rmr_rts_msg( rmc, msg );                  // return the buffer to the sender
        errors += fail_if_nil( msg, "rmr_rts_msg did not return a message pointer" );
        errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno" );
 
 
        snprintf( msg->xaction, 17, "%015d", 16 );              // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
+
+       msg->state = 0;
+       msg = rmr_call( NULL, msg );
+       errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context" );
+
+       msg->mtype = 0;
        msg = rmr_call( rmc, msg );                                             // this call should return a message as we can anticipate a dummy message in
        errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed" );
        if( msg ) {
                errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return" );
        msg = rmr_call( rmc, msg );                                             // this call should return a message as we can anticipate a dummy message in
        errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed" );
        if( msg ) {
                errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return" );
-               errors += fail_if( errno != 0, "rmr_call did not properly set errno on successful return" );
+               errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return" );
        }
 
        snprintf( wbuf, 17, "%015d", 14 );                              // if we call receive we should find this in the first 15 tries
        }
 
        snprintf( wbuf, 17, "%015d", 14 );                              // if we call receive we should find this in the first 15 tries
@@ -240,6 +255,11 @@ static int rmr_api_test( ) {
        rmr_free_msg( msg2 );
 
 
        rmr_free_msg( msg2 );
 
 
+       msg2 = rmr_torcv_msg( NULL, NULL, 10 );
+       errors += fail_not_nil( msg2, "rmr_torcv_msg returned a pointer when given nil information" );
+       msg2 = rmr_torcv_msg( rmc, NULL, 10 );
+       errors += fail_if_nil( msg2, "rmr_torcv_msg did not return a message pointer when given a nil old msg" );
+
        // ---  test timeout receive; our dummy epoll function will return 1 ready on first call and 0 ready (timeout emulation) on second
        //              however we must drain the swamp (queue) first, so run until we get a timeout error, or 20 and report error if we get to 20.
        msg = NULL;
        // ---  test timeout receive; our dummy epoll function will return 1 ready on first call and 0 ready (timeout emulation) on second
        //              however we must drain the swamp (queue) first, so run until we get a timeout error, or 20 and report error if we get to 20.
        msg = NULL;
index 129eec1..710c75e 100644 (file)
@@ -228,5 +228,15 @@ static int rt_test( ) {
        }
 */
 
        }
 */
 
+       state = uta_link2( "worm", NULL, NULL );
+       errors += fail_if_true( state, "link2 did not return false when given nil pointers" );
+    
+       state = uta_epsock_rr( rt, 122, 0, NULL, NULL );
+       errors += fail_if_true( state, "uta_epsock_rr returned bad state when given nil socket pointer" );
+
+       rt = uta_rt_init( );                                                                            // get us a route table
+       state = uta_epsock_rr( rt, 0, -1, NULL, &nn_sock );
+       errors += fail_if_true( state, "uta_epsock_rr returned bad state (true) when given negative group number" );
+
        return !!errors;                        // 1 or 0 regardless of count
 }
        return !!errors;                        // 1 or 0 regardless of count
 }
index d477391..4997706 100644 (file)
@@ -46,14 +46,21 @@ static void gen_rt( uta_ctx_t* ctx ) {
        char*   rt_stuff;               // strings for the route table
 
        rt_stuff =
        char*   rt_stuff;               // strings for the route table
 
        rt_stuff =
+               "newrt|end\n"                                                           // end of table check before start of table found
+               "# comment to drive full comment test\n"
+               "\n"                                                                            // handle blank lines
+               "   \n"                                                                         // handle blank lines
+           "mse|4|10|localhost:4561\n"                                 // entry before start message
+           "rte|4|localhost:4561\n"                                    // entry before start message
                "newrt|start\n"                                                         // false start to drive detection
                "xxx|badentry to drive default case"
                "newrt|start\n"
                "newrt|start\n"                                                         // false start to drive detection
                "xxx|badentry to drive default case"
                "newrt|start\n"
-           "rte|0|localhost:4560,localhost:4562\n"
+           "rte|0|localhost:4560,localhost:4562\n"                                     // these are legitimate entries for our testing
            "rte|1|localhost:4562;localhost:4561,localhost:4569\n"
            "rte|1|localhost:4562;localhost:4561,localhost:4569\n"
-           "rte|2|localhost:4562\n"
-           "rte|4|localhost:4561\n"
-               "rte|5|localhost:4563\n"
+           "rte|2|localhost:4562| 10\n"                                                                // new subid at end
+           "mse|4|10|localhost:4561\n"                                                                 // new msg/subid specifier rec
+           "mse|4|localhost:4561\n"                                                                    // new mse entry with less than needed fields
+               "   rte|   5   |localhost:4563    #garbage comment\n"           // tests white space cleanup
            "rte|6|localhost:4562\n"
                "newrt|end\n";
 
            "rte|6|localhost:4562\n"
                "newrt|end\n";
 
@@ -92,6 +99,7 @@ static int sr_nng_test() {
        nng_socket nn_dummy_sock;                                       // dummy needed to drive send
        int             size;
        int             i;
        nng_socket nn_dummy_sock;                                       // dummy needed to drive send
        int             size;
        int             i;
+       void*   p;
 
        //ctx = rmr_init( "tcp:4360", 2048, 0 );                                // do NOT call init -- that starts the rtc thread which isn't good here
        ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) );              // alloc the context manually
 
        //ctx = rmr_init( "tcp:4360", 2048, 0 );                                // do NOT call init -- that starts the rtc thread which isn't good here
        ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) );              // alloc the context manually
@@ -104,6 +112,10 @@ static int sr_nng_test() {
        uta_lookup_rtg( ctx );
 
        gen_rt( ctx );                                                          // forces a static load with some known info since we don't start the rtc()
        uta_lookup_rtg( ctx );
 
        gen_rt( ctx );                                                          // forces a static load with some known info since we don't start the rtc()
+       gen_rt( ctx );                                                          // force a second load to test cloning
+
+       p = rt_ensure_ep( NULL, "foo" );                                // drive for coverage
+       errors += fail_not_nil( p,  "rt_ensure_ep did not return nil when given nil route table" );
 
        state = rmr_ready( NULL );
        errors += fail_if_true( state, "reported ready when given a nil context" );
 
        state = rmr_ready( NULL );
        errors += fail_if_true( state, "reported ready when given a nil context" );
index 21848af..74bc048 100755 (executable)
@@ -267,6 +267,7 @@ verbose=0
 show_all=1                                     # show all things -F sets to show failures only
 strict=0                                       # -s (strict) will set; when off, coverage state ignored in final pass/fail
 show_output=0                          # show output from each test execution (-S)
 show_all=1                                     # show all things -F sets to show failures only
 strict=0                                       # -s (strict) will set; when off, coverage state ignored in final pass/fail
 show_output=0                          # show output from each test execution (-S)
+quiet=0
 
 while [[ $1 == "-"* ]]
 do
 
 while [[ $1 == "-"* ]]
 do
@@ -285,6 +286,7 @@ do
                -s)     strict=1;;                                      # coverage counts toward pass/fail state
                -S)     show_output=1;;                         # test output shown even on success
                -v)     (( verbose++ ));;
                -s)     strict=1;;                                      # coverage counts toward pass/fail state
                -S)     show_output=1;;                         # test output shown even on success
                -v)     (( verbose++ ));;
+               -q)     quiet=1;;                                       # less chatty when spilling error log files
 
                -h)     usage; exit 0;;
                --help) usage; exit 0;;
 
                -h)     usage; exit 0;;
                --help) usage; exit 0;;
@@ -353,7 +355,12 @@ do
        if ! ./${tfile%.c} >/tmp/PID$$.log 2>&1
        then
                echo "[FAIL] unit test failed for: $tfile"
        if ! ./${tfile%.c} >/tmp/PID$$.log 2>&1
        then
                echo "[FAIL] unit test failed for: $tfile"
-               cat /tmp/PID$$.log
+               if (( quiet )) 
+               then
+                       grep "^<" /tmp/PID$$.log        # in quiet mode just dump <...> messages which are assumed from the test programme not appl
+               else
+                       cat /tmp/PID$$.log
+               fi
                (( ut_errors++ ))                               # cause failure even if not in strict mode
                continue                                                # skip coverage tests for this
        else
                (( ut_errors++ ))                               # cause failure even if not in strict mode
                continue                                                # skip coverage tests for this
        else
index 9cb8d82..f2d4bf6 100644 (file)
@@ -48,6 +48,7 @@ static int worm_test( ) {
        char    wbuf[1024];
        int errors = 0;                 // number errors found
        int     i;
        char    wbuf[1024];
        int errors = 0;                 // number errors found
        int     i;
+       void* p;
 
        rmr_mbuf_t*     mbuf;           // mbuf to send to peer
        int             whid = -1;
 
        rmr_mbuf_t*     mbuf;           // mbuf to send to peer
        int             whid = -1;
@@ -100,11 +101,22 @@ static int worm_test( ) {
        whid = rmr_wh_open( ctx, "localhost:21961" );
        errors += fail_not_equal( whid, 3, "attempt to fill in a hole didn't return expected" );
 
        whid = rmr_wh_open( ctx, "localhost:21961" );
        errors += fail_not_equal( whid, 3, "attempt to fill in a hole didn't return expected" );
 
-       rmr_wh_send_msg( NULL, 0, NULL );                       // tests for coverage
-       rmr_wh_send_msg( ctx, 0, NULL );
+       p = rmr_wh_send_msg( NULL, 0, NULL );                   // tests for coverage
+       fail_not_nil( p, "wh_send_msg returned a pointer when given nil context and message" );
+
+       p = rmr_wh_send_msg( ctx, 0, NULL );
+       fail_not_nil( p, "wh_send_msg returned a pointer when given nil message with valid context" );
 
        mbuf = rmr_alloc_msg( ctx, 2048 );                      // get an muf to pass round
        errors += fail_if_nil( mbuf, "unable to allocate mbuf for send tests (giving up on send tests)" );
 
        mbuf = rmr_alloc_msg( ctx, 2048 );                      // get an muf to pass round
        errors += fail_if_nil( mbuf, "unable to allocate mbuf for send tests (giving up on send tests)" );
+
+       mbuf->state = 0;
+       mbuf = rmr_wh_send_msg( NULL, 0, mbuf );
+       if( mbuf ) {
+               fail_if_equal( mbuf->state, 0, "wh_send_msg returned a zero state when given a nil context" );
+       }
+       fail_if_nil( mbuf, "wh_send_msg returned a nil message buffer when given a nil context"  );
+
        while( mbuf ) {
                if( !(mbuf = rmr_wh_send_msg( ctx, 50, mbuf )) ) {              // test for coverage
                        errors += fail_if_nil( mbuf, "send didn't return an mbuf (skip rest of send tests)" );
        while( mbuf ) {
                if( !(mbuf = rmr_wh_send_msg( ctx, 50, mbuf )) ) {              // test for coverage
                        errors += fail_if_nil( mbuf, "send didn't return an mbuf (skip rest of send tests)" );