Code Review
/
ric-plt
/
lib
/
rmr.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Fix large message bug in SI95 data callback
[ric-plt/lib/rmr.git]
/
src
/
rmr
/
nng
/
src
/
mt_call_nng_static.c
diff --git
a/src/rmr/nng/src/mt_call_nng_static.c
b/src/rmr/nng/src/mt_call_nng_static.c
index
2ef21f6
..
7820561
100644
(file)
--- a/
src/rmr/nng/src/mt_call_nng_static.c
+++ b/
src/rmr/nng/src/mt_call_nng_static.c
@@
-32,16
+32,21
@@
#include <semaphore.h>
static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
#include <semaphore.h>
static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
+ static int warned = 0;
chute_t* chute;
chute_t* chute;
- int state;
if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
+ if( !warned ) {
+ fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+ warned++;
+ }
+
+ return;
}
chute = &ctx->chutes[0];
}
chute = &ctx->chutes[0];
- chute->mbuf = mbuf;
- state = sem_post( &chute->barrier ); // tickle the ring monitor
+ sem_post( &chute->barrier ); // tickle the ring monitor
}
/*
}
/*
@@
-59,7
+64,8
@@
static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
The transaction ID in the message matches the expected ID in the chute,
then the message is given to the chute and the chute's semaphore is tickled.
The transaction ID in the message matches the expected ID in the chute,
then the message is given to the chute and the chute's semaphore is tickled.
- If none are true, the message is dropped.
+ If none are true, the message is queued on the normal receive queue and that
+ related semaphore is tickeled.
*/
static void* mt_receive( void* vctx ) {
uta_ctx_t* ctx;
*/
static void* mt_receive( void* vctx ) {
uta_ctx_t* ctx;
@@
-79,7
+85,7
@@
static void* mt_receive( void* vctx ) {
while( ! ctx->shutdown ) {
mbuf = rcv_msg( ctx, NULL );
while( ! ctx->shutdown ) {
mbuf = rcv_msg( ctx, NULL );
- if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL ) {
+ if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL
&& mbuf->payload != NULL
) {
if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
queue_normal( ctx, mbuf );
} else {
if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
queue_normal( ctx, mbuf );
} else {
@@
-91,13
+97,8
@@
static void* mt_receive( void* vctx ) {
queue_normal( ctx, mbuf );
} else {
chute = &ctx->chutes[call_id];
queue_normal( ctx, mbuf );
} else {
chute = &ctx->chutes[call_id];
- if( memcmp( mbuf->xaction, chute->expect, RMR_MAX_XID ) == 0 ) { // match
- chute->mbuf = mbuf;
- sem_post( &chute->barrier );
- } else {
- rmr_free_msg( mbuf );
- mbuf = NULL;
- }
+ chute->mbuf = mbuf;
+ sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
}
}
}
}
}
}