// : vi ts=4 sw=4 noet :
/*
==================================================================================
- Copyright (c) 2019 Nokia
+ Copyright (c) 2019 Nokia
Copyright (c) 2018-2019 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
#define QUOTE_DEF(a) QUOTE(a) // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
-#define RMR_MSG_VER 1 // potental to treat messages differently if from backlevel version
+#define RMR_MSG_VER 2 // message version this code was designed to handle
// environment variable names we'll suss out
-#define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
+#define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
#define ENV_RTG_PORT "RMR_RTG_SVC" // the port we'll listen on for rtg connections
#define ENV_SEED_RT "RMR_SEED_RT" // where we expect to find the name of the seed route table
#define ENV_RTG_RAW "RMR_RTG_ISRAW" // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
//#define DEF_RTG_MSGID "" // default to pick up all messages from rtg
#define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on
#define DEF_COMM_PORT "tcp:4560" // default port we use for normal communications
+#define DEF_TR_LEN (-1) // use default trace data len from context
+
+#define UNSET_SUBID (-1) // initial value on msg allocation indicating not set
+#define UNSET_MSGTYPE (-1)
+
+// -- header length/offset macros must ensure network conversion ----
+#define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
+#define RMR_TR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len1))
+#define RMR_D1_LEN(h) (ntohl(((uta_mhdr_t *)h)->len2))
+#define RMR_D2_LEN(h) (ntohl(((uta_mhdr_t *)h)->len3))
+
+#define TRACE_OFFSET(h) ((ntohl(((uta_mhdr_t *)h)->len0)))
+#define DATA1_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
+#define DATA2_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
+#define PAYLOAD_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
+
+#define TRACE_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0))
+#define DATA1_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
+#define DATA2_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
+#define PAYLOAD_ADDR(h) (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
+
+#define SET_HDR_LEN(h) (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t))) // convert to network byte order on insert
+#define SET_HDR_TR_LEN(h,l) (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
+#define SET_HDR_D1_LEN(h,l) (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
+#define SET_HDR_D2_LEN(h,l) (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
+
+
+#define V1_PAYLOAD_OFFSET(h) (sizeof(uta_v1mhdr_t))
+
+ // v2 header flags
+#define HFL_HAS_TRACE 0x01 // Trace data is populated
+#define HFL_SUBID 0x02 // subscription ID is populated
/*
Message header; interpreted by the other side, but never seen by
DANGER: Add new fields AT THE END of the struct. Adding them any where else
will break any code that is currently running.
+
+ The transport layer buffer allocated will be divided this way:
+ | RMr header | Trace data | data1 | data2 | User paylaod |
+
+ Len 0 is the length of the RMr header
+ Len 1 is the length of the trace data
+ Len 2 and 3 are lengths of data1 and data2 and are unused at the moment
+
+ To point at the payload, we take the address of the header and add all 4 lengths.
*/
typedef struct {
int32_t mtype; // message type ("long" network integer)
int32_t rmr_ver; // our internal message version number
unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
- unsigned char src[RMR_MAX_SRC]; // name of the sender (source)
+ unsigned char src[RMR_MAX_SRC]; // name:port of the sender (source)
unsigned char meid[RMR_MAX_MEID]; // managed element id.
struct timespec ts; // timestamp ???
+
+ // V2 extension
+ int32_t flags; // HFL_* constants
+ int32_t len0; // length of the RMr header data
+ int32_t len1; // length of the tracing data
+ int32_t len2; // length of data 1 (d1)
+ int32_t len3; // length of data 2 (d2)
+ int32_t sub_id; // subscription id (-1 invalid)
} uta_mhdr_t;
+
+typedef struct { // old (inflexible) v1 header
+ int32_t mtype; // message type ("long" network integer)
+ int32_t plen; // payload length
+ int32_t rmr_ver; // our internal message version number
+ unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
+ unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
+ unsigned char src[16]; // name of the sender (source) (old size was 16)
+ unsigned char meid[RMR_MAX_MEID]; // managed element id.
+ struct timespec ts; // timestamp ???
+} uta_v1mhdr_t;
+
/*
Round robin group.
*/
} route_table_t;
/*
- A wormhole is a direct connection between two endpoints that the user app can
+ A wormhole is a direct connection between two endpoints that the user app can
send to without message type based routing.
*/
typedef struct {
} if_addrs_t;
-
// --------------- ring things -------------------------------------------------
typedef struct ring {
uint16_t head; // index of the head of the ring (insert point)
// ----- route table generic static things ---------
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
static char* uta_fib( char* fname );
static route_table_t* uta_rt_clone( route_table_t* srt );
static void uta_rt_drop( route_table_t* rt );
static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group );
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups );
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
static void read_static_rt( uta_ctx_t* ctx, int vlevel );
static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
+static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
static void* rtc( void* vctx );
static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );