enhance(API): Add multi-threaded call
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
index e77c9f3..582cbb0 100644 (file)
@@ -50,6 +50,8 @@
 #include <unistd.h>
 #include <time.h>
 #include <arpa/inet.h>
+#include <semaphore.h>
+#include <pthread.h>
 
 #include <nng/nng.h>
 #include <nng/protocol/pubsub0/pub.h>
@@ -70,6 +72,8 @@
 #include "tools_static.c"
 #include "sr_nng_static.c"                     // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
+#include "mt_call_static.c"
+#include "mt_call_nng_static.c"
 
 
 //------------------------------------------------------------------------------
@@ -181,103 +185,20 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
 }
 
 /*
-       send message with maximum timeout.
-       Accept a message and send it to an endpoint based on message type.
-       If NNG reports that the send attempt timed out, or should be retried,
-       RMr will retry for approximately max_to microseconds; rounded to the next
-       higher value of 10.
-
-       Allocates a new message buffer for the next send. If a message type has
-       more than one group of endpoints defined, then the message will be sent
-       in round robin fashion to one endpoint in each group.
-
-       An endpoint will be looked up in the route table using the message type and
-       the subscription id. If the subscription id is "UNSET_SUBID", then only the
-       message type is used.  If the initial lookup, with a subid, fails, then a
-       second lookup using just the mtype is tried.
-
-       CAUTION: this is a non-blocking send.  If the message cannot be sent, then
-               it will return with an error and errno set to eagain. If the send is
-               a limited fanout, then the returned status is the status of the last
-               send attempt.
-
+       This is a wrapper to the real timeout send. We must wrap it now to ensure that
+       the call flag and call-id are reset
 */
 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
-       nng_socket nn_sock;                     // endpoint socket for send
-       uta_ctx_t*      ctx;
-       int     group;                                  // selected group to get socket for
-       int send_again;                         // true if the message must be sent again
-       rmr_mbuf_t*     clone_m;                // cloned message for an nth send
-       int sock_ok;                            // got a valid socket from round robin select
-       uint64_t key;                           // mtype or sub-id/mtype sym table key
-       int     altk_ok = 0;                    // set true if we can lookup on alternate key if mt/sid lookup fails
-
-       if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
-               errno = EINVAL;                                                                                         // if msg is null, this is their clue
-               if( msg != NULL ) {
-                       msg->state = RMR_ERR_BADARG;
-                       errno = EINVAL;                                                                                 // must ensure it's not eagain
-               }
-               return msg;
-       }
+       char* d1;                                                                                                                       // point at the call-id in the header
 
-       errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
-       if( msg->header == NULL ) {
-               fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
-               msg->state = RMR_ERR_NOHDR;
-               errno = EBADMSG;                                                                                        // must ensure it's not eagain
-               return msg;
-       }
+       if( msg != NULL ) {
+               ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
-       if( max_to < 0 ) {
-               max_to = ctx->send_retries;             // convert to retries
-       }
+               d1 = DATA1_ADDR( msg->header );
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+       }       
 
-       send_again = 1;                                                                                 // force loop entry
-       group = 0;                                                                                              // always start with group 0
-
-       key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
-       if( msg->sub_id != UNSET_SUBID ) {
-               altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
-       }
-       while( send_again ) {
-               sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
-                               msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
-
-               if( ! sock_ok ) {
-                       if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
-                               altk_ok = 0;
-                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
-                               send_again = 1;                                                                         // ensure we don't exit the while
-                               continue;
-                       }
-
-                       msg->state = RMR_ERR_NOENDPT;
-                       errno = ENXIO;                                                                                  // must ensure it's not eagain
-                       return msg;                                                                                             // caller can resend (maybe) or free
-               }
-
-               group++;
-
-               if( send_again ) {
-                       clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
-                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
-                       msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
-                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
-                       /*
-                       if( msg ) {
-                               // error do we need to count successes/errors, how to report some success, esp if last fails?
-                       }
-                       */
-
-                       msg = clone_m;                                                                                  // clone will be the next to send
-               } else {
-                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
-               }
-       }
-
-       return msg;                                                                     // last message caries the status of last/only send attempt
+       return mtosend_msg( vctx, msg, max_to );
 }
 
 /*
@@ -286,7 +207,16 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        See rmr_stimeout() for info on setting the default timeout.
 */
 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
-       return rmr_mtosend_msg( vctx, msg,  -1 );                       // retries <  uses default from ctx
+       char* d1;                                                                                                               // point at the call-id in the header
+
+       if( msg != NULL ) {
+               ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
+
+               d1 = DATA1_ADDR( msg->header );
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+       }       
+
+       return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
 }
 
 /*
@@ -314,12 +244,11 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
                The caller must check for this and handle.
 */
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
-       nng_socket nn_sock;                     // endpoint socket for send
+       nng_socket      nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
-       int state;
-       uta_mhdr_t*     hdr;
-       char*   hold_src;                       // we need the original source if send fails
-       int             sock_ok;                        // true if we found a valid endpoint socket
+       int                     state;
+       char*           hold_src;                       // we need the original source if send fails
+       int                     sock_ok;                        // true if we found a valid endpoint socket
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -336,19 +265,20 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
+       ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
        sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // socket of specific endpoint
        if( ! sock_ok ) {
                msg->state = RMR_ERR_NOENDPT;
                return msg;                                                     // preallocated msg can be reused since not given back to nn
        }
 
-       msg->state = RMR_OK;                                    // ensure it is clear before send
-       hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                        // must overlay the source to be ours
+       msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
+       hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
+       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );        // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );                    // always return original source so rts can be called again
-               msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );    // always return original source so rts can be called again
+               msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
        free( hold_src );
@@ -356,6 +286,16 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 }
 
 /*
+       If multi-threading call is turned on, this invokes that mechanism with the special call
+       id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
+       behavour (described below) is carried out.  This is safe to use when mt is enabled, but
+       the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
+       a flexible timeout.
+
+       On timeout this function will return a nil pointer. If the original message could not
+       be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
+
+       Original behavour:
        Call sends the message based on message routing using the message type, and waits for a
        response message to arrive with the same transaction id that was in the outgoing message.
        If, while wiating for the expected response,  messages are received which do not have the
@@ -373,8 +313,6 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
                                        user should call this function with the message again.
 
-
-       QUESTION:  should user specify the number of messages to allow to queue?
 */
 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
        uta_ctx_t*              ctx;
@@ -387,6 +325,10 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
+               return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
+       }
+
        memcpy( expected_id, msg->xaction, RMR_MAX_XID );
        expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
        if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
@@ -426,6 +368,10 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
        }
        errno = 0;
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
+               return rmr_mt_rcv( ctx, old_msg, -1 );
+       }
+
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
@@ -458,6 +404,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
                return old_msg;
        }
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
+               return rmr_mt_rcv( ctx, old_msg, ms_to );
+       }
+
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
@@ -650,7 +600,14 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
-       ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
+       ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
+
+       if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
+               ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
+               init_mtcall( ctx );                                                     // set up call chutes
+       } else {
+               ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
+       }
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
@@ -713,6 +670,14 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                }
        }
 
+       if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
+               ctx->flags |= CFL_MTC_ENABLED;
+               if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
+                       fprintf( stderr, "[WARN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+               }
+               
+       }
+
        free( proto_port );
        return (void *) ctx;
 }
@@ -818,4 +783,210 @@ extern void rmr_close( void* vctx ) {
 }
 
 
+// ----- multi-threaded call/receive support -------------------------------------------------
 
