feat(routing): Support session based routing 84/84/1
authorE. Scott Daniels <daniels@research.att.com>
Tue, 23 Apr 2019 18:24:25 +0000 (18:24 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Thu, 25 Apr 2019 15:53:25 +0000 (15:53 +0000)
The session id field in a message buffer is now used
directly for routing.

Change-Id: I3634c97588b11172db964b2d06c96c317d8b8ae3
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Routing table entry changes to pick up subid

Change-Id: If08dc21aae4acaab350ba75a8854ad2f24007b03
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Fix unit test for rmr_call

It was not properly setting message type and now that RMr
ensures that invalid message type is set by default on a newly
created message this was causing unit test to fail.

Change-Id: I50f08d1038ea7fca2a070cdd949657bfbc25f3fd
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Add application level tests

Added round robin and multi group application level
test scripts.

Change-Id: Ic6aebaf3bc1edb763decc7fd0aebb09df116f20c
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
NNG based sub-id support added

Change-Id: I0d36b55bb90a315ba94c9476df88e2c7eac6c383
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Correct bug in app test script

Change-Id: I5b4a9f32aa1bc2907f320b8ad4628e0948062904
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Nano sub-id changes and unit test updates

Change-Id: Ia69f2fb33de3bbee2f33f9a4c5def779c872e52c
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change nil-sub_id key to high order key

If there is no sub-id, then the key is based only on the
message type, but to allow for a sub-id == 0 the key
when there is no subscription id must be set to
0xffffffff00000000 + msg type.

New version for deb is 1.0.19

Change-Id: I55f89d368466a0137fdea99410c76ba72e1923ab
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
26 files changed:
CMakeLists.txt
src/common/include/rmr.h
src/common/include/rmr_agnostic.h
src/common/include/rmr_symtab.h
src/common/src/rt_generic_static.c
src/common/src/symtab.c
src/nanomsg/include/rmr_private.h
src/nanomsg/src/rmr.c
src/nanomsg/src/rtable_static.c
src/nanomsg/src/sr_static.c
src/nng/include/rmr_nng_private.h
src/nng/src/rmr_nng.c
src/nng/src/rtable_nng_static.c
src/nng/src/sr_nng_static.c
test/app_test/rebuild.ksh [new file with mode: 0644]
test/app_test/receiver.c
test/app_test/rt.mask
test/app_test/run_app_test.ksh
test/app_test/run_multi_test.ksh [new file with mode: 0644]
test/app_test/run_rr_test.ksh [new file with mode: 0644]
test/app_test/sender.c
test/rmr_nng_api_static_test.c
test/rt_static_test.c
test/sr_nng_static_test.c
test/unit_test.ksh
test/wormhole_static_test.c

index 2782147..365d370 100644 (file)
@@ -24,7 +24,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "18" )
+set( patch_level "19" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
index 6bfe075..e6c1660 100644 (file)
@@ -47,6 +47,8 @@ extern "C" {
 
 #define RMR_DEF_SIZE           0               // pass as size to have msg allocation use the default msg size
 
+#define RMR_VOID_MSGTYPE       (-1)    // unset/invalid message type and sub id
+#define RMR_VOID_SUBID         (-1)
 
 #define RMR_OK                         0               // state is good
 #define RMR_ERR_BADARG         1               // argument passd to function was unusable
index a23cff7..06da060 100644 (file)
@@ -230,6 +230,7 @@ static int ie_test( void* r, int i_factor, long inserts );
 
 
 // ----- route table generic static things ---------
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
 static char* uta_fib( char* fname );
@@ -237,7 +238,7 @@ static route_table_t* uta_rt_init( );
 static route_table_t* uta_rt_clone( route_table_t* srt );
 static void uta_rt_drop( route_table_t* rt );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups );
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
index 7d4d18c..74c5aa3 100644 (file)
@@ -28,6 +28,8 @@
 #ifndef _rmr_symtab_h
 #define _rmr_symtab_h
 
+#include <netdb.h>
+
 /* --------- symtab ---------------- */
 #define UT_FL_NOCOPY 0x00          /* use user pointer */
 #define UT_FL_COPY 0x01            /* make a copy of the string data */
@@ -39,12 +41,12 @@ extern void rmr_sym_clear( void *s );
 extern void rmr_sym_dump( void *s );
 extern void *rmr_sym_alloc( int size );
 extern void rmr_sym_del( void *s, const char *name, unsigned int class );
-extern void *rmr_sym_ndel( void *vtable, int key );
+extern void *rmr_sym_ndel( void *vtable, uint64_t key );
 extern void rmr_sym_free( void *vtable );
 extern void *rmr_sym_get( void *s,  const char *name, unsigned int class );
 extern int rmr_sym_put( void *s,  const char *name, unsigned int class, void *val );
-extern int rmr_sym_map( void *s,  unsigned int key, void *val );
-extern void *rmr_sym_pull(  void *vtable, int key );
+extern int rmr_sym_map( void *s,  uint64_t key, void *val );
+extern void *rmr_sym_pull(  void *vtable, uint64_t key );
 extern void rmr_sym_stats( void *s, int level );
 extern void rmr_sym_foreach_class( void *vst, unsigned int class, void (* user_fun)( void*, void*, const char*, void*, void* ), void *user_data );
 
index 4dd9344..5bbacbd 100644 (file)
@@ -44,6 +44,7 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
+#include <netdb.h>
 
 
 /*
@@ -56,13 +57,43 @@ typedef struct thing_list {
        void** things;
 } thing_list_t;
 
+// ------------------------------------------------------------------------------------------------
+
+
+/*
+       Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
+       must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
+*/
+static char* clip( char* buf ) {
+       char*   tok;
+
+       while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
+               buf++;
+       }
+
+       if( (tok = strchr( buf, '#' )) != NULL ) {
+               if( tok == buf ) {
+                       return buf;                                     // just push back; leading comment sym handled there
+               }
+
+               if( isspace( *(tok-1) ) ) {
+                       *tok = 0;
+               }
+       }
+
+       for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
+       *(tok+1) = 0;
+
+       return buf;
+}
+
 
 /*
        Given a message type create a route table entry and add to the hash keyed on the
        message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
        is the number of group slots to allocate in the entry.
 */
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) {
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
        rtable_ent_t* rte;
 
        if( rt == NULL ) {
@@ -87,12 +118,67 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups )
        memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
        rte->nrrgroups = nrrgroups;
 
-       rmr_sym_map( rt->hash, mtype, rte );                                                    // add to hash using numeric mtype as key
+       rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: mt=%d groups=%d\n", mtype, nrrgroups );
+       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lu groups=%d\n", key, nrrgroups );
        return rte;
 }
 
+/*
+       This accepts partially parsed information from a record sent by route manager or read from
+       a file such that:
+               ts_field is the msg-type,sender field
+               subid is the integer subscription id
+               rr_field is the endpoint information for round robening message over
+
+       If all goes well, this will add an RTE to the table under construction.
+
+       The ts_field is checked to see if we should ingest this record. We ingest if one of
+       these is true:
+               there is no sender info (a generic entry for all)
+               there is sender and our host:port matches one of the senders
+               the sender info is an IP address that matches one of our IP addresses
+*/
+static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
+       rtable_ent_t*   rte;            // route table entry added
+       char*   tok;
+       int             ntoks;
+       uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
+       char*   tokens[128];
+       char*   gtokens[64];
+       int             i;
+       int             ngtoks;                         // number of tokens in the group list
+       int             grp;                            // index into group list
+
+       ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
+       rr_field = clip( rr_field );
+
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+               (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
+               has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
+
+                       key = build_rt_key( subid, atoi( ts_field ) );
+
+                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lu\n", ts_field, subid, key );
+
+                       if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
+                               rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
+
+                               for( grp = 0; grp < ngtoks; grp++ ) {
+                                       if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
+                                               for( i = 0; i < ntoks; i++ ) {
+                                                       if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", ts_field );
+                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+                                               }
+                                       }
+                               }
+                       }
+               } else {
+                       if( DEBUG || (vlevel > 2) )
+                               fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+               }
+}
+
 /*
        Parse a single record recevied from the route table generator, or read
        from a static route table file.  Start records cause a new table to
@@ -100,6 +186,11 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups )
        entry records are added to the currenly 'in progress' table, and an
        end record causes the in progress table to be finalised and the
        currently active table is replaced.
+
+       We expect one of several types:
+               newrt|{start|end}
+               rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
+               mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
 */
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
        int i;
@@ -129,6 +220,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                break;
 
                        case 'n':                                                                                               // newrt|{start|end}
+                               tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
                                        if( ctx->new_rtable ) {
                                                uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
@@ -155,33 +247,29 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                }
                                break;
 
+                       case 'm':                                       // assume mse entry
+                               if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
+                                       break;
+                               }
+
+                               if( ntoks < 4 ) {
+                                       if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
+                                       break;
+                               }
+
+                               build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
+                               break;
+
                        case 'r':                                       // assume rt entry
                                if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
                                        break;
                                }
 
-                               if( ((tok = strchr( tokens[1], ',' )) == NULL ) ||                                      // no sender names
-                                       (uta_has_str( tokens[1],  ctx->my_name, ',', 127) >= 0) ||              // our name isn't in the list
-                                       has_myip( tokens[1], ctx->ip_list, ',', 127 ) ) {                               // the list has one of our IP addresses
-
-                                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s\n", tokens[1] );
-
-                                       if( (ngtoks = uta_tokenise( tokens[2], gtokens, 64, ';' )) > 0 ) {                                      // split last field by groups first
-                                               rte = uta_add_rte( ctx->new_rtable, atoi( tokens[1] ), ngtoks );                        // get/create entry for message type
-                                               for( grp = 0; grp < ngtoks; grp++ ) {
-                                                       if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
-                                                               for( i = 0; i < ntoks; i++ ) {
-                                                                       if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", tokens[i] );
-                                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
-                                                               }
-                                                       }
-                                               }
-                                       }
+                               if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
+                                       build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
                                } else {
-                                       if( DEBUG || (vlevel > 2) )
-                                               fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+                                       build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
                                }
-
                                break;
 
                        default:
@@ -463,4 +551,21 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
 }
 
 
+/*
+       Given a session id and message type build a key that can be used to look up the rte in the route
+       table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
+*/
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
+       uint64_t key;
+
+       if( sub_id == UNSET_SUBID ) {
+               key = 0xffffffff00000000 | mtype;
+       } else {
+               key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
+       }
+
+       return key;
+}
+
+
 #endif
index a1a792d..31f150f 100644 (file)
@@ -28,6 +28,7 @@ Abstract:     Symbol table -- slightly streamlined from it's original 2000 version
                        Things changed for the Ric Msg implemention (Nov 2018):
                                - no concept of copy/free of the user data (functions removed)
                                - add ability to support an integer key (class 0)
+                                 Numeric key is an unsigned, 64bit key.
                                - externally visible names given a rmr_ extension as it's being
                                  incorporated into the RIC msg routing library and will be
                                  available to user applications.
@@ -46,6 +47,7 @@ Mod:          2016 23 Feb - converted Symtab refs so that caller need only a
 #include <string.h>
 #include <stdlib.h>
 #include <memory.h>
+#include <netdb.h>
 
 #include "rmr_symtab.h"
 
@@ -56,7 +58,7 @@ typedef struct Sym_ele
        struct Sym_ele *next;   /* pointer at next element in list */
        struct Sym_ele *prev;   /* larger table, easier deletes */
        const char *name;               /* symbol name */
-       unsigned int    nkey;   // the numeric key
+       uint64_t nkey;                  // the numeric key
        void *val;              /* user data associated with name */
        unsigned long mcount;   /* modificaitons to value */
        unsigned long rcount;   /* references to symbol */
@@ -136,11 +138,11 @@ static inline int same( unsigned int c1, unsigned int c2, const char *s1, const
        much the same.
 */
 static int putin( Sym_tab *table, const char *name, unsigned int class, void *val ) {
-       Sym_ele *eptr;          /* pointer into hash table */
+       Sym_ele *eptr;                  /* pointer into hash table */
        Sym_ele **sym_tab;      /* pointer into hash table */
-       int hv;                  /* hash value */
-       int rc = 0;              /* assume it existed */
-       unsigned int    nkey = 0;       // numeric key if class == 0
+       int hv;                 /* hash value */
+       int rc = 0;             /* assume it existed */
+       uint64_t nkey = 0;              // numeric key if class == 0
 
        sym_tab = table->symlist;
 
@@ -148,7 +150,7 @@ static int putin( Sym_tab *table, const char *name, unsigned int class, void *va
                hv = sym_hash( name, table->size );             // hash it
                for( eptr=sym_tab[hv]; eptr && ! same( class, eptr->class, eptr->name, name); eptr=eptr->next );
        } else {
-               nkey = *((int *) name);
+               nkey = *((uint64_t *) name);
                hv = nkey % table->size;                                        // just hash the number
                for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next );
        }
@@ -237,7 +239,7 @@ extern void rmr_sym_dump( void *vtable )
                        if( eptr->val && eptr->class ) {
                                fprintf( stderr, "key=%s val@=%p\n", eptr->name, eptr->val );
                        } else {
-                               fprintf( stderr, "nkey=%d val@=%p\n", eptr->nkey, eptr->val );
+                               fprintf( stderr, "nkey=%lu val@=%p\n", (unsigned long) eptr->nkey, eptr->val );
                        }
                }
        }
@@ -284,9 +286,9 @@ extern void rmr_sym_del( void *vtable, const char *name, unsigned int class )
 {
        Sym_tab *table;
        Sym_ele **sym_tab;
-       Sym_ele *eptr;    /* pointer into hash table */
-       int hv;                  /* hash value */
-       unsigned int nkey;              // class 0, name points to integer not string
+       Sym_ele *eptr;                  /* pointer into hash table */
+       int hv;                 /* hash value */
+       uint64_t        nkey;           // class 0, name points to integer not string
 
        table = (Sym_tab *) vtable;
        sym_tab = table->symlist;
@@ -306,7 +308,7 @@ extern void rmr_sym_del( void *vtable, const char *name, unsigned int class )
 /*
        Delete element by numberic key.
 */
-extern void *rmr_sym_ndel(  void *vtable, int key ) {
+extern void *rmr_sym_ndel(  void *vtable, uint64_t key ) {
        rmr_sym_del( vtable, (const char *) &key, 0 );
 }
 
@@ -317,7 +319,7 @@ extern void *rmr_sym_get( void *vtable, const char *name, unsigned int class )
        Sym_ele **sym_tab;
        Sym_ele *eptr;                  // element from table
        int hv;                 // hash value of key
-       unsigned int nkey;              // numeric key if class 0
+       uint64_t nkey;                  // numeric key if class 0
 
        table = (Sym_tab *) vtable;
        sym_tab = table->symlist;
@@ -326,7 +328,7 @@ extern void *rmr_sym_get( void *vtable, const char *name, unsigned int class )
                hv = sym_hash( name, table->size );
                for(eptr=sym_tab[hv]; eptr &&  ! same(class, eptr->class, eptr->name, name); eptr=eptr->next );
        } else {
-               nkey = *((int *) name);
+               nkey = *((uint64_t *) name);
                hv = nkey % table->size;                        // just hash the number
                for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next );
        }
@@ -343,7 +345,7 @@ extern void *rmr_sym_get( void *vtable, const char *name, unsigned int class )
 /*
        Retrieve the data referenced by a numerical key.
 */
-extern void *rmr_sym_pull(  void *vtable, int key ) {
+extern void *rmr_sym_pull(  void *vtable, uint64_t key ) {
        return rmr_sym_get( vtable, (const char *) &key, 0 );
 }
 
@@ -366,11 +368,11 @@ extern int rmr_sym_put( void *vtable, const char *name, unsigned int class, void
 }
 
 /*
-       Add a new entry assuming that the key is an unsigned integer.
+       Add a new entry assuming that the key is an unsigned, 64 bit, integer.
 
        Returns 1 if new, 0 if existed
 */
-extern int rmr_sym_map( void *vtable, unsigned int key, void *val ) {
+extern int rmr_sym_map( void *vtable, uint64_t key, void *val ) {
        Sym_tab *table;
 
        table = (Sym_tab *) vtable;
@@ -407,7 +409,7 @@ extern void rmr_sym_stats( void *vtable, int level )
                                        if( eptr->class  ) {                                    // a string key
                                                fprintf( stderr, "sym: (%d) key=%s val@=%p ref=%ld mod=%lu\n", i, eptr->name, eptr->val, eptr->rcount, eptr->mcount );
                                        } else {
-                                               fprintf( stderr, "sym: (%d) key=%d val@=%p ref=%ld mod=%lu\n", i, eptr->nkey, eptr->val, eptr->rcount, eptr->mcount );
+                                               fprintf( stderr, "sym: (%d) key=%lu val@=%p ref=%ld mod=%lu\n", i, (unsigned long) eptr->nkey, eptr->val, eptr->rcount, eptr->mcount );
                                        }
                                }
                        }
@@ -434,7 +436,7 @@ extern void rmr_sym_stats( void *vtable, int level )
                        if( eptr->class ) {
                                fprintf( stderr, "\t%s\n", eptr->name );
                        } else {
-                               fprintf( stderr, "\t%d (numeric key)\n", eptr->nkey );
+                               fprintf( stderr, "\t%lu (numeric key)\n", (unsigned long) eptr->nkey );
                        }
                }
        }
index dde00ae..7d78382 100644 (file)
@@ -97,7 +97,7 @@ static int uta_link2( char* target );
 static int rt_link2_ep( endpoint_t* ep );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
 static int uta_epsock_byname( route_table_t* rt, char* ep_name );
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more );
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more );
 
 // ------ msg ------------------------------------------------
 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int tr_size );
@@ -107,48 +107,6 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock );
 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
 
-
-
-/*
-// --- message ring --------------------------
-static void* uta_mk_ring( int size );
-static void uta_ring_free( void* vr );
-static inline void* uta_ring_extract( void* vr );
-static inline int uta_ring_insert( void* vr, void* new_data );
-
-// --- message and context management --------
-static int ie_test( void* r, int i_factor, long inserts );
-static void free_ctx( uta_ctx_t* ctx );
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state );
-static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  );
-static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
-
-// ----- route table static things ---------
-static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
-static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
-static char* uta_fib( char* fname );
-static route_table_t* uta_rt_init( );
-static route_table_t* uta_rt_clone( route_table_t* srt );
-static void uta_rt_drop( route_table_t* rt );
-static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups );
-static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
-static int uta_epsock_byname( route_table_t* rt, char* ep_name );
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more );
-static void read_static_rt( uta_ctx_t* ctx, int vlevel );
-static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
-static void* rtc( void* vctx );
-static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
-
-
-// --- tools_static protos ------------------
-static int uta_tokenise( char* buf, char** tokens, int max, char sep );
-static char* uta_h2ip( char const* hname );
-static int uta_link2( char* target );
-static int uta_lookup_rtg( uta_ctx_t* ctx );
-static int uta_has_str( char const* buf, char const* str, char sep, int max );
-*/
-
 static int rt_link2_ep( endpoint_t* ep );
 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
 
index dcf7e67..e67402f 100644 (file)
@@ -242,6 +242,8 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        int     group;                                  // selected group to get socket for
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
+       uint64_t key;                           // lookup key is now subid and mtype
+       int max_rt = 1000;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -263,8 +265,10 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
+       key = build_rt_key( msg->sub_id, msg->mtype );                  // what we need to find the route table entry
        while( send_again ) {
-               nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again );         // round robin select endpoint; again set if mult groups
+               max_rt = 1000;
+               nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again );                // round robin select endpoint; again set if mult groups
                if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n",
                                msg->mtype, send_again, group, nn_sock, msg->len );
                group++;
@@ -277,18 +281,21 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
-                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len );
                        msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
                        msg = send_msg( ctx, msg, nn_sock );                                    // do the hard work, msg should be nil on success
-                       /*
-                       if( msg ) {
-                               // error do we need to count successes/errors, how to report some success, esp if last fails?
+                       while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
+                               msg = send_msg( ctx, msg, nn_sock );
+                               max_rt--;
                        }
-                       */
 
                        msg = clone_m;                                                                                  // clone will be the next to send
                } else {
                        msg = send_msg( ctx, msg, nn_sock );                                    // send the last, and allocate a new buffer; drops the clone if it was
+                       while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
+                               msg = send_msg( ctx, msg, nn_sock );
+                               max_rt--;
+                       }
                }
        }
 
index 3ebad2d..cfcb27e 100644 (file)
@@ -58,13 +58,13 @@ static int uta_link2( char* target ) {
 
        nn_sock = nn_socket( AF_SP, NN_PUSH );          // the socket we'll use to connect to the target
        if( nn_sock < 0 ) {
-               fprintf( stderr, "[WARN] rmr: link2: unable to create socket for link to target: %s: %d\n", target, errno );
+               fprintf( stderr, "[WARN] rmr: link2: unable to create socket for link to target: %s: %d\n\n\n", target, errno );
                return -1;
        }
 
        snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
        if( nn_connect( nn_sock, conn_info ) < 0 ) {                                                    // connect failed
-               fprintf( stderr, "[WARN] rmr: link2: unable to create link to target: %s: %d\n", target, errno );
+               fprintf( stderr, "[WARN] rmr: link2: unable to create link to target: %s: %d\n\n\n", target, errno );
                nn_close( nn_sock );
                return -1;
        }
@@ -138,6 +138,7 @@ static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                }
 
                ep->nn_sock = -1;                                       // not connected
+               ep->open = 0;
                ep->addr = uta_h2ip( ep_name );
                ep->name = strdup( ep_name );
 
@@ -203,14 +204,15 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name ) {
        invoke this function again to make a selection against that group. If there
        are no more groups, more is set to 0.
 */
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) {
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
        rtable_ent_t* rte;                      // matching rt entry
        endpoint_t*     ep;                             // seected end point
-       int nn_sock = -1;
+       int nn_sock = -2;
        int dummy;
        rrgroup_t* rrg;
 
-       if( ! more ) {                          // eliminate cheks each time we need to user
+
+       if( ! more ) {                          // eliminate checks each time we need to use
                more = &dummy;
        }
 
@@ -219,20 +221,20 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) {
                return -1;
        }
 
-       if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) {
+       if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
                *more = 0;
-               //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype );
+               //if( DEBUG ) fprintf( stderr, "#### >>> rte not found for type key=%lu\n", key );
                return -1;
        }
 
        if( group < 0 || group >= rte->nrrgroups ) {
-               //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups );
+               //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
                *more = 0;
                return -1;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
-               //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype );
+               //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %lu\n", key );
                *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
                return -1;
        }
@@ -241,10 +243,10 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) {
 
        switch( rrg->nused ) {
                case 0:                         // nothing allocated, just punt
-                       //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
+                       //if( DEBUG )
                        return -1;
 
-               case 1:                         // exactly one, no rr to deal with and more is not possible even if fanout > 1
+               case 1:                         // exactly one, no rr to deal with
                        nn_sock = rrg->epts[0]->nn_sock;
                        ep = rrg->epts[0];
                        break;
@@ -258,7 +260,7 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) {
                        break;
        }
 
-       if( ! ep->open ) {                              // not connected
+       if( ep && ! ep->open ) {                                // not connected
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                        ep->addr = uta_h2ip( ep->name );
                }
index 77a0e64..18acdda 100644 (file)
@@ -98,6 +98,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                exit( 1 );
        }
 
+       memset( msg->header, 0, sizeof( uta_mhdr_t ) );                 // must ensure that header portion of tpbuf is 0
+       msg->tp_buf = msg->header;
        hdr = (uta_mhdr_t *) msg->header;
        hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
        hdr->sub_id = htonl( UNSET_SUBID );
@@ -108,6 +110,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
 
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated payload
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
@@ -323,6 +327,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
+       //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype );
        hdr = (uta_mhdr_t *) msg->header;
        hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
        hdr->sub_id = htonl( msg->sub_id );
index b810bdb..6bca91f 100644 (file)
@@ -104,7 +104,7 @@ static void free_ctx( uta_ctx_t* ctx );
 static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer );
 static int rt_link2_ep( endpoint_t* ep );
 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock );
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock );
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock );
 static inline int xlate_nng_state( int state, int def_state );
 
 
index 70542f8..ba7c852 100644 (file)
@@ -204,6 +204,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int sock_ok;                            // got a valid socket from round robin select
+       uint64_t key;                           // mtype or sub-id/mtype sym table key
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -229,8 +230,9 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
+       key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
        while( send_again ) {
-               sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock );               // round robin sel epoint; again set if mult groups
+               sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
                if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->mtype, send_again, group, msg->len, sock_ok );
                group++;
index d6d3490..6122f69 100644 (file)
@@ -231,7 +231,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        during test that different entries are being seleted; we cannot depend on the nng
        socket being different as we could with nano.
 */
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock ) {
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
        rtable_ent_t* rte;                      // matching rt entry
        endpoint_t*     ep;                             // seected end point
        int  state = FALSE;                     // processing state
@@ -254,20 +254,20 @@ static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nn
                return FALSE;
        }
 
-       if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) {
+       if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
                *more = 0;
-               //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype );
+               //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %lu\n", key );
                return FALSE;
        }
 
        if( group < 0 || group >= rte->nrrgroups ) {
-               //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups );
+               //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
                *more = 0;
                return FALSE;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
-               //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype );
+               //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type key=%lu\n", key );
                *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
                return FALSE;
        }
index e1a2fa5..7c17589 100644 (file)
@@ -147,6 +147,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        }
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
@@ -175,6 +177,8 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
 
        memset( msg, 0, sizeof( *msg ) );
 
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
        msg->tp_buf = NULL;
        msg->header = NULL;
        msg->len = -1;                                                                                  // no payload; invalid len
diff --git a/test/app_test/rebuild.ksh b/test/app_test/rebuild.ksh
new file mode 100644 (file)
index 0000000..04367ef
--- /dev/null
@@ -0,0 +1,46 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+#      Mnemonic:       rebuild.ksh
+#      Abstract:       This is a simple script that will cause RMr to be rebuilt. It 
+#                              may be invoked by any of the run_* scripts in this directory.
+#
+#      Date:           24 April 2019
+#      Author:         E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+build_path=../../.build
+
+(
+       set -e
+       mkdir -p $build_path
+       cd ${build_path%/*}                             # cd barfs on ../../.build, so we do this
+       cd ${build_path##*/}
+       cmake ..
+       make package
+)
+if (( $? != 0 ))
+then
+       echo "build failed"
+       exit 1
+fi
+
index dd639ce..ed2450b 100644 (file)
@@ -99,12 +99,15 @@ int main( int argc, char** argv ) {
        long good = 0;                                          // good palyload buffers
        long bad = 0;                                           // payload buffers which were not correct
        long bad_tr = 0;                                        // trace buffers that were not correct
+       long bad_sid = 0;                                       // bad subscription ids
        long timeout = 0;
        char*   data;
-       char    wbuf[1024];                                     // we'll pull trace data into here
        int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
        int             rt_count = 0;                           // retry count
        long ack_count = 0;                                     // number of acks sent
+       int             count_bins[11];                         // histogram bins based on msg type (0-10)
+       char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
+       char    sbuf[128];                                      // short buffer
 
        data = getenv( "RMR_RTG_SVC" );
        if( data == NULL ) {
@@ -118,6 +121,8 @@ int main( int argc, char** argv ) {
                listen_port = argv[2];
        }
 
+       memset( count_bins, 0, sizeof( count_bins ) );
+
        fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
 
        mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
@@ -165,11 +170,21 @@ int main( int argc, char** argv ) {
                                }
                                count++;                                                                        // messages received for stats output
 
-                               if( msg->mtype == 5 ) {                                         // send an ack; sender will count but not process, so data in message is moot
-                                       msg = rmr_rts_msg( mrc, msg );                                                          // we don't try to resend if this returns retry
+                               if( msg->mtype < 3 ) {                                                  // count number of properly set subscription id
+                                       if( msg->sub_id != msg->mtype * 10 ) {
+                                               bad_sid++;
+                                       }
+                               }
+
+                               if( msg->mtype >= 0 && msg->mtype <= 10 ) {
+                                       count_bins[msg->mtype]++;
+                               }
+
+                               if( msg->mtype == 5 ) {                                                                                 // send an ack; sender will count but not process, so data in message is moot
+                                       msg = rmr_rts_msg( mrc, msg );
                                        rt_count = 1000;
                                        while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
-                                               if( ack_count < 1 ) {                                                                   // need to connect, so hard wait
+                                               if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
                                                        sleep( 1 );
                                                }
                                                rt_count--;
@@ -189,7 +204,16 @@ int main( int argc, char** argv ) {
                }
        }
 
-       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld\n", !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr );
+       wbuf[0] = 0;
+       for( i = 0; i < 11; i++ ) {
+               snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
+               strcat( wbuf, sbuf );
+       }
+
+       fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
+       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld bad-sub_id=%ld\n", 
+               !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
+
        sleep( 2 );                                                                     // let any outbound acks flow before closing
 
        rmr_close( mrc );
index 80d139f..5fab235 100644 (file)
@@ -1,21 +1,24 @@
 
 # This is a 'mask' such that the run command can generate with the 
-# host name for the sender.
+# host name for the sender. (not needed after RMr 1.0.18)
 
-newrt|start
-rte|0|localhost:4560
-rte|1|localhost:4560
-rte|2|localhost:4560
-rte|3|localhost:4560
-rte|4|localhost:4560
-rte|5|localhost:4560
-rte|6|localhost:4560
-rte|7|localhost:4560
-rte|8|localhost:4560
-rte|9|localhost:4560
-rte|10|localhost:4560
-rte|11|localhost:4560
-rte|12|localhost:4560
-rte|13|localhost:4560
-rte|999|%%hostname%%:43086
-newrt|end
+newrt | start
+mse | 0 |  0 | localhost:4560
+mse | 1 | 10 | localhost:4560
+mse | 2 | 20 | localhost:4560
+rte | 3 | localhost:4560
+mse | 3 | 100 | localhost:4560 # special test to ensure that this does not affect previous entry
+rte | 4 | localhost:4560
+rte | 5 | localhost:4560
+rte | 6 | localhost:4560
+rte | 7 | localhost:4560
+rte | 8 | localhost:4560
+rte | 9 | localhost:4560
+rte | 10 | localhost:4560
+rte | 11 | localhost:4560
+rte | 12 | localhost:4560
+rte | 13 | localhost:4560
+
+# this entry isn't needed after RMr 1.0.18
+rte | 999 | %%hostname%%:43086
+newrt | end
index 7ad4934..3c4f20e 100644 (file)
@@ -73,6 +73,7 @@ nano_sender=0                         # start nano version if set (-N)
 nano_receiver=0
 wait=1
 rebuild=0
+verbose=0
 
 while [[ $1 == -* ]]
 do
@@ -83,6 +84,7 @@ do
                        nano_receiver=1
                        ;;
                -n)     nmsg=$2; shift;;
+               -v)     verbose=1;;
 
                *)      echo "unrecognised option: $1"
                        echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
