132873c61b38cb4befe33a007b657de852c7329c
[ric-plt/lib/rmr.git] / src / rmr / si / include / rmr_si_private.h
1 //  vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       uta_si_private.h
23         Abstract:       Private header information for the uta nng library functions.
24                                 These are structs which have specific NNG types; anything that
25                                 does not can be included in the common/rmr_agnostic.h header.
26
27         Author:         E. Scott Daniels
28         Date:           31 November 2018
29
30         Mods:           28 Feb 2019 -- moved the majority to the agnostic header.
31 */
32
33 #ifndef _uta_private_h
34 #define _uta_private_h
35
36 // if pmode is off we don't compile in some checks in hopes of speeding things up
37 #ifndef PARINOID_CHECKS
38 #       define PARINOID_CHECKS 0
39 #endif
40
41
42 // ---- si specific things  -------------------------------------------
43
44 #define TP_HDR_LEN      50              // bytes added to each message for transport needs
45
46                                                         // river states
47 #define RS_NEW          0               // river is unitialised
48 #define RS_GOOD         1               // flow is in progress
49 #define RS_RESET        2               // flow was interrupted; reset on next receive
50
51 #define SI_MAX_ADDR_LEN         512
52
53 /*
54         Manages a river of inbound bytes.
55 */
56 typedef struct {
57         int             state;          // RS_* constants
58         char*   accum;          // bytes being accumulated
59         int             nbytes;         // allocated size of accumulator
60         int             ipt;            // insertion point in accumulator
61         //int           max;            // size of accum
62         //int           expected;       // expected for a complete message
63         int             msg_size;       // size of the message being accumulated
64 } river_t;
65
66
67 /*
68         Callback context.
69 typedef struct {
70         uta_ctx_t*      ctx;
71         
72 } cbctx_t;
73 */
74
75 // ---------------------------- mainline rmr things ----------------
76
77
78 /*
79         Manages an endpoint. Type def for this is defined in agnostic.
80 */
81 struct endpoint {
82         char*   name;                   // end point name (symtab reference)
83         char*   proto;                  // connection proto (should only be TCP, but future might bring others)
84         char*   addr;                   // address used for connection
85         int             nn_sock;                // we'll keep calling it nn_ because it's less changes to the code
86         //nng_dialer    dialer;         // the connection specific information (retry timout etc)
87         int             open;                   // set to true if we've connected as socket cannot be checked directly)
88         pthread_mutex_t gate;   // we must serialise when we open/link to the endpoint
89         long long scounts[EPSC_SIZE];           // send counts (indexed by EPSCOUNT_* constants
90
91                                                         // SI specific things
92         int notify;                             // if we fail, we log once until a connection happens; notify if set
93 };
94
95 /*
96         Epoll information needed for the rmr_torcv_msg() funciton
97 */
98 typedef struct epoll_stuff {
99         struct epoll_event events[1];                           // wait on 1 possible events
100         struct epoll_event epe;                                         // event definition for event to listen to
101         int ep_fd;                                                                      // file des from nng
102         int poll_fd;                                                            // fd from nng
103 } epoll_stuff_t;
104
105 /*
106         Context describing our world. Should be returned to user programme on
107         call to initialise, and passed as first parm on all calls to other
108         visible functions.
109
110         The typedef is declared in the agnostic header.
111 */
112 struct uta_ctx {
113         char*   my_name;                        // dns name of this host to set in sender field of a message
114         char*   my_ip;                          // the ip address we _think_ we are using sent in src_ip of the message for rts
115         int     shutdown;                               // thread notification if we need to tell them to stop
116         int max_mlen;                           // max message length payload+header
117         int     max_plen;                               // max payload length
118         int     flags;                                  // CTXFL_ constants
119         int nrtele;                                     // number of elements in the routing table
120         int send_retries;                       // number of retries send_msg() should attempt if eagain/timeout indicated by nng
121         int     trace_data_len;                 // number of bytes to allocate in header for trace data
122         int d1_len;                                     // extra header data 1 length
123         int d2_len;                                     // extra header data 2 length   (future)
124         int     nn_sock;                                // our general listen socket
125         route_table_t* rtable;          // the active route table
126         route_table_t* old_rtable;      // the previously used rt, sits here to allow for draining
127         route_table_t* new_rtable;      // route table under construction
128         if_addrs_t*     ip_list;                // list manager of the IP addresses that are on our known interfaces
129         void*   mring;                          // ring where msgs are queued while waiting for a call response msg
130         chute_t*        chutes;
131
132         char*   rtg_addr;                       // addr/port of the route table generation publisher
133         int             rtg_port;                       // the port that the rtg listens on
134
135         wh_mgt_t*       wormholes;              // management of user opened wormholes
136         epoll_stuff_t*  eps;            // epoll information needed for the rcv with timeout call
137
138         pthread_t       rtc_th;                 // thread info for the rtc listener
139         pthread_t       mtc_th;                 // thread info for the multi-thread call receive process
140
141                                                                 // added for SI95 support
142         si_ctx_t*       si_ctx;                 // the socket context
143         int                     nrivers;                // allocated rivers
144         river_t*        rivers;                 // inbound flows (index is the socket fd)
145         int                     max_ibm;                // max size of an inbound message (river accum alloc size)
146         void*           zcb_mring;              // zero copy buffer mbuf ring
147 };
148
149 typedef uta_ctx_t uta_ctx;
150
151
152 /*
153         Static prototypes for functions located here. All common protos are in the
154         agnostic header file.
155 */
156
157 // --- initialisation and housekeeping -------
158 static void* init(  char* uproto_port, int max_msg_size, int flags );
159 static void free_ctx( uta_ctx_t* ctx );
160
161 // --- rt table things ---------------------------
162 static void uta_ep_failed( endpoint_t* ep );
163 static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep );
164 static int rt_link2_ep( void* vctx, endpoint_t* ep );
165 static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt );
166 static inline int xlate_si_state( int state, int def_state );
167
168 // --- these have changes for si
169 static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
170 static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
171
172
173 // --- msg ---------------------------------------
174 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo );
175 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state );
176 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) ;
177 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  );
178 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
179 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
180 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  );
181 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
182
183 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries );
184
185
186 #endif