+/*
+       Blocks on the receive ring chute semaphore and then reads from the ring
+       when it is tickled.  If max_wait is -1 then the function blocks until
+       a message is ready on the ring. Else max_wait is assumed to be the number
+       of millaseconds to wait before returning a timeout message.
+*/
+extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
+       uta_ctx_t*      ctx;
+       uta_mhdr_t*     hdr;                    // header in the transport buffer
+       chute_t*        chute;
+       struct timespec ts;                     // time info if we have a timeout
+       long    new_ms;                         // adjusted mu-sec
+       long    seconds = 0;            // max wait seconds
+       long    nano_sec;                       // max wait xlated to nano seconds
+       int             state;
+       rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
+       
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               if( mbuf ) {
+                       mbuf->state = RMR_ERR_BADARG;
+               }
+               return mbuf;
+       }
+
+       if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+               errno = EINVAL;
+               if( mbuf != NULL ) {
+                       mbuf->state = RMR_ERR_NOTSUPP;
+               }
+               return mbuf;
+       }
+
+       ombuf = mbuf;
+       if( ombuf ) {
+               ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
+               ombuf->len = 0;
+       }
+
+       chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
+       
+       if( max_wait > 0 ) {
+               clock_gettime( CLOCK_REALTIME, &ts );   
+
+               if( max_wait > 999 ) {
+                       seconds = (max_wait - 999)/1000;
+                       max_wait -= seconds * 1000;
+                       ts.tv_sec += seconds;
+               }
+               if( max_wait > 0 ) {
+                       nano_sec = max_wait * 1000000;
+                       ts.tv_nsec += nano_sec;
+                       if( ts.tv_nsec > 999999999 ) {
+                               ts.tv_nsec -= 999999999;
+                               ts.tv_sec++;
+                       }
+               }
+
+               seconds = 1;                                                                                                    // use as flag later to invoked timed wait
+       }
+
+       errno = 0;
+       while( chute->mbuf == NULL && ! errno ) {
+               if( seconds ) {
+                       state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
+               } else {
+                       state = sem_wait( &chute->barrier );
+               }
+
+               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
+                       errno = 0;
+               }
+       }
+
+       if( state < 0 ) {
+               mbuf = ombuf;                           // return caller's buffer if they passed one in
+       } else {
+               if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+               if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       if( mbuf ) {
+                               mbuf->state = RMR_OK;
+
+                               if( ombuf ) {
+                                       rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
+                               }
+                       } else {
+                               mbuf = ombuf;                           // no buffer, return user's if there
+                       }
+               }
+       }
+
+       return mbuf;
+}
+
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+       rmr_mbuf_t* ombuf;                      // original mbuf passed in
+       uta_ctx_t*      ctx;
+       uta_mhdr_t*     hdr;                    // header in the transport buffer
+       chute_t*        chute;
+       unsigned char*  d1;                     // d1 data in header
+       struct timespec ts;                     // time info if we have a timeout
+       long    new_ms;                         // adjusted mu-sec
+       long    seconds = 0;            // max wait seconds
+       long    nano_sec;                       // max wait xlated to nano seconds
+       int             state;
+       
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
+               errno = EINVAL;
+               if( mbuf ) {
+                       mbuf->state = RMR_ERR_BADARG;
+               }
+               return mbuf;
+       }
+
+       if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+               mbuf->state = RMR_ERR_NOTSUPP;
+               return mbuf;
+       }
+
+       if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
+               mbuf->state = RMR_ERR_BADARG;
+               return mbuf;
+       }
+
+       ombuf = mbuf;                                                                                                   // save to return timeout status with
+
+       chute = &ctx->chutes[call_id];
+       if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
+               rmr_free_msg( chute->mbuf );
+               chute->mbuf = NULL;
+       }
+       
+       hdr = (uta_mhdr_t *) mbuf->header;
+       hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
+       memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
+       d1 = DATA1_ADDR( hdr );
+       d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
+       mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
+
+       if( max_wait > 0 ) {
+               clock_gettime( CLOCK_REALTIME, &ts );   
+
+               if( max_wait > 999 ) {
+                       seconds = (max_wait - 999)/1000;
+                       max_wait -= seconds * 1000;
+                       ts.tv_sec += seconds;
+               }
+               if( max_wait > 0 ) {
+                       nano_sec = max_wait * 1000000;
+                       ts.tv_nsec += nano_sec;
+                       if( ts.tv_nsec > 999999999 ) {
+                               ts.tv_nsec -= 999999999;
+                               ts.tv_sec++;
+                       }
+               }
+
+               seconds = 1;                                                                            // use as flag later to invoked timed wait
+       }
+
+       mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
+       if( mbuf ) {
+               if( mbuf->state != RMR_OK ) {
+                       return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
+               }
+       }
+
+       errno = 0;
+       while( chute->mbuf == NULL && ! errno ) {
+               if( seconds ) {
+                       state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
+               } else {
+                       state = sem_wait( &chute->barrier );
+               }
+
+               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
+                       errno = 0;
+               }
+       }
+
+       if( state < 0 ) {
+               return NULL;                                    // leave errno as set by sem wait call
+       }
+
+       mbuf = chute->mbuf;
+       mbuf->state = RMR_OK;
+       chute->mbuf = NULL;
+
+       return mbuf;
+}