enhance(API): Add multi-threaded call
[ric-plt/lib/rmr.git] / src / rmr / nng / src / sr_nng_static.c
index 7c17589..da4dfbd 100644 (file)
@@ -142,8 +142,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                hdr->sub_id = htonl( UNSET_SUBID );
                SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
                SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
-               //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
-               //SET_HDR_D2_LEN( hdr, ctx->d2_len );
+               SET_HDR_D1_LEN( hdr, ctx->d1_len );
+               //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
        }
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
@@ -322,7 +322,6 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
        uta_mhdr_t* hdr;
        uta_v1mhdr_t* v1hdr;
        int     tr_old_len;                     // tr size in new buffer
-       int     coffset;                        // an offset to something in the header for copy
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
@@ -349,22 +348,19 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
                        nm->payload = (void *) v1hdr + sizeof( *v1hdr );
                        break;
 
-               default:                                                                                        // current message always caught  here
+               default:                                                                                        // current message version always caught  here
                        hdr = nm->header;
-                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data might have changed
-                       if( RMR_D1_LEN( hdr )  ) {
-                               coffset = DATA1_OFFSET( hdr );                                                                                          // offset to d1
-                               memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
+                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
 
+                       if( RMR_D1_LEN( hdr )  ) {
+                               memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
                        }
                        if( RMR_D2_LEN( hdr )  ) {
-                               coffset = DATA2_OFFSET( hdr );                                                                                          // offset to d2
-                               memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) );          // copy data2 and data2 if necessary
+                               memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
                        }
 
-                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // MUST set before pointing payload
                        nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
-                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // do NOT copy old trace data, just set the new header
                        break;
        }
 
@@ -427,6 +423,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        msg->len = 0;
        msg->payload = NULL;
        msg->xaction = NULL;
+       msg->tp_buf = NULL;
 
        msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
        if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
@@ -590,6 +587,108 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        return msg;
 }
 
+/*
+       send message with maximum timeout.
+       Accept a message and send it to an endpoint based on message type.
+       If NNG reports that the send attempt timed out, or should be retried,
+       RMr will retry for approximately max_to microseconds; rounded to the next
+       higher value of 10.
+
+       Allocates a new message buffer for the next send. If a message type has
+       more than one group of endpoints defined, then the message will be sent
+       in round robin fashion to one endpoint in each group.
+
+       An endpoint will be looked up in the route table using the message type and
+       the subscription id. If the subscription id is "UNSET_SUBID", then only the
+       message type is used.  If the initial lookup, with a subid, fails, then a
+       second lookup using just the mtype is tried.
+
+       CAUTION: this is a non-blocking send.  If the message cannot be sent, then
+               it will return with an error and errno set to eagain. If the send is
+               a limited fanout, then the returned status is the status of the last
+               send attempt.
+
+*/
+static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
+       nng_socket      nn_sock;                        // endpoint socket for send
+       uta_ctx_t*      ctx;
+       int                     group;                          // selected group to get socket for
+       int                     send_again;                     // true if the message must be sent again
+       rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
+       int                     sock_ok;                        // got a valid socket from round robin select
+       uint64_t         key;                           // mtype or sub-id/mtype sym table key
+       int                     altk_ok = 0;            // set true if we can lookup on alternate key if mt/sid lookup fails
+       char*           d1;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
+               errno = EINVAL;                                                                                         // if msg is null, this is their clue
+               if( msg != NULL ) {
+                       msg->state = RMR_ERR_BADARG;
+                       errno = EINVAL;                                                                                 // must ensure it's not eagain
+               }
+               return msg;
+       }
+
+       errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
+       if( msg->header == NULL ) {
+               fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
+               msg->state = RMR_ERR_NOHDR;
+               errno = EBADMSG;                                                                                        // must ensure it's not eagain
+               return msg;
+       }
+
+       if( max_to < 0 ) {
+               max_to = ctx->send_retries;             // convert to retries
+       }
+
+       send_again = 1;                                                                                 // force loop entry
+       group = 0;                                                                                              // always start with group 0
+
+       key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
+       if( msg->sub_id != UNSET_SUBID ) {
+               altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+       }
+       while( send_again ) {
+               sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
+               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+                               msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
+
+               if( ! sock_ok ) {
+                       if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
+                               altk_ok = 0;
+                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
+                               send_again = 1;                                                                         // ensure we don't exit the while
+                               continue;
+                       }
+
+                       msg->state = RMR_ERR_NOENDPT;
+                       errno = ENXIO;                                                                                  // must ensure it's not eagain
+                       return msg;                                                                                             // caller can resend (maybe) or free
+               }
+
+               group++;
+
+               if( send_again ) {
+                       clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
+                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+                       msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
+                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
+                       /*
+                       if( msg ) {
+                               // error do we need to count successes/errors, how to report some success, esp if last fails?
+                       }
+                       */
+
+                       msg = clone_m;                                                                                  // clone will be the next to send
+               } else {
+                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
+               }
+       }
+
+       return msg;                                                                     // last message caries the status of last/only send attempt
+}
+
+
 /*
        A generic wrapper to the real send to keep wormhole stuff agnostic.
        We assume the wormhole function vetted the buffer so we don't have to.