enhance(API): Add multi-threaded call
[ric-plt/lib/rmr.git] / test / test_nng_em.c
index f1976b9..c2101f5 100644 (file)
        Author:         E. Scott Daniels
 */
 
+
+#include "rmr.h"                               // we use some of rmr defs in building dummy messages, so we need these
+#include "rmr_agnostic.h"
+
 // ---------------------- emulated nng functions ---------------------------
 
 
 #ifndef _em_nn
 #define _em_nn
 
+#include <pthread.h>
+
 static int em_send_failures = 0;       // test programme can set this to emulate eagain send failures
 static int em_timeout = -1;                    // set by set socket option
+static int em_mtc_msgs = 0;                    // set to generate 'received' messages with mt-call header data
+static int return_value = 0;           // functions should return this value
+static int rcv_count = 0;                      // receive counter for transaction id to allow test to rest
+static int rcv_delay = 0;                      // forced delay before call to rcvmsg starts to work
+
+static int gates_ok = 0;
+static pthread_mutex_t rcv_gate;
+
 
 // ----------- epoll emulation ---------------------------------------------
 
@@ -95,17 +109,12 @@ struct em_msg {
        int32_t len1;                       // length of the tracing data
        int32_t len2;                       // length of data 1 (d1)
        int32_t len3;                       // length of data 2 (d2)
-
+       int32_t sub_id;                                         // subscription id (-1 invalid)
 };
 
-static int return_value = 0;
 
-//--------------------------------------------------------------------------
-#ifdef EMULATE_NNG
-struct nn_msghdr {
-       int boo;
-};
 
+// --  emulation control functions ------------------------------------------------------
 
 /*
        Test app can call this to have all emulated functions return failure instead
@@ -122,34 +131,106 @@ static int em_nng_foo() {
 }
 
 
+/*
+       Turns on/off the generation of multi-threaded call messages
+*/
+static int em_set_mtc_msgs( int state ) {
+       em_mtc_msgs = state;
+}
+
+/*
+       Returns the size of the header we inserted
+*/
+static int em_hdr_size() {
+       if( em_mtc_msgs ) {
+               return (int) sizeof( struct em_msg ) + 4;
+       }
+
+       return (int) sizeof( struct em_msg );
+}
+
+static void em_set_rcvcount( int v ) {
+       rcv_count = v;
+}
+
+static void em_set_rcvdelay( int v ) {
+       rcv_delay = v;
+}
+
+static void em_start() {
+       if( ! gates_ok ) {
+               pthread_mutex_init( &rcv_gate, NULL );
+               gates_ok = 1;
+       }
+}
+
+//--------------------------------------------------------------------------
+#ifdef EMULATE_NNG
+struct nn_msghdr {
+       int boo;
+};
+
+
 /*
        Receive message must allocate a new buffer and return the pointer into *m.
        Every 9 messages or so we'll simulate an old version message
+
+       If em_mtc_msgs is set, then we add a non-zero d1 field with
+       the call-id set to 2, and alternate the call flag
 */
 static int em_nng_recvmsg( nng_socket s, nng_msg ** m, int i ) {
+       static int call_flag = 0;
+
        void* b;
        struct em_msg* msg;
-       static int count = 0;                   // we'll simulate a message going in by dropping an rmr-ish msg with transaction id only
        int trace_size = 0;
+       int d1_size = 0;
+       unsigned char* d1;
+
+       if( rcv_delay > 0 ) {
+               sleep( rcv_delay );
+       }
 
-       //sleep( 1 );
+       if( em_mtc_msgs ) {
+               d1_size = 4;
+       }
 
        b = (void *) malloc( 2048 );
        if( m != NULL ) {
                memset( b, 0, 2048 );
+
                *m = (nng_msg *) b;
                msg = (struct em_msg *) b;
-               if( count % 10  == 9 ) {
-                       //msg->rmr_ver = htonl( MSG_VER );
-                       msg->rmr_ver = ALT_MSG_VER;             // emulate the bug in RMr v1
+               if( ! em_mtc_msgs  &&  (rcv_count % 10) == 9 ) {
+                       msg->rmr_ver = ALT_MSG_VER;                                                     // allow emulation the bug in RMr v1
                } else {
                        msg->rmr_ver = htonl( MSG_VER );
                }
+
                msg->mtype = htonl( 1 );
                msg->plen = htonl( 220 );
                msg->len0 = htonl( sizeof( struct em_msg ) );
                msg->len1 = htonl( trace_size );
-               snprintf( msg->xid, 32, "%015d", count++ );             // simple transaction id so we can test receive specific and ring stuff
+               msg->len2 = htonl( d1_size );
+               msg->len3 = htonl( 0 );
+
+               pthread_mutex_lock( &rcv_gate );        // hold lock to update counter/flag
+               if( em_mtc_msgs ) {
+                       d1 = DATA1_ADDR( msg );
+                       d1[0] = 2;                                                                      // simulated msgs always on chute 2
+                       if( call_flag ) {
+                               rcv_count++;
+                               msg->flags |= HFL_CALL_MSG;
+                       }
+                       if( rcv_delay > 0 ) {
+                               fprintf( stderr, "<EM>    count=%d flag=%d %02x \n", rcv_count, call_flag, msg->flags );
+                       }
+                       call_flag = !call_flag;
+               } else {
+                       rcv_count++;
+               }
+               pthread_mutex_unlock( &rcv_gate );
+               snprintf( msg->xid, 32, "%015d", rcv_count );           // simple transaction id so we can test receive specific and ring stuff
                snprintf( msg->src, 16, "localhost:4562" );             // set src id (unrealistic) so that rts() can be tested
        }