Add wormhole state check function
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
index 4dd32ea..58e1e17 100644 (file)
@@ -143,7 +143,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
 
 /*
        memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
-       memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );                // NOT for production -- debugging eyecatcher
+       memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TP_HDR_LEN );                // NOT for production -- debugging eyecatcher
 */
        alen = (int *) msg->tp_buf;
        *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
@@ -159,6 +159,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
        }
        msg->len = 0;                                                                                   // length of data in the payload
+       msg->cookie = 0x4942;
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
        msg->sub_id = UNSET_SUBID;
        msg->mtype = UNSET_MSGTYPE;
@@ -197,6 +198,7 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
 
        memset( msg, 0, sizeof( *msg ) );
 
+       msg->cookie = 0x4942;
        msg->sub_id = UNSET_SUBID;
        msg->mtype = UNSET_MSGTYPE;
        msg->tp_buf = NULL;
@@ -326,7 +328,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
-       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->xaction = &hdr->xid[0];                                                             // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
@@ -404,7 +406,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
-       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->xaction = &hdr->xid[0];                                                             // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
@@ -500,6 +502,7 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len,
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );    
        if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
                rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
+               free( nm );
                return NULL;
        }
 
@@ -588,6 +591,9 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
        msg->state = RMR_OK;
        do {
                tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
+               if( tot_len > msg->alloc_len ) {
+                       tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
+               }
                *((int*) msg->tp_buf) = tot_len;
 
                if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
@@ -708,7 +714,12 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
        while( send_again ) {
-               sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
+               if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
+                       sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
+               } else {
+                       sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
+                       send_again = 0;
+               }
 
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );