3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: All functions (internal and external) needed to manage wormholes.
25 Wormholes allow a user application to send directly to an endpoint.
26 The application must first "open" the wormhole which allows us to
27 provide the application with an ID that can be used on a wh_send()
28 call. It also does the validation (future) which might not allow
29 the application to open any wormholes, or may allow them only to
32 Author: E. Scott Daniels
33 Date: 13 February 2019
46 #include "rmr_symtab.h"
51 #include <nng/protocol/pubsub0/pub.h>
52 #include <nng/protocol/pubsub0/sub.h>
53 #include <nng/protocol/pipeline0/push.h>
54 #include <nng/protocol/pipeline0/pull.h>
56 #include "rmr_nng_private.h"
57 #include "rt_generic_static.c"
58 #include "rtable_nng_static.c"
59 #include "sr_nng_static.c"
62 #include <nanomsg/nn.h>
63 #include <nanomsg/tcp.h>
64 #include <nanomsg/pair.h>
65 #include <nanomsg/pipeline.h>
66 #include <nanomsg/pubsub.h>
68 #include "rmr_private.h"
69 #include "rt_generic_static.c"
70 #include "rtable_static.c"
71 #include "sr_static.c"
75 #include "tools_static.c"
79 // ----------------------- internal stuff -----------------------------------------------
82 This function returns true if the current application is permitted to open a wormhole
83 to the desired target.
85 This is a place holder for future functionality.
87 static int wh_can_open( uta_ctx_t* ctx, char const* target ) {
92 Allocate and initialise the wormholes list; point context at it.
94 static int wh_init( uta_ctx_t* ctx ) {
103 if( ctx->wormholes != NULL ) { // already allocated, do nothing but signal all is well
107 if( (whm = malloc( sizeof( *whm ) )) == NULL ) {
108 fprintf( stderr, "mem alloc failed for whm: alloc %d bytes\n", (int) sizeof( *whm ) );
114 alloc_sz = whm->nalloc * sizeof( endpoint_t );
115 if( (whm->eps = (endpoint_t **) malloc( alloc_sz )) == NULL ) {
116 fprintf( stderr, "mem alloc failed: alloc %d bytes\n", (int) alloc_sz );
122 memset( whm->eps, 0, alloc_sz );
124 ctx->wormholes = whm;
130 Realloc the wormhole endpoint list.
131 Returns 0 if failure with errno set; !0 on success.
133 static int wh_extend( wh_mgt_t* whm ) {
138 i = whm->nalloc; // starting point for initialisation after realloc
141 alloc_sz = whm->nalloc * sizeof( endpoint_t );
142 if( (whm->eps = (endpoint_t **) realloc( whm->eps, alloc_sz )) == NULL ) {
147 for( j = 0; j < 16; j++ ) {
148 whm->eps[i++] = NULL; // must init the new stuff
156 Mostly for leak analysis during testing.
158 static void wh_nuke( uta_ctx_t* ctx ) {
163 if( ctx->wormholes ) {
164 if( ctx->wormholes->eps ) {
165 free( ctx->wormholes->eps );
167 free( ctx->wormholes );
170 ctx->wormholes = NULL;
173 // ----------------------- visible stuff ------------------------------------------------
176 Opens a direct wormhole connection to the named target. Target is expected to be
177 either hostname:port or IP:port. If we don't have an endpoint in our hash, we'll
179 Unlike 'regular' connections to endpoints which are connected on the first send
180 attempt, when a wormhole is opened we connect immediatly. In the NNG world this
181 could result in a delay and immediate failure. With nanomsg the failure may not
182 be detected as the connect doesn't block.
184 extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
185 endpoint_t* ep; // endpoint that represents the target
186 uta_ctx_t* ctx = NULL;
187 rmr_whid_t whid = -1; // wormhole id is the index into the list
188 wh_mgt_t* whm; // easy reference to wh mgt stuff
192 if( (ctx = (uta_ctx_t *) vctx) == NULL || target == NULL || *target == 0 ) {
197 if( ! wh_can_open( ctx, target ) ) {
202 if( ctx->wormholes == NULL ) {
203 if( ! wh_init( ctx ) ) { // first call, we need to set things up
204 return whid; // fail with errno set by init
208 whm = ctx->wormholes;
211 if( (ep = rt_ensure_ep( ctx->rtable, target )) == NULL ) { // get pointer to ep if there, create new if not
212 fprintf( stderr, "ensure ep returned bad; setting no memory error\n" );
213 return -1; // ensure sets errno
217 for( i = 0; i < whm->nalloc; i++ ) { // look for a pointer to the ep, and find first open spot
218 if( whid == whm->nalloc && !whm->eps[i] ) {
219 whid = i; // save first open slot should we need it
222 if( whm->eps[i] == ep ) {
223 return i; // we're already pointing to it, just send it back again
227 if( whid >= whm->nalloc ) {
228 if( ! wh_extend( whm ) ) { // add some; whid will point to the right place
233 if( !rt_link2_ep( ep ) ) { // start a connection if open already
234 errno = ECONNREFUSED;
244 Send a message directly to an open wormhole.
245 As with the other send functions in RMr, we return a new zero copy buffer for the
246 user application to fill in.
248 extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg ) {
250 endpoint_t* ep; // enpoint that wormhole ID references
253 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
254 errno = EINVAL; // if msg is null, this is their clue
256 msg->state = RMR_ERR_BADARG;
257 errno = EINVAL; // must ensure it's not eagain
264 if( (whm = ctx->wormholes) == NULL ) {
265 errno = EINVAL; // no wormholes open
266 msg->state = RMR_ERR_NOWHOPEN;
270 if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
271 errno = EINVAL; // no wormholes open
272 msg->state = RMR_ERR_WHID;
276 errno = 0; // nng seems not to set errno any longer, so ensure it's clear
277 if( msg->header == NULL ) {
278 fprintf( stderr, "[ERR] rmr_wh_send_msg: message had no header\n" );
279 msg->state = RMR_ERR_NOHDR;
280 errno = EBADMSG; // must ensure it's not eagain
285 return send2ep( ctx, ep, msg ); // send directly to the endpoint
289 This will "close" a wormhole. We don't actually drop the session as that might be needed
290 by others, but we do pull the ep reference from the list.
292 extern void rmr_wh_close( void* vctx, int whid ) {
296 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
300 if( (whm = ctx->wormholes) == NULL || whm->eps == NULL ) {
304 if( whid >= whm->nalloc || whid < 0 ) {
308 if( whm->eps[whid] == NULL ) {
312 whm->eps[whid] = NULL;