3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: All functions (internal and external) needed to manage wormholes.
25 Wormholes allow a user application to send directly to an endpoint.
26 The application must first "open" the wormhole which allows us to
27 provide the application with an ID that can be used on a wh_send()
28 call. It also does the validation (future) which might not allow
29 the application to open any wormholes, or may allow them only to
32 Author: E. Scott Daniels
33 Date: 13 February 2019
46 #include "rmr_symtab.h"
51 #include <nng/protocol/pubsub0/pub.h>
52 #include <nng/protocol/pubsub0/sub.h>
53 #include <nng/protocol/pipeline0/push.h>
54 #include <nng/protocol/pipeline0/pull.h>
56 #include "rmr_nng_private.h"
57 #include "rt_generic_static.c"
58 #include "rtable_nng_static.c"
59 #include "sr_nng_static.c"
62 #include <nanomsg/nn.h>
63 #include <nanomsg/tcp.h>
64 #include <nanomsg/pair.h>
65 #include <nanomsg/pipeline.h>
66 #include <nanomsg/pubsub.h>
68 #include "rmr_private.h"
69 #include "rt_generic_static.c"
70 #include "rtable_static.c"
71 #include "sr_static.c"
75 #include "tools_static.c"
79 // ----------------------- internal stuff -----------------------------------------------
82 This function returns true if the current application is permitted to open a wormhole
83 to the desired target.
85 This is a place holder for future functionality.
87 static int wh_can_open( uta_ctx_t* ctx, char const* target ) {
92 Allocate and initialise the wormholes list; point context at it.
94 static int wh_init( uta_ctx_t* ctx ) {
103 if( ctx->wormholes != NULL ) { // already allocated, do nothing but signal all is well
107 if( (whm = malloc( sizeof( *whm ) )) == NULL ) {
108 rmr_vlog( RMR_VL_ERR, "mem alloc failed for whm: alloc %d bytes\n", (int) sizeof( *whm ) );
114 alloc_sz = whm->nalloc * sizeof( endpoint_t );
115 if( (whm->eps = (endpoint_t **) malloc( alloc_sz )) == NULL ) {
116 rmr_vlog( RMR_VL_ERR, "mem alloc failed: alloc %d bytes\n", (int) alloc_sz );
122 memset( whm->eps, 0, alloc_sz );
124 ctx->wormholes = whm;
130 Realloc the wormhole endpoint list.
131 Returns 0 if failure with errno set; !0 on success.
133 static int wh_extend( wh_mgt_t* whm ) {
138 i = whm->nalloc; // starting point for initialisation after realloc
141 alloc_sz = whm->nalloc * sizeof( endpoint_t );
142 if( (whm->eps = (endpoint_t **) realloc( whm->eps, alloc_sz )) == NULL ) {
147 for( j = 0; j < 16; j++ ) {
148 whm->eps[i++] = NULL; // must init the new stuff
156 Mostly for leak analysis during testing.
158 static void wh_nuke( uta_ctx_t* ctx ) {
163 if( ctx->wormholes ) {
164 if( ctx->wormholes->eps ) {
165 free( ctx->wormholes->eps );
167 free( ctx->wormholes );
170 ctx->wormholes = NULL;
173 // ----------------------- visible stuff ------------------------------------------------
176 Opens a direct wormhole connection to the named target. Target is expected to be
177 either hostname:port or IP:port. If we don't have an endpoint in our hash, we'll
179 Unlike 'regular' connections to endpoints which are connected on the first send
180 attempt, when a wormhole is opened we connect immediatly. In the NNG world this
181 could result in a delay and immediate failure. With nanomsg the failure may not
182 be detected as the connect doesn't block.
184 extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
185 endpoint_t* ep; // endpoint that represents the target
186 uta_ctx_t* ctx = NULL;
187 rmr_whid_t whid = -1; // wormhole id is the index into the list
188 wh_mgt_t* whm; // easy reference to wh mgt stuff
192 if( (ctx = (uta_ctx_t *) vctx) == NULL || target == NULL || *target == 0 ) {
197 if( ! wh_can_open( ctx, target ) ) {
202 if( ctx->wormholes == NULL ) {
203 if( ! wh_init( ctx ) ) { // first call, we need to set things up
204 return whid; // fail with errno set by init
208 whm = ctx->wormholes;
211 if( (ep = rt_ensure_ep( ctx->rtable, target )) == NULL ) { // get pointer to ep if there, create new if not
212 rmr_vlog( RMR_VL_ERR, "wormhole_open: ensure ep returned bad: target=(%s)\n", target );
213 return -1; // ensure sets errno
217 for( i = 0; i < whm->nalloc; i++ ) { // look for a pointer to the ep, and find first open spot
218 if( whid == whm->nalloc && !whm->eps[i] ) {
219 whid = i; // save first open slot should we need it
222 if( whm->eps[i] == ep ) {
223 if( whm->eps[i]->open ) { // we know about it and it's open
224 return i; // just send back the reference
227 whid = i; // have it, but not open, reopen
232 if( whid >= whm->nalloc ) {
233 if( ! wh_extend( whm ) ) { // add some; whid will point to the right place
238 if( !rt_link2_ep( ctx, ep ) ) { // start a connection if not already open
239 errno = ECONNREFUSED;
249 Send a message directly to an open wormhole.
250 As with the other send functions in RMr, we return a new zero copy buffer for the
251 user application to fill in.
253 extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg ) {
255 endpoint_t* ep; // enpoint that wormhole ID references
257 char* d1; // point at the call-id in the header
259 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
260 errno = EINVAL; // if msg is null, this is their clue
262 msg->state = RMR_ERR_BADARG;
263 errno = EINVAL; // must ensure it's not eagain
270 if( (whm = ctx->wormholes) == NULL ) {
271 errno = EINVAL; // no wormholes open
272 msg->state = RMR_ERR_NOWHOPEN;
276 if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
277 errno = EINVAL; // no wormholes open
278 msg->state = RMR_ERR_WHID;
283 if( msg->header == NULL ) {
284 rmr_vlog( RMR_VL_ERR, "rmr_wh_send_msg: message had no header\n" );
285 msg->state = RMR_ERR_NOHDR;
286 errno = EBADMSG; // must ensure it's not eagain
290 d1 = DATA1_ADDR( msg->header );
291 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
295 rmr_wh_open( ctx, ep->name );
297 return send2ep( ctx, ep, msg ); // send directly to the endpoint
301 This will "close" a wormhole. We don't actually drop the session as that might be needed
302 by others, but we do pull the ep reference from the list.
304 extern void rmr_wh_close( void* vctx, int whid ) {
308 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
312 if( (whm = ctx->wormholes) == NULL || whm->eps == NULL ) {
316 if( whid >= whm->nalloc || whid < 0 ) {
320 if( whm->eps[whid] == NULL ) {
324 whm->eps[whid] = NULL;