Remove bad includes from SI code
[ric-plt/lib/rmr.git] / src / rmr / nanomsg / src / rtable_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rtable_static.c
23         Abstract:       Route table management functions.
24         Author:         E. Scott Daniels
25         Date:           29 November 2018
26 */
27
28 #ifndef rtable_static_c
29 #define rtable_static_c
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <netdb.h>
34 #include <errno.h>
35 #include <string.h>
36 #include <errno.h>
37 #include <fcntl.h>
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <unistd.h>
41
42
43
44 /*
45         Establish a TCP connection to the indicated target (IP address).
46         Target assumed to be address:port. Requires a separate nano socket;
47         the socket number (for future sends) is returned or -1 on error.
48 */
49 static int uta_link2( char* target ) {
50         char    conn_info[NN_SOCKADDR_MAX];     // string to give to nano to make the connection
51         int             nn_sock;                                        // the nano socket for this link
52         char*   addr;
53
54         if( target == NULL  ||  (addr = strchr( target, ':' )) == NULL ) {              // bad address:port
55                 fprintf( stderr, "[INFO] rmr: rmr_link2: unable to create link: invalid target: %s\n", target == NULL ? "<nil>" : target );
56                 return -1;
57         }
58
59         nn_sock = nn_socket( AF_SP, NN_PUSH );          // the socket we'll use to connect to the target
60         if( nn_sock < 0 ) {
61                 fprintf( stderr, "[WRN] rmr: link2: unable to create socket for link to target: %s: %d\n\n\n", target, errno );
62                 return -1;
63         }
64
65         snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
66         if( nn_connect( nn_sock, conn_info ) < 0 ) {                                                    // connect failed
67                 fprintf( stderr, "[WRN] rmr: link2: unable to create link to target: %s: %d\n\n\n", target, errno );
68                 nn_close( nn_sock );
69                 return -1;
70         }
71
72         return nn_sock;
73 }
74
75 /*
76         This provides a protocol independent mechanism for establishing the connection to an endpoint.
77         Returns true on success; false otherwise.
78 */
79 static int rt_link2_ep( endpoint_t* ep ) {
80         if( ep == NULL ) {
81                 return FALSE;
82         }
83
84         if( ep->open ) {
85                 return TRUE;
86         }
87
88         ep->nn_sock =  uta_link2( ep->addr ) >= 0;                      // open if a valid socket returned
89         ep->open = ep->nn_sock >= 0;
90         return ep->open;
91 }
92
93 /*
94         Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
95         hash we add it and create the endpoint struct.
96
97         The caller must supply the specific route table (we assume it will be pending, but they
98         could live on the edge and update the active one, though that's not at all a good idea).
99 */
100 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  ) {
101         endpoint_t*     ep;
102         rrgroup_t* rrg;                         // pointer at group to update
103
104         if( ! rte || ! rt ) {
105                 fprintf( stderr, "[WRN] rmr_add_ep didn't get a valid rt and/or rte pointer\n" );
106                 return NULL;
107         }
108
109         if( rte->nrrgroups <= group ) {
110                 fprintf( stderr, "[WRN] rmr_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
111                 return NULL;
112         }
113
114         if( (rrg = rte->rrgroups[group]) == NULL ) {
115                 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
116                         fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
117                         return NULL;
118                 }
119                 memset( rrg, 0, sizeof( *rrg ) );
120
121                 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
122                         fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
123                         return NULL;
124                 }
125                 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
126
127                 rte->rrgroups[group] = rrg;
128
129                 rrg->ep_idx = 0;                                                // next to send to
130                 rrg->nused = 0;                                                 // number populated
131                 rrg->nendpts = MAX_EP_GROUP;                    // number allocated
132         }
133
134         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
135                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
136                         fprintf( stderr, "uta: [WRN] malloc failed for endpoint creation: %s\n", ep_name );
137                         return NULL;
138                 }
139
140                 ep->nn_sock = -1;                                       // not connected
141                 ep->open = 0;
142                 ep->addr = uta_h2ip( ep_name );
143                 ep->name = strdup( ep_name );
144
145                 rmr_sym_put( rt->hash, ep_name, 1, ep );
146         }
147
148         if( rrg != NULL ) {
149                 if( rrg->nused >= rrg->nendpts ) {
150                         // future: reallocate
151                         fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
152                         return NULL;
153                 }
154
155                 rrg->epts[rrg->nused] = ep;
156                 rrg->nused++;
157         }
158
159         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
160         return ep;
161 }
162
163 /*
164         Given a name, find the nano socket needed to send to it. Returns the socket number if
165         found; -1 on error.
166 */
167 static int uta_epsock_byname( route_table_t* rt, char* ep_name ) {
168         endpoint_t* ep;
169
170         if( rt == NULL ) {
171                 return -1;
172         }
173
174         ep =  rmr_sym_get( rt->hash, ep_name, 1 );
175         if( ep == NULL ) {
176                 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
177                         return -1;
178                 }
179         }
180
181         if( !ep->open  ) {                                                              // not connected; must connect now
182                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
183                         ep->addr = uta_h2ip( ep->name );
184                 }
185                 ep->nn_sock = uta_link2( ep->addr );
186                 ep->open = ep->nn_sock >= 0;
187                 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", ep->nn_sock >= 0 ? "[OK]" : "[FAIL]", ep->name );
188         }
189
190         return ep->nn_sock;
191 }
192
193 /*
194         Make a round robin selection within a round robin group for a route table
195         entry. Returns the nanomsg socket number if there is a rte for the message
196         type, and group is defined, else returns -1.
197
198         The group is the group number to select from.
199
200         The user supplied integer 'more' will be set if there are additional groups
201         defined to the matching route table entry which have a higher group number.
202         This assumes the caller is making a sequential pass across groups starting
203         with group 0. If more is set, the caller may increase the group number and
204         invoke this function again to make a selection against that group. If there
205         are no more groups, more is set to 0.
206
207         NOTE:   The round robin selection index increment might collide with other
208                 threads if multiple threads are attempting to send to the same round
209                 robin group; the consequences are small and avoid locking. The only side
210                 effect is either sending two messages in a row to, or skipping, an endpoint.
211                 Both of these, in the grand scheme of things, is minor compared to the
212                 overhead of grabbing a lock on each call.
213 */
214 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
215         rtable_ent_t* rte;                      // matching rt entry
216         endpoint_t*     ep;                             // seected end point
217         int nn_sock = -2;
218         int dummy;
219         rrgroup_t* rrg;
220         int     idx;
221
222
223         if( ! more ) {                          // eliminate checks each time we need to use
224                 more = &dummy;
225         }
226
227         if( rt == NULL ) {
228                 *more = 0;
229                 return -1;
230         }
231
232         if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
233                 *more = 0;
234                 //if( DEBUG ) fprintf( stderr, "#### >>> rte not found for type key=%lu\n", key );
235                 return -1;
236         }
237
238         if( group < 0 || group >= rte->nrrgroups ) {
239                 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
240                 *more = 0;
241                 return -1;
242         }
243
244         if( (rrg = rte->rrgroups[group]) == NULL ) {
245                 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %lu\n", key );
246                 *more = 0;                                      // groups are inserted contig, so nothing should be after a nil pointer
247                 return -1;
248         }
249
250         *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
251
252         switch( rrg->nused ) {
253                 case 0:                         // nothing allocated, just punt
254                         //if( DEBUG )
255                         return -1;
256
257                 case 1:                         // exactly one, no rr to deal with
258                         nn_sock = rrg->epts[0]->nn_sock;
259                         ep = rrg->epts[0];
260                         break;
261
262                 default:                                                                                // need to pick one and adjust rr counts
263                         idx = rrg->ep_idx++ % rrg->nused;                       // see note above
264                         ep = rrg->epts[idx];
265                         nn_sock = ep->nn_sock;
266                         break;
267         }
268
269         if( ep && ! ep->open ) {                                // not connected
270                 if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
271                         ep->addr = uta_h2ip( ep->name );
272                 }
273                 ep->nn_sock = nn_sock = uta_link2( ep->addr );
274                 ep->open = ep->nn_sock >= 0;
275                 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection state to %s: %s\n", ep->name, nn_sock >= 0 ? "[OK]" : "[FAIL]" );
276         }
277
278         return nn_sock;
279 }
280
281
282 #endif