Add alarm generation when application is slow
[ric-plt/lib/rmr.git] / src / rmr / common / src / wormholes.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       wormholes.c
23         Abstract:       All functions (internal and external) needed to manage wormholes.
24
25                                 Wormholes allow a user application to send directly to an endpoint.
26                                 The application must first "open" the wormhole which allows us to
27                                 provide the application with an ID that can be used on a wh_send()
28                                 call. It also does the validation (future) which might not allow
29                                 the application to open any wormholes, or may allow them only to
30                                 specific targets.
31
32         Author:         E. Scott Daniels
33         Date:           13 February 2019
34 */
35
36 #include <unistd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <strings.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <stdint.h>
43 #include <ctype.h>
44
45 #include "rmr.h"
46 #include "rmr_symtab.h"
47
48 /*
49 #ifdef NNG
50 #include <nng/nng.h>
51 #include <nng/protocol/pubsub0/pub.h>
52 #include <nng/protocol/pubsub0/sub.h>
53 #include <nng/protocol/pipeline0/push.h>
54 #include <nng/protocol/pipeline0/pull.h>
55
56 #include "rmr_nng_private.h"
57 #include "rt_generic_static.c"
58 #include "rtable_nng_static.c"
59 #include "sr_nng_static.c"
60
61 #else
62 #include <nanomsg/nn.h>
63 #include <nanomsg/tcp.h>
64 #include <nanomsg/pair.h>
65 #include <nanomsg/pipeline.h>
66 #include <nanomsg/pubsub.h>
67
68 #include "rmr_private.h"
69 #include "rt_generic_static.c"
70 #include "rtable_static.c"
71 #include "sr_static.c"
72 #endif
73 */
74
75 #include "tools_static.c"
76
77
78
79 // ----------------------- internal stuff -----------------------------------------------
80
81 /*
82         This function returns true if the current application is permitted to open a wormhole
83         to the desired target.
84
85         This is a place holder for future functionality.
86 */
87 static int wh_can_open( uta_ctx_t* ctx, char const* target ) {
88         return 1;
89 }
90
91 /*
92         Allocate and initialise the wormholes list; point context at it.
93 */
94 static int wh_init( uta_ctx_t* ctx ) {
95         wh_mgt_t*       whm;
96         size_t          alloc_sz;
97
98         if( ctx == NULL ) {
99                 errno = EINVAL;
100                 return 0;
101         }
102
103         if( ctx->wormholes != NULL ) {          // already allocated, do nothing but signal all is well
104                 return 1;
105         }
106
107         if( (whm  = malloc( sizeof( *whm ) )) == NULL ) {
108                 rmr_vlog( RMR_VL_ERR, "mem alloc failed for whm: alloc %d bytes\n", (int) sizeof( *whm ) );
109                 errno = ENOMEM;
110                 return 0;
111         }
112
113         whm->nalloc = 16;
114         alloc_sz = whm->nalloc * sizeof( endpoint_t );
115         if( (whm->eps = (endpoint_t **) malloc( alloc_sz )) == NULL ) {
116                 rmr_vlog( RMR_VL_ERR, "mem alloc failed: alloc %d bytes\n", (int) alloc_sz );
117                 free( whm );
118                 errno = ENOMEM;
119                 return 0;
120         }
121
122         memset( whm->eps, 0, alloc_sz );
123
124         ctx->wormholes = whm;
125         errno = 0;
126         return 1;
127 }
128
129 /*
130         Realloc the wormhole endpoint list.
131         Returns 0 if failure with errno set; !0 on success.
132 */
133 static int wh_extend( wh_mgt_t* whm ) {
134         int i;
135         int j;
136         size_t  alloc_sz;
137
138         i = whm->nalloc;                // starting point for initialisation after realloc
139         whm->nalloc += 16;
140
141         alloc_sz = whm->nalloc * sizeof( endpoint_t );
142         if( (whm->eps = (endpoint_t **) realloc( whm->eps, alloc_sz )) == NULL ) {
143                 errno = ENOMEM;
144                 return 0;
145         }
146
147         for( j = 0; j < 16; j++ ) {
148                 whm->eps[i++] = NULL;                   // must init the new stuff
149         }
150
151         errno = 0;
152         return 1;
153 }
154
155 /*
156         Mostly for leak analysis during testing.
157 */
158 static void wh_nuke( uta_ctx_t* ctx ) {
159         if( ctx == NULL ) {
160                 return;
161         }
162
163         if( ctx->wormholes ) {
164                 if( ctx->wormholes->eps ) {
165                         free( ctx->wormholes->eps );
166                 }
167                 free( ctx->wormholes );
168         }
169
170         ctx->wormholes = NULL;
171 }
172
173 // ----------------------- visible stuff  ------------------------------------------------
174
175 /*
176         Opens a direct wormhole connection to the named target. Target is expected to be
177         either hostname:port or IP:port.  If we don't have an endpoint in our hash, we'll
178         create one.
179         Unlike 'regular' connections to endpoints which are connected on the first send
180         attempt, when a wormhole is opened we connect immediatly. In the NNG world this
181         could result in a delay and immediate failure. With nanomsg the failure may not
182         be detected as the connect doesn't block.
183 */
184 extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
185         endpoint_t*     ep;                             // endpoint that represents the target
186         uta_ctx_t*      ctx = NULL;
187         rmr_whid_t      whid = -1;              // wormhole id is the index into the list
188         wh_mgt_t*       whm;                    // easy reference to wh mgt stuff
189         int                     i;
190         route_table_t*  rt;                     // the currently active route table
191
192
193         if( (ctx = (uta_ctx_t *) vctx) == NULL || target == NULL || *target == 0 ) {
194                 errno = EINVAL;
195                 return whid;
196         }
197
198         if( ! wh_can_open( ctx, target ) )  {
199                 errno = EACCES;
200                 return whid;
201         }
202
203         if( ctx->wormholes == NULL ) {
204                 if( ! wh_init( ctx ) ) {                                        // first call, we need to set things up
205                         return whid;                                                    // fail with errno set by init
206                 }
207         }
208
209         whm = ctx->wormholes;
210
211         rt = get_rt( ctx );                                                                             // get and raise ref counter
212         ep = rt_ensure_ep( rt, target );                                                // get pointer to ep if there, create new if not
213         release_rt( ctx, rt );                                                                  // release use counter
214         if( ep == NULL ) {
215                 rmr_vlog( RMR_VL_ERR, "wormhole_open: ensure ep returned bad: target=(%s)\n", target );
216                 return -1;                      // ensure sets errno
217         }
218
219         whid = whm->nalloc;
220         for( i = 0; i < whm->nalloc; i++ ) {                            // look for a pointer to the ep, and find first open spot
221                 if( whid == whm->nalloc && !whm->eps[i] ) {
222                         whid = i;                                                                       // save first open slot should we need it
223                 }
224
225                 if( whm->eps[i] == ep ) {
226                         if(  whm->eps[i]->open ) {                                      // we know about it and it's open
227                                 return i;                                                               // just send back the reference
228                         }
229
230                         whid = i;                                                                       // have it, but not open, reopen
231                         break;
232                 }
233         }
234
235         if( whid >= whm->nalloc ) {
236                 if( ! wh_extend( whm  ) ) {                                     // add some; whid will point to the right place
237                         return -1;
238                 }
239         }
240
241         if( !rt_link2_ep( ctx, ep ) ) {                 // start a connection if not already open
242                 errno = ECONNREFUSED;
243                 return -1;
244         }
245
246         whm->eps[whid] = ep;
247         return whid;
248 }
249
250
251 /*
252         Send a message directly to an open wormhole.
253         As with the other send functions in RMr, we return a new zero copy buffer for the
254         user application to fill in.
255 */
256 extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg ) {
257         uta_ctx_t*      ctx;
258         endpoint_t*     ep;                             // enpoint that wormhole ID references
259         wh_mgt_t *whm;
260         char* d1;                                       // point at the call-id in the header
261
262         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
263                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
264                 if( msg != NULL ) {
265                         msg->state = RMR_ERR_BADARG;
266                         errno = EINVAL;                                                                                 // must ensure it's not eagain
267                 }
268                 return msg;
269         }
270
271         msg->state = RMR_OK;
272
273         if( (whm = ctx->wormholes) == NULL ) {
274                 errno = EINVAL;                                                                                         // no wormholes open
275                 msg->state = RMR_ERR_NOWHOPEN;
276                 return msg;
277         }
278
279         if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
280                 errno = EINVAL;                                                                                         // no wormholes open
281                 msg->state = RMR_ERR_WHID;
282                 return msg;
283         }
284
285         errno = 0;
286         if( msg->header == NULL ) {
287                 rmr_vlog( RMR_VL_ERR, "rmr_wh_send_msg: message had no header\n" );
288                 msg->state = RMR_ERR_NOHDR;
289                 errno = EBADMSG;                                                                                // must ensure it's not eagain
290                 return msg;
291         }
292
293         d1 = DATA1_ADDR( msg->header );
294         d1[D1_CALLID_IDX] = NO_CALL_ID;                                                         // must blot out so it doesn't queue on a chute at the other end
295
296         ep = whm->eps[whid];
297         if( ! ep->open ) {
298                 rmr_wh_open( ctx, ep->name );
299         }
300         return send2ep( ctx, ep, msg );                                                 // send directly to the endpoint
301 }
302
303 /*
304         Send a message directly to an open wormhole and then block until a response has
305         been received.  The return is the same as for rmr_call(); the received buffer
306         or nil if no response was received.
307 */
308 extern rmr_mbuf_t* rmr_wh_call( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg, int call_id, int max_wait ) {
309         uta_ctx_t*      ctx;
310         endpoint_t*     ep;                             // enpoint that wormhole ID references
311         wh_mgt_t *whm;
312         char* d1;                                       // point at the call-id in the header
313
314         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
315                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
316                 if( msg != NULL ) {
317                         msg->state = RMR_ERR_BADARG;
318                         errno = EINVAL;                                                                                 // must ensure it's not eagain
319                 }
320                 return msg;
321         }
322
323         msg->state = RMR_OK;
324
325         if( (whm = ctx->wormholes) == NULL ) {
326                 errno = EINVAL;                                                                                         // no wormholes open
327                 msg->state = RMR_ERR_NOWHOPEN;
328                 return msg;
329         }
330
331         if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
332                 errno = EINVAL;
333                 msg->state = RMR_ERR_WHID;
334                 return msg;
335         }
336
337         errno = 0;
338         if( msg->header == NULL ) {
339                 rmr_vlog( RMR_VL_ERR, "rmr_wh_call: message had no header\n" );
340                 msg->state = RMR_ERR_NOHDR;
341                 errno = EBADMSG;                                                                                // must ensure it's not eagain
342                 return msg;
343         }
344
345         ep = whm->eps[whid];
346         if( ep != NULL ) {
347                 if( ! ep->open ) {
348                         rmr_wh_open( ctx, ep->name );
349                 }
350                 return mt_call( vctx, msg, call_id, max_wait, ep );                     // use main (internal) call to setup and block
351         }
352
353         msg->state = RMR_ERR_NOENDPT;
354         return msg;
355 }
356
357 /*
358         This will "close" a wormhole.  We don't actually drop the session as that might be needed
359         by others, but we do pull the ep reference from the list.
360 */
361 extern void rmr_wh_close( void* vctx, int whid ) {
362         uta_ctx_t*      ctx;
363         wh_mgt_t *whm;
364
365         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
366                 return;
367         }
368
369         if( (whm = ctx->wormholes) == NULL || whm->eps == NULL ) {
370                 return;
371         }
372
373         if( whid >= whm->nalloc || whid < 0 ) {
374                 return;
375         }
376
377         if( whm->eps[whid] == NULL ) {
378                 return;
379         }
380
381         whm->eps[whid] = NULL;
382 }
383
384 /*
385         Check the state of an endpoint that is associated with the wormhold ID
386         passed in. If the state is "open" then we return RMR_OK. Other possible
387         return codes:
388
389                 RMR_ERR_WHID        // wormhole id was invalid
390                 RMR_ERR_NOENDPT     // the endpoint connection is not open
391                 RMR_ERR_BADARG          // context or other arg was invalid
392                 RMR_ERR_NOWHOPEN        // wormhole(s) have not been initalised
393
394 */
395 extern int rmr_wh_state( void* vctx, rmr_whid_t whid ) {
396         uta_ctx_t*      ctx;
397         wh_mgt_t*       whm;                    // easy reference to wh mgt stuff
398         endpoint_t*     ep;                             // enpoint that wormhole ID references
399
400         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {              // bad stuff, bail fast
401                 errno = EINVAL;
402                 return RMR_ERR_BADARG;
403         }
404
405         if( (whm = ctx->wormholes) == NULL ) {
406                 errno = EINVAL;                                                                                         // no wormholes open
407                 return RMR_ERR_NOWHOPEN;
408         }
409
410         if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
411                 errno = EINVAL;
412                 return RMR_ERR_WHID;
413         }
414
415         errno = 0;
416
417         if( (ep = whm->eps[whid]) != NULL ) {
418                 return ep->open ? RMR_OK : RMR_ERR_NOENDPT;
419         }
420
421         return RMR_ERR_NOENDPT;
422 }