/*
memset( msg->tp_buf, 0, mlen ); // NOT for production (debug only) valgrind will complain about uninitalised use if we don't set
- memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // NOT for production -- debugging eyecatcher
+ memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TPHDR_LEN ); // NOT for production -- debugging eyecatcher
*/
alen = (int *) msg->tp_buf;
*alen = mlen; // FIX ME: need a stuct to go in these first bytes, not just dummy len
//SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
}
msg->len = 0; // length of data in the payload
+ msg->cookie = 0x4942;
msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header)
msg->sub_id = UNSET_SUBID;
msg->mtype = UNSET_MSGTYPE;
memset( msg, 0, sizeof( *msg ) );
+ msg->cookie = 0x4942;
msg->sub_id = UNSET_SUBID;
msg->mtype = UNSET_MSGTYPE;
msg->tp_buf = NULL;
return NULL;
}
-/*
- Receives a 'raw' message from a non-RMr sender (no header expected). The returned
- message buffer cannot be used to send, and the length information may or may
- not be correct (it is set to the length received which might be more than the
- bytes actually in the payload).
-
- Mostly this supports the route table collector, but could be extended with an
- API external function.
-*/
-static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
- return NULL;
-/*
-FIXME: do we need this in the SI world? The only user was the route table collector
- int state;
- rmr_mbuf_t* msg = NULL; // msg received
- size_t rsize; // nng needs to write back the size received... grrr
-
- if( old_msg ) {
- msg = old_msg;
- } else {
- msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
- }
-
- //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
- if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
- return msg;
- }
- rsize = nng_msg_len( msg->tp_buf );
-
- // do NOT use ref_tpbuf() here! Must fill these in manually.
- msg->header = nng_msg_body( msg->tp_buf );
- msg->len = rsize; // len is the number of bytes received
- msg->alloc_len = rsize;
- msg->mtype = UNSET_MSGTYPE; // raw message has no type
- msg->sub_id = UNSET_SUBID; // nor a subscription id
- msg->state = RMR_OK;
- msg->flags = MFL_RAW;
- msg->payload = msg->header; // payload is the whole thing; no header
- msg->xaction = NULL;
-
- if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
-
- return msg;
-*/
-}
-
/*
This does the hard work of actually sending the message to the given socket. On success,
a new message struct is returned. On error, the original msg is returned with the state
msg->state = RMR_OK;
do {
tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths
+ if( tot_len > msg->alloc_len ) {
+ tot_len = msg->alloc_len; // likely bad length from user :(
+ }
*((int*) msg->tp_buf) = tot_len;
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
send_again = 1; // force loop entry
group = 0; // always start with group 0
while( send_again ) {
- sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx ); // select endpt from rr group and set again if more groups
+ sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );