Add ability for RMR to request route table
[ric-plt/lib/rmr.git] / src / rmr / nng / src / sr_nng_static.c
index a4e2410..dd978a4 100644 (file)
@@ -126,7 +126,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        if( msg == NULL ) {
                msg = (rmr_mbuf_t *) malloc( sizeof *msg );
                if( msg == NULL ) {
-                       fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
+                       rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
                        exit( 1 );
                }
        } else {
@@ -136,7 +136,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        memset( msg, 0, sizeof( *msg ) );
 
        if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
-               fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
+               rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
                abort( );                                                                                       // toss out a core file for this
        }
 
@@ -161,7 +161,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
        strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
 
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
+       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
 
        return msg;
 }
@@ -177,7 +177,7 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
 
        msg = (rmr_mbuf_t *) malloc( sizeof *msg );
        if( msg == NULL ) {
-               fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
+               rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
                exit( 1 );
        }
 
@@ -275,14 +275,14 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
-               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
                exit( 1 );
        }
        memset( nm, 0, sizeof( *nm ) );
 
        mlen = old_msg->alloc_len;                                                                              // length allocated before
        if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
-               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
+               rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
                exit( 1 );
        }
 
@@ -306,7 +306,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
-       if( DEBUG ) fprintf( stderr, "[DBUG] clone values: mty=%d sid=%d len=%d alloc=%d\n", nm->mtype, nm->sub_id, nm->len, nm->alloc_len );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "clone values: mty=%d sid=%d len=%d alloc=%d\n", nm->mtype, nm->sub_id, nm->len, nm->alloc_len );
 
        nm->xaction = hdr->xid;                                                                 // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
@@ -332,7 +332,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
-               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
                exit( 1 );
        }
        memset( nm, 0, sizeof( *nm ) );
@@ -341,9 +341,9 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
        tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
 
        mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
-       if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
        if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
-               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
+               rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
                exit( 1 );
        }
 
@@ -432,7 +432,7 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len,
        old_len = old_msg->len;
        old_psize = old_msg->alloc_len - RMR_HDR_LEN( old_msg->header );                                // allocated transport size less the header and other data bits
        if( !clone  && payload_len <= old_psize ) {                                                             // old message is large enough, nothing to do
-               if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
                return old_msg;
        }
 
@@ -440,12 +440,12 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len,
        old_tp_buf = old_msg->tp_buf;
 
        if( clone ) {
-               if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: cloning message\n" );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
                free_tp = 0;
 
                nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
                if( nm == NULL ) {
-                       fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
+                       rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
                        return NULL;
                }
                memset( nm, 0, sizeof( *nm ) );
@@ -456,9 +456,9 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len,
        omhdr = old_msg->header;
        mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] reallocate for payload increase. new message size: %d\n", (int) mlen );    
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );    
        if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
-               fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
+               rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
                return NULL;
        }
 
@@ -466,10 +466,10 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len,
        SET_HDR_LEN( nm->header );
 
        if( copy ) {                                                                                                                            // if we need to copy the old payload too
-               if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
                memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
        } else {                                                                                                                                        // just need to copy header
-               if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
                memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
        }
 
@@ -561,7 +561,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                hdr = (uta_mhdr_t *) msg->header;
                msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
 
-               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
        } else {
                msg->state = RMR_ERR_EMPTY;
@@ -615,7 +615,7 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        msg->payload = msg->header;                                     // payload is the whole thing; no header
        msg->xaction = NULL;
 
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
+       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
 
        return msg;
 }
@@ -714,7 +714,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                        msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
                }
 
-               if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
        }
 
        return msg;
@@ -783,7 +783,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
 
        if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
                if( ctx->flags & CTXFL_WARN ) {
-                       fprintf( stderr, "[WARN] no endpoint for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
+                       rmr_vlog( RMR_VL_WARN, "no endpoint for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
                }
                msg->state = RMR_ERR_NOENDPT;
                errno = ENXIO;                                                                          // must ensure it's not eagain
@@ -794,9 +794,14 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
        while( send_again ) {
-               sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep );              // select endpt from rr group and set again if more groups
+               if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry
+                       sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep );              // select endpt from rr group and set again if more groups
+               } else {
+                       sock_ok = epsock_meid( ctx->rtable, msg, &nn_sock, &ep );
+                       send_again = 0;
+               }
 
-               if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
 
                group++;
@@ -809,12 +814,12 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                                        errno = ENOMEM;
                                        msg->tp_state = errno;
                                        if( ctx->flags & CTXFL_WARN ) {
-                                               fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
+                                               rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
                                        }
                                        return msg;
                                }
 
-                               if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
                                msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
                                msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
        
@@ -829,7 +834,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                                msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
                                if( DEBUG ) {
                                        if( msg == NULL ) {
-                                               fprintf( stderr, "[DBUG] mtosend_msg:  send returned nil message!\n" );         
+                                               rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );         
                                        }
                                }
                        }
@@ -851,7 +856,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                        }
                } else {
                        if( ctx->flags & CTXFL_WARN ) {
-                               fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
+                               rmr_vlog( RMR_VL_WARN, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
                        }
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;
@@ -864,7 +869,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                        msg->state = RMR_OK;
                }
        
-               if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
        
                msg->tp_state = errno;
        }