Add SI95 transport support
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
index bc3fe10..720f6ad 100644 (file)
@@ -1,4 +1,4 @@
-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
        Copyright (c) 2019 Nokia
@@ -250,6 +250,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        char*           hold_src;                       // we need the original source if send fails
        char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
+       endpoint_t*     ep;                                     // end point to track counts
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -270,10 +271,10 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
-       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // src is always used first for rts
+       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );                   // src is always used first for rts
        if( ! sock_ok ) {
                if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
-                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );
+                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
                }
                if( ! sock_ok ) {
                        msg->state = RMR_ERR_NOENDPT;
@@ -287,6 +288,21 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
+               if( ep != NULL ) {
+                       switch( msg->state ) {
+                               case RMR_OK:
+                                       ep->scounts[EPSC_GOOD]++;
+                                       break;
+                       
+                               case RMR_ERR_RETRY:
+                                       ep->scounts[EPSC_TRANS]++;
+                                       break;
+
+                               default:
+                                       ep->scounts[EPSC_FAIL]++;
+                                       break;
+                       }
+               }
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
@@ -438,7 +454,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
                if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
                fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
                        free( eps );
-                       return NULL;
+                       ctx->eps = NULL;
+                       if( old_msg != NULL ) {
+                               old_msg->state = RMR_ERR_INITFAILED;
+                               old_msg->tp_state = errno;
+                       }
+                       return old_msg;
                }
 
                eps->nng_fd = rmr_get_rcvfd( ctx );
@@ -448,7 +469,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
                if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
                fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
                        free( eps );
-                       return NULL;
+                       ctx->eps = NULL;
+                       if( old_msg != NULL ) {
+                               old_msg->state = RMR_ERR_INITFAILED;
+                               old_msg->tp_state = errno;
+                       }
+                       return old_msg;
                }
 
                ctx->eps = eps;
@@ -599,8 +625,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        int             state;
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
-                       RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+                       RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
 
@@ -883,11 +909,11 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
 
        chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
 
-       if( max_wait > 0 ) {
+       if( max_wait >= 0 ) {
                clock_gettime( CLOCK_REALTIME, &ts );   
 
                if( max_wait > 999 ) {
-                       seconds = (max_wait - 999)/1000;
+                       seconds = max_wait / 1000;
                        max_wait -= seconds * 1000;
                        ts.tv_sec += seconds;
                }
@@ -1000,11 +1026,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
        mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
 
-       if( max_wait > 0 ) {
+       if( max_wait >= 0 ) {
                clock_gettime( CLOCK_REALTIME, &ts );   
 
                if( max_wait > 999 ) {
-                       seconds = (max_wait - 999)/1000;
+                       seconds = max_wait / 1000;
                        max_wait -= seconds * 1000;
                        ts.tv_sec += seconds;
                }
@@ -1060,3 +1086,40 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
 
        return mbuf;
 }
+
+/*
+       Given an existing message buffer, reallocate the payload portion to
+       be at least new_len bytes.  The message header will remain such that
+       the caller may use the rmr_rts_msg() function to return a payload
+       to the sender. 
+
+       The mbuf passed in may or may not be reallocated and the caller must
+       use the returned pointer and should NOT assume that it can use the 
+       pointer passed in with the exceptions based on the clone flag.
+
+       If the clone flag is set, then a duplicated message, with larger payload
+       size, is allocated and returned.  The old_msg pointer in this situation is
+       still valid and must be explicitly freed by the application. If the clone 
+       message is not set (0), then any memory management of the old message is
+       handled by the function.
+
+       If the copy flag is set, the contents of the old message's payload is 
+       copied to the reallocated payload.  If the flag is not set, then the 
+       contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+       if( old_msg == NULL ) {
+               return NULL;
+       }
+
+       return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
+}
+
+/*
+       The following functions are "dummies" as NNG has no concept of supporting
+       them, but are needed to resolve calls at link time.
+*/
+
+extern void rmr_set_fack( void* p ) {
+       return;
+}