1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: uta_si_private.h
23 Abstract: Private header information for the uta nng library functions.
24 These are structs which have specific NNG types; anything that
25 does not can be included in the common/rmr_agnostic.h header.
27 Author: E. Scott Daniels
28 Date: 31 November 2018
30 Mods: 28 Feb 2019 -- moved the majority to the agnostic header.
33 #ifndef _uta_private_h
34 #define _uta_private_h
36 // if pmode is off we don't compile in some checks in hopes of speeding things up
37 #ifndef PARINOID_CHECKS
38 # define PARINOID_CHECKS 0
42 // ---- si specific things -------------------------------------------
44 #define TP_HDR_LEN 50 // bytes added to each message for transport needs
47 #define RS_NEW 0 // river is unitialised
48 #define RS_GOOD 1 // flow is in progress
49 #define RS_RESET 2 // flow was interrupted; reset on next receive
51 #define RF_NOTIFIED 0x01 // notification made about river issue
52 #define RF_DROP 0x02 // this message is large and being dropped
54 #define SI_MAX_ADDR_LEN 512
57 Manages a river of inbound bytes.
60 int state; // RS_* constants
61 char* accum; // bytes being accumulated
62 int nbytes; // allocated size of accumulator
63 int ipt; // insertion point in accumulator
64 int msg_size; // size of the message being accumulated
65 int flags; // RF_* constants
77 // ---------------------------- mainline rmr things ----------------
81 Manages an endpoint. Type def for this is defined in agnostic.
84 char* name; // end point name (symtab reference)
85 char* proto; // connection proto (should only be TCP, but future might bring others)
86 char* addr; // address used for connection
87 int nn_sock; // we'll keep calling it nn_ because it's less changes to the code
88 //nng_dialer dialer; // the connection specific information (retry timout etc)
89 int open; // set to true if we've connected as socket cannot be checked directly)
90 pthread_mutex_t gate; // we must serialise when we open/link to the endpoint
91 long long scounts[EPSC_SIZE]; // send counts (indexed by EPSCOUNT_* constants
94 int notify; // if we fail, we log once until a connection happens; notify if set
98 Epoll information needed for the rmr_torcv_msg() funciton
100 typedef struct epoll_stuff {
101 struct epoll_event events[1]; // wait on 1 possible events
102 struct epoll_event epe; // event definition for event to listen to
103 int ep_fd; // file des from nng
104 int poll_fd; // fd from nng
108 Context describing our world. Should be returned to user programme on
109 call to initialise, and passed as first parm on all calls to other
112 The typedef is declared in the agnostic header.
115 char* my_name; // dns name of this host to set in sender field of a message
116 char* my_ip; // the ip address we _think_ we are using sent in src_ip of the message for rts
117 int shutdown; // thread notification if we need to tell them to stop
118 int max_mlen; // max message length payload+header
119 int max_plen; // max payload length
120 int flags; // CTXFL_ constants
121 int nrtele; // number of elements in the routing table
122 int send_retries; // number of retries send_msg() should attempt if eagain/timeout indicated by nng
123 int trace_data_len; // number of bytes to allocate in header for trace data
124 int d1_len; // extra header data 1 length
125 int d2_len; // extra header data 2 length (future)
126 int nn_sock; // our general listen socket
127 route_table_t* rtable; // the active route table
128 route_table_t* old_rtable; // the previously used rt, sits here to allow for draining
129 route_table_t* new_rtable; // route table under construction
130 if_addrs_t* ip_list; // list manager of the IP addresses that are on our known interfaces
131 void* mring; // ring where msgs are queued while waiting for a call response msg
134 char* rtg_addr; // addr/port of the route table generation publisher
135 int rtg_port; // the port that the rtg listens on
137 wh_mgt_t* wormholes; // management of user opened wormholes
138 epoll_stuff_t* eps; // epoll information needed for the rcv with timeout call
140 pthread_t rtc_th; // thread info for the rtc listener
141 pthread_t mtc_th; // thread info for the multi-thread call receive process
143 // added for SI95 support
144 si_ctx_t* si_ctx; // the socket context
145 int nrivers; // allocated rivers
146 river_t* rivers; // inbound flows (index is the socket fd)
147 int max_ibm; // max size of an inbound message (river accum alloc size)
148 void* zcb_mring; // zero copy buffer mbuf ring
149 void* fd2ep; // the symtab mapping file des to endpoints for cleanup on disconnect
152 typedef uta_ctx_t uta_ctx;
156 Static prototypes for functions located here. All common protos are in the
157 agnostic header file.
160 // --- initialisation and housekeeping -------
161 static void* init( char* uproto_port, int max_msg_size, int flags );
162 static void free_ctx( uta_ctx_t* ctx );
164 // --- rt table things ---------------------------
165 static void uta_ep_failed( endpoint_t* ep );
166 static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep );
167 static int rt_link2_ep( void* vctx, endpoint_t* ep );
168 static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt );
169 static inline int xlate_si_state( int state, int def_state );
171 // --- these have changes for si
172 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp );
173 //static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx );
174 static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp );
177 // --- msg ---------------------------------------
178 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo );
179 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state );
180 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) ;
181 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg );
182 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
183 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
184 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len );
185 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
187 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries );
189 // ---- fd to endpoint translation ------------------------------
190 static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd );
191 static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd );
192 static void fd2ep_init( uta_ctx_t* ctx );
193 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep );