1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rmr_agnostic.h
23 Abstract: Header file for things that are agnostic to the underlying transport
25 Author: E. Scott Daniels
26 Date: 28 February 2018
29 #ifndef _rmr_agnostic_h
30 #define _rmr_agnostic_h
32 typedef struct endpoint endpoint_t; // place holder for structs defined in nano/nng private.h
33 typedef struct uta_ctx uta_ctx_t;
35 // allow testing to override without changing this
43 #define QUOTE(a) #a // allow a constant to be quoted
44 #define QUOTE_DEF(a) QUOTE(a) // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
47 #define RMR_MSG_VER 2 // message version this code was designed to handle
49 // environment variable names we'll suss out
50 #define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
51 #define ENV_RTG_PORT "RMR_RTG_SVC" // the port we'll listen on for rtg connections
52 #define ENV_SEED_RT "RMR_SEED_RT" // where we expect to find the name of the seed route table
53 #define ENV_RTG_RAW "RMR_RTG_ISRAW" // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
54 #define ENV_VERBOSE_FILE "RMR_VCTL_FILE" // file where vlevel may be managed for some (non-time critical) functions
56 #define NO_FLAGS 0 // no flags to pass to a function
58 #define FL_NOTHREAD 0x01 // do not start an additional thread (must be 'user land' to support rtg
59 #define UFL_MASK 0xff // mask applied to some flag parms passed by the user to remove any internal flags
60 // internal flags, must be > than UFLAG_MASK
63 #define CFL_MTC_ENABLED 0x01 // multi-threaded call is enabled
66 #define MFL_ZEROCOPY 0x01 // the message is an allocated zero copy message and can be sent.
67 #define MFL_NOALLOC 0x02 // send should NOT allocate a new buffer before returning
68 #define MFL_ADDSRC 0x04 // source must be added on send
69 #define MFL_RAW 0x08 // message is 'raw' and not from an RMr based sender (no header)
71 #define MAX_EP_GROUP 32 // max number of endpoints in a group
72 #define MAX_RTG_MSG_SZ 2048 // max expected message size from route generator
73 #define MAX_CALL_ID 255 // largest call ID that is supported
75 //#define DEF_RTG_MSGID "" // default to pick up all messages from rtg
76 #define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on
77 #define DEF_COMM_PORT "tcp:4560" // default port we use for normal communications
78 #define DEF_TR_LEN (-1) // use default trace data len from context
80 #define UNSET_SUBID (-1) // initial value on msg allocation indicating not set
81 #define UNSET_MSGTYPE (-1)
83 // -- header length/offset macros must ensure network conversion ----
84 #define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
85 #define RMR_TR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len1))
86 #define RMR_D1_LEN(h) (ntohl(((uta_mhdr_t *)h)->len2))
87 #define RMR_D2_LEN(h) (ntohl(((uta_mhdr_t *)h)->len3))
89 // CAUTION: if using an offset with a header pointer, the pointer MUST be cast to void* before adding the offset!
90 #define TRACE_OFFSET(h) ((ntohl(((uta_mhdr_t *)h)->len0)))
91 #define DATA1_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
92 #define DATA2_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
93 #define PAYLOAD_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
95 #define TRACE_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0))
96 #define DATA1_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
97 #define DATA2_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
98 #define PAYLOAD_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
100 #define SET_HDR_LEN(h) (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t))) // convert to network byte order on insert
101 #define SET_HDR_TR_LEN(h,l) (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
102 #define SET_HDR_D1_LEN(h,l) (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
103 #define SET_HDR_D2_LEN(h,l) (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
105 // index of things in the d1 data space
106 #define D1_CALLID_IDX 0 // the call-id to match on return
108 #define NO_CALL_ID 0 // no call id associated with the message (normal queue)
110 #define V1_PAYLOAD_OFFSET(h) (sizeof(uta_v1mhdr_t))
113 #define HFL_HAS_TRACE 0x01 // Trace data is populated
114 #define HFL_SUBID 0x02 // subscription ID is populated
115 #define HFL_CALL_MSG 0x04 // msg sent via blocking call
118 Message header; interpreted by the other side, but never seen by
119 the user application.
121 DANGER: Add new fields AT THE END of the struct. Adding them any where else
122 will break any code that is currently running.
124 The transport layer buffer allocated will be divided this way:
125 | RMr header | Trace data | data1 | data2 | User paylaod |
127 Len 0 is the length of the RMr header
128 Len 1 is the length of the trace data
129 Len 2 and 3 are lengths of data1 and data2 and are unused at the moment
131 To point at the payload, we take the address of the header and add all 4 lengths.
134 int32_t mtype; // message type ("long" network integer)
135 int32_t plen; // payload length
136 int32_t rmr_ver; // our internal message version number
137 unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
138 unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
139 unsigned char src[RMR_MAX_SRC]; // name:port of the sender (source)
140 unsigned char meid[RMR_MAX_MEID]; // managed element id.
141 struct timespec ts; // timestamp ???
144 int32_t flags; // HFL_* constants
145 int32_t len0; // length of the RMr header data
146 int32_t len1; // length of the tracing data
147 int32_t len2; // length of data 1 (d1)
148 int32_t len3; // length of data 2 (d2)
149 int32_t sub_id; // subscription id (-1 invalid)
153 typedef struct { // old (inflexible) v1 header
154 int32_t mtype; // message type ("long" network integer)
155 int32_t plen; // payload length
156 int32_t rmr_ver; // our internal message version number
157 unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
158 unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
159 unsigned char src[16]; // name of the sender (source) (old size was 16)
160 unsigned char meid[RMR_MAX_MEID]; // managed element id.
161 struct timespec ts; // timestamp ???
168 int ep_idx; // next endpoint to send to
169 int nused; // number of endpoints in the list
170 int nendpts; // number allocated
171 endpoint_t **epts; // the list of endpoints that we RR over
175 Routing table entry. This is a list of endpoints that can be sent
176 messages of the given mtype. If there is more than one, we will
177 round robin messags across the list.
180 uint64_t key; // key used to reinsert this entry into a new symtab
181 int refs; // number of symtabs which reference the entry
182 int mtype; // the message type for this list
183 int nrrgroups; // number of rr groups to send to
184 rrgroup_t** rrgroups; // one or more set of endpoints to round robin messages to
191 void* hash; // hash table.
192 int updates; // counter of update records received
196 A wormhole is a direct connection between two endpoints that the user app can
197 send to without message type based routing.
200 int nalloc; // number of ep pointers allocated
201 endpoint_t** eps; // end points directly referenced
206 This manages an array of pointers to IP addresses that are associated with one of our interfaces.
207 For now, we don't need to map the addr to a specific interface, just know that it is one of ours.
210 char** addrs; // all ip addresses we found
211 int naddrs; // num actually used
215 // --------------- ring things -------------------------------------------------
216 typedef struct ring {
217 uint16_t head; // index of the head of the ring (insert point)
218 uint16_t tail; // index of the tail (extract point)
219 uint16_t nelements; // number of elements in the ring
220 void** data; // the ring data (pointers to blobs of stuff)
224 // --------- multi-threaded call things -----------------------------------------
226 A chute provides a return path for a received message that a thread has blocked
227 on. The receive thread will set the mbuf pointer and tickler the barrier to
228 signal to the call thread that data is ready.
230 typedef struct chute {
231 rmr_mbuf_t* mbuf; // pointer to message buffer received
232 sem_t barrier; // semaphore that the thread is waiting on
233 unsigned char expect[RMR_MAX_XID]; // the expected transaction ID
237 // -------------- common static prototypes --------------------------------------
239 //---- tools ----------------------------------
240 static int has_myip( char const* buf, if_addrs_t* list, char sep, int max );
241 static int uta_tokenise( char* buf, char** tokens, int max, char sep );
242 static char* uta_h2ip( char const* hname );
243 static int uta_lookup_rtg( uta_ctx_t* ctx );
244 static int uta_has_str( char const* buf, char const* str, char sep, int max );
246 // --- message ring --------------------------
247 static void* uta_mk_ring( int size );
248 static void uta_ring_free( void* vr );
249 static inline void* uta_ring_extract( void* vr );
250 static inline int uta_ring_insert( void* vr, void* new_data );
252 // --- message and context management --------
253 static int ie_test( void* r, int i_factor, long inserts );
256 // ----- route table generic static things ---------
257 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
258 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
259 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
260 static char* uta_fib( char* fname );
261 static route_table_t* uta_rt_init( );
262 static route_table_t* uta_rt_clone( route_table_t* srt );
263 static route_table_t* uta_rt_clone_all( route_table_t* srt );
264 static void uta_rt_drop( route_table_t* rt );
265 static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group );
266 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
267 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
268 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
269 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
270 static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
271 static void* rtc( void* vctx );
272 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );