feature(routes): Add rtable update
[ric-plt/lib/rmr.git] / src / nanomsg / src / rmr.c
index 3fe4247..b57df3a 100644 (file)
@@ -1,14 +1,14 @@
 // :vi sw=4 ts=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,9 +24,9 @@
                                the older nanomsg messaging transport mehhanism.
 
                                To "hide" internal functions the choice was made to implement them
-                               all as static functions. This means that we include nearly 
+                               all as static functions. This means that we include nearly
                                all of our modules here as 90% of the library is not visible to
-                               the outside world. 
+                               the outside world.
 
        Author:         E. Scott Daniels
        Date:           28 November 2018
@@ -80,8 +80,8 @@ static void free_ctx( uta_ctx_t* ctx ) {
 // --------------- public functions --------------------------------------------------------------------------
 
 /*
-       Set the receive timeout to time. If time >1000 we assume the time is milliseconds,
-       else we assume seconds. Setting -1 is always block.
+       Set the receive timeout to time (ms). A value of 0 is the same as a non-blocking
+       receive and -1 is block for ever.
        Returns the nn value (0 on success <0 on error).
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
@@ -92,11 +92,11 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
                return -1;
        }
 
-       if( time > 0 ) {
-               if( time < 1000 ) {     
-                       time = time * 1000;                     // assume seconds, nn wants ms
-               }
-       } 
+       if( ctx->last_rto == time ) {
+               return 0;
+       }
+
+       ctx->last_rto = time;
 
        return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
 }
@@ -108,7 +108,6 @@ extern int rmr_rcv_to( void* vctx, int time ) {
        return rmr_rcv_to( vctx, time );
 }
 
-
 /*
        Set the send timeout to time. If time >1000 we assume the time is milliseconds,
        else we assume seconds. Setting -1 is always block.
@@ -123,10 +122,10 @@ extern int rmr_set_stimeout( void* vctx, int time ) {
        }
 
        if( time > 0 ) {
-               if( time < 1000 ) {     
+               if( time < 1000 ) {
                        time = time * 1000;                     // assume seconds, nn wants ms
                }
-       } 
+       }
 
        return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
 }
@@ -142,7 +141,7 @@ extern int rmr_send_to( void* vctx, int time ) {
        Returns the size of the payload (bytes) that the msg buffer references.
        Len in a message is the number of bytes which were received, or should
        be transmitted, however, it is possible that the mbuf was allocated
-       with a larger payload space than the payload length indicates; this 
+       with a larger payload space than the payload length indicates; this
        function returns the absolute maximum space that the user has available
        in the payload. On error (bad msg buffer) -1 is returned and errno should
        indicate the rason.
@@ -154,7 +153,7 @@ extern int rmr_payload_size( rmr_mbuf_t* msg ) {
        }
 
        errno = 0;
-       return msg->alloc_len - sizeof( uta_mhdr_t );                                           // figure size should we not have a msg buffer
+       return msg->alloc_len - RMR_HDR_LEN( msg->header );                     // transport buffer less header and other data bits
 }
 
 /*
@@ -169,10 +168,44 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
                return NULL;
        }
 
-       m = alloc_zcmsg( ctx, NULL, size, 0 );
+       m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );
        return  m;
 }
 
+/*
+       Allocates a send message as a zerocopy message allowing the underlying message protocol
+       to send the buffer without copy. In addition, a trace data field of tr_size will be
+       added and the supplied data coppied to the buffer before returning the message to
+       the caller.
+*/
+extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
+       uta_ctx_t*      ctx;
+       rmr_mbuf_t*     m;
+       int state;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return NULL;
+       }
+
+       m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
+       if( m != NULL ) {
+               state = rmr_set_trace( m, data, tr_size );                              // roll their data in
+               if( state != tr_size ) {
+                       m->state = RMR_ERR_INITFAILED;
+               }
+       }
+
+       return  m;
+}
+
+/*
+       Need an external path to the realloc static function as it's called by an
+       outward facing mbuf api function.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+       return realloc_msg( msg, new_tr_size );
+}
+
 /*
        Return the message to the available pool, or free it outright.
 */
@@ -188,15 +221,15 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
                        free( mbuf->header );
                }
        }
-       
+
        free( mbuf );
 }
 
 /*
-       Accept a message and send it to an endpoint based on message type.      
+       Accept a message and send it to an endpoint based on message type.
        Allocates a new message buffer for the next send. If a message type has
        more than one group of endpoints defined, then the message will be sent
-       in round robin fashion to one endpoint in each group. 
+       in round robin fashion to one endpoint in each group.
 
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
                it will return with an error and errno set to eagain. If the send is
@@ -209,13 +242,16 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        int     group;                                  // selected group to get socket for
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
+       uint64_t key;                           // lookup key is now subid and mtype
+       int max_rt = 1000;
+       int     altk_ok = 0;                    // ok to retry with alt key when true
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
                        errno = EINVAL;                                                                                 // must ensure it's not eagain
-               }                       
+               }
                return msg;
        }
 
@@ -230,32 +266,48 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
+       key = build_rt_key( msg->sub_id, msg->mtype );                  // what we need to find the route table entry
+       if( msg->sub_id != UNSET_SUBID ) {                                              // if sub id set, allow retry with just mtype if no endpoint when sub-id used
+               altk_ok = 1;
+       }
+
        while( send_again ) {
-               nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again );         // round robin select endpoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n", 
-                               msg->mtype, send_again, group, nn_sock, msg->len );
-               group++;
+               max_rt = 1000;
+               nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again );                // round robin select endpoint; again set if mult groups
+               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
+                               msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
 
                if( nn_sock < 0 ) {
+                       if( altk_ok ) {                                                                                 // ok to retry with alternate key
+                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build key with just mtype and retry
+                               send_again = 1;
+                               altk_ok = 0;
+                               continue;
+                       }
+
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
+               group++;
 
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
-                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len );
                        msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
                        msg = send_msg( ctx, msg, nn_sock );                                    // do the hard work, msg should be nil on success
-                       /*
-                       if( msg ) {
-                               // error do we need to count successes/errors, how to report some success, esp if last fails?
-                       } 
-                       */
+                       while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
+                               msg = send_msg( ctx, msg, nn_sock );
+                               max_rt--;
+                       }
 
                        msg = clone_m;                                                                                  // clone will be the next to send
                } else {
                        msg = send_msg( ctx, msg, nn_sock );                                    // send the last, and allocate a new buffer; drops the clone if it was
+                       while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
+                               msg = send_msg( ctx, msg, nn_sock );
+                               max_rt--;
+                       }
                }
        }
 
@@ -263,9 +315,9 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 }
 
 /*
-       Return to sender allows a message to be sent back to the endpoint where it originated. 
+       Return to sender allows a message to be sent back to the endpoint where it originated.
        The source information in the message is used to select the socket on which to write
-       the message rather than using the message type and round-robin selection. This 
+       the message rather than using the message type and round-robin selection. This
        should return a message buffer with the state of the send operation set. On success
        (state is RMR_OK, the caller may use the buffer for another receive operation), and on
        error it can be passed back to this function to retry the send if desired. On error,
@@ -297,7 +349,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                return msg;
        }
 
@@ -335,7 +387,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
        to ensure that no error was encountered. If the state is UTA_BADARG, then the message
-       may be resent (likely the context pointer was nil).  If the message is sent, but no 
+       may be resent (likely the context pointer was nil).  If the message is sent, but no
        response is received, a nil message is returned with errno set to indicate the likley
        issue:
                ETIMEDOUT -- too many messages were queued before reciving the expected response
@@ -354,7 +406,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                return msg;
        }
 
@@ -391,7 +443,7 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                errno = EINVAL;
                return old_msg;
        }
@@ -410,34 +462,41 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
 }
 
 /*
-       Receive with a timeout.  This is a convenience function when sitting on top of 
-       nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg(). 
+       Receive with a timeout.  This is a convenience function when sitting on top of
+       nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg().
 */
 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
-       rmr_set_rtimeout( vctx, ms_to );
+       uta_ctx_t*      ctx;
+
+       if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
+               if( ctx->last_rto != ms_to ) {                                                  // avoid call overhead
+                       rmr_set_rtimeout( vctx, ms_to );
+               }
+       }
+
        return rmr_rcv_msg( vctx, old_msg );
 }
 
 
 /*
        This blocks until the message with the 'expect' ID is received. Messages which are received
-       before the expected message are queued onto the message ring.  The function will return 
+       before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
        expected message is received. If the queued message ring fills a nil pointer is returned
        and errno is set to ENOBUFS.
 
-       Generally this will be invoked only by the call() function as it waits for a response, but 
+       Generally this will be invoked only by the call() function as it waits for a response, but
        it is exposed to the user application as three is no reason not to.
 */
 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
        uta_ctx_t*      ctx;
        int     queued = 0;                             // number we pushed into the ring
        int     exp_len = 0;                    // length of expected ID
-       
+
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                errno = EINVAL;
                return msg;
        }
@@ -497,12 +556,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
        char*   port;
        char*   proto_port;
-       char    wbuf[1024];                                     // work buffer 
+       char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
        int             state;
        char*   interface = NULL;                       // interface to bind to pulled from RMR_BIND_IF if set
 
-       fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n", 
+       fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n",
                        QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
 
        errno = 0;
@@ -520,6 +579,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
 
 
        ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
+       ctx->last_rto = -2;                                                             // last receive timeout that was set; invalid value to force first to set
 
        ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t );               // default max buffer size
        if( max_msg_size > 0 ) {
@@ -534,7 +594,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
 
        uta_lookup_rtg( ctx );                                                  // attempt to fill in rtg info; rtc will handle missing values/errors
 
-    ctx->nn_sock = nn_socket( AF_SP, NN_PULL );                // our 'listen' socket should allow multiple senders to connect
+       ctx->nn_sock = nn_socket( AF_SP, NN_PULL );             // our 'listen' socket should allow multiple senders to connect
        if( ctx->nn_sock < 0 ) {
                fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
                free_ctx( ctx );
@@ -569,7 +629,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
                interface = "0.0.0.0";
        }
        snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
-    if( nn_bind( ctx->nn_sock, bind_info ) < 0) {                      // bind and automatically accept client sessions
+       if( nn_bind( ctx->nn_sock, bind_info ) < 0) {                   // bind and automatically accept client sessions
                fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
                nn_close( ctx->nn_sock );
                free_ctx( ctx );
@@ -586,10 +646,30 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
        return (void *) ctx;
 }
 
+/*
+       This sets the default trace length which will be added to any message buffers
+       allocated.  It can be set at any time, and if rmr_set_trace() is given a
+       trace len that is different than the default allcoated in a message, the message
+       will be resized.
+
+       Returns 0 on failure and 1 on success. If failure, then errno will be set.
+*/
+extern int rmr_init_trace( void* vctx, int tr_len ) {
+       uta_ctx_t* ctx;
+
+       errno = 0;
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               return 0;
+       }
+
+       ctx->trace_data_len = tr_len;
+       return 1;
+}
 
 /*
        Publicly facing initialisation function. Wrapper for the init() funcion above
-       as it needs to ensure internal flags are masked off before calling the 
+       as it needs to ensure internal flags are masked off before calling the
        real workhorse.
 */
 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
@@ -614,7 +694,7 @@ extern int rmr_ready( void* vctx ) {
 }
 
 /*
-       Provides a non-fatal (compile) interface for the nng only function. 
+       Provides a non-fatal (compile) interface for the nng only function.
        Not supported on top of nano, so this always returns -1.
 */
 extern int rmr_get_rcvfd( void* vctx ) {
@@ -631,6 +711,6 @@ extern void rmr_close( void* vctx ) {
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                return;
        }
-       
+
        nn_close( ctx->nn_sock );
 }