X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=73d776aedc1d795fb047f960741da6ee82059b4b;hb=11838bcf76f3614384459cb56e2ce80dea788cef;hp=1b31bf41df2af017ccb0403de661fe3d0b652430;hpb=ce1c741c01e8387cb095dac5e36a4d8ad91d006d;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 1b31bf4..73d776a 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -1,8 +1,8 @@ // : vi ts=4 sw=4 noet: /* ================================================================================== - Copyright (c) 2020 Nokia - Copyright (c) 2018-2020 AT&T Intellectual Property. + Copyright (c) 2020-2021 Nokia + Copyright (c) 2018-2021 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -35,14 +35,19 @@ #include static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { - static int warned = 0; + static time_t last_warning = 0; + //static long dcount = 0; + chute_t* chute; if( ! uta_ring_insert( ctx->mring, mbuf ) ) { rmr_free_msg( mbuf ); // drop if ring is full - if( !warned ) { - rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" ); - warned++; + //dcount++; + ctx->dcount++; + if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec + rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount ); + last_warning = time( NULL ); + ctx->dcount = 0; } return; @@ -96,6 +101,8 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd } } } + } else { + free( raw_msg ); } } @@ -153,7 +160,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { unsigned char* old_accum; // old accumulator reference should we need to realloc int bidx = 0; // transport buffer index int remain; // bytes in transport buf that need to be moved - int* mlen; // pointer to spot in buffer for conversion to int + int* mlen; // pointer to spot in buffer for conversion to int int need; // bytes needed for something int i; @@ -161,22 +168,31 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return SI_RET_OK; } - - if( fd >= ctx->nrivers || fd < 0 ) { - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); - return SI_RET_OK; - } } else { ctx = (uta_ctx_t *) vctx; } - if( buflen <= 0 ) { + if( buflen <= 0 || fd < 0 ) { // no buffer or invalid fd return SI_RET_OK; } - river = &ctx->rivers[fd]; + if( fd >= ctx->nrivers ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); + if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) { + river = (river_t *) malloc( sizeof( *river ) ); + memset( river, 0, sizeof( *river ) ); + rmr_sym_map( ctx->river_hash, (uint64_t) fd, river ); + river->state = RS_NEW; + } + } else { + river = &ctx->rivers[fd]; // quick index for fd values < MAX_FD + } + if( river->state != RS_GOOD ) { // all states which aren't good require reset first if( river->state == RS_NEW ) { + if( river->accum != NULL ) { + free( river->accum ); + } memset( river, 0, sizeof( *river ) ); river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size river->accum = (char *) malloc( river->nbytes ); @@ -281,17 +297,36 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { static int mt_disc_cb( void* vctx, int fd ) { uta_ctx_t* ctx; endpoint_t* ep; + river_t* river = NULL; if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return SI_RET_OK; } + if( fd < ctx->nrivers && fd >= 0 ) { + river = &ctx->rivers[fd]; + } else { + if( fd > 0 ) { + river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd ); + } + } + + if( river != NULL ) { + river->state = RS_NEW; // if one connects here later; ensure it's new + if( river->accum != NULL ) { + free( river->accum ); + river->accum = NULL; + river->state = RS_NEW; // force realloc if the fd is used again + } + } + ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash if( ep != NULL ) { - pthread_mutex_lock( &ep->gate ); // wise to lock this + pthread_mutex_lock( &ep->gate ); // wise to lock this ep->open = FALSE; ep->nn_sock = -1; - pthread_mutex_unlock( &ep->gate ); + pthread_mutex_unlock( &ep->gate ); } return SI_RET_OK; @@ -326,7 +361,7 @@ static void* mt_receive( void* vctx ) { return NULL; } - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" ); + rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld registering SI95 data callback and waiting\n", (long long) pthread_self() ); SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects