Code Review
/
ric-plt
/
lib
/
rmr.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
fixing RMR messages with negative size
[ric-plt/lib/rmr.git]
/
src
/
rmr
/
si
/
src
/
mt_call_si_static.c
diff --git
a/src/rmr/si/src/mt_call_si_static.c
b/src/rmr/si/src/mt_call_si_static.c
index
27c707b
..
78a1039
100644
(file)
--- a/
src/rmr/si/src/mt_call_si_static.c
+++ b/
src/rmr/si/src/mt_call_si_static.c
@@
-1,8
+1,8
@@
- // : vi ts=4 sw=4 noet2
+// : vi ts=4 sw=4 noet:
/*
==================================================================================
/*
==================================================================================
- Copyright (c) 2020 Nokia
- Copyright (c) 2018-202
0
AT&T Intellectual Property.
+ Copyright (c) 2020
-2021
Nokia
+ Copyright (c) 2018-202
1
AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@
-44,15
+44,16
@@
static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
rmr_free_msg( mbuf ); // drop if ring is full
//dcount++;
ctx->dcount++;
rmr_free_msg( mbuf ); // drop if ring is full
//dcount++;
ctx->dcount++;
+ ctx->acc_dcount++;
if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec
if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec
- rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %
l
d msgs dropped since last warning\n", ctx->dcount );
+ rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
last_warning = time( NULL );
ctx->dcount = 0;
}
return;
}
last_warning = time( NULL );
ctx->dcount = 0;
}
return;
}
-
+ ctx->acc_ecount++;
chute = &ctx->chutes[0];
sem_post( &chute->barrier ); // tickle the ring monitor
}
chute = &ctx->chutes[0];
sem_post( &chute->barrier ); // tickle the ring monitor
}
@@
-198,8
+199,13
@@
static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
- // future -- sync to next marker
- river->ipt = 0; // insert point
+ if( river->state == RS_RESET ) {
+ // future -- reset not implemented
+ return SI_RET_OK;
+ } else {
+ // future -- sync to next marker
+ river->ipt = 0; // insert point
+ }
}
}
}
}
@@
-237,6
+243,12
@@
static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
} else {
river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later
}
} else {
river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later
}
+
+ if( river->msg_size < 0) { // addressing RIC-989
+ river->state=RS_RESET;
+ return SI_RET_OK;
+ }
+
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer