Merge "Added new message type constants"
[ric-plt/lib/rmr.git] / src / rmr / common / src / wormholes.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       wormholes.c
23         Abstract:       All functions (internal and external) needed to manage wormholes.
24
25                                 Wormholes allow a user application to send directly to an endpoint.
26                                 The application must first "open" the wormhole which allows us to
27                                 provide the application with an ID that can be used on a wh_send()
28                                 call. It also does the validation (future) which might not allow
29                                 the application to open any wormholes, or may allow them only to
30                                 specific targets.
31
32         Author:         E. Scott Daniels
33         Date:           13 February 2019
34 */
35
36 #include <unistd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <strings.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <stdint.h>
43 #include <ctype.h>
44
45 #include "rmr.h"
46 #include "rmr_symtab.h"
47
48 /*
49 #ifdef NNG
50 #include <nng/nng.h>
51 #include <nng/protocol/pubsub0/pub.h>
52 #include <nng/protocol/pubsub0/sub.h>
53 #include <nng/protocol/pipeline0/push.h>
54 #include <nng/protocol/pipeline0/pull.h>
55
56 #include "rmr_nng_private.h"
57 #include "rt_generic_static.c"
58 #include "rtable_nng_static.c"
59 #include "sr_nng_static.c"
60
61 #else
62 #include <nanomsg/nn.h>
63 #include <nanomsg/tcp.h>
64 #include <nanomsg/pair.h>
65 #include <nanomsg/pipeline.h>
66 #include <nanomsg/pubsub.h>
67
68 #include "rmr_private.h"
69 #include "rt_generic_static.c"
70 #include "rtable_static.c"
71 #include "sr_static.c"
72 #endif
73 */
74
75 #include "tools_static.c"
76
77
78
79 // ----------------------- internal stuff -----------------------------------------------
80
81 /*
82         This function returns true if the current application is permitted to open a wormhole
83         to the desired target.
84
85         This is a place holder for future functionality.
86 */
87 static int wh_can_open( uta_ctx_t* ctx, char const* target ) {
88         return 1;
89 }
90
91 /*
92         Allocate and initialise the wormholes list; point context at it.
93 */
94 static int wh_init( uta_ctx_t* ctx ) {
95         wh_mgt_t*       whm;
96         size_t          alloc_sz;
97
98         if( ctx == NULL ) {
99                 errno = EINVAL;
100                 return 0;
101         }
102
103         if( ctx->wormholes != NULL ) {          // already allocated, do nothing but signal all is well
104                 return 1;
105         }
106
107         if( (whm  = malloc( sizeof( *whm ) )) == NULL ) {
108                 fprintf( stderr, "mem alloc failed for whm: alloc %d bytes\n", (int) sizeof( *whm ) );
109                 errno = ENOMEM;
110                 return 0;
111         }
112
113         whm->nalloc = 16;
114         alloc_sz = whm->nalloc * sizeof( endpoint_t );
115         if( (whm->eps = (endpoint_t **) malloc( alloc_sz )) == NULL ) {
116                 fprintf( stderr, "mem alloc failed: alloc %d bytes\n", (int) alloc_sz );
117                 free( whm );
118                 errno = ENOMEM;
119                 return 0;
120         }
121
122         memset( whm->eps, 0, alloc_sz );
123
124         ctx->wormholes = whm;
125         errno = 0;
126         return 1;
127 }
128
129 /*
130         Realloc the wormhole endpoint list.
131         Returns 0 if failure with errno set; !0 on success.
132 */
133 static int wh_extend( wh_mgt_t* whm ) {
134         int i;
135         int j;
136         size_t  alloc_sz;
137
138         i = whm->nalloc;                // starting point for initialisation after realloc
139         whm->nalloc += 16;
140
141         alloc_sz = whm->nalloc * sizeof( endpoint_t );
142         if( (whm->eps = (endpoint_t **) realloc( whm->eps, alloc_sz )) == NULL ) {
143                 errno = ENOMEM;
144                 return 0;
145         }
146
147         for( j = 0; j < 16; j++ ) {
148                 whm->eps[i++] = NULL;                   // must init the new stuff
149         }
150
151         errno = 0;
152         return 1;
153 }
154
155 /*
156         Mostly for leak analysis during testing.
157 */
158 static void wh_nuke( uta_ctx_t* ctx ) {
159         if( ctx == NULL ) {
160                 return;
161         }
162
163         if( ctx->wormholes ) {
164                 if( ctx->wormholes->eps ) {
165                         free( ctx->wormholes->eps );
166                 }
167                 free( ctx->wormholes );
168         }
169
170         ctx->wormholes = NULL;
171 }
172
173 // ----------------------- visible stuff  ------------------------------------------------
174
175 /*
176         Opens a direct wormhole connection to the named target. Target is expected to be
177         either hostname:port or IP:port.  If we don't have an endpoint in our hash, we'll
178         create one.
179         Unlike 'regular' connections to endpoints which are connected on the first send
180         attempt, when a wormhole is opened we connect immediatly. In the NNG world this
181         could result in a delay and immediate failure. With nanomsg the failure may not
182         be detected as the connect doesn't block.
183 */
184 extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
185         endpoint_t*     ep;                             // endpoint that represents the target
186         uta_ctx_t*      ctx = NULL;
187         rmr_whid_t      whid = -1;              // wormhole id is the index into the list
188         wh_mgt_t*       whm;                    // easy reference to wh mgt stuff
189         int                     i;
190
191
192         if( (ctx = (uta_ctx_t *) vctx) == NULL || target == NULL || *target == 0 ) {
193                 errno = EINVAL;
194                 return whid;
195         }
196
197         if( ! wh_can_open( ctx, target ) )  {
198                 errno = EACCES;
199                 return whid;
200         }
201
202         if( ctx->wormholes == NULL ) {
203                 if( ! wh_init( ctx ) ) {                                        // first call, we need to set things up
204                         return whid;                                                    // fail with errno set by init
205                 }
206         }
207
208         whm = ctx->wormholes;
209
210
211         if( (ep = rt_ensure_ep( ctx->rtable, target )) == NULL ) {              // get pointer to ep if there, create new if not
212                 fprintf( stderr, "ensure ep returned bad; setting no memory error\n" );
213                 return -1;                      // ensure sets errno
214         }
215
216         whid = whm->nalloc;
217         for( i = 0; i < whm->nalloc; i++ ) {                            // look for a pointer to the ep, and find first open spot
218                 if( whid == whm->nalloc && !whm->eps[i] ) {
219                         whid = i;                                                                       // save first open slot should we need it
220                 }
221
222                 if( whm->eps[i] == ep ) {
223                         return i;                                                       // we're already pointing to it, just send it back again
224                 }
225         }
226
227         if( whid >= whm->nalloc ) {
228                 if( ! wh_extend( whm  ) ) {                                     // add some; whid will point to the right place
229                         return -1;
230                 }
231         }
232
233         if( !rt_link2_ep( ep ) ) {                      // start a connection if open already
234                 errno = ECONNREFUSED;
235                 return -1;
236         }
237
238         whm->eps[whid] = ep;
239         return whid;
240 }
241
242
243 /*
244         Send a message directly to an open wormhole.
245         As with the other send functions in RMr, we return a new zero copy buffer for the
246         user application to fill in.
247 */
248 extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg ) {
249         uta_ctx_t*      ctx;
250         endpoint_t*     ep;                             // enpoint that wormhole ID references
251         wh_mgt_t *whm;
252
253         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
254                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
255                 if( msg != NULL ) {
256                         msg->state = RMR_ERR_BADARG;
257                         errno = EINVAL;                                                                                 // must ensure it's not eagain
258                 }
259                 return msg;
260         }
261
262         msg->state = RMR_OK;
263
264         if( (whm = ctx->wormholes) == NULL ) {
265                 errno = EINVAL;                                                                                         // no wormholes open
266                 msg->state = RMR_ERR_NOWHOPEN;
267                 return msg;
268         }
269
270         if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) {
271                 errno = EINVAL;                                                                                         // no wormholes open
272                 msg->state = RMR_ERR_WHID;
273                 return msg;
274         }
275
276         errno = 0;                                                                                                      // nng seems not to set errno any longer, so ensure it's clear
277         if( msg->header == NULL ) {
278                 fprintf( stderr, "[ERR] rmr_wh_send_msg: message had no header\n" );
279                 msg->state = RMR_ERR_NOHDR;
280                 errno = EBADMSG;                                                                                // must ensure it's not eagain
281                 return msg;
282         }
283
284         ep = whm->eps[whid];
285         return send2ep( ctx, ep, msg );                                                 // send directly to the endpoint
286 }
287
288 /*
289         This will "close" a wormhole.  We don't actually drop the session as that might be needed
290         by others, but we do pull the ep reference from the list.
291 */
292 extern void rmr_wh_close( void* vctx, int whid ) {
293         uta_ctx_t*      ctx;
294         wh_mgt_t *whm;
295
296         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
297                 return;
298         }
299
300         if( (whm = ctx->wormholes) == NULL || whm->eps == NULL ) {
301                 return;
302         }
303
304         if( whid >= whm->nalloc || whid < 0 ) {
305                 return;
306         }
307
308         if( whm->eps[whid] == NULL ) {
309                 return;
310         }
311
312         whm->eps[whid] = NULL;
313 }