3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: All functions (internal and external) needed to manage wormholes.
25 Wormholes allow a user application to send directly to an endpoint.
26 The application must first "open" the wormhole which allows us to
27 provide the application with an ID that can be used on a wh_send()
28 call. It also does the validation (future) which might not allow
29 the application to open any wormholes, or may allow them only to
32 Author: E. Scott Daniels
33 Date: 13 February 2019
46 #include "rmr_symtab.h"
51 #include <nng/protocol/pubsub0/pub.h>
52 #include <nng/protocol/pubsub0/sub.h>
53 #include <nng/protocol/pipeline0/push.h>
54 #include <nng/protocol/pipeline0/pull.h>
56 #include "rmr_nng_private.h"
57 #include "rt_generic_static.c"
58 #include "rtable_nng_static.c"
59 #include "sr_nng_static.c"
62 #include <nanomsg/nn.h>
63 #include <nanomsg/tcp.h>
64 #include <nanomsg/pair.h>
65 #include <nanomsg/pipeline.h>
66 #include <nanomsg/pubsub.h>
68 #include "rmr_private.h"
69 #include "rt_generic_static.c"
70 #include "rtable_static.c"
71 #include "sr_static.c"
75 #include "tools_static.c"
79 // ----------------------- internal stuff -----------------------------------------------
82 This function returns true if the current application is permitted to open a wormhole
83 to the desired target.
85 This is a place holder for future functionality.
87 static int wh_can_open( uta_ctx_t* ctx, char const* target ) {
92 Allocate and initialise the wormholes list; point context at it.
94 static int wh_init( uta_ctx_t* ctx ) {
103 if( ctx->wormholes != NULL ) { // already allocated, do nothing but signal all is well
107 if( (whm = malloc( sizeof( *whm ) )) == NULL ) {
108 rmr_vlog( RMR_VL_ERR, "mem alloc failed for whm: alloc %d bytes\n", (int) sizeof( *whm ) );
114 alloc_sz = whm->nalloc * sizeof( endpoint_t );
115 if( (whm->eps = (endpoint_t **) malloc( alloc_sz )) == NULL ) {
116 rmr_vlog( RMR_VL_ERR, "mem alloc failed: alloc %d bytes\n", (int) alloc_sz );
122 memset( whm->eps, 0, alloc_sz );
124 ctx->wormholes = whm;
130 Realloc the wormhole endpoint list.
131 Returns 0 if failure with errno set; !0 on success.
133 static int wh_extend( wh_mgt_t* whm ) {
138 i = whm->nalloc; // starting point for initialisation after realloc
141 alloc_sz = whm->nalloc * sizeof( endpoint_t );
142 if( (whm->eps = (endpoint_t **) realloc( whm->eps, alloc_sz )) == NULL ) {
147 for( j = 0; j < 16; j++ ) {
148 whm->eps[i++] = NULL; // must init the new stuff
156 Mostly for leak analysis during testing.
158 static void wh_nuke( uta_ctx_t* ctx ) {
163 if( ctx->wormholes ) {
164 if( ctx->wormholes->eps ) {
165 free( ctx->wormholes->eps );
167 free( ctx->wormholes );
170 ctx->wormholes = NULL;
173 // ----------------------- visible stuff ------------------------------------------------
176 Opens a direct wormhole connection to the named target. Target is expected to be
177 either hostname:port or IP:port. If we don't have an endpoint in our hash, we'll
179 Unlike 'regular' connections to endpoints which are connected on the first send
180 attempt, when a wormhole is opened we connect immediatly. In the NNG world this
181 could result in a delay and immediate failure. With nanomsg the failure may not
182 be detected as the connect doesn't block.
184 extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
185 endpoint_t* ep; // endpoint that represents the target
186 uta_ctx_t* ctx = NULL;
187 rmr_whid_t whid = -1; // wormhole id is the index into the list
188 wh_mgt_t* whm; // easy reference to wh mgt stuff
192 if( (ctx = (uta_ctx_t *) vctx) == NULL || target == NULL || *target == 0 ) {
197 if( ! wh_can_open( ctx, target ) ) {
202 if( ctx->wormholes == NULL ) {
203 if( ! wh_init( ctx ) ) { // first call, we need to set things up
204 return whid; // fail with errno set by init
208 whm = ctx->wormholes;
211 if( (ep = rt_ensure_ep( ctx->rtable, target )) == NULL ) { // get pointer to ep if there, create new if not
212 rmr_vlog( RMR_VL_ERR, "wormhole_open: ensure ep returned bad: target=(%s)\n", target );
213 return -1; // ensure sets errno
217 for( i = 0; i < whm->nalloc; i++ ) { // look for a pointer to the ep, and find first open spot
218 if( whid == whm->nalloc && !whm->eps[i] ) {
219 whid = i; // save first open slot should we need it
222 if( whm->eps[i] == ep ) {
223 if( whm->eps[i]->open ) { // we know about it and it's open
224 return i; // just send back the reference
227 whid = i; // have it, but not open, reopen
232 if( whid >= whm->nalloc ) {
233 if( ! wh_extend( whm ) ) { // add some; whid will point to the right place
238 if( !rt_link2_ep( ctx, ep ) ) { // start a connection if not already open
239 errno = ECONNREFUSED;
249 Send a message directly to an open wormhole.
250 As with the other send functions in RMr, we return a new zero copy buffer for the
251 user application to fill in.
253 extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg ) {
255 endpoint_t* ep; // enpoint that wormhole ID references
257 char* d1; // point at the call-id in the header
259 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
260 errno = EINVAL; // if msg is null, this is their clue
262 msg->state = RMR_ERR_BADARG;
263 errno = EINVAL; // must ensure it's not eagain
270 if( (whm = ctx->wormholes) == NULL ) {
271 errno = EINVAL; // no wormholes open
272 msg->state = RMR_ERR_NOWHOPEN;
276 if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
277 errno = EINVAL; // no wormholes open
278 msg->state = RMR_ERR_WHID;
283 if( msg->header == NULL ) {
284 rmr_vlog( RMR_VL_ERR, "rmr_wh_send_msg: message had no header\n" );
285 msg->state = RMR_ERR_NOHDR;
286 errno = EBADMSG; // must ensure it's not eagain
290 d1 = DATA1_ADDR( msg->header );
291 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
295 rmr_wh_open( ctx, ep->name );
297 return send2ep( ctx, ep, msg ); // send directly to the endpoint
301 Send a message directly to an open wormhole and then block until a response has
302 been received. The return is the same as for rmr_call(); the received buffer
303 or nil if no response was received.
305 extern rmr_mbuf_t* rmr_wh_call( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg, int call_id, int max_wait ) {
307 endpoint_t* ep; // enpoint that wormhole ID references
309 char* d1; // point at the call-id in the header
311 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
312 errno = EINVAL; // if msg is null, this is their clue
314 msg->state = RMR_ERR_BADARG;
315 errno = EINVAL; // must ensure it's not eagain
322 if( (whm = ctx->wormholes) == NULL ) {
323 errno = EINVAL; // no wormholes open
324 msg->state = RMR_ERR_NOWHOPEN;
328 if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
330 msg->state = RMR_ERR_WHID;
335 if( msg->header == NULL ) {
336 rmr_vlog( RMR_VL_ERR, "rmr_wh_call: message had no header\n" );
337 msg->state = RMR_ERR_NOHDR;
338 errno = EBADMSG; // must ensure it's not eagain
345 rmr_wh_open( ctx, ep->name );
347 return mt_call( vctx, msg, call_id, max_wait, ep ); // use main (internal) call to setup and block
350 msg->state = RMR_ERR_NOENDPT;
355 This will "close" a wormhole. We don't actually drop the session as that might be needed
356 by others, but we do pull the ep reference from the list.
358 extern void rmr_wh_close( void* vctx, int whid ) {
362 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
366 if( (whm = ctx->wormholes) == NULL || whm->eps == NULL ) {
370 if( whid >= whm->nalloc || whid < 0 ) {
374 if( whm->eps[whid] == NULL ) {
378 whm->eps[whid] = NULL;
382 Check the state of an endpoint that is associated with the wormhold ID
383 passed in. If the state is "open" then we return RMR_OK. Other possible
386 RMR_ERR_WHID // wormhole id was invalid
387 RMR_ERR_NOENDPT // the endpoint connection is not open
388 RMR_ERR_BADARG // context or other arg was invalid
389 RMR_ERR_NOWHOPEN // wormhole(s) have not been initalised
392 extern int rmr_wh_state( void* vctx, rmr_whid_t whid ) {
394 wh_mgt_t* whm; // easy reference to wh mgt stuff
395 endpoint_t* ep; // enpoint that wormhole ID references
397 if( (ctx = (uta_ctx_t *) vctx) == NULL ) { // bad stuff, bail fast
399 return RMR_ERR_BADARG;
402 if( (whm = ctx->wormholes) == NULL ) {
403 errno = EINVAL; // no wormholes open
404 return RMR_ERR_NOWHOPEN;
407 if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
414 if( (ep = whm->eps[whid]) != NULL ) {
415 return ep->open ? RMR_OK : RMR_ERR_NOENDPT;
418 return RMR_ERR_NOENDPT;