Allow user programme to set RMR verbosity level
[ric-plt/lib/rmr.git] / src / rmr / common / include / rmr_agnostic.h
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_agnostic.h
23         Abstract:       Header file for things that are agnostic to the underlying transport
24                                 mechanism.
25         Author:         E. Scott Daniels
26         Date:           28 February 2018
27 */
28
29 #ifndef _rmr_agnostic_h
30 #define _rmr_agnostic_h
31
32 #include <semaphore.h>                                  // needed to support some structs
33
34 typedef struct endpoint endpoint_t;             // place holder for structs defined in nano/nng private.h
35 typedef struct uta_ctx  uta_ctx_t;
36
37 // allow testing to override without changing this
38 #ifndef DEBUG
39 #define DEBUG 0
40 #endif
41
42 #define FALSE 0
43 #define TRUE  1
44
45 #define QUOTE(a) #a                             // allow a constant to be quoted
46 #define QUOTE_DEF(a) QUOTE(a)   // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
47
48
49 #define RT_SIZE                 10009   // primary entries in route table (prime) meids hash through this so larger than expected # meids
50                                                                 // space deginations in the hash table
51 #define RT_MT_SPACE     0               // (integer) message type as the key
52 #define RT_NAME_SPACE   1               // enpoint name/address is the key
53 #define RT_ME_SPACE     2               // message id is the key
54
55 #define RMR_MSG_VER     3                       // message version this code was designed to handle
56
57                                                                                         // environment variable names we'll suss out
58 #define ENV_BIND_IF             "RMR_BIND_IF"           // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
59 #define ENV_RTG_PORT    "RMR_RTG_SVC"           // the port we'll listen on for rtg connections
60 #define ENV_SEED_RT             "RMR_SEED_RT"           // where we expect to find the name of the seed route table
61 #define ENV_SEED_MEMAP  "RMR_SEED_MEMAP"        // where we expect to find the name of the seed route table
62 #define ENV_RTG_RAW             "RMR_RTG_ISRAW"         // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
63 #define ENV_VERBOSE_FILE "RMR_VCTL_FILE"        // file where vlevel may be managed for some (non-time critical) functions
64 #define ENV_NAME_ONLY   "RMR_SRC_NAMEONLY"      // src in message is name only
65 #define ENV_WARNINGS    "RMR_WARNINGS"          // if == 1 then we write some, non-performance impacting, warnings
66 #define ENV_SRC_ID              "RMR_SRC_ID"            // forces this string (adding :port, max 63 ch) into the source field; host name used if not set
67 #define ENV_LOG_HR              "RMR_HR_LOG"            // set to 0 to turn off human readable logging and write using some formatting
68 #define ENV_LOG_VLEVEL  "RMR_LOG_VLEVEL"        // set the verbosity level (0 == 0ff; 1 == crit .... 5 == debug )
69
70 #define NO_FLAGS        0                               // no flags to pass to a function
71
72 #define FL_NOTHREAD     0x01                    // do not start an additional thread (must be 'user land' to support rtg
73 #define UFL_MASK                0xff            // mask applied to some flag parms passed by the user to remove any internal flags
74                                                                         // internal flags, must be > than UFLAG_MASK
75 //#define IFL_....
76
77 #define CFL_MTC_ENABLED 0x01            // multi-threaded call is enabled
78
79                                                                         // context flags
80 #define CTXFL_WARN              0x01            // ok to warn on stderr for some things that shouldn't happen
81
82                                                                         // msg buffer flags
83 #define MFL_ZEROCOPY    0x01            // the message is an allocated zero copy message and can be sent.
84 #define MFL_NOALLOC             0x02            // send should NOT allocate a new buffer before returning
85 #define MFL_ADDSRC              0x04            // source must be added on send
86 #define MFL_RAW                 0x08            // message is 'raw' and not from an RMr based sender (no header)
87
88 #define MAX_EP_GROUP    32                      // max number of endpoints in a group
89 #define MAX_RTG_MSG_SZ  2048            // max expected message size from route generator
90 #define MAX_CALL_ID             255                     // largest call ID that is supported
91
92 //#define DEF_RTG_MSGID ""                              // default to pick up all messages from rtg
93 #define DEF_RTG_PORT    "tcp:4561"              // default port that we accept rtg connections on
94 #define DEF_COMM_PORT   "tcp:4560"              // default port we use for normal communications
95 #define DEF_TR_LEN              (-1)                    // use default trace data len from context
96
97 #define UNSET_SUBID             (-1)                    // initial value on msg allocation indicating not set
98 #define UNSET_MSGTYPE   (-1)
99
100                                                                                 // index values into the send counters for an enpoint
101 #define EPSC_GOOD               0                               // successful send
102 #define EPSC_FAIL               1                               // hard failurs
103 #define EPSC_TRANS              2                               // transient/soft faiures
104 #define EPSC_SIZE               3                               // number of counters
105
106 // -- header length/offset macros must ensure network conversion ----
107 #define RMR_HDR_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
108 #define RMR_TR_LEN(h)           (ntohl(((uta_mhdr_t *)h)->len1))
109 #define RMR_D1_LEN(h)           (ntohl(((uta_mhdr_t *)h)->len2))
110 #define RMR_D2_LEN(h)           (ntohl(((uta_mhdr_t *)h)->len3))
111
112 // CAUTION:  if using an offset with a header pointer, the pointer MUST be cast to void* before adding the offset!
113 #define TRACE_OFFSET(h)         ((ntohl(((uta_mhdr_t *)h)->len0)))
114 #define DATA1_OFFSET(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
115 #define DATA2_OFFSET(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
116 #define PAYLOAD_OFFSET(h)       (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
117
118 #define TRACE_ADDR(h)           (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0))
119 #define DATA1_ADDR(h)           (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
120 #define DATA2_ADDR(h)           (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
121 #define PAYLOAD_ADDR(h)         (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
122
123 #define SET_HDR_LEN(h)          (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t)))            // convert to network byte order on insert
124 #define SET_HDR_TR_LEN(h,l)     (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
125 #define SET_HDR_D1_LEN(h,l)     (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
126 #define SET_HDR_D2_LEN(h,l)     (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
127
128 #define HDR_VERSION(h)  htonl((((uta_mhdr_t *)h)->rmr_ver))
129
130                                                         // index of things in the d1 data space
131 #define D1_CALLID_IDX   0       // the call-id to match on return
132
133 #define NO_CALL_ID              0       // no call id associated with the message (normal queue)
134
135 #define V1_PAYLOAD_OFFSET(h)    (sizeof(uta_v1mhdr_t))
136
137                                                                                 // v2 header flags
138 #define HFL_HAS_TRACE   0x01                    // Trace data is populated
139 #define HFL_SUBID               0x02                    // subscription ID is populated
140 #define HFL_CALL_MSG    0x04                    // msg sent via blocking call
141
142 /*
143         Message header; interpreted by the other side, but never seen by
144         the user application.
145
146         DANGER: Add new fields AT THE END of the struct. Adding them any where else
147                         will break any code that is currently running.
148
149         The transport layer buffer allocated will be divided this way:
150                 | RMr header | Trace data | data1 | data2 | User paylaod |
151
152                 Len 0 is the length of the RMr header
153                 Len 1 is the length of the trace data
154                 Len 2 and 3 are lengths of data1 and data2 and are unused at the moment
155
156         To point at the payload, we take the address of the header and add all 4 lengths.
157 */
158 typedef struct {
159         int32_t mtype;                                          // message type  ("long" network integer)
160         int32_t plen;                                           // payload length (sender data length in payload)
161         int32_t rmr_ver;                                        // our internal message version number
162         unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
163         unsigned char sid[RMR_MAX_SID];         // sender ID for return to sender needs
164         unsigned char src[RMR_MAX_SRC];         // name:port of the sender (source)
165         unsigned char meid[RMR_MAX_MEID];       // managed element id.
166         struct timespec ts;                                     // timestamp ???
167
168                                                                                 // V2 extension
169         int32_t flags;                                          // HFL_* constants
170         int32_t len0;                                           // length of the RMr header data
171         int32_t len1;                                           // length of the tracing data
172         int32_t len2;                                           // length of data 1 (d1)
173         int32_t len3;                                           // length of data 2 (d2)
174         int32_t sub_id;                                         // subscription id (-1 invalid)
175
176                                                                                 // v3 extension
177         unsigned char srcip[RMR_MAX_SRC];       // ip address and port of the source
178 } uta_mhdr_t;
179
180
181 typedef struct {                                                // old (inflexible) v1 header
182         int32_t mtype;                                          // message type  ("long" network integer)
183         int32_t plen;                                           // payload length
184         int32_t rmr_ver;                                        // our internal message version number
185         unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
186         unsigned char sid[RMR_MAX_SID];         // misc sender info/data
187         unsigned char src[16];                          // name of the sender (source) (old size was 16)
188         unsigned char meid[RMR_MAX_MEID];       // managed element id.
189         struct timespec ts;                                     // timestamp ???
190 } uta_v1mhdr_t;
191
192 /*
193         Round robin group.
194 */
195 typedef struct {
196         uint16_t        ep_idx;         // next endpoint to send to
197         int nused;                              // number of endpoints in the list
198         int nendpts;                    // number allocated
199         endpoint_t **epts;              // the list of endpoints that we RR over
200 } rrgroup_t;
201
202 /*
203         Routing table entry. This is a list of endpoints that can be sent
204         messages of the given mtype.  If there is more than one, we will
205         round robin messags across the list.
206 */
207 typedef struct {
208         uint64_t key;                   // key used to reinsert this entry into a new symtab
209         int     refs;                           // number of symtabs which reference the entry
210         int mtype;                              // the message type for this list
211         int     nrrgroups;                      // number of rr groups to send to (if 0, the meid in a message determines endpoint)
212         rrgroup_t**     rrgroups;       // one or more set of endpoints to round robin messages to
213 } rtable_ent_t;
214
215 /*
216         The route table.
217 */
218 typedef struct {
219         void*   hash;                   // hash table.
220         int             updates;                // counter of update records received
221         int             mupdates;               // counter of meid update records received
222 } route_table_t;
223
224 /*
225         A wormhole is a direct connection between two endpoints that the user app can
226         send to without message type based routing.
227 */
228 typedef struct {
229         int     nalloc;                         // number of ep pointers allocated
230         endpoint_t** eps;               // end points directly referenced
231 } wh_mgt_t;
232
233
234 /*
235         This manages an array of pointers to IP addresses that are associated with one of our interfaces.
236         For now, we don't need to map the addr to a specific interface, just know that it is one of ours.
237 */
238 typedef struct {
239         char**  addrs;                  // all ip addresses we found
240         int             naddrs;                 // num actually used
241 } if_addrs_t;
242
243
244 // --------------- ring things  -------------------------------------------------
245 #define RING_NONE       0                       // no options
246 #define RING_RLOCK      0x01            // create/destroy the read lock on the ring
247 #define RING_WLOCK      0x02            // create/destroy the write lockk on the ring
248
249 typedef struct ring {
250         uint16_t head;                          // index of the head of the ring (insert point)
251         uint16_t tail;                          // index of the tail (extract point)
252         uint16_t nelements;                     // number of elements in the ring
253         void**  data;                           // the ring data (pointers to blobs of stuff)
254         int             pfd;                            // event fd for the ring for epoll
255         pthread_mutex_t*        rgate;  // read lock if used
256         pthread_mutex_t*        wgate;  // write lock if used
257 } ring_t;
258
259
260 // --------- multi-threaded call things -----------------------------------------
261 /*
262         A chute provides a return path for a received message that a thread has blocked
263         on.  The receive thread will set the mbuf pointer and tickler the barrier to
264         signal to the call thread that data is ready.
265 */
266 typedef struct chute {
267         rmr_mbuf_t*     mbuf;                                           // pointer to message buffer received
268         sem_t   barrier;                                                // semaphore that the thread is waiting on
269         unsigned char   expect[RMR_MAX_XID];    // the expected transaction ID
270 } chute_t;
271
272
273 // -------------- common static prototypes --------------------------------------
274
275 //---- tools ----------------------------------
276 static int has_myip( char const* buf, if_addrs_t* list, char sep, int max );
277 static int uta_tokenise( char* buf, char** tokens, int max, char sep );
278 static int uta_rmip_tokenise( char* buf, if_addrs_t* iplist, char** toks, int max, char sep );
279 static char* uta_h2ip( char const* hname );
280 static int uta_lookup_rtg( uta_ctx_t* ctx );
281 static int uta_has_str( char const* buf, char const* str, char sep, int max );
282 static char* get_default_ip( if_addrs_t* iplist );
283
284 // --- message ring --------------------------
285 static void* uta_mk_ring( int size );
286 static int uta_ring_config( void* vr, int options );
287 static void uta_ring_free( void* vr );
288 static inline void* uta_ring_extract( void* vr );
289 static inline int uta_ring_insert( void* vr, void* new_data );
290
291 // --- message and context management --------
292 static int ie_test( void* r, int i_factor, long inserts );
293
294
295 // ----- route table generic static things ---------
296 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
297 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
298 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
299 static char* uta_fib( char* fname );
300 static route_table_t* uta_rt_init( );
301 static route_table_t* uta_rt_clone( route_table_t* srt );
302 static route_table_t* uta_rt_clone_all( route_table_t* srt );
303 static void uta_rt_drop( route_table_t* rt );
304 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
305 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
306 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
307 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
308 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
309 static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
310 static void* rtc( void* vctx );
311 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
312
313 #endif