#define QUOTE_DEF(a) QUOTE(a) // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
-#define RMR_MSG_VER 2 // message version this code was designed to handle
+#define RMR_MSG_VER 3 // message version this code was designed to handle
- // environment variable names we'll suss out
-#define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
-#define ENV_RTG_PORT "RMR_RTG_SVC" // the port we'll listen on for rtg connections
-#define ENV_SEED_RT "RMR_SEED_RT" // where we expect to find the name of the seed route table
-#define ENV_RTG_RAW "RMR_RTG_ISRAW" // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
+ // environment variable names we'll suss out
+#define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
+#define ENV_RTG_PORT "RMR_RTG_SVC" // the port we'll listen on for rtg connections
+#define ENV_SEED_RT "RMR_SEED_RT" // where we expect to find the name of the seed route table
+#define ENV_RTG_RAW "RMR_RTG_ISRAW" // if > 0 we expect route table gen messages as raw (not sent from an RMr application)
#define ENV_VERBOSE_FILE "RMR_VCTL_FILE" // file where vlevel may be managed for some (non-time critical) functions
+#define ENV_NAME_ONLY "RMR_SRC_NAMEONLY" // src in message is name only
#define NO_FLAGS 0 // no flags to pass to a function
// internal flags, must be > than UFLAG_MASK
//#define IFL_....
+#define CFL_MTC_ENABLED 0x01 // multi-threaded call is enabled
+
// msg buffer flags
#define MFL_ZEROCOPY 0x01 // the message is an allocated zero copy message and can be sent.
#define MFL_NOALLOC 0x02 // send should NOT allocate a new buffer before returning
#define MAX_EP_GROUP 32 // max number of endpoints in a group
#define MAX_RTG_MSG_SZ 2048 // max expected message size from route generator
+#define MAX_CALL_ID 255 // largest call ID that is supported
//#define DEF_RTG_MSGID "" // default to pick up all messages from rtg
#define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on
#define RMR_D1_LEN(h) (ntohl(((uta_mhdr_t *)h)->len2))
#define RMR_D2_LEN(h) (ntohl(((uta_mhdr_t *)h)->len3))
+// CAUTION: if using an offset with a header pointer, the pointer MUST be cast to void* before adding the offset!
#define TRACE_OFFSET(h) ((ntohl(((uta_mhdr_t *)h)->len0)))
#define DATA1_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
#define DATA2_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
#define SET_HDR_D1_LEN(h,l) (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
#define SET_HDR_D2_LEN(h,l) (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
+#define HDR_VERSION(h) htonl((((uta_mhdr_t *)h)->rmr_ver))
+
+ // index of things in the d1 data space
+#define D1_CALLID_IDX 0 // the call-id to match on return
+
+#define NO_CALL_ID 0 // no call id associated with the message (normal queue)
#define V1_PAYLOAD_OFFSET(h) (sizeof(uta_v1mhdr_t))
// v2 header flags
#define HFL_HAS_TRACE 0x01 // Trace data is populated
#define HFL_SUBID 0x02 // subscription ID is populated
+#define HFL_CALL_MSG 0x04 // msg sent via blocking call
/*
Message header; interpreted by the other side, but never seen by
int32_t len2; // length of data 1 (d1)
int32_t len3; // length of data 2 (d2)
int32_t sub_id; // subscription id (-1 invalid)
+
+ // v3 extension
+ unsigned char srcip[RMR_MAX_SRC]; // ip address and port of the source
} uta_mhdr_t;
int32_t plen; // payload length
int32_t rmr_ver; // our internal message version number
unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
- unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
+ unsigned char sid[RMR_MAX_SID]; // misc sender info/data
unsigned char src[16]; // name of the sender (source) (old size was 16)
unsigned char meid[RMR_MAX_MEID]; // managed element id.
struct timespec ts; // timestamp ???
Round robin group.
*/
typedef struct {
- int ep_idx; // next endpoint to send to
+ uint16_t ep_idx; // next endpoint to send to
int nused; // number of endpoints in the list
int nendpts; // number allocated
endpoint_t **epts; // the list of endpoints that we RR over
} ring_t;
+// --------- multi-threaded call things -----------------------------------------
+/*
+ A chute provides a return path for a received message that a thread has blocked
+ on. The receive thread will set the mbuf pointer and tickler the barrier to
+ signal to the call thread that data is ready.
+*/
+typedef struct chute {
+ rmr_mbuf_t* mbuf; // pointer to message buffer received
+ sem_t barrier; // semaphore that the thread is waiting on
+ unsigned char expect[RMR_MAX_XID]; // the expected transaction ID
+} chute_t;
+
// -------------- common static prototypes --------------------------------------
//---- tools ----------------------------------
static int has_myip( char const* buf, if_addrs_t* list, char sep, int max );
static int uta_tokenise( char* buf, char** tokens, int max, char sep );
+static int uta_rmip_tokenise( char* buf, if_addrs_t* iplist, char** toks, int max, char sep );
static char* uta_h2ip( char const* hname );
static int uta_lookup_rtg( uta_ctx_t* ctx );
static int uta_has_str( char const* buf, char const* str, char sep, int max );
+static char* get_default_ip( if_addrs_t* iplist );
// --- message ring --------------------------
static void* uta_mk_ring( int size );