Correct excessive TCP connection bug
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index 1b31bf4..ec86a82 100644 (file)
@@ -1,4 +1,4 @@
-// : vi ts=4 sw=4 noet:
+       // : vi ts=4 sw=4 noet2
 /*
 ==================================================================================
        Copyright (c) 2020 Nokia
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
-       static  int warned = 0;
+       static  time_t last_warning = 0;
+       static  long dcount = 0;
+
        chute_t*        chute;
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
-               if( !warned ) {
-                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
-                       warned++;
+               dcount++;
+               if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %ld msgs dropped since last warning\n", dcount );
+                       last_warning = time( NULL );
+                       dcount = 0;
                }
 
                return;
@@ -96,6 +100,8 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
                                }
                        }
                }
+       } else {
+               free( raw_msg );
        }
 }
 
@@ -153,7 +159,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        unsigned char*  old_accum;              // old accumulator reference should we need to realloc
        int                             bidx = 0;               // transport buffer index
        int                             remain;                 // bytes in transport buf that need to be moved
-       int*                    mlen;                   // pointer to spot in buffer for conversion to int
+       int*                    mlen;                   // pointer to spot in buffer for conversion to int
        int                             need;                   // bytes needed for something
        int                             i;
 
@@ -161,22 +167,31 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                        return SI_RET_OK;
                }
-
-               if( fd >= ctx->nrivers || fd < 0 ) {
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
-                       return SI_RET_OK;
-               }
        } else {
                ctx = (uta_ctx_t *) vctx;
        }
 
-       if( buflen <= 0 ) {
+       if( buflen <= 0 || fd < 0 ) {                   // no buffer or invalid fd
                return SI_RET_OK;
        }
 
-       river = &ctx->rivers[fd];
+       if( fd >= ctx->nrivers ) {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+               if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
+                       river = (river_t *) malloc( sizeof( *river ) );
+                       memset( river, 0, sizeof( *river ) );
+                       rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
+                       river->state = RS_NEW;
+               }
+       } else {
+               river = &ctx->rivers[fd];                               // quick index for fd values < MAX_FD
+       }
+
        if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
                if( river->state == RS_NEW ) {
+                       if( river->accum != NULL ) {
+                               free( river->accum );
+                       }
                        memset( river, 0, sizeof( *river ) );
                        river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // start with what user said would be the "normal" max inbound msg size
                        river->accum = (char *) malloc( river->nbytes );
@@ -281,17 +296,36 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 static int mt_disc_cb( void* vctx, int fd ) {
        uta_ctx_t*      ctx;
        endpoint_t*     ep;
+       river_t*        river = NULL;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                return SI_RET_OK;
        }
 
+       if( fd < ctx->nrivers && fd >= 0 ) {
+               river = &ctx->rivers[fd];
+       } else {
+               if( fd > 0 ) {
+                       river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
+               }
+       }
+
+       if( river != NULL ) {
+               river->state = RS_NEW;                  // if one connects here later; ensure it's new
+               if( river->accum != NULL ) {
+                       free( river->accum );
+                       river->accum = NULL;
+                       river->state = RS_NEW;          // force realloc if the fd is used again
+               }
+       }
+
        ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
        if( ep != NULL ) {
-       pthread_mutex_lock( &ep->gate );            // wise to lock this
+               pthread_mutex_lock( &ep->gate );            // wise to lock this
                ep->open = FALSE;
                ep->nn_sock = -1;
-       pthread_mutex_unlock( &ep->gate );
+               pthread_mutex_unlock( &ep->gate );
        }
 
        return SI_RET_OK;
@@ -326,7 +360,7 @@ static void* mt_receive( void* vctx ) {
                return NULL;
        }
 
-       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+       rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld  registering SI95 data callback and waiting\n", (long long) pthread_self() );
 
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
        SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects