3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: All functions (internal and external) needed to manage wormholes.
25 Wormholes allow a user application to send directly to an endpoint.
26 The application must first "open" the wormhole which allows us to
27 provide the application with an ID that can be used on a wh_send()
28 call. It also does the validation (future) which might not allow
29 the application to open any wormholes, or may allow them only to
32 Author: E. Scott Daniels
33 Date: 13 February 2019
46 #include "rmr_symtab.h"
51 #include <nng/protocol/pubsub0/pub.h>
52 #include <nng/protocol/pubsub0/sub.h>
53 #include <nng/protocol/pipeline0/push.h>
54 #include <nng/protocol/pipeline0/pull.h>
56 #include "rmr_nng_private.h"
57 #include "rt_generic_static.c"
58 #include "rtable_nng_static.c"
59 #include "sr_nng_static.c"
62 #include <nanomsg/nn.h>
63 #include <nanomsg/tcp.h>
64 #include <nanomsg/pair.h>
65 #include <nanomsg/pipeline.h>
66 #include <nanomsg/pubsub.h>
68 #include "rmr_private.h"
69 #include "rt_generic_static.c"
70 #include "rtable_static.c"
71 #include "sr_static.c"
75 #include "tools_static.c"
79 // ----------------------- internal stuff -----------------------------------------------
82 This function returns true if the current application is permitted to open a wormhole
83 to the desired target.
85 This is a place holder for future functionality.
87 static int wh_can_open( uta_ctx_t* ctx, char const* target ) {
92 Allocate and initialise the wormholes list; point context at it.
94 static int wh_init( uta_ctx_t* ctx ) {
103 if( ctx->wormholes != NULL ) { // already allocated, do nothing but signal all is well
107 if( (whm = malloc( sizeof( *whm ) )) == NULL ) {
108 rmr_vlog( RMR_VL_ERR, "mem alloc failed for whm: alloc %d bytes\n", (int) sizeof( *whm ) );
114 alloc_sz = whm->nalloc * sizeof( endpoint_t );
115 if( (whm->eps = (endpoint_t **) malloc( alloc_sz )) == NULL ) {
116 rmr_vlog( RMR_VL_ERR, "mem alloc failed: alloc %d bytes\n", (int) alloc_sz );
122 memset( whm->eps, 0, alloc_sz );
124 ctx->wormholes = whm;
130 Realloc the wormhole endpoint list.
131 Returns 0 if failure with errno set; !0 on success.
133 static int wh_extend( wh_mgt_t* whm ) {
138 i = whm->nalloc; // starting point for initialisation after realloc
141 alloc_sz = whm->nalloc * sizeof( endpoint_t );
142 if( (whm->eps = (endpoint_t **) realloc( whm->eps, alloc_sz )) == NULL ) {
147 for( j = 0; j < 16; j++ ) {
148 whm->eps[i++] = NULL; // must init the new stuff
156 Mostly for leak analysis during testing.
158 static void wh_nuke( uta_ctx_t* ctx ) {
163 if( ctx->wormholes ) {
164 if( ctx->wormholes->eps ) {
165 free( ctx->wormholes->eps );
167 free( ctx->wormholes );
170 ctx->wormholes = NULL;
173 // ----------------------- visible stuff ------------------------------------------------
176 Opens a direct wormhole connection to the named target. Target is expected to be
177 either hostname:port or IP:port. If we don't have an endpoint in our hash, we'll
179 Unlike 'regular' connections to endpoints which are connected on the first send
180 attempt, when a wormhole is opened we connect immediatly. In the NNG world this
181 could result in a delay and immediate failure. With nanomsg the failure may not
182 be detected as the connect doesn't block.
184 extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
185 endpoint_t* ep; // endpoint that represents the target
186 uta_ctx_t* ctx = NULL;
187 rmr_whid_t whid = -1; // wormhole id is the index into the list
188 wh_mgt_t* whm; // easy reference to wh mgt stuff
190 route_table_t* rt; // the currently active route table
193 if( (ctx = (uta_ctx_t *) vctx) == NULL || target == NULL || *target == 0 ) {
198 if( ! wh_can_open( ctx, target ) ) {
203 if( ctx->wormholes == NULL ) {
204 if( ! wh_init( ctx ) ) { // first call, we need to set things up
205 return whid; // fail with errno set by init
209 whm = ctx->wormholes;
211 rt = get_rt( ctx ); // get and raise ref counter
212 ep = rt_ensure_ep( rt, target ); // get pointer to ep if there, create new if not
213 release_rt( ctx, rt ); // release use counter
215 rmr_vlog( RMR_VL_ERR, "wormhole_open: ensure ep returned bad: target=(%s)\n", target );
216 return -1; // ensure sets errno
220 for( i = 0; i < whm->nalloc; i++ ) { // look for a pointer to the ep, and find first open spot
221 if( whid == whm->nalloc && !whm->eps[i] ) {
222 whid = i; // save first open slot should we need it
225 if( whm->eps[i] == ep ) {
226 if( whm->eps[i]->open ) { // we know about it and it's open
227 return i; // just send back the reference
230 whid = i; // have it, but not open, reopen
235 if( whid >= whm->nalloc ) {
236 if( ! wh_extend( whm ) ) { // add some; whid will point to the right place
241 if( !rt_link2_ep( ctx, ep ) ) { // start a connection if not already open
242 errno = ECONNREFUSED;
252 Send a message directly to an open wormhole.
253 As with the other send functions in RMr, we return a new zero copy buffer for the
254 user application to fill in.
256 extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg ) {
258 endpoint_t* ep; // enpoint that wormhole ID references
260 char* d1; // point at the call-id in the header
262 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
263 errno = EINVAL; // if msg is null, this is their clue
265 msg->state = RMR_ERR_BADARG;
266 errno = EINVAL; // must ensure it's not eagain
273 if( (whm = ctx->wormholes) == NULL ) {
274 errno = EINVAL; // no wormholes open
275 msg->state = RMR_ERR_NOWHOPEN;
279 if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
280 errno = EINVAL; // no wormholes open
281 msg->state = RMR_ERR_WHID;
286 if( msg->header == NULL ) {
287 rmr_vlog( RMR_VL_ERR, "rmr_wh_send_msg: message had no header\n" );
288 msg->state = RMR_ERR_NOHDR;
289 errno = EBADMSG; // must ensure it's not eagain
293 d1 = DATA1_ADDR( msg->header );
294 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
298 rmr_wh_open( ctx, ep->name );
300 return send2ep( ctx, ep, msg ); // send directly to the endpoint
304 Send a message directly to an open wormhole and then block until a response has
305 been received. The return is the same as for rmr_call(); the received buffer
306 or nil if no response was received.
308 extern rmr_mbuf_t* rmr_wh_call( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg, int call_id, int max_wait ) {
310 endpoint_t* ep; // enpoint that wormhole ID references
312 char* d1; // point at the call-id in the header
314 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
315 errno = EINVAL; // if msg is null, this is their clue
317 msg->state = RMR_ERR_BADARG;
318 errno = EINVAL; // must ensure it's not eagain
325 if( (whm = ctx->wormholes) == NULL ) {
326 errno = EINVAL; // no wormholes open
327 msg->state = RMR_ERR_NOWHOPEN;
331 if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
333 msg->state = RMR_ERR_WHID;
338 if( msg->header == NULL ) {
339 rmr_vlog( RMR_VL_ERR, "rmr_wh_call: message had no header\n" );
340 msg->state = RMR_ERR_NOHDR;
341 errno = EBADMSG; // must ensure it's not eagain
348 rmr_wh_open( ctx, ep->name );
350 return mt_call( vctx, msg, call_id, max_wait, ep ); // use main (internal) call to setup and block
353 msg->state = RMR_ERR_NOENDPT;
358 This will "close" a wormhole. We don't actually drop the session as that might be needed
359 by others, but we do pull the ep reference from the list.
361 extern void rmr_wh_close( void* vctx, int whid ) {
365 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
369 if( (whm = ctx->wormholes) == NULL || whm->eps == NULL ) {
373 if( whid >= whm->nalloc || whid < 0 ) {
377 if( whm->eps[whid] == NULL ) {
381 whm->eps[whid] = NULL;
385 Check the state of an endpoint that is associated with the wormhold ID
386 passed in. If the state is "open" then we return RMR_OK. Other possible
389 RMR_ERR_WHID // wormhole id was invalid
390 RMR_ERR_NOENDPT // the endpoint connection is not open
391 RMR_ERR_BADARG // context or other arg was invalid
392 RMR_ERR_NOWHOPEN // wormhole(s) have not been initalised
395 extern int rmr_wh_state( void* vctx, rmr_whid_t whid ) {
397 wh_mgt_t* whm; // easy reference to wh mgt stuff
398 endpoint_t* ep; // enpoint that wormhole ID references
400 if( (ctx = (uta_ctx_t *) vctx) == NULL ) { // bad stuff, bail fast
402 return RMR_ERR_BADARG;
405 if( (whm = ctx->wormholes) == NULL ) {
406 errno = EINVAL; // no wormholes open
407 return RMR_ERR_NOWHOPEN;
410 if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
417 if( (ep = whm->eps[whid]) != NULL ) {
418 return ep->open ? RMR_OK : RMR_ERR_NOENDPT;
421 return RMR_ERR_NOENDPT;