#define _rmr_agnostic_h
#include <semaphore.h> // needed to support some structs
+#include <pthread.h>
typedef struct endpoint endpoint_t; // place holder for structs defined in nano/nng private.h
typedef struct uta_ctx uta_ctx_t;
#define ENV_LOG_HR "RMR_HR_LOG" // set to 0 to turn off human readable logging and write using some formatting
#define ENV_LOG_VLEVEL "RMR_LOG_VLEVEL" // set the verbosity level (0 == 0ff; 1 == crit .... 5 == debug )
#define ENV_CTL_PORT "RMR_CTL_PORT" // route collector will listen here for control messages (4561 default)
+#define ENV_RTREQ_FREA "RMR_RTREQ_FREQ" // frequency we will request route table updates when we want one (1-300 inclusive)
#define NO_FLAGS 0 // no flags to pass to a function
#define MFL_NOALLOC 0x02 // send should NOT allocate a new buffer before returning
#define MFL_ADDSRC 0x04 // source must be added on send
#define MFL_RAW 0x08 // message is 'raw' and not from an RMr based sender (no header)
+#define MFL_HUGE 0x10 // buffer was larger than applications indicated usual max; don't cache
#define MAX_EP_GROUP 32 // max number of endpoints in a group
#define MAX_RTG_MSG_SZ 2048 // max expected message size from route generator
#define DEF_COMM_PORT "tcp:4560" // default port we use for normal communications
#define DEF_RTG_WK_ADDR "routemgr:4561" // well known address for the route manager
#define DEF_TR_LEN (-1) // use default trace data len from context
+#define DEF_RTREQ_FREQ 5 // delay between route table requests
#define UNSET_SUBID (-1) // initial value on msg allocation indicating not set
#define UNSET_MSGTYPE (-1)
The route table.
*/
typedef struct {
- void* hash; // hash table.
+ int error; // set if there was a problem building the table
+ void* hash; // hash table for msg type and meid.
+ void* ephash; // hash for endpoint references
int updates; // counter of update records received
int mupdates; // counter of meid update records received
+ int ref_count; // num threads currently using
+ pthread_mutex_t* gate; // lock allowing update to ref counter
} route_table_t;
/*
#define RING_NONE 0 // no options
#define RING_RLOCK 0x01 // create/destroy the read lock on the ring
#define RING_WLOCK 0x02 // create/destroy the write lockk on the ring
+#define RING_FRLOCK 0x04 // read locking with no wait if locked option
+
+ // flag values
+#define RING_FL_FLOCK 0x01 // fast read lock (don't wait if locked when reading)
typedef struct ring {
uint16_t head; // index of the head of the ring (insert point)
uint16_t nelements; // number of elements in the ring
void** data; // the ring data (pointers to blobs of stuff)
int pfd; // event fd for the ring for epoll
+ int flags; // RING_FL_* constants
pthread_mutex_t* rgate; // read lock if used
pthread_mutex_t* wgate; // write lock if used
} ring_t;
static int uta_tokenise( char* buf, char** tokens, int max, char sep );
static int uta_rmip_tokenise( char* buf, if_addrs_t* iplist, char** toks, int max, char sep );
static char* uta_h2ip( char const* hname );
+#ifdef RTG_PUB
+// deprecated funciton -- step 1 of removal
static int uta_lookup_rtg( uta_ctx_t* ctx );
+#endif
static int uta_has_str( char const* buf, char const* str, char sep, int max );
static char* get_default_ip( if_addrs_t* iplist );
static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
-static endpoint_t* get_meid_owner( route_table_t *rt, char* meid );
-static char* uta_fib( char* fname );
-static route_table_t* uta_rt_init( );
-static route_table_t* uta_rt_clone( route_table_t* srt );
-static route_table_t* uta_rt_clone_all( route_table_t* srt );
+static endpoint_t* get_meid_owner( route_table_t *rt, char const* meid );
+static char* uta_fib( char const* fname );
+static route_table_t* uta_rt_init( uta_ctx_t* ctx );
+static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all );
static void uta_rt_drop( route_table_t* rt );
+static inline route_table_t* get_rt( uta_ctx_t* ctx );
static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group );
static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
static void read_static_rt( uta_ctx_t* ctx, int vlevel );
-static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel );
+static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all );
+static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf );
static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
+static void release_rt( uta_ctx_t* ctx, route_table_t* rt );
static void* rtc( void* vctx );
static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
// --------- route manager communications -----------------
-static void send_rt_ack( uta_ctx_t* ctx, char* table_id, int state, char* reason );
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* mbuf, char* table_id, int state, char* reason );
static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx );
+// -------- internal functions that can be referenced by common functions -------
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep );
+
+
#endif