9000c291b3419a9f38eea527b0d2f211e725b996
[ric-plt/lib/rmr.git] / src / rmr / common / include / rmr_agnostic.h
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_agnostic.h
23         Abstract:       Header file for things that are agnostic to the underlying transport
24                                 mechanism.
25         Author:         E. Scott Daniels
26         Date:           28 February 2018
27 */
28
29 #ifndef _rmr_agnostic_h
30 #define _rmr_agnostic_h
31
32 typedef struct endpoint endpoint_t;             // place holder for structs defined in nano/nng private.h
33 typedef struct uta_ctx  uta_ctx_t;
34
35 // allow testing to override without changing this
36 #ifndef DEBUG
37 #define DEBUG 0
38 #endif
39
40 #define FALSE 0
41 #define TRUE  1
42
43 #define QUOTE(a) #a                             // allow a constant to be quoted
44 #define QUOTE_DEF(a) QUOTE(a)   // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
45
46
47 #define RMR_MSG_VER     3                       // message version this code was designed to handle
48
49                                                                                         // environment variable names we'll suss out
50 #define ENV_BIND_IF "RMR_BIND_IF"                       // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
51 #define ENV_RTG_PORT "RMR_RTG_SVC"                      // the port we'll listen on for rtg connections
52 #define ENV_SEED_RT     "RMR_SEED_RT"                   // where we expect to find the name of the seed route table
53 #define ENV_RTG_RAW "RMR_RTG_ISRAW"                     // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
54 #define ENV_VERBOSE_FILE "RMR_VCTL_FILE"        // file where vlevel may be managed for some (non-time critical) functions
55 #define ENV_NAME_ONLY "RMR_SRC_NAMEONLY"        // src in message is name only
56
57 #define NO_FLAGS        0                               // no flags to pass to a function
58
59 #define FL_NOTHREAD     0x01                    // do not start an additional thread (must be 'user land' to support rtg
60 #define UFL_MASK                0xff            // mask applied to some flag parms passed by the user to remove any internal flags
61                                                                         // internal flags, must be > than UFLAG_MASK
62 //#define IFL_....
63
64 #define CFL_MTC_ENABLED 0x01            // multi-threaded call is enabled
65
66                                                                         // msg buffer flags
67 #define MFL_ZEROCOPY    0x01            // the message is an allocated zero copy message and can be sent.
68 #define MFL_NOALLOC             0x02            // send should NOT allocate a new buffer before returning
69 #define MFL_ADDSRC              0x04            // source must be added on send
70 #define MFL_RAW                 0x08            // message is 'raw' and not from an RMr based sender (no header)
71
72 #define MAX_EP_GROUP    32                      // max number of endpoints in a group
73 #define MAX_RTG_MSG_SZ  2048            // max expected message size from route generator
74 #define MAX_CALL_ID             255                     // largest call ID that is supported
75
76 //#define DEF_RTG_MSGID ""                              // default to pick up all messages from rtg
77 #define DEF_RTG_PORT    "tcp:4561"              // default port that we accept rtg connections on
78 #define DEF_COMM_PORT   "tcp:4560"              // default port we use for normal communications
79 #define DEF_TR_LEN              (-1)                    // use default trace data len from context
80
81 #define UNSET_SUBID             (-1)                    // initial value on msg allocation indicating not set
82 #define UNSET_MSGTYPE   (-1)
83
84 // -- header length/offset macros must ensure network conversion ----
85 #define RMR_HDR_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
86 #define RMR_TR_LEN(h)           (ntohl(((uta_mhdr_t *)h)->len1))
87 #define RMR_D1_LEN(h)           (ntohl(((uta_mhdr_t *)h)->len2))
88 #define RMR_D2_LEN(h)           (ntohl(((uta_mhdr_t *)h)->len3))
89
90 // CAUTION:  if using an offset with a header pointer, the pointer MUST be cast to void* before adding the offset!
91 #define TRACE_OFFSET(h)         ((ntohl(((uta_mhdr_t *)h)->len0)))
92 #define DATA1_OFFSET(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
93 #define DATA2_OFFSET(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
94 #define PAYLOAD_OFFSET(h)       (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
95
96 #define TRACE_ADDR(h)           (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0))
97 #define DATA1_ADDR(h)           (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
98 #define DATA2_ADDR(h)           (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
99 #define PAYLOAD_ADDR(h)         (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
100
101 #define SET_HDR_LEN(h)          (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t)))            // convert to network byte order on insert
102 #define SET_HDR_TR_LEN(h,l)     (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
103 #define SET_HDR_D1_LEN(h,l)     (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
104 #define SET_HDR_D2_LEN(h,l)     (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
105
106 #define HDR_VERSION(h)  htonl((((uta_mhdr_t *)h)->rmr_ver))
107
108                                                         // index of things in the d1 data space
109 #define D1_CALLID_IDX   0       // the call-id to match on return
110
111 #define NO_CALL_ID              0       // no call id associated with the message (normal queue)
112
113 #define V1_PAYLOAD_OFFSET(h)    (sizeof(uta_v1mhdr_t))
114
115                                                                                 // v2 header flags
116 #define HFL_HAS_TRACE   0x01                    // Trace data is populated
117 #define HFL_SUBID               0x02                    // subscription ID is populated
118 #define HFL_CALL_MSG    0x04                    // msg sent via blocking call
119
120 /*
121         Message header; interpreted by the other side, but never seen by
122         the user application.
123
124         DANGER: Add new fields AT THE END of the struct. Adding them any where else
125                         will break any code that is currently running.
126
127         The transport layer buffer allocated will be divided this way:
128                 | RMr header | Trace data | data1 | data2 | User paylaod |
129
130                 Len 0 is the length of the RMr header
131                 Len 1 is the length of the trace data
132                 Len 2 and 3 are lengths of data1 and data2 and are unused at the moment
133
134         To point at the payload, we take the address of the header and add all 4 lengths.
135 */
136 typedef struct {
137         int32_t mtype;                                          // message type  ("long" network integer)
138         int32_t plen;                                           // payload length
139         int32_t rmr_ver;                                        // our internal message version number
140         unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
141         unsigned char sid[RMR_MAX_SID];         // sender ID for return to sender needs
142         unsigned char src[RMR_MAX_SRC];         // name:port of the sender (source)
143         unsigned char meid[RMR_MAX_MEID];       // managed element id.
144         struct timespec ts;                                     // timestamp ???
145
146                                                                                 // V2 extension
147         int32_t flags;                                          // HFL_* constants
148         int32_t len0;                                           // length of the RMr header data
149         int32_t len1;                                           // length of the tracing data
150         int32_t len2;                                           // length of data 1 (d1)
151         int32_t len3;                                           // length of data 2 (d2)
152         int32_t sub_id;                                         // subscription id (-1 invalid)
153
154                                                                                 // v3 extension
155         unsigned char srcip[RMR_MAX_SRC];       // ip address and port of the source
156 } uta_mhdr_t;
157
158
159 typedef struct {                                                // old (inflexible) v1 header
160         int32_t mtype;                                          // message type  ("long" network integer)
161         int32_t plen;                                           // payload length
162         int32_t rmr_ver;                                        // our internal message version number
163         unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
164         unsigned char sid[RMR_MAX_SID];         // misc sender info/data
165         unsigned char src[16];                          // name of the sender (source) (old size was 16)
166         unsigned char meid[RMR_MAX_MEID];       // managed element id.
167         struct timespec ts;                                     // timestamp ???
168 } uta_v1mhdr_t;
169
170 /*
171         Round robin group.
172 */
173 typedef struct {
174         uint16_t        ep_idx;         // next endpoint to send to
175         int nused;                              // number of endpoints in the list
176         int nendpts;                    // number allocated
177         endpoint_t **epts;              // the list of endpoints that we RR over
178 } rrgroup_t;
179
180 /*
181         Routing table entry. This is a list of endpoints that can be sent
182         messages of the given mtype.  If there is more than one, we will
183         round robin messags across the list.
184 */
185 typedef struct {
186         uint64_t key;                   // key used to reinsert this entry into a new symtab
187         int     refs;                           // number of symtabs which reference the entry
188         int mtype;                              // the message type for this list
189         int     nrrgroups;                      // number of rr groups to send to
190         rrgroup_t**     rrgroups;       // one or more set of endpoints to round robin messages to
191 } rtable_ent_t;
192
193 /*
194         The route table.
195 */
196 typedef struct {
197         void*   hash;                   // hash table.
198         int             updates;                // counter of update records received
199 } route_table_t;
200
201 /*
202         A wormhole is a direct connection between two endpoints that the user app can
203         send to without message type based routing.
204 */
205 typedef struct {
206         int     nalloc;                         // number of ep pointers allocated
207         endpoint_t** eps;               // end points directly referenced
208 } wh_mgt_t;
209
210
211 /*
212         This manages an array of pointers to IP addresses that are associated with one of our interfaces.
213         For now, we don't need to map the addr to a specific interface, just know that it is one of ours.
214 */
215 typedef struct {
216         char**  addrs;                  // all ip addresses we found
217         int             naddrs;                 // num actually used
218 } if_addrs_t;
219
220
221 // --------------- ring things  -------------------------------------------------
222 typedef struct ring {
223         uint16_t head;                          // index of the head of the ring (insert point)
224         uint16_t tail;                          // index of the tail (extract point)
225         uint16_t nelements;                     // number of elements in the ring
226         void**  data;                           // the ring data (pointers to blobs of stuff)
227 } ring_t;
228
229
230 // --------- multi-threaded call things -----------------------------------------
231 /*
232         A chute provides a return path for a received message that a thread has blocked
233         on.  The receive thread will set the mbuf pointer and tickler the barrier to
234         signal to the call thread that data is ready.
235 */
236 typedef struct chute {
237         rmr_mbuf_t*     mbuf;                                           // pointer to message buffer received
238         sem_t   barrier;                                                // semaphore that the thread is waiting on
239         unsigned char   expect[RMR_MAX_XID];    // the expected transaction ID
240 } chute_t;
241
242
243 // -------------- common static prototypes --------------------------------------
244
245 //---- tools ----------------------------------
246 static int has_myip( char const* buf, if_addrs_t* list, char sep, int max );
247 static int uta_tokenise( char* buf, char** tokens, int max, char sep );
248 static int uta_rmip_tokenise( char* buf, if_addrs_t* iplist, char** toks, int max, char sep );
249 static char* uta_h2ip( char const* hname );
250 static int uta_lookup_rtg( uta_ctx_t* ctx );
251 static int uta_has_str( char const* buf, char const* str, char sep, int max );
252 static char* get_default_ip( if_addrs_t* iplist );
253
254 // --- message ring --------------------------
255 static void* uta_mk_ring( int size );
256 static void uta_ring_free( void* vr );
257 static inline void* uta_ring_extract( void* vr );
258 static inline int uta_ring_insert( void* vr, void* new_data );
259
260 // --- message and context management --------
261 static int ie_test( void* r, int i_factor, long inserts );
262
263
264 // ----- route table generic static things ---------
265 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
266 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
267 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
268 static char* uta_fib( char* fname );
269 static route_table_t* uta_rt_init( );
270 static route_table_t* uta_rt_clone( route_table_t* srt );
271 static route_table_t* uta_rt_clone_all( route_table_t* srt );
272 static void uta_rt_drop( route_table_t* rt );
273 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
274 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
275 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
276 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
277 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
278 static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
279 static void* rtc( void* vctx );
280 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
281
282 #endif