#ifndef _rmr_agnostic_h
#define _rmr_agnostic_h
+#include <semaphore.h> // needed to support some structs
+
typedef struct endpoint endpoint_t; // place holder for structs defined in nano/nng private.h
typedef struct uta_ctx uta_ctx_t;
#define QUOTE_DEF(a) QUOTE(a) // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
+#define RT_SIZE 10009 // primary entries in route table (prime) meids hash through this so larger than expected # meids
+ // space deginations in the hash table
+#define RT_MT_SPACE 0 // (integer) message type as the key
+#define RT_NAME_SPACE 1 // enpoint name/address is the key
+#define RT_ME_SPACE 2 // message id is the key
+
#define RMR_MSG_VER 3 // message version this code was designed to handle
// environment variable names we'll suss out
-#define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
-#define ENV_RTG_PORT "RMR_RTG_SVC" // the port we'll listen on for rtg connections
-#define ENV_SEED_RT "RMR_SEED_RT" // where we expect to find the name of the seed route table
-#define ENV_RTG_RAW "RMR_RTG_ISRAW" // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
+#define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
+#define ENV_RTG_PORT "RMR_RTG_SVC" // the port we'll listen on for rtg connections (deprecated; see RTG_SVC and CTL_PORT)
+#define ENV_RTG_ADDR "RMR_RTG_SVC" // the address we will connect to for route manager updates
+#define ENV_SEED_RT "RMR_SEED_RT" // where we expect to find the name of the seed route table
+#define ENV_SEED_MEMAP "RMR_SEED_MEMAP" // where we expect to find the name of the seed route table
+#define ENV_RTG_RAW "RMR_RTG_ISRAW" // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
#define ENV_VERBOSE_FILE "RMR_VCTL_FILE" // file where vlevel may be managed for some (non-time critical) functions
-#define ENV_NAME_ONLY "RMR_SRC_NAMEONLY" // src in message is name only
+#define ENV_NAME_ONLY "RMR_SRC_NAMEONLY" // src in message is name only
+#define ENV_WARNINGS "RMR_WARNINGS" // if == 1 then we write some, non-performance impacting, warnings
+#define ENV_SRC_ID "RMR_SRC_ID" // forces this string (adding :port, max 63 ch) into the source field; host name used if not set
+#define ENV_LOG_HR "RMR_HR_LOG" // set to 0 to turn off human readable logging and write using some formatting
+#define ENV_LOG_VLEVEL "RMR_LOG_VLEVEL" // set the verbosity level (0 == 0ff; 1 == crit .... 5 == debug )
+#define ENV_CTL_PORT "RMR_CTL_PORT" // route collector will listen here for control messages (4561 default)
#define NO_FLAGS 0 // no flags to pass to a function
//#define IFL_....
#define CFL_MTC_ENABLED 0x01 // multi-threaded call is enabled
+#define CFL_NO_RTACK 0x02 // no route table ack needed when end received
+
+ // context flags
+#define CTXFL_WARN 0x01 // ok to warn on stderr for some things that shouldn't happen
// msg buffer flags
#define MFL_ZEROCOPY 0x01 // the message is an allocated zero copy message and can be sent.
#define MAX_CALL_ID 255 // largest call ID that is supported
//#define DEF_RTG_MSGID "" // default to pick up all messages from rtg
-#define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on
+#define DEF_CTL_PORT "4561" // default control port that rtc listens on
+#define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on (deprecated)
#define DEF_COMM_PORT "tcp:4560" // default port we use for normal communications
+#define DEF_RTG_WK_ADDR "routemgr:4561" // well known address for the route manager
#define DEF_TR_LEN (-1) // use default trace data len from context
#define UNSET_SUBID (-1) // initial value on msg allocation indicating not set
#define UNSET_MSGTYPE (-1)
+ // index values into the send counters for an enpoint
+#define EPSC_GOOD 0 // successful send
+#define EPSC_FAIL 1 // hard failurs
+#define EPSC_TRANS 2 // transient/soft faiures
+#define EPSC_SIZE 3 // number of counters
+
// -- header length/offset macros must ensure network conversion ----
#define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
#define RMR_TR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len1))
*/
typedef struct {
int32_t mtype; // message type ("long" network integer)
- int32_t plen; // payload length
+ int32_t plen; // payload length (sender data length in payload)
int32_t rmr_ver; // our internal message version number
unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
uint64_t key; // key used to reinsert this entry into a new symtab
int refs; // number of symtabs which reference the entry
int mtype; // the message type for this list
- int nrrgroups; // number of rr groups to send to
+ int nrrgroups; // number of rr groups to send to (if 0, the meid in a message determines endpoint)
rrgroup_t** rrgroups; // one or more set of endpoints to round robin messages to
} rtable_ent_t;
typedef struct {
void* hash; // hash table.
int updates; // counter of update records received
+ int mupdates; // counter of meid update records received
} route_table_t;
/*
// --------------- ring things -------------------------------------------------
+#define RING_NONE 0 // no options
+#define RING_RLOCK 0x01 // create/destroy the read lock on the ring
+#define RING_WLOCK 0x02 // create/destroy the write lockk on the ring
+
typedef struct ring {
uint16_t head; // index of the head of the ring (insert point)
uint16_t tail; // index of the tail (extract point)
uint16_t nelements; // number of elements in the ring
void** data; // the ring data (pointers to blobs of stuff)
+ int pfd; // event fd for the ring for epoll
+ pthread_mutex_t* rgate; // read lock if used
+ pthread_mutex_t* wgate; // write lock if used
} ring_t;
//---- tools ----------------------------------
static int has_myip( char const* buf, if_addrs_t* list, char sep, int max );
static int uta_tokenise( char* buf, char** tokens, int max, char sep );
+static int uta_rmip_tokenise( char* buf, if_addrs_t* iplist, char** toks, int max, char sep );
static char* uta_h2ip( char const* hname );
static int uta_lookup_rtg( uta_ctx_t* ctx );
static int uta_has_str( char const* buf, char const* str, char sep, int max );
// --- message ring --------------------------
static void* uta_mk_ring( int size );
+static int uta_ring_config( void* vr, int options );
static void uta_ring_free( void* vr );
static inline void* uta_ring_extract( void* vr );
static inline int uta_ring_insert( void* vr, void* new_data );
static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
+static endpoint_t* get_meid_owner( route_table_t *rt, char* meid );
static char* uta_fib( char* fname );
static route_table_t* uta_rt_init( );
static route_table_t* uta_rt_clone( route_table_t* srt );
static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
static void read_static_rt( uta_ctx_t* ctx, int vlevel );
-static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
+static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel );
static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
static void* rtc( void* vctx );
static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
+// --------- route manager communications -----------------
+static void send_rt_ack( uta_ctx_t* ctx, char* table_id, int state, char* reason );
+static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx );
+
#endif