1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rmr_agnostic.h
23 Abstract: Header file for things that are agnostic to the underlying transport
25 Author: E. Scott Daniels
26 Date: 28 February 2018
29 #ifndef _rmr_agnostic_h
30 #define _rmr_agnostic_h
32 #include <semaphore.h> // needed to support some structs
34 typedef struct endpoint endpoint_t; // place holder for structs defined in nano/nng private.h
35 typedef struct uta_ctx uta_ctx_t;
37 // allow testing to override without changing this
45 #define QUOTE(a) #a // allow a constant to be quoted
46 #define QUOTE_DEF(a) QUOTE(a) // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
49 #define RT_SIZE 10009 // primary entries in route table (prime) meids hash through this so larger than expected # meids
50 // space deginations in the hash table
51 #define RT_MT_SPACE 0 // (integer) message type as the key
52 #define RT_NAME_SPACE 1 // enpoint name/address is the key
53 #define RT_ME_SPACE 2 // message id is the key
55 #define RMR_MSG_VER 3 // message version this code was designed to handle
57 // environment variable names we'll suss out
58 #define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
59 #define ENV_RTG_PORT "RMR_RTG_SVC" // the port we'll listen on for rtg connections
60 #define ENV_SEED_RT "RMR_SEED_RT" // where we expect to find the name of the seed route table
61 #define ENV_SEED_MEMAP "RMR_SEED_MEMAP" // where we expect to find the name of the seed route table
62 #define ENV_RTG_RAW "RMR_RTG_ISRAW" // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
63 #define ENV_VERBOSE_FILE "RMR_VCTL_FILE" // file where vlevel may be managed for some (non-time critical) functions
64 #define ENV_NAME_ONLY "RMR_SRC_NAMEONLY" // src in message is name only
65 #define ENV_WARNINGS "RMR_WARNINGS" // if == 1 then we write some, non-performance impacting, warnings
66 #define ENV_SRC_ID "RMR_SRC_ID" // forces this string (adding :port, max 63 ch) into the source field; host name used if not set
67 #define ENV_LOG_HR "RMR_HR_LOG" // set to 0 to turn off human readable logging and write using some formatting
68 #define ENV_LOG_VLEVEL "RMR_LOG_VLEVEL" // set the verbosity level (0 == 0ff; 1 == crit .... 5 == debug )
70 #define NO_FLAGS 0 // no flags to pass to a function
72 #define FL_NOTHREAD 0x01 // do not start an additional thread (must be 'user land' to support rtg
73 #define UFL_MASK 0xff // mask applied to some flag parms passed by the user to remove any internal flags
74 // internal flags, must be > than UFLAG_MASK
77 #define CFL_MTC_ENABLED 0x01 // multi-threaded call is enabled
80 #define CTXFL_WARN 0x01 // ok to warn on stderr for some things that shouldn't happen
83 #define MFL_ZEROCOPY 0x01 // the message is an allocated zero copy message and can be sent.
84 #define MFL_NOALLOC 0x02 // send should NOT allocate a new buffer before returning
85 #define MFL_ADDSRC 0x04 // source must be added on send
86 #define MFL_RAW 0x08 // message is 'raw' and not from an RMr based sender (no header)
88 #define MAX_EP_GROUP 32 // max number of endpoints in a group
89 #define MAX_RTG_MSG_SZ 2048 // max expected message size from route generator
90 #define MAX_CALL_ID 255 // largest call ID that is supported
92 //#define DEF_RTG_MSGID "" // default to pick up all messages from rtg
93 #define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on
94 #define DEF_COMM_PORT "tcp:4560" // default port we use for normal communications
95 #define DEF_TR_LEN (-1) // use default trace data len from context
97 #define UNSET_SUBID (-1) // initial value on msg allocation indicating not set
98 #define UNSET_MSGTYPE (-1)
100 // index values into the send counters for an enpoint
101 #define EPSC_GOOD 0 // successful send
102 #define EPSC_FAIL 1 // hard failurs
103 #define EPSC_TRANS 2 // transient/soft faiures
104 #define EPSC_SIZE 3 // number of counters
106 // -- header length/offset macros must ensure network conversion ----
107 #define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
108 #define RMR_TR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len1))
109 #define RMR_D1_LEN(h) (ntohl(((uta_mhdr_t *)h)->len2))
110 #define RMR_D2_LEN(h) (ntohl(((uta_mhdr_t *)h)->len3))
112 // CAUTION: if using an offset with a header pointer, the pointer MUST be cast to void* before adding the offset!
113 #define TRACE_OFFSET(h) ((ntohl(((uta_mhdr_t *)h)->len0)))
114 #define DATA1_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
115 #define DATA2_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
116 #define PAYLOAD_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
118 #define TRACE_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0))
119 #define DATA1_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
120 #define DATA2_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
121 #define PAYLOAD_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
123 #define SET_HDR_LEN(h) (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t))) // convert to network byte order on insert
124 #define SET_HDR_TR_LEN(h,l) (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
125 #define SET_HDR_D1_LEN(h,l) (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
126 #define SET_HDR_D2_LEN(h,l) (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
128 #define HDR_VERSION(h) htonl((((uta_mhdr_t *)h)->rmr_ver))
130 // index of things in the d1 data space
131 #define D1_CALLID_IDX 0 // the call-id to match on return
133 #define NO_CALL_ID 0 // no call id associated with the message (normal queue)
135 #define V1_PAYLOAD_OFFSET(h) (sizeof(uta_v1mhdr_t))
138 #define HFL_HAS_TRACE 0x01 // Trace data is populated
139 #define HFL_SUBID 0x02 // subscription ID is populated
140 #define HFL_CALL_MSG 0x04 // msg sent via blocking call
143 Message header; interpreted by the other side, but never seen by
144 the user application.
146 DANGER: Add new fields AT THE END of the struct. Adding them any where else
147 will break any code that is currently running.
149 The transport layer buffer allocated will be divided this way:
150 | RMr header | Trace data | data1 | data2 | User paylaod |
152 Len 0 is the length of the RMr header
153 Len 1 is the length of the trace data
154 Len 2 and 3 are lengths of data1 and data2 and are unused at the moment
156 To point at the payload, we take the address of the header and add all 4 lengths.
159 int32_t mtype; // message type ("long" network integer)
160 int32_t plen; // payload length (sender data length in payload)
161 int32_t rmr_ver; // our internal message version number
162 unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
163 unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
164 unsigned char src[RMR_MAX_SRC]; // name:port of the sender (source)
165 unsigned char meid[RMR_MAX_MEID]; // managed element id.
166 struct timespec ts; // timestamp ???
169 int32_t flags; // HFL_* constants
170 int32_t len0; // length of the RMr header data
171 int32_t len1; // length of the tracing data
172 int32_t len2; // length of data 1 (d1)
173 int32_t len3; // length of data 2 (d2)
174 int32_t sub_id; // subscription id (-1 invalid)
177 unsigned char srcip[RMR_MAX_SRC]; // ip address and port of the source
181 typedef struct { // old (inflexible) v1 header
182 int32_t mtype; // message type ("long" network integer)
183 int32_t plen; // payload length
184 int32_t rmr_ver; // our internal message version number
185 unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
186 unsigned char sid[RMR_MAX_SID]; // misc sender info/data
187 unsigned char src[16]; // name of the sender (source) (old size was 16)
188 unsigned char meid[RMR_MAX_MEID]; // managed element id.
189 struct timespec ts; // timestamp ???
196 uint16_t ep_idx; // next endpoint to send to
197 int nused; // number of endpoints in the list
198 int nendpts; // number allocated
199 endpoint_t **epts; // the list of endpoints that we RR over
203 Routing table entry. This is a list of endpoints that can be sent
204 messages of the given mtype. If there is more than one, we will
205 round robin messags across the list.
208 uint64_t key; // key used to reinsert this entry into a new symtab
209 int refs; // number of symtabs which reference the entry
210 int mtype; // the message type for this list
211 int nrrgroups; // number of rr groups to send to (if 0, the meid in a message determines endpoint)
212 rrgroup_t** rrgroups; // one or more set of endpoints to round robin messages to
219 void* hash; // hash table.
220 int updates; // counter of update records received
221 int mupdates; // counter of meid update records received
225 A wormhole is a direct connection between two endpoints that the user app can
226 send to without message type based routing.
229 int nalloc; // number of ep pointers allocated
230 endpoint_t** eps; // end points directly referenced
235 This manages an array of pointers to IP addresses that are associated with one of our interfaces.
236 For now, we don't need to map the addr to a specific interface, just know that it is one of ours.
239 char** addrs; // all ip addresses we found
240 int naddrs; // num actually used
244 // --------------- ring things -------------------------------------------------
245 #define RING_NONE 0 // no options
246 #define RING_RLOCK 0x01 // create/destroy the read lock on the ring
247 #define RING_WLOCK 0x02 // create/destroy the write lockk on the ring
249 typedef struct ring {
250 uint16_t head; // index of the head of the ring (insert point)
251 uint16_t tail; // index of the tail (extract point)
252 uint16_t nelements; // number of elements in the ring
253 void** data; // the ring data (pointers to blobs of stuff)
254 int pfd; // event fd for the ring for epoll
255 pthread_mutex_t* rgate; // read lock if used
256 pthread_mutex_t* wgate; // write lock if used
260 // --------- multi-threaded call things -----------------------------------------
262 A chute provides a return path for a received message that a thread has blocked
263 on. The receive thread will set the mbuf pointer and tickler the barrier to
264 signal to the call thread that data is ready.
266 typedef struct chute {
267 rmr_mbuf_t* mbuf; // pointer to message buffer received
268 sem_t barrier; // semaphore that the thread is waiting on
269 unsigned char expect[RMR_MAX_XID]; // the expected transaction ID
273 // -------------- common static prototypes --------------------------------------
275 //---- tools ----------------------------------
276 static int has_myip( char const* buf, if_addrs_t* list, char sep, int max );
277 static int uta_tokenise( char* buf, char** tokens, int max, char sep );
278 static int uta_rmip_tokenise( char* buf, if_addrs_t* iplist, char** toks, int max, char sep );
279 static char* uta_h2ip( char const* hname );
280 static int uta_lookup_rtg( uta_ctx_t* ctx );
281 static int uta_has_str( char const* buf, char const* str, char sep, int max );
282 static char* get_default_ip( if_addrs_t* iplist );
284 // --- message ring --------------------------
285 static void* uta_mk_ring( int size );
286 static int uta_ring_config( void* vr, int options );
287 static void uta_ring_free( void* vr );
288 static inline void* uta_ring_extract( void* vr );
289 static inline int uta_ring_insert( void* vr, void* new_data );
291 // --- message and context management --------
292 static int ie_test( void* r, int i_factor, long inserts );
295 // ----- route table generic static things ---------
296 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
297 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
298 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
299 static char* uta_fib( char* fname );
300 static route_table_t* uta_rt_init( );
301 static route_table_t* uta_rt_clone( route_table_t* srt );
302 static route_table_t* uta_rt_clone_all( route_table_t* srt );
303 static void uta_rt_drop( route_table_t* rt );
304 static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group );
305 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
306 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
307 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
308 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
309 static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
310 static void* rtc( void* vctx );
311 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );