1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rmr_agnostic.h
23 Abstract: Header file for things that are agnostic to the underlying transport
25 Author: E. Scott Daniels
26 Date: 28 February 2018
29 #ifndef _rmr_agnostic_h
30 #define _rmr_agnostic_h
32 #include <semaphore.h> // needed to support some structs
35 typedef struct endpoint endpoint_t; // place holder for structs defined in nano/nng private.h
36 typedef struct uta_ctx uta_ctx_t;
38 // allow testing to override without changing this
46 #define QUOTE(a) #a // allow a constant to be quoted
47 #define QUOTE_DEF(a) QUOTE(a) // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
50 #define RT_SIZE 10009 // primary entries in route table (prime) meids hash through this so larger than expected # meids
51 // space deginations in the hash table
52 #define RT_MT_SPACE 0 // (integer) message type as the key
53 #define RT_NAME_SPACE 1 // enpoint name/address is the key
54 #define RT_ME_SPACE 2 // message id is the key
56 #define RMR_MSG_VER 3 // message version this code was designed to handle
58 // environment variable names we'll suss out
59 #define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
60 #define ENV_RTG_PORT "RMR_RTG_SVC" // the port we'll listen on for rtg connections (deprecated; see RTG_SVC and CTL_PORT)
61 #define ENV_RTG_ADDR "RMR_RTG_SVC" // the address we will connect to for route manager updates
62 #define ENV_SEED_RT "RMR_SEED_RT" // where we expect to find the name of the seed route table
63 #define ENV_SEED_MEMAP "RMR_SEED_MEMAP" // where we expect to find the name of the seed route table
64 #define ENV_RTG_RAW "RMR_RTG_ISRAW" // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
65 #define ENV_VERBOSE_FILE "RMR_VCTL_FILE" // file where vlevel may be managed for some (non-time critical) functions
66 #define ENV_NAME_ONLY "RMR_SRC_NAMEONLY" // src in message is name only
67 #define ENV_WARNINGS "RMR_WARNINGS" // if == 1 then we write some, non-performance impacting, warnings
68 #define ENV_SRC_ID "RMR_SRC_ID" // forces this string (adding :port, max 63 ch) into the source field; host name used if not set
69 #define ENV_LOG_HR "RMR_HR_LOG" // set to 0 to turn off human readable logging and write using some formatting
70 #define ENV_LOG_VLEVEL "RMR_LOG_VLEVEL" // set the verbosity level (0 == 0ff; 1 == crit .... 5 == debug )
71 #define ENV_CTL_PORT "RMR_CTL_PORT" // route collector will listen here for control messages (4561 default)
72 #define ENV_RTREQ_FREA "RMR_RTREQ_FREQ" // frequency we will request route table updates when we want one (1-300 inclusive)
74 #define NO_FLAGS 0 // no flags to pass to a function
76 #define FL_NOTHREAD 0x01 // do not start an additional thread (must be 'user land' to support rtg
77 #define UFL_MASK 0xff // mask applied to some flag parms passed by the user to remove any internal flags
78 // internal flags, must be > than UFLAG_MASK
81 #define CFL_MTC_ENABLED 0x01 // multi-threaded call is enabled
82 #define CFL_NO_RTACK 0x02 // no route table ack needed when end received
85 #define CTXFL_WARN 0x01 // ok to warn on stderr for some things that shouldn't happen
88 #define MFL_ZEROCOPY 0x01 // the message is an allocated zero copy message and can be sent.
89 #define MFL_NOALLOC 0x02 // send should NOT allocate a new buffer before returning
90 #define MFL_ADDSRC 0x04 // source must be added on send
91 #define MFL_RAW 0x08 // message is 'raw' and not from an RMr based sender (no header)
92 #define MFL_HUGE 0x10 // buffer was larger than applications indicated usual max; don't cache
94 #define MAX_EP_GROUP 32 // max number of endpoints in a group
95 #define MAX_RTG_MSG_SZ 2048 // max expected message size from route generator
96 #define MAX_CALL_ID 255 // largest call ID that is supported
98 //#define DEF_RTG_MSGID "" // default to pick up all messages from rtg
99 #define DEF_CTL_PORT "4561" // default control port that rtc listens on
100 #define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on (deprecated)
101 #define DEF_COMM_PORT "tcp:4560" // default port we use for normal communications
102 #define DEF_RTG_WK_ADDR "routemgr:4561" // well known address for the route manager
103 #define DEF_TR_LEN (-1) // use default trace data len from context
104 #define DEF_RTREQ_FREQ 5 // delay between route table requests
106 #define UNSET_SUBID (-1) // initial value on msg allocation indicating not set
107 #define UNSET_MSGTYPE (-1)
109 // index values into the send counters for an enpoint
110 #define EPSC_GOOD 0 // successful send
111 #define EPSC_FAIL 1 // hard failurs
112 #define EPSC_TRANS 2 // transient/soft faiures
113 #define EPSC_SIZE 3 // number of counters
115 // -- header length/offset macros must ensure network conversion ----
116 #define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
117 #define RMR_TR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len1))
118 #define RMR_D1_LEN(h) (ntohl(((uta_mhdr_t *)h)->len2))
119 #define RMR_D2_LEN(h) (ntohl(((uta_mhdr_t *)h)->len3))
121 // CAUTION: if using an offset with a header pointer, the pointer MUST be cast to void* before adding the offset!
122 #define TRACE_OFFSET(h) ((ntohl(((uta_mhdr_t *)h)->len0)))
123 #define DATA1_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
124 #define DATA2_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
125 #define PAYLOAD_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
127 #define TRACE_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0))
128 #define DATA1_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
129 #define DATA2_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
130 #define PAYLOAD_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
132 #define SET_HDR_LEN(h) (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t))) // convert to network byte order on insert
133 #define SET_HDR_TR_LEN(h,l) (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
134 #define SET_HDR_D1_LEN(h,l) (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
135 #define SET_HDR_D2_LEN(h,l) (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
137 #define HDR_VERSION(h) htonl((((uta_mhdr_t *)h)->rmr_ver))
139 // index of things in the d1 data space
140 #define D1_CALLID_IDX 0 // the call-id to match on return
142 #define NO_CALL_ID 0 // no call id associated with the message (normal queue)
144 #define V1_PAYLOAD_OFFSET(h) (sizeof(uta_v1mhdr_t))
147 #define HFL_HAS_TRACE 0x01 // Trace data is populated
148 #define HFL_SUBID 0x02 // subscription ID is populated
149 #define HFL_CALL_MSG 0x04 // msg sent via blocking call
152 Message header; interpreted by the other side, but never seen by
153 the user application.
155 DANGER: Add new fields AT THE END of the struct. Adding them any where else
156 will break any code that is currently running.
158 The transport layer buffer allocated will be divided this way:
159 | RMr header | Trace data | data1 | data2 | User paylaod |
161 Len 0 is the length of the RMr header
162 Len 1 is the length of the trace data
163 Len 2 and 3 are lengths of data1 and data2 and are unused at the moment
165 To point at the payload, we take the address of the header and add all 4 lengths.
168 int32_t mtype; // message type ("long" network integer)
169 int32_t plen; // payload length (sender data length in payload)
170 int32_t rmr_ver; // our internal message version number
171 unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
172 unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
173 unsigned char src[RMR_MAX_SRC]; // name:port of the sender (source)
174 unsigned char meid[RMR_MAX_MEID]; // managed element id.
175 struct timespec ts; // timestamp ???
178 int32_t flags; // HFL_* constants
179 int32_t len0; // length of the RMr header data
180 int32_t len1; // length of the tracing data
181 int32_t len2; // length of data 1 (d1)
182 int32_t len3; // length of data 2 (d2)
183 int32_t sub_id; // subscription id (-1 invalid)
186 unsigned char srcip[RMR_MAX_SRC]; // ip address and port of the source
190 typedef struct { // old (inflexible) v1 header
191 int32_t mtype; // message type ("long" network integer)
192 int32_t plen; // payload length
193 int32_t rmr_ver; // our internal message version number
194 unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
195 unsigned char sid[RMR_MAX_SID]; // misc sender info/data
196 unsigned char src[16]; // name of the sender (source) (old size was 16)
197 unsigned char meid[RMR_MAX_MEID]; // managed element id.
198 struct timespec ts; // timestamp ???
205 uint16_t ep_idx; // next endpoint to send to
206 int nused; // number of endpoints in the list
207 int nendpts; // number allocated
208 endpoint_t **epts; // the list of endpoints that we RR over
212 Routing table entry. This is a list of endpoints that can be sent
213 messages of the given mtype. If there is more than one, we will
214 round robin messags across the list.
217 uint64_t key; // key used to reinsert this entry into a new symtab
218 int refs; // number of symtabs which reference the entry
219 int mtype; // the message type for this list
220 int nrrgroups; // number of rr groups to send to (if 0, the meid in a message determines endpoint)
221 rrgroup_t** rrgroups; // one or more set of endpoints to round robin messages to
228 int error; // set if there was a problem building the table
229 void* hash; // hash table for msg type and meid.
230 void* ephash; // hash for endpoint references
231 int updates; // counter of update records received
232 int mupdates; // counter of meid update records received
233 int ref_count; // num threads currently using
234 pthread_mutex_t* gate; // lock allowing update to ref counter
238 A wormhole is a direct connection between two endpoints that the user app can
239 send to without message type based routing.
242 int nalloc; // number of ep pointers allocated
243 endpoint_t** eps; // end points directly referenced
248 This manages an array of pointers to IP addresses that are associated with one of our interfaces.
249 For now, we don't need to map the addr to a specific interface, just know that it is one of ours.
252 char** addrs; // all ip addresses we found
253 int naddrs; // num actually used
257 // --------------- ring things -------------------------------------------------
258 #define RING_NONE 0 // no options
259 #define RING_RLOCK 0x01 // create/destroy the read lock on the ring
260 #define RING_WLOCK 0x02 // create/destroy the write lockk on the ring
261 #define RING_FRLOCK 0x04 // read locking with no wait if locked option
264 #define RING_FL_FLOCK 0x01 // fast read lock (don't wait if locked when reading)
266 typedef struct ring {
267 uint16_t head; // index of the head of the ring (insert point)
268 uint16_t tail; // index of the tail (extract point)
269 uint16_t nelements; // number of elements in the ring
270 void** data; // the ring data (pointers to blobs of stuff)
271 int pfd; // event fd for the ring for epoll
272 int flags; // RING_FL_* constants
273 pthread_mutex_t* rgate; // read lock if used
274 pthread_mutex_t* wgate; // write lock if used
278 // --------- multi-threaded call things -----------------------------------------
280 A chute provides a return path for a received message that a thread has blocked
281 on. The receive thread will set the mbuf pointer and tickler the barrier to
282 signal to the call thread that data is ready.
284 typedef struct chute {
285 rmr_mbuf_t* mbuf; // pointer to message buffer received
286 sem_t barrier; // semaphore that the thread is waiting on
287 unsigned char expect[RMR_MAX_XID]; // the expected transaction ID
291 // -------------- common static prototypes --------------------------------------
293 //---- tools ----------------------------------
294 static int has_myip( char const* buf, if_addrs_t* list, char sep, int max );
295 static int uta_tokenise( char* buf, char** tokens, int max, char sep );
296 static int uta_rmip_tokenise( char* buf, if_addrs_t* iplist, char** toks, int max, char sep );
297 static char* uta_h2ip( char const* hname );
299 // deprecated funciton -- step 1 of removal
300 static int uta_lookup_rtg( uta_ctx_t* ctx );
302 static int uta_has_str( char const* buf, char const* str, char sep, int max );
303 static char* get_default_ip( if_addrs_t* iplist );
305 // --- message ring --------------------------
306 static void* uta_mk_ring( int size );
307 static int uta_ring_config( void* vr, int options );
308 static void uta_ring_free( void* vr );
309 static inline void* uta_ring_extract( void* vr );
310 static inline int uta_ring_insert( void* vr, void* new_data );
312 // --- message and context management --------
313 static int ie_test( void* r, int i_factor, long inserts );
316 // ----- route table generic static things ---------
317 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
318 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
319 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
320 static endpoint_t* get_meid_owner( route_table_t *rt, char const* meid );
321 static char* uta_fib( char const* fname );
322 static route_table_t* uta_rt_init( uta_ctx_t* ctx );
323 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all );
324 static void uta_rt_drop( route_table_t* rt );
325 static inline route_table_t* get_rt( uta_ctx_t* ctx );
326 static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group );
327 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
328 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
329 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
330 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all );
331 static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf );
332 static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
333 static void release_rt( uta_ctx_t* ctx, route_table_t* rt );
334 static void* rtc( void* vctx );
335 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
337 // --------- route manager communications -----------------
338 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* mbuf, char* table_id, int state, char* reason );
339 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx );
341 // -------- internal functions that can be referenced by common functions -------
342 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep );