feat(API): Add subscription id and source retrieval
[ric-plt/lib/rmr.git] / src / nng / src / rtable_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_nng_static.c
23         Abstract:       Route table management functions which depend on the underlying
24                                 transport mechanism and thus cannot be included with the generic
25                                 route table functions.
26
27                                 This module is designed to be included by any module (main) needing
28                                 the static/private stuff.
29
30         Author:         E. Scott Daniels
31         Date:           29 November 2018
32 */
33
34 #ifndef rtable_static_c
35 #define rtable_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47
48
49 // -----------------------------------------------------------------------------------------------------
50
51 /*
52         Establish a TCP connection to the indicated target (IP address).
53         Target assumed to be address:port.  The new socket is returned via the
54         user supplied pointer so that a success/fail code is returned directly.
55         Return value is 0 (false) on failure, 1 (true)  on success.
56 */
57 static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
58         char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
59         char*           addr;
60         int                     state = FALSE;
61
62         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
63                 fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
64                 return FALSE;
65         }
66
67         if( nn_sock == NULL ) {
68                 errno = EINVAL;
69                 return FALSE;
70         }
71
72         if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
73                 fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
74                 return FALSE;
75         }
76
77         snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
78         if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
79                 fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
80                 nng_close( *nn_sock );
81                 return FALSE;
82         }
83                 
84         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMAXT, 2000 );             // cap backoff on retries to reasonable amount (2s)
85         nng_dialer_setopt_ms( *dialer,  NNG_OPT_RECONNMINT, 100 );              // start retry 100m after last failure with 2s cap
86
87         if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) {                                            // can fail immediatly (unlike nanomsg)
88                 fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
89                 nng_close( *nn_sock );
90                 return FALSE;
91         }
92
93         if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
94
95         return TRUE;
96 }
97
98 /*
99         This provides a protocol independent mechanism for establishing the connection to an endpoint.
100         Return is true (1) if the link was opened; false on error.
101 */
102 static int rt_link2_ep( endpoint_t* ep ) {
103         if( ep == NULL ) {
104                 return FALSE;
105         }
106
107         if( ep->open )  {                       // already open, do nothing
108                 return TRUE;
109         }
110
111         ep->open =  uta_link2( ep->addr, &ep->nn_sock, &ep->dialer );
112         return ep->open;
113 }
114
115
116 /*
117         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the 
118         hash we add it and create the endpoint struct.
119
120         The caller must supply the specific route table (we assume it will be pending, but they
121         could live on the edge and update the active one, though that's not at all a good idea).
122 */
123 extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
124         endpoint_t*     ep;
125         rrgroup_t* rrg;                         // pointer at group to update
126
127         if( ! rte || ! rt ) {
128                 fprintf( stderr, "[WARN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
129                 return NULL;
130         }
131
132         if( rte->nrrgroups <= group ) {
133                 fprintf( stderr, "[WARN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
134                 return NULL;
135         }
136
137         if( (rrg = rte->rrgroups[group]) == NULL ) {
138                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
139                         fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
140                         return NULL;
141                 }
142                 memset( rrg, 0, sizeof( *rrg ) );
143
144                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
145                         fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
146                         return NULL;
147                 }
148                 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
149
150                 rte->rrgroups[group] = rrg;
151
152                 rrg->ep_idx = 0;                                                // next endpoint to send to
153                 rrg->nused = 0;                                                 // number populated
154                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
155         }
156
157         ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
158         /*
159         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
160                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
161                         fprintf( stderr, "uta: [WARN] malloc failed for endpoint creation: %s\n", ep_name );
162                         return NULL;
163                 }
164
165                 ep->open = 0;                                   // not connected
166                 ep->addr = uta_h2ip( ep_name );
167                 ep->name = strdup( ep_name );
168
169                 rmr_sym_put( rt->hash, ep_name, 1, ep );
170         }
171         */
172
173         if( rrg != NULL ) {
174                 if( rrg->nused >= rrg->nendpts ) {
175                         // future: reallocate
176                         fprintf( stderr, "[WARN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
177                         return NULL;
178                 }
179
180                 rrg->epts[rrg->nused] = ep;
181                 rrg->nused++;
182         }
183
184         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
185         return ep;
186 }
187
188
189 /*
190         Given a name, find the nano socket needed to send to it. Returns the socket via
191         the user pointer passed in and sets the return value to true (1). If the
192         endpoint cannot be found false (0) is returned.
193 */
194 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
195         endpoint_t* ep;
196         int state = FALSE;
197
198         if( rt == NULL ) {
199                 return FALSE;
200         }
201
202
203         ep =  rmr_sym_get( rt->hash, ep_name, 1 );
204         if( ep == NULL ) {
205                 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
206                 return FALSE;
207         }
208
209         if( ! ep->open )  {                                                                             // not open -- connect now
210                 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
211                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
212                         ep->addr = uta_h2ip( ep->name );
213                 }
214                 if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
215                         state = TRUE;
216                         ep->open = TRUE;
217                         *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
218                 }
219                 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
220         } else {
221                 *nn_sock = ep->nn_sock;
222                 state = TRUE;
223         }       
224
225         return state;
226 }
227
228 /*
229         Make a round robin selection within a round robin group for a route table
230         entry. Returns the nanomsg socket if there is a rte for the message
231         type, and group is defined. Socket is returned via pointer in the parm
232         list (nn_sock).
233
234         The group is the group number to select from.
235
236         The user supplied (via pointer to) integer 'more' will be set if there are
237         additional groups beyond the one selected. This allows the caller to
238         to easily iterate over the group list -- more is set when the group should
239         be incremented and the function invoked again. Groups start at 0.
240
241         The return value is true (>0) if the socket was found and *nn_sock was updated
242         and false (0) if there is no associated socket for the msg type, group combination.
243         We return the index+1 from the round robin table on success so that we can verify
244         during test that different entries are being seleted; we cannot depend on the nng
245         socket being different as we could with nano.
246 */
247 static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock ) {
248         rtable_ent_t* rte;                      // matching rt entry
249         endpoint_t*     ep;                             // seected end point
250         int  state = FALSE;                     // processing state
251         int dummy;
252         rrgroup_t* rrg;
253
254
255         if( ! more ) {                          // eliminate cheks each time we need to user
256                 more = &dummy;
257         }
258
259         if( ! nn_sock ) {                       // user didn't supply a pointer
260                 errno = EINVAL;
261                 *more = 0;
262                 return FALSE;
263         }
264
265         if( rt == NULL ) {
266                 *more = 0;
267                 return FALSE;
268         }
269
270         if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) {
271                 *more = 0;
272                 //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype );
273                 return FALSE;
274         }
275
276         if( group < 0 || group >= rte->nrrgroups ) {
277                 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups );
278                 *more = 0;
279                 return FALSE;
280         }
281
282         if( (rrg = rte->rrgroups[group]) == NULL ) {
283                 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype );
284                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
285                 return FALSE;
286         }
287
288         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
289
290         switch( rrg->nused ) {
291                 case 0:                         // nothing allocated, just punt
292                         //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
293                         return FALSE;
294
295                 case 1:                         // exactly one, no rr to deal with and more is not possible even if fanout > 1
296                         //*nn_sock = rrg->epts[0]->nn_sock;
297                         ep = rrg->epts[0];
298                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
299                         state = TRUE;
300                         break;
301         
302                 default:                                                                                // need to pick one and adjust rr counts
303                         ep = rrg->epts[rrg->ep_idx++];                          // select next endpoint
304                         //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
305                         if( rrg->ep_idx >= rrg->nused ) {
306                                 rrg->ep_idx = 0;
307                         }
308                         state = rrg->ep_idx+1;
309                         break;
310         }
311
312         if( state ) {                                                                           // end point selected, open if not, get socket either way
313                 if( ! ep->open ) {                                                              // not connected
314                         if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
315                                 ep->addr = uta_h2ip( ep->name );
316                         }
317
318                         if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) {                // find entry in table and create link
319                                 ep->open = TRUE;
320                                 *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
321                         } else {
322                                 state = FALSE;
323                         }
324                         if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
325                 } else {
326                         *nn_sock = ep->nn_sock;
327                 }
328         }
329
330         return state;
331 }
332
333 #endif