@@ -94,23 +96,18 @@ do
        shift
 done
 
+if (( verbose ))
+then
+       echo "2" >.verbose
+       export RMR_VCTL_FILE=".verbose"
+fi
+
 if (( rebuild )) 
 then
        build_path=../../.build
-
-       (
-               set -e
-               mkdir -p $build_path
-               cd ${build_path%/*}                             # cd barfs on ../../.build, so we do this
-               cd ${build_path##*/}
-               cmake ..
-               make package
-       )
-       if (( $? != 0 ))
-       then
-               echo "build failed"
-               exit 1
-       fi
+       set -e
+       ksh ./rebuild.ksh
+       set +e
 else
        build_path=${BUILD_PATH:-"../../.build"}        # we prefer .build at the root level, but allow user option
 
@@ -137,6 +134,7 @@ then
 fi
 
 run_rcvr &
+sleep 2                                # if sender starts faster than rcvr we can drop, so pause a bit
 run_sender &
 
 wait
@@ -151,6 +149,7 @@ else
 fi
 
 rm /tmp/PID$$.*
+rm -f .verbose
 
 exit $(( !! (src + rrc) ))
 
diff --git a/test/app_test/run_multi_test.ksh b/test/app_test/run_multi_test.ksh
new file mode 100644 (file)
index 0000000..9bfc59b
--- /dev/null
@@ -0,0 +1,208 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+#      Mnemonic:       run_multi_test.ksh
+#      Abstract:       This is a simple script to set up and run the basic send/receive
+#                              processes for some library validation on top of nano/nng. This 
+#                              particular tests starts several receivers and creates a route table
+#                              which causes messages to be sent to all receivers in parallel
+#                              (forcing message cloning internally in RMr).
+#                              It should be possible to clone the repo, switch to this directory
+#                              and execute  'ksh run -B'  which will build RMr, make the sender and
+#                              recevier then  run the basic test.
+#
+#                              Example command line:
+#                                      ksh ./run_multi_test.ksh                # default 10 messages at 1 msg/sec
+#                                      ksh ./run_multi_test.ksh -N    # default but with nanomsg lib
+#                                      ksh ./run_multi_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between
+#
+#      Date:           24 April 2019
+#      Author:         E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receivers are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+       if (( $nano_sender ))
+       then
+               ./sender_nano $nmsg $delay
+       else
+               ./sender $nmsg $delay
+       fi
+       echo $? >/tmp/PID$$.src         # must communicate state back via file b/c asynch
+}
+
+# $1 is the instance so we can keep logs separate
+function run_rcvr {
+       typeset port
+
+       port=$(( 4460 + ${1:-0} ))
+       export RMR_RTG_SVC=$(( 9990 + $1 ))
+       if (( $nano_receiver ))
+       then
+               ./receiver_nano $nmsg $port
+       else
+               ./receiver $nmsg $port
+       fi
+       echo $? >/tmp/PID$$.$1.rrc
+}
+
+#      Drop a contrived route table in such that the sender sends each message to n
+#      receivers.
+#
+function set_rt {
+       typeset port=4460
+       typeset groups="localhost:4460"
+       for (( i=1; i < ${1:-3}; i++ ))
+       do
+               groups="$groups;localhost:$((port+i))"
+       done
+
+       cat <<endKat >multi.rt
+               newrt | start
+               mse |0 | 0 | $groups
+               mse |1 | 10 | $groups
+               mse |2 | 20 | $groups
+               rte |3 | $groups 
+               rte |4 | $groups 
+               rte |5 | $groups
+               rte |6 | $groups
+               rte |7 | $groups
+               rte |8 | $groups
+               rte |9 | $groups
+               rte |10 | $groups
+               rte |11 | $groups
+               newrt | end
+endKat
+
+}
+
+# ---------------------------------------------------------
+
+if [[ ! -f local.rt ]]         # we need the real host name in the local.rt; build one from mask if not there
+then
+       hn=$(hostname)
+       sed "s!%%hostname%%!$hn!" rt.mask >local.rt
+fi
+
+nmsg=10                                                # total number of messages to be exchanged (-n value changes)
+delay=1000000                          # microsec sleep between msg 1,000,000 == 1s
+nano_sender=0                          # start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+verbose=0
+nrcvrs=3                                       # this is sane, but -r allows it to be set up
+
+while [[ $1 == -* ]]
+do
+       case $1 in 
+               -B)     rebuild=1;;
+               -d)     delay=$2; shift;;
+               -N)     nano_sender=1
+                       nano_receiver=1
+                       ;;
+               -n)     nmsg=$2; shift;;
+               -r)     nrcvrs=$2; shift;;
+               -v)     verbose=1;;
+
+               *)      echo "unrecognised option: $1"
+                       echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
+                       echo "  -B forces a rebuild which will use .build"
+                       exit 1
+                       ;;
+       esac
+
+       shift
+done
+
+if (( verbose ))
+then
+       echo "2" >.verbose
+       export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( rebuild )) 
+then
+       build_path=../../.build         # if we rebuild we can insist that it is in .build :)
+       set -e
+       ksh ./rebuild.ksh
+       set +e
+else
+       build_path=${BUILD_PATH:-"../../.build"}        # we prefer .build at the root level, but allow user option
+
+       if [[ ! -d $build_path ]]
+       then
+               echo "cannot find build in: $build_path"
+               echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+               exit 1
+       fi
+fi
+
+export LD_LIBRARY_PATH=$build_path:$build_path/lib
+export LIBRARY_PATH=$LD_LIBRARY_PATH
+export RMR_SEED_RT=./multi.rt
+
+set_rt $nrcvrs                                         # set up the rt for n receivers
+
+if [[ ! -f ./sender ]]
+then
+       if ! make >/dev/null 2>&1
+       then
+               echo "[FAIL] cannot find sender binary, and cannot make it.... humm?"
+               exit 1
+       fi
+fi
+
+for (( i=0; i < nrcvrs; i++ ))         # start the receivers with an instance number
+do
+       run_rcvr $i &
+done
+
+sleep 2                                # let receivers init so we don't shoot at an empty target
+run_sender &
+
+wait
+
+
+for (( i=0; i < nrcvrs; i++ ))         # collect return codes
+do
+       head -1 /tmp/PID$$.$i.rrc | read x
+       (( rrc += x ))
+done
+
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+       echo "[FAIL] sender rc=$src  receiver rc=$rrc"
+else
+       echo "[PASS] sender rc=$src  receiver rc=$rrc"
+       rm -f multi.rt
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+
+exit $(( !! (src + rrc) ))
+
diff --git a/test/app_test/run_rr_test.ksh b/test/app_test/run_rr_test.ksh
new file mode 100644 (file)
index 0000000..2e52aa6
--- /dev/null
@@ -0,0 +1,211 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+#      Mnemonic:       run_multi_test.ksh
+#      Abstract:       This is a simple script to set up and run the basic send/receive
+#                              processes for some library validation on top of nano/nng. This 
+#                              particular tests starts several receivers and creates a route table
+#                              which causes messages to be sent round robin to all of the receivers.
+#                              The number of messages command line parameter (-n) will be the number
+#                              of messages that each receiver should expect; the sender will be asked
+#                              to send r times that many messages so that as they are round robbined
+#                              each receiver should get the same number of messages.
+#
+#                              Example command line:
+#                                      ksh ./run_rr_test.ksh           # default 10 messages at 1 msg/sec
+#                                      ksh ./run_rr_test.ksh -N    # default but with nanomsg lib
+#                                      ksh ./run_rr_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between
+#
+#      Date:           24 April 2019
+#      Author:         E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receivers are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+       export RMR_RTG_SVC=8990
+       if (( $nano_sender ))
+       then
+               ./sender_nano $(( nmsg * nrcvrs )) $delay 1
+       else
+               ./sender $(( nmsg * nrcvrs ))  $delay 1
+       fi
+       echo $? >/tmp/PID$$.src         # must communicate state back via file b/c asynch
+}
+
+# $1 is the instance so we can keep logs separate
+function run_rcvr {
+       typeset port
+
+       port=$(( 4560 + ${1:-0} ))
+       export RMR_RTG_SVC=$(( 9990 + $1 ))
+       if (( $nano_receiver ))
+       then
+               ./receiver_nano $nmsg $port
+       else
+               ./receiver $nmsg $port
+       fi
+       echo $? >/tmp/PID$$.$1.rrc
+}
+
+#
+#      Drop a contrived route table in such that the sender sends each message to n
+#      receivers.
+#
+function set_rt {
+       typeset port=4560
+       typeset endpoints="localhost:4560"
+       for (( i=1; i < ${1:-3}; i++ ))
+       do
+               endpoints="$endpoints,localhost:$((port+i))"
+       done
+
+       cat <<endKat >rr.rt
+               newrt |start
+               rte |0 | $endpoints  |0
+               rte |1 | $endpoints  |10
+               mse |2 | 20 | $endpoints                # new style mtype/subid entry
+               rte |3 | $endpoints  |0
+               rte |4 | $endpoints  |0
+               rte |5 | $endpoints  |0
+               rte |6 | $endpoints  |0
+               rte |7 | $endpoints  |0
+               rte |8 | $endpoints  |0
+               rte |9 | $endpoints  |0
+               rte |10 | $endpoints  |0
+               rte |11 | $endpoints  |0
+               newrt |end
+endKat
+
+}
+
+# ---------------------------------------------------------
+
+if [[ ! -f local.rt ]]         # we need the real host name in the local.rt; build one from mask if not there
+then
+       hn=$(hostname)
+       sed "s!%%hostname%%!$hn!" rt.mask >local.rt
+fi
+
+nmsg=10                                                # total number of messages to be exchanged (-n value changes)
+delay=1000                                     # microsec sleep between msg 1,000,000 == 1s (shorter than others b/c/ we are sending to multiple)
+nano_sender=0                          # start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+verbose=0
+nrcvrs=3                                       # this is sane, but -r allows it to be set up
+
+while [[ $1 == -* ]]
+do
+       case $1 in 
+               -B)     rebuild=1;;
+               -d)     delay=$2; shift;;
+               -N)     nano_sender=1
+                       nano_receiver=1
+                       ;;
+               -n)     nmsg=$2; shift;;
+               -r)     nrcvrs=$2; shift;;
+               -v)     verbose=1;;
+
+               *)      echo "unrecognised option: $1"
+                       echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
+                       echo "  -B forces a rebuild which will use .build"
+                       exit 1
+                       ;;
+       esac
+
+       shift
+done
+
+
+if (( verbose ))
+then
+       echo "2" >.verbose
+       export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( rebuild )) 
+then
+       build_path=../../.build
+       set -e
+       ksh ./rebuild.ksh
+       set +e
+else
+       build_path=${BUILD_PATH:-"../../.build"}        # we prefer .build at the root level, but allow user option
+
+       if [[ ! -d $build_path ]]
+       then
+               echo "cannot find build in: $build_path"
+               echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+               exit 1
+       fi
+fi
+
+export LD_LIBRARY_PATH=$build_path:$build_path/lib
+export LIBRARY_PATH=$LD_LIBRARY_PATH
+export RMR_SEED_RT=./rr.rt
+
+set_rt $nrcvrs
+
+if [[ ! -f ./sender ]]
+then
+       if ! make >/dev/null 2>&1
+       then
+               echo "[FAIL] cannot find sender binary, and cannot make it.... humm?"
+               exit 1
+       fi
+fi
+
+for (( i=0; i < nrcvrs; i++ ))         # start the receivers with an instance number
+do
+       run_rcvr $i &
+done
+
+sleep 2                                        # wait to start sender else we might send before receivers up and drop messages
+run_sender &
+
+wait
+
+
+for (( i=0; i < nrcvrs; i++ ))         # collect return codes
+do
+       head -1 /tmp/PID$$.$i.rrc | read x
+       (( rrc += x ))
+done
+
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+       echo "[FAIL] sender rc=$src  receiver rc=$rrc"
+else
+       echo "[PASS] sender rc=$src  receiver rc=$rrc"
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+rm -f rr.rt
+
+exit $(( !! (src + rrc) ))
+
index 3602d4f..ac7d869 100644 (file)
                                will give up and fail.
 
 
-                               Message types will vary between 1 and 10, so the route table must
-                               be set up to support those message types.
+                               Message types will vary between 0 and 9, so the route table must
+                               be set up to support those message types. Further, for message types
+                               0, 1 and 2, the subscription ID will be set to type x 10, so the route
+                               table must be set to include the sub-id for those types in order for
+                               the messages to reach their destination.
 
                                Message format is:
                                        ck1 ck2|<msg-txt><nil>
                                Ck2 is the simple check sum of the trace data which is a nil terminated
                                series of bytes.
 
-                               Parms:  argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port
+                               Parms:  argv[1] == number of msgs to send (10)
+                                               argv[2] == delay                (mu-seconds, 1000000 default)
+                                               argv[3] == max msg type (not inclusive; default 10)
+                                               argv[4] == listen port
 
                                Sender will send for at most 20 seconds, so if nmsgs and delay extend
                                beyond that period the total number of messages sent will be less
@@ -75,16 +81,17 @@ static int sum( char* str ) {
 
 int main( int argc, char** argv ) {
        void* mrc;                                                      // msg router context
-       struct epoll_event events[1];                   // list of events to give to epoll
-       struct epoll_event epe;                 // event definition for event to listen to
+       struct  epoll_event events[1];                  // list of events to give to epoll
+       struct  epoll_event epe;                                // event definition for event to listen to
        int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
-       int rcv_fd;                                                     // file des that NNG tickles -- give this to epoll to listen on
-       int nready;                                                             // number of events ready for receive
+       int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
+       int             nready;                                                 // number of events ready for receive
        rmr_mbuf_t*             sbuf;                                   // send buffer
        rmr_mbuf_t*             rbuf;                                   // received buffer
-       int     count = 0;
-       int     rt_count = 0;                                           // number of messages requiring a spin retry
-       int     rcvd_count = 0;
+       int             count = 0;
+       int             rt_count = 0;                                   // number of messages requiring a spin retry
+       int             rcvd_count = 0;
+       int             fail_count = 0;                                 // # of failure sends after first successful send
        char*   listen_port = "43086";
        int             mtype = 0;
        int             stats_freq = 100;
@@ -94,6 +101,7 @@ int main( int argc, char** argv ) {
        long    timeout = 0;
        int             delay = 100000;                                 // usec between send attempts
        int             nmsgs = 10;                                             // number of messages to send
+       int             max_mt = 10;                                    // reset point for message type
 
        if( argc > 1 ) {
                nmsgs = atoi( argv[1] );
@@ -102,7 +110,10 @@ int main( int argc, char** argv ) {
                delay = atoi( argv[2] );
        }
        if( argc > 3 ) {
-               listen_port = argv[3];
+               max_mt = atoi( argv[3] );
+       }
+       if( argc > 4 ) {
+               listen_port = argv[4];
        }
 
        fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
@@ -150,13 +161,19 @@ int main( int argc, char** argv ) {
 
        timeout = time( NULL ) + 20;
 
-       while( count < nmsgs ) {                                                                // we send 10 messages after the first message is successful
+       while( count < nmsgs ) {                                                                // we send n messages after the first message is successful
                snprintf( trace, 100, "%lld", (long long) time( NULL ) );
                rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
                snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() );
                snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
 
                sbuf->mtype = mtype;                                                    // fill in the message bits
+               if( mtype < 3 ) {
+                       sbuf->sub_id = mtype * 10;
+               } else {
+                       sbuf->sub_id = -1;
+               }
+
                sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
                sbuf->state = 0;
                sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
@@ -167,7 +184,13 @@ int main( int argc, char** argv ) {
                                while( sbuf->state == RMR_ERR_RETRY ) {                 // soft failure (device busy?) retry
                                        sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
                                }
-                               successful = 1;
+                               if( sbuf->state == RMR_OK ) {
+                                       successful = 1;                                                         // indicates only that we sent one successful message, not the current state
+                               } else {
+                                       if( successful ) {
+                                               fail_count++;                                                   // count failures after first successful message
+                                       }
+                               }
                                break;
 
                        case RMR_OK:
@@ -175,15 +198,19 @@ int main( int argc, char** argv ) {
                                break;
 
                        default:
+                               if( successful ) {
+                                       fail_count++;                                                   // count failures after first successful message
+                               }
                                // some error (not connected likely), don't count this
+                               //sleep( 1 );
                                break;
                }
 
                if( successful ) {                              // once we have a message that was sent, start to increase things
                        count++;
                        mtype++;
-                       if( mtype > 10 ) {                      // if large number of sends don't require infinite rt entries :)
-                               mtype = 1;
+                       if( mtype >= max_mt ) {                 // if large number of sends don't require infinite rt entries :)
+                               mtype = 0;
                        }
                }
 
@@ -237,7 +264,7 @@ int main( int argc, char** argv ) {
                        }
                }
 
-       fprintf( stderr, "<SNDR> [%s] sent %d messages   received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL",  count, rcvd_count, rt_count );
+       fprintf( stderr, "<SNDR> [%s] sent=%d  rcvd-acks=%d  failures=%d retries=%d\n", count == nmsgs ? "PASS" : "FAIL",  count, rcvd_count, fail_count, rt_count );
        rmr_close( mrc );
 
        return !( count == nmsgs );
index e86c6f5..cefc09f 100644 (file)
@@ -139,6 +139,10 @@ static int rmr_api_test( ) {
        msg2 = rmr_send_msg( NULL, NULL );                      // drive for coverage
        errors += fail_not_nil( msg2, "send_msg returned msg pointer when given a nil message and context" );
 
+       msg->state = 0;
+       msg = rmr_send_msg( NULL, msg );
+       errors += fail_if( msg->state == 0, "rmr_send_msg did not set msg state when msg given with nil context" );
+
        // --- sends will fail with a no endpoint error until a dummy route table is set, so we test fail case first.
        msg->len = 100;
        msg->mtype = 1;
@@ -194,17 +198,28 @@ static int rmr_api_test( ) {
        rmr_rts_msg( rmc, NULL );
        errors += fail_if( errno == 0, "rmr_rts_msg did not set errno when given a nil message" );
 
+       msg->state = 0;
+       msg = rmr_rts_msg( NULL, msg );                 // should set state in msg
+       errors += fail_if_equal( msg->state, 0, "rmr_rts_msg did not set state when given valid message but no context" );
+       
+
        msg = rmr_rts_msg( rmc, msg );                  // return the buffer to the sender
        errors += fail_if_nil( msg, "rmr_rts_msg did not return a message pointer" );
        errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno" );
 
 
        snprintf( msg->xaction, 17, "%015d", 16 );              // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
+
+       msg->state = 0;
+       msg = rmr_call( NULL, msg );
+       errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context" );
+
+       msg->mtype = 0;
        msg = rmr_call( rmc, msg );                                             // this call should return a message as we can anticipate a dummy message in
        errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed" );
        if( msg ) {
                errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return" );
-               errors += fail_if( errno != 0, "rmr_call did not properly set errno on successful return" );
+               errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return" );
        }
 
        snprintf( wbuf, 17, "%015d", 14 );                              // if we call receive we should find this in the first 15 tries
@@ -240,6 +255,11 @@ static int rmr_api_test( ) {
        rmr_free_msg( msg2 );
 
 
+       msg2 = rmr_torcv_msg( NULL, NULL, 10 );
+       errors += fail_not_nil( msg2, "rmr_torcv_msg returned a pointer when given nil information" );
+       msg2 = rmr_torcv_msg( rmc, NULL, 10 );
+       errors += fail_if_nil( msg2, "rmr_torcv_msg did not return a message pointer when given a nil old msg" );
+
        // ---  test timeout receive; our dummy epoll function will return 1 ready on first call and 0 ready (timeout emulation) on second
        //              however we must drain the swamp (queue) first, so run until we get a timeout error, or 20 and report error if we get to 20.
        msg = NULL;
index 129eec1..710c75e 100644 (file)
@@ -228,5 +228,15 @@ static int rt_test( ) {
        }
 */
 
+       state = uta_link2( "worm", NULL, NULL );
+       errors += fail_if_true( state, "link2 did not return false when given nil pointers" );
+    
+       state = uta_epsock_rr( rt, 122, 0, NULL, NULL );
+       errors += fail_if_true( state, "uta_epsock_rr returned bad state when given nil socket pointer" );
+
+       rt = uta_rt_init( );                                                                            // get us a route table
+       state = uta_epsock_rr( rt, 0, -1, NULL, &nn_sock );
+       errors += fail_if_true( state, "uta_epsock_rr returned bad state (true) when given negative group number" );
+
        return !!errors;                        // 1 or 0 regardless of count
 }
index d477391..4997706 100644 (file)
@@ -46,14 +46,21 @@ static void gen_rt( uta_ctx_t* ctx ) {
        char*   rt_stuff;               // strings for the route table
 
        rt_stuff =
+               "newrt|end\n"                                                           // end of table check before start of table found
+               "# comment to drive full comment test\n"
+               "\n"                                                                            // handle blank lines
+               "   \n"                                                                         // handle blank lines
+           "mse|4|10|localhost:4561\n"                                 // entry before start message
+           "rte|4|localhost:4561\n"                                    // entry before start message
                "newrt|start\n"                                                         // false start to drive detection
                "xxx|badentry to drive default case"
                "newrt|start\n"
-           "rte|0|localhost:4560,localhost:4562\n"
+           "rte|0|localhost:4560,localhost:4562\n"                                     // these are legitimate entries for our testing
            "rte|1|localhost:4562;localhost:4561,localhost:4569\n"
-           "rte|2|localhost:4562\n"
-           "rte|4|localhost:4561\n"
-               "rte|5|localhost:4563\n"
+           "rte|2|localhost:4562| 10\n"                                                                // new subid at end
+           "mse|4|10|localhost:4561\n"                                                                 // new msg/subid specifier rec
+           "mse|4|localhost:4561\n"                                                                    // new mse entry with less than needed fields
+               "   rte|   5   |localhost:4563    #garbage comment\n"           // tests white space cleanup
            "rte|6|localhost:4562\n"
                "newrt|end\n";
 
@@ -92,6 +99,7 @@ static int sr_nng_test() {
        nng_socket nn_dummy_sock;                                       // dummy needed to drive send
        int             size;
        int             i;
+       void*   p;
 
        //ctx = rmr_init( "tcp:4360", 2048, 0 );                                // do NOT call init -- that starts the rtc thread which isn't good here
        ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) );              // alloc the context manually
@@ -104,6 +112,10 @@ static int sr_nng_test() {
        uta_lookup_rtg( ctx );
 
        gen_rt( ctx );                                                          // forces a static load with some known info since we don't start the rtc()
+       gen_rt( ctx );                                                          // force a second load to test cloning
+
+       p = rt_ensure_ep( NULL, "foo" );                                // drive for coverage
+       errors += fail_not_nil( p,  "rt_ensure_ep did not return nil when given nil route table" );
 
        state = rmr_ready( NULL );
        errors += fail_if_true( state, "reported ready when given a nil context" );
index 21848af..74bc048 100755 (executable)
@@ -267,6 +267,7 @@ verbose=0
 show_all=1                                     # show all things -F sets to show failures only
 strict=0                                       # -s (strict) will set; when off, coverage state ignored in final pass/fail
 show_output=0                          # show output from each test execution (-S)
+quiet=0
 
 while [[ $1 == "-"* ]]
 do
@@ -285,6 +286,7 @@ do
                -s)     strict=1;;                                      # coverage counts toward pass/fail state
                -S)     show_output=1;;                         # test output shown even on success
                -v)     (( verbose++ ));;
+               -q)     quiet=1;;                                       # less chatty when spilling error log files
 
                -h)     usage; exit 0;;
                --help) usage; exit 0;;
@@ -353,7 +355,12 @@ do
        if ! ./${tfile%.c} >/tmp/PID$$.log 2>&1
        then
                echo "[FAIL] unit test failed for: $tfile"
-               cat /tmp/PID$$.log
+               if (( quiet )) 
+               then
+                       grep "^<" /tmp/PID$$.log        # in quiet mode just dump <...> messages which are assumed from the test programme not appl
+               else
+                       cat /tmp/PID$$.log
+               fi
                (( ut_errors++ ))                               # cause failure even if not in strict mode
                continue                                                # skip coverage tests for this
        else
index 9cb8d82..f2d4bf6 100644 (file)
@@ -48,6 +48,7 @@ static int worm_test( ) {
        char    wbuf[1024];
        int errors = 0;                 // number errors found
        int     i;
+       void* p;
 
        rmr_mbuf_t*     mbuf;           // mbuf to send to peer
        int             whid = -1;
@@ -100,11 +101,22 @@ static int worm_test( ) {
        whid = rmr_wh_open( ctx, "localhost:21961" );
        errors += fail_not_equal( whid, 3, "attempt to fill in a hole didn't return expected" );
 
-       rmr_wh_send_msg( NULL, 0, NULL );                       // tests for coverage
-       rmr_wh_send_msg( ctx, 0, NULL );
+       p = rmr_wh_send_msg( NULL, 0, NULL );                   // tests for coverage
+       fail_not_nil( p, "wh_send_msg returned a pointer when given nil context and message" );
+
+       p = rmr_wh_send_msg( ctx, 0, NULL );
+       fail_not_nil( p, "wh_send_msg returned a pointer when given nil message with valid context" );
 
        mbuf = rmr_alloc_msg( ctx, 2048 );                      // get an muf to pass round
        errors += fail_if_nil( mbuf, "unable to allocate mbuf for send tests (giving up on send tests)" );
+
+       mbuf->state = 0;
+       mbuf = rmr_wh_send_msg( NULL, 0, mbuf );
+       if( mbuf ) {
+               fail_if_equal( mbuf->state, 0, "wh_send_msg returned a zero state when given a nil context" );
+       }
+       fail_if_nil( mbuf, "wh_send_msg returned a nil message buffer when given a nil context"  );
+
        while( mbuf ) {
                if( !(mbuf = rmr_wh_send_msg( ctx, 50, mbuf )) ) {              // test for coverage
                        errors += fail_if_nil( mbuf, "send didn't return an mbuf (skip rest of send tests)" );