Clean up a context.
*/
static void free_ctx( uta_ctx_t* ctx ) {
- if( ctx ) {
- if( ctx->rtg_addr ) {
- free( ctx->rtg_addr );
- }
+ if( ctx && ctx->rtg_addr ) {
+ free( ctx->rtg_addr );
}
}
Return the message to the available pool, or free it outright.
*/
extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
- //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
- //return;
-
if( mbuf == NULL ) {
return;
}
- if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full
+ if( mbuf->flags & MFL_HUGE || // don't cache oversized messages
+ ! mbuf->ring || // cant cache if no ring
+ ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full
+
if( mbuf->tp_buf ) {
free( mbuf->tp_buf );
mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
int nn_sock; // endpoint socket for send
uta_ctx_t* ctx;
- int state;
char* hold_src; // we need the original source if send fails
char* hold_ip; // also must hold original ip
int sock_ok = 0; // true if we found a valid endpoint socket
port = proto_port; // assume something like "1234" was passed
}
- if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener'
- if( atoi( tok ) < 1 ) {
- static_rtc = 1;
- }
+ if( (tok = getenv( "ENV_RTG_PORT" )) != NULL && atoi( tok ) < 1 ) { // must check here -- if < 1 then we just start static file 'listener'
+ static_rtc = 1;
}
if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
*/
extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
uta_ctx_t* ctx;
- uta_mhdr_t* hdr; // header in the transport buffer
chute_t* chute;
struct timespec ts; // time info if we have a timeout
long new_ms; // adjusted mu-sec