Fix SI address and initialistion bugs
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index 9c67dc3..17014a6 100644 (file)
@@ -204,7 +204,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
                d1 = DATA1_ADDR( msg->header );
-               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
        }
 
        return mtosend_msg( vctx, msg, max_to );
@@ -555,7 +555,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
@@ -674,7 +674,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
-                       strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
+                       ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
                }
        }
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
@@ -697,7 +697,10 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                return NULL;
        }
 
-       if( flags & FL_NOTHREAD ) {                                     // thread set to off; no rout table collector started (could be called by the rtc thread itself)
+                                                                                               // finish all flag setting before threads to keep helgrind quiet
+       ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
+
+       if( flags & RMRFL_NOTHREAD ) {                          // thread set to off; no route table collector started (could be called by the rtc thread itself)
                ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
        } else {
                if( static_rtc ) {
@@ -711,7 +714,6 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                }
        }
 
-       ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
        if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
        }
@@ -873,6 +875,9 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                        }
                }
 
+               if( mbuf != NULL ) {
+                       mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
+               }
                return mbuf;
        }
 
@@ -917,6 +922,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
                        mbuf->state = RMR_OK;
+                       mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
 
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
@@ -933,23 +939,19 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        return mbuf;
 }
 
-/*
-       Accept a message buffer and caller ID, send the message and then wait
-       for the receiver to tickle the semaphore letting us know that a message
-       has been received. The call_id is a value between 2 and 255, inclusive; if
-       it's not in this range an error will be returned. Max wait is the amount
-       of time in millaseconds that the call should block for. If 0 is given
-       then no timeout is set.
 
-       If the mt_call feature has not been initialised, then the attempt to use this
-       funciton will fail with RMR_ERR_NOTSUPP
 
-       If no matching message is received before the max_wait period expires, a
-       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
-       occurs after the message has been sent, then a nil pointer is returned
-       with errno set to some other value.
+
+/*
+       This is the work horse for the multi-threaded call() function. It supports
+       both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
+       for for rmr_mt_call() modulo the caveat below.
+
+       If endpoint is given, then we assume that we're not doing normal route table
+       routing and that we should send directly to that endpoint (probably worm
+       hole).
 */
-extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
        rmr_mbuf_t* ombuf;                      // original mbuf passed in
        uta_ctx_t*      ctx;
        uta_mhdr_t*     hdr;                    // header in the transport buffer
@@ -1017,7 +1019,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                seconds = 1;                                                                            // use as flag later to invoked timed wait
        }
 
-       mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
+       if( ep == NULL ) {                                                                              // normal routing
+               mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
+       } else {
+               mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
+       }
        if( mbuf ) {
                if( mbuf->state != RMR_OK ) {
                        mbuf->tp_state = errno;
@@ -1052,12 +1058,37 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        }
 
        mbuf = chute->mbuf;
-       mbuf->state = RMR_OK;
+       if( mbuf != NULL ) {
+               mbuf->state = RMR_OK;
+       }
        chute->mbuf = NULL;
 
        return mbuf;
 }
 
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+
+       This is now just an outward facing wrapper so we can support wormhole calls.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+       return mt_call( vctx, mbuf, call_id, max_wait, NULL );
+}
+
+
 /*
        Given an existing message buffer, reallocate the payload portion to
        be at least new_len bytes.  The message header will remain such that