feat(routing): Support session based routing
[ric-plt/lib/rmr.git] / src / common / include / rmr_agnostic.h
index fc45429..06da060 100644 (file)
@@ -1,14 +1,14 @@
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
@@ -47,7 +47,7 @@ typedef struct uta_ctx  uta_ctx_t;
 #define RMR_MSG_VER    2                       // message version this code was designed to handle
 
                                                                        // environment variable names we'll suss out
 #define RMR_MSG_VER    2                       // message version this code was designed to handle
 
                                                                        // environment variable names we'll suss out
-#define ENV_BIND_IF "RMR_BIND_IF"      // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing) 
+#define ENV_BIND_IF "RMR_BIND_IF"      // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
 #define ENV_RTG_PORT "RMR_RTG_SVC"     // the port we'll listen on for rtg connections
 #define ENV_SEED_RT    "RMR_SEED_RT"   // where we expect to find the name of the seed route table
 #define ENV_RTG_RAW "RMR_RTG_ISRAW"    // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
 #define ENV_RTG_PORT "RMR_RTG_SVC"     // the port we'll listen on for rtg connections
 #define ENV_SEED_RT    "RMR_SEED_RT"   // where we expect to find the name of the seed route table
 #define ENV_RTG_RAW "RMR_RTG_ISRAW"    // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
@@ -72,17 +72,27 @@ typedef struct uta_ctx  uta_ctx_t;
 //#define DEF_RTG_MSGID        ""                              // default to pick up all messages from rtg
 #define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on
 #define DEF_COMM_PORT  "tcp:4560"              // default port we use for normal communications
 //#define DEF_RTG_MSGID        ""                              // default to pick up all messages from rtg
 #define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on
 #define DEF_COMM_PORT  "tcp:4560"              // default port we use for normal communications
+#define DEF_TR_LEN             (-1)                    // use default trace data len from context
 
 
-// -- header length/offset macros which ensure network conversion ----
-#define RMR_HDR_LEN(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))   // convert from net byte order
-#define PAYLOAD_OFFSET(h)      (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
-#define TRACE_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0))
-#define DATA1_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
-#define DATA2_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl((uta_mhdr_t *)h)->len2)
+#define UNSET_SUBID            (-1)                    // initial value on msg allocation indicating not set
+#define UNSET_MSGTYPE  (-1)
+
+// -- header length/offset macros must ensure network conversion ----
+#define RMR_HDR_LEN(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
 #define RMR_TR_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len1))
 #define RMR_D1_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len2))
 #define RMR_D2_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len3))
 
 #define RMR_TR_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len1))
 #define RMR_D1_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len2))
 #define RMR_D2_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len3))
 
+#define TRACE_OFFSET(h)                ((ntohl(((uta_mhdr_t *)h)->len0)))
+#define DATA1_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
+#define DATA2_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
+#define PAYLOAD_OFFSET(h)      (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
+
+#define TRACE_ADDR(h)          (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0))
+#define DATA1_ADDR(h)          (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
+#define DATA2_ADDR(h)          (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
+#define PAYLOAD_ADDR(h)                (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
+
 #define SET_HDR_LEN(h)         (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t)))            // convert to network byte order on insert
 #define SET_HDR_TR_LEN(h,l)    (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
 #define SET_HDR_D1_LEN(h,l)    (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
 #define SET_HDR_LEN(h)         (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t)))            // convert to network byte order on insert
 #define SET_HDR_TR_LEN(h,l)    (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
 #define SET_HDR_D1_LEN(h,l)    (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
@@ -122,12 +132,12 @@ typedef struct {
        struct timespec ts;                                     // timestamp ???
 
                                                                                // V2 extension
        struct timespec ts;                                     // timestamp ???
 
                                                                                // V2 extension
-       int32_t flags;                                          // HFL_* constants      
+       int32_t flags;                                          // HFL_* constants
        int32_t len0;                                           // length of the RMr header data
        int32_t len1;                                           // length of the tracing data
        int32_t len2;                                           // length of data 1 (d1)
        int32_t len3;                                           // length of data 2 (d2)
        int32_t len0;                                           // length of the RMr header data
        int32_t len1;                                           // length of the tracing data
        int32_t len2;                                           // length of data 1 (d1)
        int32_t len3;                                           // length of data 2 (d2)
-
+       int32_t sub_id;                                         // subscription id (-1 invalid)
 } uta_mhdr_t;
 
 
 } uta_mhdr_t;
 
 
@@ -171,7 +181,7 @@ typedef struct {
 } route_table_t;
 
 /*
 } route_table_t;
 
 /*
-       A wormhole is a direct connection between two endpoints that the user app can 
+       A wormhole is a direct connection between two endpoints that the user app can
        send to without message type based routing.
 */
 typedef struct {
        send to without message type based routing.
 */
 typedef struct {
@@ -189,6 +199,7 @@ typedef struct {
        int             naddrs;                 // num actually used
 } if_addrs_t;
 
        int             naddrs;                 // num actually used
 } if_addrs_t;
 
+
 // --------------- ring things  -------------------------------------------------
 typedef struct ring {
        uint16_t head;                          // index of the head of the ring (insert point)
 // --------------- ring things  -------------------------------------------------
 typedef struct ring {
        uint16_t head;                          // index of the head of the ring (insert point)
@@ -219,6 +230,7 @@ static int ie_test( void* r, int i_factor, long inserts );
 
 
 // ----- route table generic static things ---------
 
 
 // ----- route table generic static things ---------
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
 static char* uta_fib( char* fname );
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
 static char* uta_fib( char* fname );
@@ -226,10 +238,11 @@ static route_table_t* uta_rt_init( );
 static route_table_t* uta_rt_clone( route_table_t* srt );
 static void uta_rt_drop( route_table_t* rt );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
 static route_table_t* uta_rt_clone( route_table_t* srt );
 static void uta_rt_drop( route_table_t* rt );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups );
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
+static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
 static void* rtc( void* vctx );
 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
 
 static void* rtc( void* vctx );
 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );