Add the wormhole call function
[ric-plt/lib/rmr.git] / src / rmr / common / src / wormholes.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       wormholes.c
23         Abstract:       All functions (internal and external) needed to manage wormholes.
24
25                                 Wormholes allow a user application to send directly to an endpoint.
26                                 The application must first "open" the wormhole which allows us to
27                                 provide the application with an ID that can be used on a wh_send()
28                                 call. It also does the validation (future) which might not allow
29                                 the application to open any wormholes, or may allow them only to
30                                 specific targets.
31
32         Author:         E. Scott Daniels
33         Date:           13 February 2019
34 */
35
36 #include <unistd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <strings.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <stdint.h>
43 #include <ctype.h>
44
45 #include "rmr.h"
46 #include "rmr_symtab.h"
47
48 /*
49 #ifdef NNG
50 #include <nng/nng.h>
51 #include <nng/protocol/pubsub0/pub.h>
52 #include <nng/protocol/pubsub0/sub.h>
53 #include <nng/protocol/pipeline0/push.h>
54 #include <nng/protocol/pipeline0/pull.h>
55
56 #include "rmr_nng_private.h"
57 #include "rt_generic_static.c"
58 #include "rtable_nng_static.c"
59 #include "sr_nng_static.c"
60
61 #else
62 #include <nanomsg/nn.h>
63 #include <nanomsg/tcp.h>
64 #include <nanomsg/pair.h>
65 #include <nanomsg/pipeline.h>
66 #include <nanomsg/pubsub.h>
67
68 #include "rmr_private.h"
69 #include "rt_generic_static.c"
70 #include "rtable_static.c"
71 #include "sr_static.c"
72 #endif
73 */
74
75 #include "tools_static.c"
76
77
78
79 // ----------------------- internal stuff -----------------------------------------------
80
81 /*
82         This function returns true if the current application is permitted to open a wormhole
83         to the desired target.
84
85         This is a place holder for future functionality.
86 */
87 static int wh_can_open( uta_ctx_t* ctx, char const* target ) {
88         return 1;
89 }
90
91 /*
92         Allocate and initialise the wormholes list; point context at it.
93 */
94 static int wh_init( uta_ctx_t* ctx ) {
95         wh_mgt_t*       whm;
96         size_t          alloc_sz;
97
98         if( ctx == NULL ) {
99                 errno = EINVAL;
100                 return 0;
101         }
102
103         if( ctx->wormholes != NULL ) {          // already allocated, do nothing but signal all is well
104                 return 1;
105         }
106
107         if( (whm  = malloc( sizeof( *whm ) )) == NULL ) {
108                 rmr_vlog( RMR_VL_ERR, "mem alloc failed for whm: alloc %d bytes\n", (int) sizeof( *whm ) );
109                 errno = ENOMEM;
110                 return 0;
111         }
112
113         whm->nalloc = 16;
114         alloc_sz = whm->nalloc * sizeof( endpoint_t );
115         if( (whm->eps = (endpoint_t **) malloc( alloc_sz )) == NULL ) {
116                 rmr_vlog( RMR_VL_ERR, "mem alloc failed: alloc %d bytes\n", (int) alloc_sz );
117                 free( whm );
118                 errno = ENOMEM;
119                 return 0;
120         }
121
122         memset( whm->eps, 0, alloc_sz );
123
124         ctx->wormholes = whm;
125         errno = 0;
126         return 1;
127 }
128
129 /*
130         Realloc the wormhole endpoint list.
131         Returns 0 if failure with errno set; !0 on success.
132 */
133 static int wh_extend( wh_mgt_t* whm ) {
134         int i;
135         int j;
136         size_t  alloc_sz;
137
138         i = whm->nalloc;                // starting point for initialisation after realloc
139         whm->nalloc += 16;
140
141         alloc_sz = whm->nalloc * sizeof( endpoint_t );
142         if( (whm->eps = (endpoint_t **) realloc( whm->eps, alloc_sz )) == NULL ) {
143                 errno = ENOMEM;
144                 return 0;
145         }
146
147         for( j = 0; j < 16; j++ ) {
148                 whm->eps[i++] = NULL;                   // must init the new stuff
149         }
150
151         errno = 0;
152         return 1;
153 }
154
155 /*
156         Mostly for leak analysis during testing.
157 */
158 static void wh_nuke( uta_ctx_t* ctx ) {
159         if( ctx == NULL ) {
160                 return;
161         }
162
163         if( ctx->wormholes ) {
164                 if( ctx->wormholes->eps ) {
165                         free( ctx->wormholes->eps );
166                 }
167                 free( ctx->wormholes );
168         }
169
170         ctx->wormholes = NULL;
171 }
172
173 // ----------------------- visible stuff  ------------------------------------------------
174
175 /*
176         Opens a direct wormhole connection to the named target. Target is expected to be
177         either hostname:port or IP:port.  If we don't have an endpoint in our hash, we'll
178         create one.
179         Unlike 'regular' connections to endpoints which are connected on the first send
180         attempt, when a wormhole is opened we connect immediatly. In the NNG world this
181         could result in a delay and immediate failure. With nanomsg the failure may not
182         be detected as the connect doesn't block.
183 */
184 extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
185         endpoint_t*     ep;                             // endpoint that represents the target
186         uta_ctx_t*      ctx = NULL;
187         rmr_whid_t      whid = -1;              // wormhole id is the index into the list
188         wh_mgt_t*       whm;                    // easy reference to wh mgt stuff
189         int                     i;
190
191
192         if( (ctx = (uta_ctx_t *) vctx) == NULL || target == NULL || *target == 0 ) {
193                 errno = EINVAL;
194                 return whid;
195         }
196
197         if( ! wh_can_open( ctx, target ) )  {
198                 errno = EACCES;
199                 return whid;
200         }
201
202         if( ctx->wormholes == NULL ) {
203                 if( ! wh_init( ctx ) ) {                                        // first call, we need to set things up
204                         return whid;                                                    // fail with errno set by init
205                 }
206         }
207
208         whm = ctx->wormholes;
209
210
211         if( (ep = rt_ensure_ep( ctx->rtable, target )) == NULL ) {              // get pointer to ep if there, create new if not
212                 rmr_vlog( RMR_VL_ERR, "wormhole_open: ensure ep returned bad: target=(%s)\n", target );
213                 return -1;                      // ensure sets errno
214         }
215
216         whid = whm->nalloc;
217         for( i = 0; i < whm->nalloc; i++ ) {                            // look for a pointer to the ep, and find first open spot
218                 if( whid == whm->nalloc && !whm->eps[i] ) {
219                         whid = i;                                                                       // save first open slot should we need it
220                 }
221
222                 if( whm->eps[i] == ep ) {
223                         if(  whm->eps[i]->open ) {                                      // we know about it and it's open
224                                 return i;                                                               // just send back the reference
225                         }
226
227                         whid = i;                                                                       // have it, but not open, reopen
228                         break;
229                 }
230         }
231
232         if( whid >= whm->nalloc ) {
233                 if( ! wh_extend( whm  ) ) {                                     // add some; whid will point to the right place
234                         return -1;
235                 }
236         }
237
238         if( !rt_link2_ep( ctx, ep ) ) {                 // start a connection if not already open
239                 errno = ECONNREFUSED;
240                 return -1;
241         }
242
243         whm->eps[whid] = ep;
244         return whid;
245 }
246
247
248 /*
249         Send a message directly to an open wormhole.
250         As with the other send functions in RMr, we return a new zero copy buffer for the
251         user application to fill in.
252 */
253 extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg ) {
254         uta_ctx_t*      ctx;
255         endpoint_t*     ep;                             // enpoint that wormhole ID references
256         wh_mgt_t *whm;
257         char* d1;                                       // point at the call-id in the header
258
259         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
260                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
261                 if( msg != NULL ) {
262                         msg->state = RMR_ERR_BADARG;
263                         errno = EINVAL;                                                                                 // must ensure it's not eagain
264                 }
265                 return msg;
266         }
267
268         msg->state = RMR_OK;
269
270         if( (whm = ctx->wormholes) == NULL ) {
271                 errno = EINVAL;                                                                                         // no wormholes open
272                 msg->state = RMR_ERR_NOWHOPEN;
273                 return msg;
274         }
275
276         if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
277                 errno = EINVAL;                                                                                         // no wormholes open
278                 msg->state = RMR_ERR_WHID;
279                 return msg;
280         }
281
282         errno = 0;
283         if( msg->header == NULL ) {
284                 rmr_vlog( RMR_VL_ERR, "rmr_wh_send_msg: message had no header\n" );
285                 msg->state = RMR_ERR_NOHDR;
286                 errno = EBADMSG;                                                                                // must ensure it's not eagain
287                 return msg;
288         }
289
290         d1 = DATA1_ADDR( msg->header );
291         d1[D1_CALLID_IDX] = NO_CALL_ID;                                                         // must blot out so it doesn't queue on a chute at the other end
292
293         ep = whm->eps[whid];
294         if( ! ep->open ) {
295                 rmr_wh_open( ctx, ep->name );
296         }
297         return send2ep( ctx, ep, msg );                                                 // send directly to the endpoint
298 }
299
300 /*
301         Send a message directly to an open wormhole and then block until a response has
302         been received.  The return is the same as for rmr_call(); the received buffer
303         or nil if no response was received.
304 */
305 extern rmr_mbuf_t* rmr_wh_call( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg, int call_id, int max_wait ) {
306         uta_ctx_t*      ctx;
307         endpoint_t*     ep;                             // enpoint that wormhole ID references
308         wh_mgt_t *whm;
309         char* d1;                                       // point at the call-id in the header
310
311         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
312                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
313                 if( msg != NULL ) {
314                         msg->state = RMR_ERR_BADARG;
315                         errno = EINVAL;                                                                                 // must ensure it's not eagain
316                 }
317                 return msg;
318         }
319
320         msg->state = RMR_OK;
321
322         if( (whm = ctx->wormholes) == NULL ) {
323                 errno = EINVAL;                                                                                         // no wormholes open
324                 msg->state = RMR_ERR_NOWHOPEN;
325                 return msg;
326         }
327
328         if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
329                 errno = EINVAL;
330                 msg->state = RMR_ERR_WHID;
331                 return msg;
332         }
333
334         errno = 0;
335         if( msg->header == NULL ) {
336                 rmr_vlog( RMR_VL_ERR, "rmr_wh_call: message had no header\n" );
337                 msg->state = RMR_ERR_NOHDR;
338                 errno = EBADMSG;                                                                                // must ensure it's not eagain
339                 return msg;
340         }
341
342         ep = whm->eps[whid];
343         if( ep != NULL ) {
344                 if( ! ep->open ) {
345                         rmr_wh_open( ctx, ep->name );
346                 }
347                 return mt_call( vctx, msg, call_id, max_wait, ep );                     // use main (internal) call to setup and block
348         }
349
350         msg->state = RMR_ERR_NOENDPT;
351         return msg;
352 }
353
354 /*
355         This will "close" a wormhole.  We don't actually drop the session as that might be needed
356         by others, but we do pull the ep reference from the list.
357 */
358 extern void rmr_wh_close( void* vctx, int whid ) {
359         uta_ctx_t*      ctx;
360         wh_mgt_t *whm;
361
362         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
363                 return;
364         }
365
366         if( (whm = ctx->wormholes) == NULL || whm->eps == NULL ) {
367                 return;
368         }
369
370         if( whid >= whm->nalloc || whid < 0 ) {
371                 return;
372         }
373
374         if( whm->eps[whid] == NULL ) {
375                 return;
376         }
377
378         whm->eps[whid] = NULL;
379 }
380
381 /*
382         Check the state of an endpoint that is associated with the wormhold ID
383         passed in. If the state is "open" then we return RMR_OK. Other possible
384         return codes:
385
386                 RMR_ERR_WHID        // wormhole id was invalid
387                 RMR_ERR_NOENDPT     // the endpoint connection is not open
388                 RMR_ERR_BADARG          // context or other arg was invalid
389                 RMR_ERR_NOWHOPEN        // wormhole(s) have not been initalised
390
391 */
392 extern int rmr_wh_state( void* vctx, rmr_whid_t whid ) {
393         uta_ctx_t*      ctx;
394         wh_mgt_t*       whm;                    // easy reference to wh mgt stuff
395         endpoint_t*     ep;                             // enpoint that wormhole ID references
396
397         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {              // bad stuff, bail fast
398                 errno = EINVAL;
399                 return RMR_ERR_BADARG;
400         }
401
402         if( (whm = ctx->wormholes) == NULL ) {
403                 errno = EINVAL;                                                                                         // no wormholes open
404                 return RMR_ERR_NOWHOPEN;
405         }
406
407         if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
408                 errno = EINVAL;
409                 return RMR_ERR_WHID;
410         }
411
412         errno = 0;
413
414         if( (ep = whm->eps[whid]) != NULL ) {
415                 return ep->open ? RMR_OK : RMR_ERR_NOENDPT;
416         }
417
418         return RMR_ERR_NOENDPT;
419 }