Allow RTS calls prior to initial route table load
[ric-plt/lib/rmr.git] / src / rmr / nng / src / mt_call_nng_static.c
index 325b313..d3a914a 100644 (file)
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
+       static  int warned = 0;
        chute_t*        chute;
        chute_t*        chute;
-       int                     state;
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
+               if( !warned ) {
+                       rmr_vlog( RMR_VL_WARN, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+                       warned++;
+               }
+
+               return;
        }
 
        chute = &ctx->chutes[0];
        }
 
        chute = &ctx->chutes[0];
-       chute->mbuf = mbuf;
-       state = sem_post( &chute->barrier );                                                            // tickle the ring monitor
+       sem_post( &chute->barrier );                                                            // tickle the ring monitor
 }
 
 /*
 }
 
 /*
@@ -59,7 +64,8 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
                The transaction ID in the message matches the expected ID in the chute,
                then the message is given to the chute and the chute's semaphore is tickled.
 
                The transaction ID in the message matches the expected ID in the chute,
                then the message is given to the chute and the chute's semaphore is tickled.
 
-               If none are true, the message is dropped.
+               If none are true, the message is queued on the normal receive queue and that
+               related semaphore is tickeled.
 */
 static void* mt_receive( void* vctx ) {
        uta_ctx_t*              ctx;
 */
 static void* mt_receive( void* vctx ) {
        uta_ctx_t*              ctx;
@@ -74,12 +80,12 @@ static void* mt_receive( void* vctx ) {
                return NULL;
        }
 
                return NULL;
        }
 
-       fprintf( stderr, "[INFO] rmr mt_receiver is spinning\n" );
+       rmr_vlog( RMR_VL_INFO, "rmr mt_receiver is spinning\n" );
 
        while( ! ctx->shutdown ) {
                mbuf = rcv_msg( ctx, NULL );
 
 
        while( ! ctx->shutdown ) {
                mbuf = rcv_msg( ctx, NULL );
 
-               if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL ) {
+               if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL && mbuf->payload != NULL ) {
                        if( hdr->flags & HFL_CALL_MSG ) {                                       // call generated message; ignore call-id etc and queue
                                queue_normal( ctx, mbuf );
                        } else {
                        if( hdr->flags & HFL_CALL_MSG ) {                                       // call generated message; ignore call-id etc and queue
                                queue_normal( ctx, mbuf );
                        } else {