New API added for debugging rmr rx queue
[ric-plt/lib/rmr.git] / src / rmr / si / include / rmr_si_private.h
1 //  vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       uta_si_private.h
23         Abstract:       Private header information for the uta nng library functions.
24                                 These are structs which have specific NNG types; anything that
25                                 does not can be included in the common/rmr_agnostic.h header.
26
27         Author:         E. Scott Daniels
28         Date:           31 November 2018
29
30         Mods:           28 Feb 2019 -- moved the majority to the agnostic header.
31 */
32
33 #ifndef _uta_private_h
34 #define _uta_private_h
35
36 // if pmode is off we don't compile in some checks in hopes of speeding things up
37 #ifndef PARANOID_CHECKS
38 #       define PARANOID_CHECKS 0
39 #endif
40
41
42 // ---- si specific things  -------------------------------------------
43
44 #define TP_HDR_LEN      50              // bytes added to each message for transport needs
45
46                                                         // river states
47 #define RS_NEW          0               // river is unitialised
48 #define RS_GOOD         1               // flow is in progress
49 #define RS_RESET        2               // flow was interrupted; reset on next receive
50
51 #define RF_NOTIFIED     0x01    // notification made about river issue
52 #define RF_DROP         0x02    // this message is large and being dropped
53
54 #define TP_SZFIELD_LEN  ((sizeof(uint32_t)*2)+1)        // number of bytes needed for msg size in transport header
55 #define TP_SZ_MARKER    '$'                                                     // marker indicating net byte order used
56
57
58 #define SI_MAX_ADDR_LEN         512
59 #define MAX_RIVERS                      1024    // max number of directly mapped rivers
60
61 /*
62         Manages a river of inbound bytes.
63 */
64 typedef struct {
65         int             state;          // RS_* constants
66         char*   accum;          // bytes being accumulated
67         int             nbytes;         // allocated size of accumulator
68         int             ipt;            // insertion point in accumulator
69         int             msg_size;       // size of the message being accumulated
70         int             flags;          // RF_* constants
71 } river_t;
72
73
74 /*
75         Callback context.
76 typedef struct {
77         uta_ctx_t*      ctx;
78
79 } cbctx_t;
80 */
81
82 // ---------------------------- mainline rmr things ----------------
83
84
85 /*
86         Manages an endpoint. Type def for this is defined in agnostic.
87 */
88 struct endpoint {
89         char*   name;                   // end point name (symtab reference)
90         char*   proto;                  // connection proto (should only be TCP, but future might bring others)
91         char*   addr;                   // address used for connection
92         int             nn_sock;                // we'll keep calling it nn_ because it's less changes to the code
93         //nng_dialer    dialer;         // the connection specific information (retry timout etc)
94         int             open;                   // set to true if we've connected as socket cannot be checked directly)
95         pthread_mutex_t gate;   // we must serialise when we open/link to the endpoint
96         long long scounts[EPSC_SIZE];           // send counts (indexed by EPSCOUNT_* constants
97
98                                                         // SI specific things
99         int notify;                             // if we fail, we log once until a connection happens; notify if set
100 };
101
102 /*
103         Epoll information needed for the rmr_torcv_msg() funciton
104 */
105 typedef struct epoll_stuff {
106         struct epoll_event events[1];                           // wait on 1 possible events
107         struct epoll_event epe;                                         // event definition for event to listen to
108         int ep_fd;                                                                      // file des from nng
109         int poll_fd;                                                            // fd from nng
110 } epoll_stuff_t;
111
112 /*
113         Context describing our world. Should be returned to user programme on
114         call to initialise, and passed as first parm on all calls to other
115         visible functions.
116
117         The typedef is declared in the agnostic header.
118 */
119 struct uta_ctx {
120         char*   my_name;                        // dns name of this host to set in sender field of a message
121         char*   my_ip;                          // the ip address we _think_ we are using sent in src_ip of the message for rts
122         int     shutdown;                               // thread notification if we need to tell them to stop
123         int max_mlen;                           // max message length payload+header
124         int     max_plen;                               // max payload length
125         int     flags;                                  // CFL_ constants
126         int nrtele;                                     // number of elements in the routing table
127         int send_retries;                       // number of retries send_msg() should attempt if eagain/timeout indicated by nng
128         int     trace_data_len;                 // number of bytes to allocate in header for trace data
129         int d1_len;                                     // extra header data 1 length
130         int d2_len;                                     // extra header data 2 length   (future)
131         int     nn_sock;                                // our general listen socket
132         int rtable_ready;                       // set to true when rt is received or loaded
133         int snarf_rt_fd;                        // the file des where we save the last rt from RM
134         int dcount;                                     // drop counter when app is slow
135
136         uint64_t acc_dcount;            // accumulated drop counter when app is slow
137         uint64_t acc_ecount;            // accumulated enqueue counter
138
139         char*   seed_rt_fname;          // the static/seed route table; name captured at start
140         route_table_t* rtable;          // the active route table
141         route_table_t* old_rtable;      // the previously used rt, sits here to allow for draining
142         route_table_t* new_rtable;      // route table under construction
143         if_addrs_t*     ip_list;                // list manager of the IP addresses that are on our known interfaces
144         void*   mring;                          // ring where msgs are queued while waiting for a call response msg
145         chute_t*        chutes;
146
147         char*   rtg_addr;                       // addr/port of the route table generation publisher
148         int             rtg_port;                       // the port that the rtg listens on
149
150         wh_mgt_t*       wormholes;              // management of user opened wormholes
151         epoll_stuff_t*  eps;            // epoll information needed for the rcv with timeout call
152
153         pthread_t       rtc_th;                 // thread info for the rtc listener
154         pthread_t       mtc_th;                 // thread info for the multi-thread call receive process
155
156                                                                 // added for route manager request/states
157         rmr_whid_t      rtg_whid;               // wormhole id to the route manager for acks/requests
158         char*           table_id;               // table ID of the route table load in progress
159
160                                                                 // added for SI95 support
161         si_ctx_t*       si_ctx;                 // the socket context
162         int                     nrivers;                // allocated rivers
163         river_t*        rivers;                 // inbound flows (index is the socket fd)
164         void*           river_hash;             // flows with fd values > nrivers must be mapped through the hash
165         int                     max_ibm;                // max size of an inbound message (river accum alloc size)
166         void*           zcb_mring;              // zero copy buffer mbuf ring
167         void*           fd2ep;                          // the symtab mapping file des to endpoints for cleanup on disconnect
168         void*           ephash;                         // hash  host:port or ip:port to endpoint struct
169
170         pthread_mutex_t *fd2ep_gate;    // we must gate add/deletes to the fd2 symtab
171         pthread_mutex_t *rtgate;                // master gate for accessing/moving route tables
172 };
173
174 typedef uta_ctx_t uta_ctx;
175
176
177 /*
178         Static prototypes for functions located here. All common protos are in the
179         agnostic header file.
180 */
181
182 // --- initialisation and housekeeping -------
183 static void* init(  char* uproto_port, int max_msg_size, int flags );
184 static void free_ctx( uta_ctx_t* ctx );
185
186 // --- rt table things ---------------------------
187 static void uta_ep_failed( endpoint_t* ep );
188 static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep );
189
190 static int rt_link2_ep( void* vctx, endpoint_t* ep );
191 static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt );
192 static inline int xlate_si_state( int state, int def_state );
193
194 // --- these have changes for si
195 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp );
196 //static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
197 static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp );
198
199
200 // --- msg ---------------------------------------
201 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo );
202 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state );
203 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) ;
204 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  );
205 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
206 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  );
207 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
208
209 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries );
210
211 // ---- fd to endpoint translation ------------------------------
212 static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd );
213 static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd );
214 static void fd2ep_init( uta_ctx_t* ctx );
215 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep );
216
217 // ------ misc ---------------------------------------------------
218 static inline void incr_ep_counts( int state, endpoint_t* ep );         // must declare for static includes, but after headers
219
220 #endif