Revert RTS to use unidirectional connection
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rtable_nng_static.c
index 7cd3e20..2cec490 100644 (file)
@@ -106,14 +106,14 @@ static int uta_link2( endpoint_t* ep ) {
 
        if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
                pthread_mutex_unlock( &ep->gate );
-               fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
+               rmr_vlog( RMR_VL_CRIT, "rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
                return FALSE;
        }
 
        snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
        if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
                pthread_mutex_unlock( &ep->gate );
-               fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
+               rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
                nng_close( *nn_sock );
                return FALSE;
        }
@@ -123,12 +123,12 @@ static int uta_link2( endpoint_t* ep ) {
 
        if( (state = nng_dialer_start( *dialer, flags )) != 0 ) {                                               // can fail immediatly (unlike nanomsg)
                pthread_mutex_unlock( &ep->gate );
-               fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
+               rmr_vlog( RMR_VL_WARN, "rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
                nng_close( *nn_sock );
                return FALSE;
        }
 
-       if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
+       if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_link2l: dial was successful: %s\n", target );
 
        ep->open = TRUE;                                                // must set before release
        pthread_mutex_unlock( &ep->gate );
@@ -138,8 +138,10 @@ static int uta_link2( endpoint_t* ep ) {
 /*
        This provides a protocol independent mechanism for establishing the connection to an endpoint.
        Return is true (1) if the link was opened; false on error.
+
+       For some flavours, the context is needed by this function, but not for nng.
 */
-static int rt_link2_ep( endpoint_t* ep ) {
+static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
        if( ep == NULL ) {
                return FALSE;
        }
@@ -165,24 +167,24 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        rrgroup_t* rrg;                         // pointer at group to update
 
        if( ! rte || ! rt ) {
-               fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
+               rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
                return NULL;
        }
 
        if( rte->nrrgroups <= group ) {
-               fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
+               rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
                return NULL;
        }
 
        if( (rrg = rte->rrgroups[group]) == NULL ) {
                if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
-                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
+                       rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
                        return NULL;
                }
                memset( rrg, 0, sizeof( *rrg ) );
 
                if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
-                       fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+                       rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
                        return NULL;
                }
                memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
@@ -199,7 +201,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        if( rrg != NULL ) {
                if( rrg->nused >= rrg->nendpts ) {
                        // future: reallocate
-                       fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
+                       rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
                        return NULL;
                }
 
@@ -207,7 +209,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                rrg->nused++;
        }
 
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
+       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
        return ep;
 }
 
@@ -230,14 +232,14 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
                *uepp = ep;
        }
        if( ep == NULL ) {
-               if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
                if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
                        return FALSE;
                }
        }
 
        if( ! ep->open )  {                                                                             // not open -- connect now
-               if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                        ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
@@ -246,7 +248,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
                }
-               if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
        } else {
                *nn_sock = ep->nn_sock;
                state = TRUE;
@@ -353,7 +355,7 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* n
                        } else {
                                state = FALSE;
                        }
-                       if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
                } else {
                        *nn_sock = ep->nn_sock;
                }
@@ -419,4 +421,56 @@ static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
        return rs;
 }
 
+/*
+       Given a message, use the meid field to find the owner endpoint for the meid.
+       The owner ep is then used to extract the socket through which the message
+       is sent. This returns TRUE if we found a socket and it was written to the
+       nn_sock pointer; false if we didn't.
+
+       We've been told that the meid is a string, thus we count on it being a nil
+       terminated set of bytes.
+*/
+static int epsock_meid( route_table_t *rtable, rmr_mbuf_t* msg, nng_socket* nn_sock, endpoint_t** uepp ) {
+       endpoint_t*     ep;                             // seected end point
+       int     state = FALSE;                  // processing state
+       char*   meid;
+
+
+       errno = 0;
+       if( ! nn_sock || msg == NULL || rtable == NULL ) {                      // missing stuff; bail fast
+               errno = EINVAL;
+               return FALSE;
+       }
+
+       meid = ((uta_mhdr_t *) msg->header)->meid;
+
+       if( (ep = get_meid_owner( rtable, meid )) == NULL ) {
+               if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
+                       *uepp = NULL;
+               }
+
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
+               return FALSE;
+       }
+
+       state = TRUE;
+       if( ! ep->open ) {                                                              // not connected
+               if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
+                       ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
+               }
+
+               if( uta_link2( ep ) ) {                                         // find entry in table and create link
+                       ep->open = TRUE;
+                       *nn_sock = ep->nn_sock;                                 // pass socket back to caller
+               } else {
+                       state = FALSE;
+               }
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+       } else {
+               *nn_sock = ep->nn_sock;
+       }
+
+       return state;
+}
+
 #endif