Correct bug in timeout receive
[ric-plt/lib/rmr.git] / src / rmr / nng / src / mt_call_nng_static.c
index 325b313..7820561 100644 (file)
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
+       static  int warned = 0;
        chute_t*        chute;
        chute_t*        chute;
-       int                     state;
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
+               if( !warned ) {
+                       fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+                       warned++;
+               }
+
+               return;
        }
 
        chute = &ctx->chutes[0];
        }
 
        chute = &ctx->chutes[0];
-       chute->mbuf = mbuf;
-       state = sem_post( &chute->barrier );                                                            // tickle the ring monitor
+       sem_post( &chute->barrier );                                                            // tickle the ring monitor
 }
 
 /*
 }
 
 /*
@@ -59,7 +64,8 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
                The transaction ID in the message matches the expected ID in the chute,
                then the message is given to the chute and the chute's semaphore is tickled.
 
                The transaction ID in the message matches the expected ID in the chute,
                then the message is given to the chute and the chute's semaphore is tickled.
 
-               If none are true, the message is dropped.
+               If none are true, the message is queued on the normal receive queue and that
+               related semaphore is tickeled.
 */
 static void* mt_receive( void* vctx ) {
        uta_ctx_t*              ctx;
 */
 static void* mt_receive( void* vctx ) {
        uta_ctx_t*              ctx;
@@ -79,7 +85,7 @@ static void* mt_receive( void* vctx ) {
        while( ! ctx->shutdown ) {
                mbuf = rcv_msg( ctx, NULL );
 
        while( ! ctx->shutdown ) {
                mbuf = rcv_msg( ctx, NULL );
 
-               if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL ) {
+               if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL && mbuf->payload != NULL ) {
                        if( hdr->flags & HFL_CALL_MSG ) {                                       // call generated message; ignore call-id etc and queue
                                queue_normal( ctx, mbuf );
                        } else {
                        if( hdr->flags & HFL_CALL_MSG ) {                                       // call generated message; ignore call-id etc and queue
                                queue_normal( ctx, mbuf );
                        } else {