RIC:1060: Change in PTL
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index ec86a82..c3483d8 100644 (file)
@@ -1,8 +1,8 @@
-       // : vi ts=4 sw=4 noet2
+// : vi ts=4 sw=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2020 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2020-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
        static  time_t last_warning = 0;
-       static  long dcount = 0;
+       //static        long dcount = 0;
 
        chute_t*        chute;
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
-               dcount++;
+               //dcount++;
+               ctx->dcount++;
+               ctx->acc_dcount++;
                if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
-                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %ld msgs dropped since last warning\n", dcount );
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
                        last_warning = time( NULL );
-                       dcount = 0;
+                       ctx->dcount = 0;
                }
 
                return;
        }
-
+       ctx->acc_ecount++;
        chute = &ctx->chutes[0];
        sem_post( &chute->barrier );                                                            // tickle the ring monitor
 }
@@ -75,6 +77,17 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
                }
        }
 
+       // cross-check that header length indicators are not longer than actual message
+       uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN);
+        uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check);
+        uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen);
+        if (header_len+TP_HDR_LEN+payload_len> msg_size) {
+                rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size);
+                free (raw_msg);
+                return;
+        }
+
+
        if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
                mbuf->tp_buf = raw_msg;
                mbuf->rts_fd = sender_fd;
@@ -197,8 +210,13 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
-                       // future -- sync to next marker
-                       river->ipt = 0;                                         // insert point
+                       if( river->state == RS_RESET ) {
+                               // future -- reset not implemented
+                               return SI_RET_OK;
+                       } else {
+                               // future -- sync to next marker
+                               river->ipt = 0;                                         // insert point
+                       }
                }
        }
 
@@ -236,6 +254,12 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        } else {
                                river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
                        }
+
+                        if( river->msg_size < 0) { // addressing RIC-989
+                                river->state=RS_RESET;
+                               return SI_RET_OK;
+                        }
+
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
 
                        if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer