feat(routing): Support session based routing
[ric-plt/lib/rmr.git] / src / common / include / rmr_agnostic.h
index 687330b..06da060 100644 (file)
@@ -1,14 +1,14 @@
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
@@ -44,10 +44,10 @@ typedef struct uta_ctx  uta_ctx_t;
 #define QUOTE_DEF(a) QUOTE(a)  // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
 
 
 #define QUOTE_DEF(a) QUOTE(a)  // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
 
 
-#define RMR_MSG_VER    1                       // potental to treat messages differently if from backlevel version
+#define RMR_MSG_VER    2                       // message version this code was designed to handle
 
                                                                        // environment variable names we'll suss out
 
                                                                        // environment variable names we'll suss out
-#define ENV_BIND_IF "RMR_BIND_IF"      // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing) 
+#define ENV_BIND_IF "RMR_BIND_IF"      // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
 #define ENV_RTG_PORT "RMR_RTG_SVC"     // the port we'll listen on for rtg connections
 #define ENV_SEED_RT    "RMR_SEED_RT"   // where we expect to find the name of the seed route table
 #define ENV_RTG_RAW "RMR_RTG_ISRAW"    // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
 #define ENV_RTG_PORT "RMR_RTG_SVC"     // the port we'll listen on for rtg connections
 #define ENV_SEED_RT    "RMR_SEED_RT"   // where we expect to find the name of the seed route table
 #define ENV_RTG_RAW "RMR_RTG_ISRAW"    // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
@@ -72,6 +72,38 @@ typedef struct uta_ctx  uta_ctx_t;
 //#define DEF_RTG_MSGID        ""                              // default to pick up all messages from rtg
 #define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on
 #define DEF_COMM_PORT  "tcp:4560"              // default port we use for normal communications
 //#define DEF_RTG_MSGID        ""                              // default to pick up all messages from rtg
 #define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on
 #define DEF_COMM_PORT  "tcp:4560"              // default port we use for normal communications
+#define DEF_TR_LEN             (-1)                    // use default trace data len from context
+
+#define UNSET_SUBID            (-1)                    // initial value on msg allocation indicating not set
+#define UNSET_MSGTYPE  (-1)
+
+// -- header length/offset macros must ensure network conversion ----
+#define RMR_HDR_LEN(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
+#define RMR_TR_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len1))
+#define RMR_D1_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len2))
+#define RMR_D2_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len3))
+
+#define TRACE_OFFSET(h)                ((ntohl(((uta_mhdr_t *)h)->len0)))
+#define DATA1_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
+#define DATA2_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
+#define PAYLOAD_OFFSET(h)      (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
+
+#define TRACE_ADDR(h)          (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0))
+#define DATA1_ADDR(h)          (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
+#define DATA2_ADDR(h)          (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
+#define PAYLOAD_ADDR(h)                (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
+
+#define SET_HDR_LEN(h)         (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t)))            // convert to network byte order on insert
+#define SET_HDR_TR_LEN(h,l)    (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
+#define SET_HDR_D1_LEN(h,l)    (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
+#define SET_HDR_D2_LEN(h,l)    (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
+
+
+#define V1_PAYLOAD_OFFSET(h)   (sizeof(uta_v1mhdr_t))
+
+                                                                               // v2 header flags
+#define HFL_HAS_TRACE  0x01                    // Trace data is populated
+#define HFL_SUBID              0x02                    // subscription ID is populated
 
 /*
        Message header; interpreted by the other side, but never seen by
 
 /*
        Message header; interpreted by the other side, but never seen by
@@ -79,6 +111,15 @@ typedef struct uta_ctx  uta_ctx_t;
 
        DANGER: Add new fields AT THE END of the struct. Adding them any where else
                        will break any code that is currently running.
 
        DANGER: Add new fields AT THE END of the struct. Adding them any where else
                        will break any code that is currently running.
+
+       The transport layer buffer allocated will be divided this way:
+               | RMr header | Trace data | data1 | data2 | User paylaod |
+
+               Len 0 is the length of the RMr header
+               Len 1 is the length of the trace data
+               Len 2 and 3 are lengths of data1 and data2 and are unused at the moment
+
+       To point at the payload, we take the address of the header and add all 4 lengths.
 */
 typedef struct {
        int32_t mtype;                                          // message type  ("long" network integer)
 */
 typedef struct {
        int32_t mtype;                                          // message type  ("long" network integer)
@@ -86,11 +127,31 @@ typedef struct {
        int32_t rmr_ver;                                        // our internal message version number
        unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
        unsigned char sid[RMR_MAX_SID];         // sender ID for return to sender needs
        int32_t rmr_ver;                                        // our internal message version number
        unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
        unsigned char sid[RMR_MAX_SID];         // sender ID for return to sender needs
-       unsigned char src[RMR_MAX_SRC];         // name of the sender (source)
+       unsigned char src[RMR_MAX_SRC];         // name:port of the sender (source)
        unsigned char meid[RMR_MAX_MEID];       // managed element id.
        struct timespec ts;                                     // timestamp ???
        unsigned char meid[RMR_MAX_MEID];       // managed element id.
        struct timespec ts;                                     // timestamp ???
+
+                                                                               // V2 extension
+       int32_t flags;                                          // HFL_* constants
+       int32_t len0;                                           // length of the RMr header data
+       int32_t len1;                                           // length of the tracing data
+       int32_t len2;                                           // length of data 1 (d1)
+       int32_t len3;                                           // length of data 2 (d2)
+       int32_t sub_id;                                         // subscription id (-1 invalid)
 } uta_mhdr_t;
 
 } uta_mhdr_t;
 
+
+typedef struct {                                               // old (inflexible) v1 header
+       int32_t mtype;                                          // message type  ("long" network integer)
+       int32_t plen;                                           // payload length
+       int32_t rmr_ver;                                        // our internal message version number
+       unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
+       unsigned char sid[RMR_MAX_SID];         // sender ID for return to sender needs
+       unsigned char src[16];                          // name of the sender (source) (old size was 16)
+       unsigned char meid[RMR_MAX_MEID];       // managed element id.
+       struct timespec ts;                                     // timestamp ???
+} uta_v1mhdr_t;
+
 /*
        Round robin group.
 */
 /*
        Round robin group.
 */
@@ -120,7 +181,7 @@ typedef struct {
 } route_table_t;
 
 /*
 } route_table_t;
 
 /*
-       A wormhole is a direct connection between two endpoints that the user app can 
+       A wormhole is a direct connection between two endpoints that the user app can
        send to without message type based routing.
 */
 typedef struct {
        send to without message type based routing.
 */
 typedef struct {
@@ -139,7 +200,6 @@ typedef struct {
 } if_addrs_t;
 
 
 } if_addrs_t;
 
 
-
 // --------------- ring things  -------------------------------------------------
 typedef struct ring {
        uint16_t head;                          // index of the head of the ring (insert point)
 // --------------- ring things  -------------------------------------------------
 typedef struct ring {
        uint16_t head;                          // index of the head of the ring (insert point)
@@ -170,6 +230,7 @@ static int ie_test( void* r, int i_factor, long inserts );
 
 
 // ----- route table generic static things ---------
 
 
 // ----- route table generic static things ---------
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
 static char* uta_fib( char* fname );
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
 static char* uta_fib( char* fname );
@@ -177,10 +238,11 @@ static route_table_t* uta_rt_init( );
 static route_table_t* uta_rt_clone( route_table_t* srt );
 static void uta_rt_drop( route_table_t* rt );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
 static route_table_t* uta_rt_clone( route_table_t* srt );
 static void uta_rt_drop( route_table_t* rt );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups );
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
+static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
 static void* rtc( void* vctx );
 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
 
 static void* rtc( void* vctx );
 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );