fixing RMR messages with negative size
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index 27c707b..78a1039 100644 (file)
@@ -1,8 +1,8 @@
-       // : vi ts=4 sw=4 noet2
+// : vi ts=4 sw=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2020 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2020-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -44,15 +44,16 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
                //dcount++;
                ctx->dcount++;
+               ctx->acc_dcount++;
                if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
-                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %ld msgs dropped since last warning\n", ctx->dcount );
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
                        last_warning = time( NULL );
                        ctx->dcount = 0;
                }
 
                return;
        }
-
+       ctx->acc_ecount++;
        chute = &ctx->chutes[0];
        sem_post( &chute->barrier );                                                            // tickle the ring monitor
 }
@@ -198,8 +199,13 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
-                       // future -- sync to next marker
-                       river->ipt = 0;                                         // insert point
+                       if( river->state == RS_RESET ) {
+                               // future -- reset not implemented
+                               return SI_RET_OK;
+                       } else {
+                               // future -- sync to next marker
+                               river->ipt = 0;                                         // insert point
+                       }
                }
        }
 
@@ -237,6 +243,12 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        } else {
                                river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
                        }
+
+                        if( river->msg_size < 0) { // addressing RIC-989
+                                river->state=RS_RESET;
+                               return SI_RET_OK;
+                        }
+
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
 
                        if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer