Add ability to save route table updates to disk
[ric-plt/lib/rmr.git] / src / rmr / common / include / rmr_agnostic.h
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_agnostic.h
23         Abstract:       Header file for things that are agnostic to the underlying transport
24                                 mechanism.
25         Author:         E. Scott Daniels
26         Date:           28 February 2018
27 */
28
29 #ifndef _rmr_agnostic_h
30 #define _rmr_agnostic_h
31
32 #include <semaphore.h>                                  // needed to support some structs
33 #include <pthread.h>
34
35 typedef struct endpoint endpoint_t;             // place holder for structs defined in nano/nng private.h
36 typedef struct uta_ctx  uta_ctx_t;
37
38 // allow testing to override without changing this
39 #ifndef DEBUG
40 #define DEBUG 0
41 #endif
42
43 #define FALSE 0
44 #define TRUE  1
45
46 #define QUOTE(a) #a                             // allow a constant to be quoted
47 #define QUOTE_DEF(a) QUOTE(a)   // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
48
49
50 #define RT_SIZE                 10009   // primary entries in route table (prime) meids hash through this so larger than expected # meids
51                                                                 // space deginations in the hash table
52 #define RT_MT_SPACE     0               // (integer) message type as the key
53 #define RT_NAME_SPACE   1               // enpoint name/address is the key
54 #define RT_ME_SPACE     2               // message id is the key
55
56 #define RMR_MSG_VER     3                       // message version this code was designed to handle
57
58                                                                                         // environment variable names we'll suss out
59 #define ENV_BIND_IF             "RMR_BIND_IF"           // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
60 #define ENV_RTG_PORT    "RMR_RTG_SVC"           // the port we'll listen on for rtg connections (deprecated; see RTG_SVC and CTL_PORT)
61 #define ENV_RTG_ADDR    "RMR_RTG_SVC"           // the address we will connect to for route manager updates
62 #define ENV_SEED_RT             "RMR_SEED_RT"           // where we expect to find the name of the seed route table
63 #define ENV_STASH_RT    "RMR_STASH_RT"          // location for the last Route Table received from the generator we snarfed and saved
64 #define ENV_SEED_MEMAP  "RMR_SEED_MEMAP"        // where we expect to find the name of the seed route table
65 #define ENV_RTG_RAW             "RMR_RTG_ISRAW"         // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
66 #define ENV_VERBOSE_FILE "RMR_VCTL_FILE"        // file where vlevel may be managed for some (non-time critical) functions
67 #define ENV_NAME_ONLY   "RMR_SRC_NAMEONLY"      // src in message is name only
68 #define ENV_WARNINGS    "RMR_WARNINGS"          // if == 1 then we write some, non-performance impacting, warnings
69 #define ENV_SRC_ID              "RMR_SRC_ID"            // forces this string (adding :port, max 63 ch) into the source field; host name used if not set
70 #define ENV_LOG_HR              "RMR_HR_LOG"            // set to 0 to turn off human readable logging and write using some formatting
71 #define ENV_LOG_VLEVEL  "RMR_LOG_VLEVEL"        // set the verbosity level (0 == 0ff; 1 == crit .... 5 == debug )
72 #define ENV_CTL_PORT    "RMR_CTL_PORT"          // route collector will listen here for control messages (4561 default)
73 #define ENV_RTREQ_FREA  "RMR_RTREQ_FREQ"        // frequency we will request route table updates when we want one (1-300 inclusive)
74
75
76 #define ENV_AM_NAME             "ALARM_MGR_SERVICE_NAME"        // alarm manager env vars that we need
77 #define ENV_AM_PORT             "ALARM_MGR_SERVICE_PORT"
78
79 #define NO_FLAGS        0                               // no flags to pass to a function
80
81 #define FL_NOTHREAD     0x01                    // do not start an additional thread (must be 'user land' to support rtg
82 #define UFL_MASK                0xff            // mask applied to some flag parms passed by the user to remove any internal flags
83                                                                         // internal flags, must be > than UFLAG_MASK
84 //#define IFL_....
85
86 #define CFL_MTC_ENABLED 0x01            // multi-threaded call is enabled
87 #define CFL_NO_RTACK    0x02            // no route table ack needed when end received
88
89                                                                         // context flags
90 #define CTXFL_WARN              0x01            // ok to warn on stderr for some things that shouldn't happen
91
92                                                                         // msg buffer flags
93 #define MFL_ZEROCOPY    0x01            // the message is an allocated zero copy message and can be sent.
94 #define MFL_NOALLOC             0x02            // send should NOT allocate a new buffer before returning
95 #define MFL_ADDSRC              0x04            // source must be added on send
96 #define MFL_RAW                 0x08            // message is 'raw' and not from an RMr based sender (no header)
97 #define MFL_HUGE                0x10            // buffer was larger than applications indicated usual max; don't cache
98
99 #define MAX_EP_GROUP    32                      // max number of endpoints in a group
100 #define MAX_RTG_MSG_SZ  2048            // max expected message size from route generator
101 #define MAX_CALL_ID             255                     // largest call ID that is supported
102
103 //#define DEF_RTG_MSGID ""                              // default to pick up all messages from rtg
104 #define DEF_CTL_PORT    "4561"                  // default control port that rtc listens on
105 #define DEF_RTG_PORT    "tcp:4561"              // default port that we accept rtg connections on (deprecated)
106 #define DEF_COMM_PORT   "tcp:4560"              // default port we use for normal communications
107 #define DEF_RTG_WK_ADDR "routemgr:4561" // well known address for the route manager
108 #define DEF_TR_LEN              (-1)                    // use default trace data len from context
109 #define DEF_RTREQ_FREQ  5                               // delay between route table requests
110
111 #define UNSET_SUBID             (-1)                    // initial value on msg allocation indicating not set
112 #define UNSET_MSGTYPE   (-1)
113
114                                                                                 // index values into the send counters for an enpoint
115 #define EPSC_GOOD               0                               // successful send
116 #define EPSC_FAIL               1                               // hard failurs
117 #define EPSC_TRANS              2                               // transient/soft faiures
118 #define EPSC_SIZE               3                               // number of counters
119
120 // -- header length/offset macros must ensure network conversion ----
121 #define RMR_HDR_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
122 #define RMR_TR_LEN(h)           (ntohl(((uta_mhdr_t *)h)->len1))
123 #define RMR_D1_LEN(h)           (ntohl(((uta_mhdr_t *)h)->len2))
124 #define RMR_D2_LEN(h)           (ntohl(((uta_mhdr_t *)h)->len3))
125
126 // CAUTION:  if using an offset with a header pointer, the pointer MUST be cast to void* before adding the offset!
127 #define TRACE_OFFSET(h)         ((ntohl(((uta_mhdr_t *)h)->len0)))
128 #define DATA1_OFFSET(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
129 #define DATA2_OFFSET(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
130 #define PAYLOAD_OFFSET(h)       (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
131
132 #define TRACE_ADDR(h)           (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0))
133 #define DATA1_ADDR(h)           (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
134 #define DATA2_ADDR(h)           (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
135 #define PAYLOAD_ADDR(h)         (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
136
137 #define SET_HDR_LEN(h)          (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t)))            // convert to network byte order on insert
138 #define SET_HDR_TR_LEN(h,l)     (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
139 #define SET_HDR_D1_LEN(h,l)     (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
140 #define SET_HDR_D2_LEN(h,l)     (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
141
142 #define HDR_VERSION(h)  htonl((((uta_mhdr_t *)h)->rmr_ver))
143
144                                                         // index of things in the d1 data space
145 #define D1_CALLID_IDX   0       // the call-id to match on return
146
147 #define NO_CALL_ID              0       // no call id associated with the message (normal queue)
148
149 #define V1_PAYLOAD_OFFSET(h)    (sizeof(uta_v1mhdr_t))
150
151                                                                                 // v2 header flags
152 #define HFL_HAS_TRACE   0x01                    // Trace data is populated
153 #define HFL_SUBID               0x02                    // subscription ID is populated
154 #define HFL_CALL_MSG    0x04                    // msg sent via blocking call
155
156 /*
157         Alarm action constants describe the type (e.g. dropping messages) and whether or not
158         this is a "raise" or "clear" action. Raise/clear is determined by the least significant
159         bit; 1 == raise.
160 */
161 #define ALARM_RAISE     0x01
162 #define ALARM_CLEAR     0x00
163 #define ALARM_KIND(a) (a&ALARM_MASK)
164 #define ALARM_DROPS     0x02
165 #define ALARM_MASK 0xfffe
166
167 /*
168         Message header; interpreted by the other side, but never seen by
169         the user application.
170
171         DANGER: Add new fields AT THE END of the struct. Adding them any where else
172                         will break any code that is currently running.
173
174         The transport layer buffer allocated will be divided this way:
175                 | RMr header | Trace data | data1 | data2 | User paylaod |
176
177                 Len 0 is the length of the RMr header
178                 Len 1 is the length of the trace data
179                 Len 2 and 3 are lengths of data1 and data2 and are unused at the moment
180
181         To point at the payload, we take the address of the header and add all 4 lengths.
182 */
183 typedef struct {
184         int32_t mtype;                                          // message type  ("long" network integer)
185         int32_t plen;                                           // payload length (sender data length in payload)
186         int32_t rmr_ver;                                        // our internal message version number
187         unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
188         unsigned char sid[RMR_MAX_SID];         // sender ID for return to sender needs
189         unsigned char src[RMR_MAX_SRC];         // name:port of the sender (source)
190         unsigned char meid[RMR_MAX_MEID];       // managed element id.
191         struct timespec ts;                                     // timestamp ???
192
193                                                                                 // V2 extension
194         int32_t flags;                                          // HFL_* constants
195         int32_t len0;                                           // length of the RMr header data
196         int32_t len1;                                           // length of the tracing data
197         int32_t len2;                                           // length of data 1 (d1)
198         int32_t len3;                                           // length of data 2 (d2)
199         int32_t sub_id;                                         // subscription id (-1 invalid)
200
201                                                                                 // v3 extension
202         unsigned char srcip[RMR_MAX_SRC];       // ip address and port of the source
203 } uta_mhdr_t;
204
205
206 typedef struct {                                                // old (inflexible) v1 header
207         int32_t mtype;                                          // message type  ("long" network integer)
208         int32_t plen;                                           // payload length
209         int32_t rmr_ver;                                        // our internal message version number
210         unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
211         unsigned char sid[RMR_MAX_SID];         // misc sender info/data
212         unsigned char src[16];                          // name of the sender (source) (old size was 16)
213         unsigned char meid[RMR_MAX_MEID];       // managed element id.
214         struct timespec ts;                                     // timestamp ???
215 } uta_v1mhdr_t;
216
217 /*
218         Round robin group.
219 */
220 typedef struct {
221         uint16_t        ep_idx;         // next endpoint to send to
222         int nused;                              // number of endpoints in the list
223         int nendpts;                    // number allocated
224         endpoint_t **epts;              // the list of endpoints that we RR over
225 } rrgroup_t;
226
227 /*
228         Routing table entry. This is a list of endpoints that can be sent
229         messages of the given mtype.  If there is more than one, we will
230         round robin messags across the list.
231 */
232 typedef struct {
233         uint64_t key;                   // key used to reinsert this entry into a new symtab
234         int     refs;                           // number of symtabs which reference the entry
235         int mtype;                              // the message type for this list
236         int     nrrgroups;                      // number of rr groups to send to (if 0, the meid in a message determines endpoint)
237         rrgroup_t**     rrgroups;       // one or more set of endpoints to round robin messages to
238 } rtable_ent_t;
239
240 /*
241         The route table.
242 */
243 typedef struct {
244         int             error;                  // set if there was a problem building the table
245         void*   hash;                   // hash table for msg type and meid.
246         void*   ephash;                 // hash for endpoint references
247         int             updates;                // counter of update records received
248         int             mupdates;               // counter of meid update records received
249         int             ref_count;              // num threads currently using
250         pthread_mutex_t*        gate;   // lock allowing update to ref counter
251 } route_table_t;
252
253 /*
254         A wormhole is a direct connection between two endpoints that the user app can
255         send to without message type based routing.
256 */
257 typedef struct {
258         int     nalloc;                         // number of ep pointers allocated
259         endpoint_t** eps;               // end points directly referenced
260 } wh_mgt_t;
261
262
263 /*
264         This manages an array of pointers to IP addresses that are associated with one of our interfaces.
265         For now, we don't need to map the addr to a specific interface, just know that it is one of ours.
266 */
267 typedef struct {
268         char**  addrs;                  // all ip addresses we found
269         int             naddrs;                 // num actually used
270 } if_addrs_t;
271
272
273 // --------------- ring things  -------------------------------------------------
274 #define RING_NONE       0                       // no options
275 #define RING_RLOCK      0x01            // create/destroy the read lock on the ring
276 #define RING_WLOCK      0x02            // create/destroy the write lockk on the ring
277 #define RING_FRLOCK     0x04            // read locking with no wait if locked option
278
279                                                                 // flag values
280 #define RING_FL_FLOCK   0x01    // fast read lock (don't wait if locked when reading)
281
282 typedef struct ring {
283         uint16_t head;                          // index of the head of the ring (insert point)
284         uint16_t tail;                          // index of the tail (extract point)
285         uint16_t nelements;                     // number of elements in the ring
286         void**  data;                           // the ring data (pointers to blobs of stuff)
287         int             pfd;                            // event fd for the ring for epoll
288         int             flags;                          // RING_FL_* constants
289         pthread_mutex_t*        rgate;  // read lock if used
290         pthread_mutex_t*        wgate;  // write lock if used
291 } ring_t;
292
293
294 // --------- multi-threaded call things -----------------------------------------
295 /*
296         A chute provides a return path for a received message that a thread has blocked
297         on.  The receive thread will set the mbuf pointer and tickler the barrier to
298         signal to the call thread that data is ready.
299 */
300 typedef struct chute {
301         rmr_mbuf_t*     mbuf;                                           // pointer to message buffer received
302         sem_t   barrier;                                                // semaphore that the thread is waiting on
303         unsigned char   expect[RMR_MAX_XID];    // the expected transaction ID
304 } chute_t;
305
306
307 // -------------- common static prototypes --------------------------------------
308
309 //---- tools ----------------------------------
310 static int has_myip( char const* buf, if_addrs_t* list, char sep, int max );
311 static int uta_tokenise( char* buf, char** tokens, int max, char sep );
312 static int uta_rmip_tokenise( char* buf, if_addrs_t* iplist, char** toks, int max, char sep );
313 static char* uta_h2ip( char const* hname );
314 #ifdef RTG_PUB
315 // deprecated funciton -- step 1 of removal
316 static int uta_lookup_rtg( uta_ctx_t* ctx );
317 #endif
318 static int uta_has_str( char const* buf, char const* str, char sep, int max );
319 static char* get_default_ip( if_addrs_t* iplist );
320
321 // --- message ring --------------------------
322 static void* uta_mk_ring( int size );
323 static int uta_ring_config( void* vr, int options );
324 static void uta_ring_free( void* vr );
325 static inline void* uta_ring_extract( void* vr );
326 static inline int uta_ring_insert( void* vr, void* new_data );
327
328 // --- message and context management --------
329 static int ie_test( void* r, int i_factor, long inserts );
330
331
332 // --- internal alarm generation  ---------------------
333 static void uta_alarm( void* vctx, int kind, int prob_id, char* info );
334 static void uta_alarm_send( void* vctx, rmr_mbuf_t* msg );
335
336 // ----- route table generic static things ---------
337 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
338 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
339 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
340 static endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid );
341 static char* uta_fib( char const* fname );
342 static route_table_t* uta_rt_init( uta_ctx_t* ctx  );
343 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all );
344 static void uta_rt_drop( route_table_t* rt );
345 static inline route_table_t* get_rt( uta_ctx_t* ctx );
346 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
347 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
348 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
349 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
350 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all );
351 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf );
352 static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
353 static void release_rt( uta_ctx_t* ctx, route_table_t* rt );
354 static void* rtc( void* vctx );
355 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
356
357 // --------- route manager communications -----------------
358 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* mbuf, char* table_id, int state, char* reason );
359 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx );
360
361 // -------- internal functions that can be referenced by common functions -------
362 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep );
363
364
365 #endif