X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fsr_si_static.c;h=b56b6dc77820ac6b577c3f8fc9748e9b636f71fd;hb=9c2f0c74adb03a21646742702813b6ba4a4ae288;hp=703e80cde827a5f648c5551cbd694475405033bf;hpb=5200efe1e6dd13b1e1241ce623c4978151be34e8;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/sr_si_static.c b/src/rmr/si/src/sr_si_static.c index 703e80c..b56b6dc 100644 --- a/src/rmr/si/src/sr_si_static.c +++ b/src/rmr/si/src/sr_si_static.c @@ -21,7 +21,7 @@ /* Mnemonic: sr_si_static.c Abstract: These are static send/receive primatives which (sadly) - differ based on the underlying protocol (nng vs nanomsg). + differ based on the underlying protocol (nng vs SI95). Split from rmr_nng.c for easier wormhole support. Author: E. Scott Daniels @@ -31,23 +31,35 @@ #ifndef _sr_si_static_c #define _sr_si_static_c -#include -#include -#include -#include -#include +static void dump_n( char *p, char* label, int n ) { + int i; + int j; + int t = 0; + int rows; -static void dump_40( char *p, char* label ) { - int i; + if( label ) { + fprintf( stderr, "[DUMP] %s p=%p %d bytes\n", label, p, n ); + } + + rows = (n/16) + ((n % 16) ? 1 : 0); - if( label ) - fprintf( stderr, ">>>>> %s p=%p\n", label, p ); + for( j = 0; j < rows; j++ ) { + fprintf( stderr, "[DUMP] %04x: ", j * 16 ); - for( i = 0; i < 40; i++ ) { - fprintf( stderr, "%02x ", (unsigned char) *(p+i) ); + for( i = 0; t < n && i < 16; i++, t++ ) { + fprintf( stderr, "%02x ", (unsigned char) *p ); + p++; + } + fprintf( stderr, "\n" ); } - fprintf( stderr, "\n" ); +} + +/* + backwards compatability. +*/ +static void dump_40( char *p, char* label ) { + dump_n( p, label, 40 ); } /* @@ -58,7 +70,7 @@ static void dump_40( char *p, char* label ) { The addition of the connection shut error code to the switch requires that the NNG version at commit e618abf8f3db2a94269a (or after) be - used for compiling RMR. + used for compiling RMR. */ static inline int xlate_si_state( int state, int def_state ) { @@ -98,6 +110,26 @@ static inline int xlate_si_state( int state, int def_state ) { return state; } +/* + Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger) + this will put in the size such that it is compatable with old versions + of RMR (that expect the message size to not be in network byte order) + and with new versions that do. See extract function in mt_call_si_static.c + for details on what ends up in the buffer. +*/ +static inline void insert_mlen( uint32_t len, char* buf ) { + uint32_t* blen; // pointer into buffer where we'll add the len + + blen = (uint32_t *) buf; // old systems expect an unconverted integer + *blen = len; + + blen++; + *blen = htonl( len ); // new systems want a converted integer + + memset( &buf[TP_SZFIELD_LEN], 0, 4 ); // clear to prevent future conversion issues + buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER; // marker to flag this is generated by a new message +} + /* Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc a new message struct as well. Size is the size of the zc buffer to allocate (not @@ -126,7 +158,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) { msg = (rmr_mbuf_t *) malloc( sizeof *msg ); if( msg == NULL ) { - fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" ); + rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" ); return NULL; // we used to exit -- that seems wrong } memset( msg, 0, sizeof( *msg ) ); // tp_buffer will be allocated below @@ -135,24 +167,27 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s msg->alloc_len = 0; // force tp_buffer realloc below if( msg->tp_buf ) { free( msg->tp_buf ); + msg->tp_buf = NULL; } } else { mlen = msg->alloc_len; // msg given, allocate the same size as before } } + msg->rts_fd = -1; // must force to be invalid; not a received message that can be returned if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) { - fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen ); + rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen ); abort( ); // toss out a core file for this } -/* - memset( msg->tp_buf, 0, mlen ); // NOT for production (debug only) valgrind will complain about uninitalised use if we don't set - memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // NOT for production -- debugging eyecatcher -*/ - alen = (int *) msg->tp_buf; - *alen = mlen; // FIX ME: need a stuct to go in these first bytes, not just dummy len + if( DEBUG ) { + // for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set + memset( msg->tp_buf, 0, mlen ); + memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 ); // do NOT use a $ in this string! + } + + insert_mlen( (uint32_t) mlen, msg->tp_buf ); // this will likely be overwriten on send to shirnk msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN; memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area @@ -165,18 +200,19 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future } msg->len = 0; // length of data in the payload + msg->cookie = 0x4942; msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header) msg->sub_id = UNSET_SUBID; msg->mtype = UNSET_MSGTYPE; msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk) msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area msg->state = state; // fill in caller's state (likely the state of the last operation) - msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message + msg->flags = MFL_ZEROCOPY; // this is a zerocopy sendable message msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :( - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); - strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags ); return msg; } @@ -196,13 +232,14 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) { } } else { if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) { - fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" ); + rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" ); return NULL; // this used to exit, but that seems wrong } } memset( msg, 0, sizeof( *msg ) ); + msg->cookie = 0x4942; msg->sub_id = UNSET_SUBID; msg->mtype = UNSET_MSGTYPE; msg->tp_buf = NULL; @@ -300,14 +337,14 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { nm = (rmr_mbuf_t *) malloc( sizeof *nm ); if( nm == NULL ) { - fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" ); + rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" ); exit( 1 ); } memset( nm, 0, sizeof( *nm ) ); mlen = old_msg->alloc_len; // length allocated before if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) { - fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen ); + rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen ); abort(); } @@ -315,7 +352,8 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version switch( ntohl( v1hdr->rmr_ver ) ) { case 1: - memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header + hdr = nm->header; + memcpy( hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header nm->payload = (void *) v1hdr + sizeof( *v1hdr ); break; @@ -332,7 +370,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload - nm->xaction = hdr->xid; // reference xaction + nm->xaction = &hdr->xid[0]; // reference xaction nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation) nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message memcpy( nm->payload, old_msg->payload, old_msg->len ); @@ -358,7 +396,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { nm = (rmr_mbuf_t *) malloc( sizeof *nm ); if( nm == NULL ) { - fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" ); + rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" ); exit( 1 ); } memset( nm, 0, sizeof( *nm ) ); @@ -367,23 +405,26 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment - if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len ); tpb_len = mlen + TP_HDR_LEN; if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) { - fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM ); + rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM ); exit( 1 ); } - memset( nm->tp_buf, 0, tpb_len ); - memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // DEBUGGING - alen = (int *) nm->tp_buf; - *alen = tpb_len; // FIX ME: need a stuct to go in these first bytes, not just dummy len + if( DEBUG ) { + memset( nm->tp_buf, 0, tpb_len ); + memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 ); // DEBUGGING do NOT use $ in this string!! + } + + insert_mlen( (uint32_t) tpb_len, nm->tp_buf ); // this len will likely be reset on send to shrink nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version switch( ntohl( v1hdr->rmr_ver ) ) { case 1: + v1hdr = nm->header; memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header nm->payload = (void *) v1hdr + sizeof( *v1hdr ); break; @@ -410,7 +451,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload - nm->xaction = hdr->xid; // reference xaction + nm->xaction = &hdr->xid[0]; // reference xaction nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation) nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message memcpy( nm->payload, old_msg->payload, old_msg->len ); @@ -419,62 +460,138 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { } /* - For SI95 based transport all receives are driven through the threaded - ring and thus this function should NOT be called. If it is we will panic - and abort straight away. + Realloc the message such that the payload is at least payload_len bytes. + The clone and copy options affect what portion of the original payload is copied to + the reallocated message, and whether or not the original payload is lost after the + reallocation process has finished. + + copy == true + The entire payload from the original message will be coppied to the reallocated + payload. + + copy == false + Only the header (preserving return to sender information, message type, etc) + is preserved after reallocation; the payload used lengrh is set to 0 and the + payload is NOT initialised/cleared. + + clone == true + The orignal message is preserved and a completely new message buffer and payload + are allocated (even if the size given is the same). A pointer to the new message + buffer is returned and it is the user application's responsibility to manage the + old buffer (e.g. free when not needed). + + clone == false + The old payload will be lost after reallocation. The message buffer pointer which + is returned will likely reference the same structure (don't depend on that). + + + CAUTION: + If the message is not a message which was received, the mtype, sub-id, length values in the + RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting + mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently + resetting information in the mbuffer, this funciton will reset the mbuf values from the current + settings and NOT from the copied RMR header in transport buffer. */ -static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { +static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) { + rmr_mbuf_t* nm = NULL; // new message buffer when cloning + size_t mlen; + uta_mhdr_t* omhdr; // old message header + int tr_old_len; // tr size in new buffer + int old_psize = 0; // size of payload in the message passed in (alloc size - tp header and rmr header lengths) + int hdr_len = 0; // length of RMR and transport headers in old msg + void* old_tp_buf; // pointer to the old tp buffer + int free_tp = 1; // free the transport buffer (old) when done (when not cloning) + int old_mt; // msg type and sub-id from the message passed in + int old_sid; + int old_len; + int old_rfd; // rts file descriptor from old message + + if( old_msg == NULL || payload_len <= 0 ) { + errno = EINVAL; + return NULL; + } -fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort! rcv_msg called and it shouldn't be\n" ); -exit( 1 ); + old_mt = old_msg->mtype; // preserve mbuf info + old_sid = old_msg->sub_id; + old_len = old_msg->len; + old_rfd = old_msg->rts_fd; - return NULL; -} + old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN); // user payload size in orig message -/* - Receives a 'raw' message from a non-RMr sender (no header expected). The returned - message buffer cannot be used to send, and the length information may or may - not be correct (it is set to the length received which might be more than the - bytes actually in the payload). + if( !clone && payload_len <= old_psize ) { // not cloning and old is large enough; nothing to do + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len ); + return old_msg; + } - Mostly this supports the route table collector, but could be extended with an - API external function. -*/ -static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { - return NULL; -/* -FIXME: not implemented yet - int state; - rmr_mbuf_t* msg = NULL; // msg received - size_t rsize; // nng needs to write back the size received... grrr + hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN; // with SI we manage the transport header; must include in len + old_tp_buf = old_msg->tp_buf; - if( old_msg ) { - msg = old_msg; + if( clone ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" ); + free_tp = 0; + + nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) ); + if( nm == NULL ) { + rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) ); + return NULL; + } + memset( nm, 0, sizeof( *nm ) ); + nm->rts_fd = old_rfd; // this is managed only in the mbuf; dup now } else { - msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check + nm = old_msg; } - //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received - if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) { - return msg; + omhdr = old_msg->header; + mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize); // must have larger in case copy is true + + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen ); + if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) { + rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen ); + free( nm ); + return NULL; } - rsize = nng_msg_len( msg->tp_buf ); - // do NOT use ref_tpbuf() here! Must fill these in manually. - msg->header = nng_msg_body( msg->tp_buf ); - msg->len = rsize; // len is the number of bytes received - msg->alloc_len = rsize; - msg->mtype = UNSET_MSGTYPE; // raw message has no type - msg->sub_id = UNSET_SUBID; // nor a subscription id - msg->state = RMR_OK; - msg->flags = MFL_RAW; - msg->payload = msg->header; // payload is the whole thing; no header - msg->xaction = NULL; + nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; // point at the new header and copy from old + SET_HDR_LEN( nm->header ); - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len ); + if( copy ) { // if we need to copy the old payload too + memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize ); + } else { // just need to copy header + memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) ); + } - return msg; + ref_tpbuf( nm, mlen ); // set payload and other pointers in the message to the new tp buffer + + if( !copy ) { + nm->mtype = -1; // didn't copy payload, so mtype, sub-id, and rts fd are invalid + nm->sub_id = -1; + nm->len = 0; // and len is 0 + } else { + nm->len = old_len; // we must force these to avoid losing info if msg wasn't a received message + nm->mtype = old_mt; + nm->sub_id = old_sid; + } + + if( free_tp ) { + free( old_tp_buf ); // we did not clone, so free b/c no references + } + + return nm; +} + +/* + For SI95 based transport all receives are driven through the threaded + ring and thus this function should NOT be called. If it is we will panic + and abort straight away. */ +static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { + +fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort! rcv_msg called and it shouldn't be\n" ); +exit( 1 ); + + return NULL; } /* @@ -487,7 +604,7 @@ FIXME: not implemented yet Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer validation has been done prior. - When msg->state is not ok, this function must set tp_state in the message as some API + When msg->state is not ok, this function must set tp_state in the message as some API fucntions return the message directly and do not propigate errno into the message. */ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) { @@ -506,8 +623,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours - strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); } if( retries == 0 ) { @@ -519,13 +636,16 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r msg->state = RMR_OK; do { tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths - *((int*) msg->tp_buf) = tot_len; + if( tot_len > msg->alloc_len ) { + tot_len = msg->alloc_len; // likely bad length from user :( + } + insert_mlen( tot_len, msg->tp_buf ); // shrink to fit - if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries ); if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" ); if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) { - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg: error!! sent state=%d\n", state ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: error!! sent state=%d\n", state ); msg->state = state; if( retries > 0 && state == SI_ERR_BLOCKED ) { if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to @@ -539,7 +659,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r state = 0; // don't loop } } else { - if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state ); + if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state ); state = 0; msg->state = RMR_OK; hdr = NULL; @@ -561,7 +681,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r msg->state = RMR_ERR_SENDFAILED; } - if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) ); } return msg; @@ -583,8 +703,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r message type is used. If the initial lookup, with a subid, fails, then a second lookup using just the mtype is tried. - When msg->state is not OK, this function must set tp_state in the message as - some API fucntions return the message directly and do not propigate errno into + When msg->state is not OK, this function must set tp_state in the message as + some API fucntions return the message directly and do not propigate errno into the message. CAUTION: this is a non-blocking send. If the message cannot be sent, then @@ -604,6 +724,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { int sock_ok; // got a valid socket from round robin select char* d1; int ok_sends = 0; // track number of ok sends + route_table_t* rt; // active route table if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -628,10 +749,10 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { max_to = ctx->send_retries; // convert to retries } - if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key - if( ctx->flags & CTXFL_WARN ) { - fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id ); - } + rt = get_rt( ctx ); // get active route table and up ref count + if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key + release_rt( ctx, rt ); + rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id ); msg->state = RMR_ERR_NOENDPT; errno = ENXIO; // must ensure it's not eagain msg->tp_state = errno; @@ -641,9 +762,14 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { send_again = 1; // force loop entry group = 0; // always start with group 0 while( send_again ) { - sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx ); // select endpt from rr group and set again if more groups + if( rte->nrrgroups > 0 ) { // this is a round robin entry if groups are listed + sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups + } else { + sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep ); + send_again = 0; + } - if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n", + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n", msg->flags, msg->mtype, send_again, group, msg->len, sock_ok ); group++; @@ -652,31 +778,31 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { if( send_again ) { clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available if( clone_m == NULL ) { + release_rt( ctx, rt ); msg->state = RMR_ERR_SENDFAILED; errno = ENOMEM; msg->tp_state = errno; - if( ctx->flags & CTXFL_WARN ) { - fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" ); - } + rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" ); return msg; } - if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len ); msg->flags |= MFL_NOALLOC; // keep send from allocating a new message; we have a clone to use msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success - + if( msg != NULL ) { // returned message indicates send error of some sort rmr_free_msg( msg ); // must ditchone; pick msg so we don't have to unfiddle flags msg = clone_m; } else { ok_sends++; msg = clone_m; // clone will be the next to send + msg->state = RMR_OK; } } else { msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was if( DEBUG ) { if( msg == NULL ) { - fprintf( stderr, "[DBUG] mtosend_msg: send returned nil message!\n" ); + rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: send returned nil message!\n" ); } } } @@ -686,7 +812,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { case RMR_OK: ep->scounts[EPSC_GOOD]++; break; - + case RMR_ERR_RETRY: ep->scounts[EPSC_TRANS]++; break; @@ -698,24 +824,22 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { } } } else { -/* - if( ctx->flags & CTXFL_WARN ) { - fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id ); - } -*/ + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id ); msg->state = RMR_ERR_NOENDPT; errno = ENXIO; } } + release_rt( ctx, rt ); // we can safely dec the ref counter now + if( msg ) { // call functions don't get a buffer back, so a nil check is required msg->flags &= ~MFL_NOALLOC; // must return with this flag off if( ok_sends ) { // multiple rr-groups and one was successful; report ok msg->state = RMR_OK; } - - if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state ); - + + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state ); + msg->tp_state = errno; }