Remove bad includes from SI code
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs SI95).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 static void dump_40( char *p, char* label ) {
35         int i;
36
37         if( label )
38                 fprintf( stderr, ">>>>> %s p=%p\n", label, p );
39
40         for( i = 0; i < 40; i++ ) {
41                 fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
42         }
43         fprintf( stderr, "\n" );
44 }
45
46 /*
47         Translates the nng state passed in to one of ours that is suitable to put
48         into the message, and sets errno to something that might be useful.
49         If we don't have a specific RMr state, then we return the default (e.g.
50         receive failed).
51
52         The addition of the connection shut error code to the switch requires
53         that the NNG version at commit e618abf8f3db2a94269a (or after) be
54         used for compiling RMR. 
55 */
56 static inline int xlate_si_state( int state, int def_state ) {
57
58         switch( state ) {
59                 case SI_ERR_NONE:
60                         errno = 0;
61                         state = RMR_OK;
62                         break;
63
64                 default:
65                         state = def_state;
66                         errno = EAGAIN;
67                         break;
68
69                 case SI_ERR_QUEUED:
70                 case SI_ERR_TPORT:
71                 case SI_ERR_UPORT:
72                 case SI_ERR_FORK:
73                 case SI_ERR_HANDLE:
74                 case SI_ERR_SESSID:
75                 case SI_ERR_TP:
76                 case SI_ERR_SHUTD:
77                 case SI_ERR_NOFDS:
78                 case SI_ERR_SIGUSR1:
79                 case SI_ERR_SIGUSR2:
80                 case SI_ERR_DISC:
81                 case SI_ERR_TIMEOUT:
82                 case SI_ERR_ORDREL:
83                 case SI_ERR_SIGALRM:
84                 case SI_ERR_NOMEM:
85                 case SI_ERR_ADDR:
86                         errno  = ENOTSUP;
87                         state = def_state;
88                         break;
89         }
90
91         return state;
92 }
93
94 /*
95         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
96         a new message struct as well. Size is the size of the zc buffer to allocate (not
97         including our header). If size is 0, then the buffer allocated is the size previously
98         allocated (if msg is !nil) or the default size given at initialisation).
99
100         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
101         the context value is used.
102
103         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
104                 are zero copy, however ONLY the message is zero copy. We now allocate and use
105                 nng messages.
106 */
107 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
108         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
109         uta_mhdr_t*     hdr;                    // convenience pointer
110         int                     tr_len;                 // trace data len (default or override)
111         int*            alen;                   // convenience pointer to set allocated len
112
113         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
114
115         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
116         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
117         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
118
119         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
120                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
121                 if( msg == NULL ) {
122                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
123                         return NULL;                                                            // we used to exit -- that seems wrong
124                 }
125                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
126         } else {                                                                // user message or message from the ring
127                 if( mlen > msg->alloc_len ) {           // current allocation is too small
128                         msg->alloc_len = 0;                             // force tp_buffer realloc below
129                         if( msg->tp_buf ) {
130                                 free( msg->tp_buf );
131                         }
132                 } else {
133                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
134                 }
135         }
136
137         msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
138
139         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
140                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
141                 abort( );                                                                                       // toss out a core file for this
142         }
143
144 /*
145         memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
146         memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );                // NOT for production -- debugging eyecatcher
147 */
148         alen = (int *) msg->tp_buf;
149         *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
150
151         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
152         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
153         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
154                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
155                 hdr->sub_id = htonl( UNSET_SUBID );
156                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
157                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
158                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
159                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
160         }
161         msg->len = 0;                                                                                   // length of data in the payload
162         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
163         msg->sub_id = UNSET_SUBID;
164         msg->mtype = UNSET_MSGTYPE;
165         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
166         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
167         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
168         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
169         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
170         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
171         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
172
173         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
174
175         return msg;
176 }
177
178 /*
179         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
180         transport receive should allocate that on its own.
181 */
182 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
183         size_t  mlen;
184         uta_mhdr_t* hdr;                        // convenience pointer
185         rmr_mbuf_t* msg;
186
187         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
188                 if( msg->tp_buf ) {
189                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
190                 }
191         } else {
192                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
193                         fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" );
194                         return NULL;                                                    // this used to exit, but that seems wrong
195                 }
196         }
197
198         memset( msg, 0, sizeof( *msg ) );
199
200         msg->sub_id = UNSET_SUBID;
201         msg->mtype = UNSET_MSGTYPE;
202         msg->tp_buf = NULL;
203         msg->header = NULL;
204         msg->len = -1;                                                                                  // no payload; invalid len
205         msg->alloc_len = 0;
206         msg->payload = NULL;
207         msg->xaction = NULL;
208         msg->state = state;
209         msg->flags = 0;
210         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
211
212         return msg;
213 }
214
215 /*
216         This accepts a message with the assumption that only the tp_buf pointer is valid. It
217         sets all of the various header/payload/xaction pointers in the mbuf to the proper
218         spot in the transport layer buffer.  The len in the header is assumed to be the
219         allocated len (a receive buffer that nng created);
220
221         The alen parm is the assumed allocated length; assumed because it's a value likely
222         to have come from si receive and the actual alloc len might be larger, but we
223         can only assume this is the total usable space. Because we are managing a transport
224         header in the first n bytes of the real msg, we must adjust this length down by the
225         size of the tp header (for testing 50 bytes, but this should change to a struct if
226         we adopt this interface).
227
228         This function returns the message with an error state set if it detects that the
229         received message might have been truncated.  Check is done here as the calculation
230         is somewhat based on header version.
231 */
232 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
233         uta_mhdr_t* hdr = NULL;                 // current header
234         uta_v1mhdr_t* v1hdr;                    // version 1 header
235         int ver;
236         int     hlen;                                           // header len to use for a truncation check
237
238         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
239
240         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
241
242         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
243                 ver = 1;
244                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
245         } else {
246                 ver = ntohl( v1hdr->rmr_ver );
247         }
248
249         switch( ver ) {
250                 case 1:
251                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
252                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
253                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
254
255                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
256                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
257                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
258                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
259                         msg->state = RMR_OK;
260                         hlen = sizeof( uta_v1mhdr_t );
261                         break;
262
263                 default:                                                                                                        // current version always lands here
264                         hdr = (uta_mhdr_t *) msg->header;
265                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
266                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
267
268                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
269                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
270                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
271                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
272                         msg->sub_id = ntohl( hdr->sub_id );
273                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
274                         break;
275         }
276
277         if( msg->len > (msg->alloc_len - hlen ) ) {
278                 msg->state = RMR_ERR_TRUNC;
279                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
280         } else {
281                 msg->state = RMR_OK;
282         }
283 }
284
285 /*
286         This will clone a message into a new zero copy buffer and return the cloned message.
287 */
288 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
289         rmr_mbuf_t* nm;                 // new message buffer
290         size_t  mlen;
291         int state;
292         uta_mhdr_t* hdr;
293         uta_v1mhdr_t* v1hdr;
294
295         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
296         if( nm == NULL ) {
297                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
298                 exit( 1 );
299         }
300         memset( nm, 0, sizeof( *nm ) );
301
302         mlen = old_msg->alloc_len;                                                                              // length allocated before
303         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
304                 fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
305                 abort();
306         }
307
308         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
309         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
310         switch( ntohl( v1hdr->rmr_ver ) ) {
311                 case 1:
312                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
313                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
314                         break;
315
316                 default:                                                                                        // current message always caught  here
317                         hdr = nm->header;
318                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
319                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
320                         break;
321         }
322
323         // --- these are all version agnostic -----------------------------------
324         nm->mtype = old_msg->mtype;
325         nm->sub_id = old_msg->sub_id;
326         nm->len = old_msg->len;                                                                 // length of data in the payload
327         nm->alloc_len = mlen;                                                                   // length of allocated payload
328
329         nm->xaction = hdr->xid;                                                                 // reference xaction
330         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
331         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
332         memcpy( nm->payload, old_msg->payload, old_msg->len );
333
334         return nm;
335 }
336
337 /*
338         This will clone a message with a change to the trace area in the header such that
339         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
340         The orignal message will be left unchanged, and a pointer to the new message is returned.
341         It is not possible to realloc buffers and change the data sizes.
342 */
343 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
344         rmr_mbuf_t* nm;                 // new message buffer
345         size_t  mlen;
346         int state;
347         uta_mhdr_t* hdr;
348         uta_v1mhdr_t* v1hdr;
349         int     tr_old_len;                     // tr size in new buffer
350         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
351         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
352
353         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
354         if( nm == NULL ) {
355                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
356                 exit( 1 );
357         }
358         memset( nm, 0, sizeof( *nm ) );
359
360         hdr = old_msg->header;
361         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
362
363         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
364         if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
365
366         tpb_len = mlen + TP_HDR_LEN;
367         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
368                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
369                 exit( 1 );
370         }
371         memset( nm->tp_buf, 0, tpb_len );
372         memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
373         alen = (int *) nm->tp_buf;
374         *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
375
376         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
377
378         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
379         switch( ntohl( v1hdr->rmr_ver ) ) {
380                 case 1:
381                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
382                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
383                         break;
384
385                 default:                                                                                        // current message version always caught  here
386                         hdr = nm->header;
387                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
388                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
389
390                         if( RMR_D1_LEN( hdr )  ) {
391                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
392                         }
393                         if( RMR_D2_LEN( hdr )  ) {
394                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
395                         }
396
397                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
398                         break;
399         }
400
401         // --- these are all version agnostic -----------------------------------
402         nm->mtype = old_msg->mtype;
403         nm->sub_id = old_msg->sub_id;
404         nm->len = old_msg->len;                                                                 // length of data in the payload
405         nm->alloc_len = mlen;                                                                   // length of allocated payload
406
407         nm->xaction = hdr->xid;                                                                 // reference xaction
408         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
409         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
410         memcpy( nm->payload, old_msg->payload, old_msg->len );
411
412         return nm;
413 }
414
415 /*
416         Realloc the message such that the payload is at least payload_len bytes.  
417         The clone and copy options affect what portion of the original payload is copied to
418         the reallocated message, and whether or not the original payload is lost after the
419         reallocation process has finished.
420
421                 copy == true
422                 The entire payload from the original message will be coppied to the reallocated
423                 payload.
424
425                 copy == false
426                 Only the header (preserving return to sender information, message type, etc)
427                 is preserved after reallocation; the payload used lengrh is set to 0 and the
428                 payload is NOT initialised/cleared.
429
430                 clone == true
431                 The orignal message is preserved and a completely new message buffer and payload
432                 are allocated (even if the size given is the same). A pointer to the new message
433                 buffer is returned and it is the user application's responsibility to manage the
434                 old buffer (e.g. free when not needed).
435
436                 clone == false
437                 The old payload will be lost after reallocation. The message buffer pointer which
438                 is returned will likely reference the same structure (don't depend on that).
439                 
440
441         CAUTION:
442         If the message is not a message which was received, the mtype, sub-id, length values in the
443         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
444         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
445         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
446         settings and NOT from the copied RMR header in transport buffer.
447 */
448 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
449         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
450         size_t  mlen;
451         uta_mhdr_t* omhdr;              // old message header
452         int             tr_old_len;             // tr size in new buffer
453         int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
454         int             hdr_len = 0;    // length of RMR and transport headers in old msg
455         void*   old_tp_buf;             // pointer to the old tp buffer
456         int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
457         int             old_mt;                 // msg type and sub-id from the message passed in
458         int             old_sid;
459         int             old_len;
460         int             old_rfd;                // rts file descriptor from old message
461
462         if( old_msg == NULL || payload_len <= 0 ) {
463                 errno = EINVAL;
464                 return NULL;
465         }
466
467         old_mt = old_msg->mtype;                        // preserve mbuf info
468         old_sid = old_msg->sub_id;
469         old_len = old_msg->len;
470         old_rfd = old_msg->rts_fd;
471
472         old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
473
474         if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
475                 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
476                 return old_msg;
477         }
478
479         hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
480         old_tp_buf = old_msg->tp_buf;
481
482         if( clone ) {
483                 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: cloning message\n" );
484                 free_tp = 0;
485
486                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
487                 if( nm == NULL ) {
488                         fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
489                         return NULL;
490                 }
491                 memset( nm, 0, sizeof( *nm ) );
492                 nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
493         } else {
494                 nm = old_msg;
495         }
496
497         omhdr = old_msg->header;
498         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
499
500         if( DEBUG ) fprintf( stderr, "[DBUG] reallocate for payload increase. new message size: %d\n", (int) mlen );    
501         if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
502                 fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
503                 return NULL;
504         }
505
506         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
507         SET_HDR_LEN( nm->header );
508
509         if( copy ) {                                                                                                                            // if we need to copy the old payload too
510                 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
511                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
512         } else {                                                                                                                                        // just need to copy header
513                 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
514                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
515         }
516
517         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
518
519         if( !copy ) {
520                 nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
521                 nm->sub_id = -1;
522                 nm->len = 0;                                            // and len is 0
523         } else {
524                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
525                 nm->mtype = old_mt;
526                 nm->sub_id = old_sid;
527         }
528
529         if( free_tp ) {
530                 free( old_tp_buf );                             // we did not clone, so free b/c no references
531         }
532
533         return nm;
534 }
535
536 /*
537         For SI95 based transport all receives are driven through the threaded
538         ring and thus this function should NOT be called. If it is we will panic
539         and abort straight away.
540 */
541 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
542
543 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
544 exit( 1 );
545
546         return NULL;
547 }
548
549 /*
550         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
551         message buffer cannot be used to send, and the length information may or may
552         not be correct (it is set to the length received which might be more than the
553         bytes actually in the payload).
554
555         Mostly this supports the route table collector, but could be extended with an
556         API external function.
557 */
558 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
559         return NULL;
560 /*
561 FIXME: do we need this in the SI world?  The only user was the route table collector
562         int state;
563         rmr_mbuf_t*     msg = NULL;             // msg received
564         size_t  rsize;                          // nng needs to write back the size received... grrr
565
566         if( old_msg ) {
567                 msg = old_msg;
568         } else {
569                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
570         }
571
572         //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                        // blocks hard until received
573         if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
574                 return msg;
575         }
576         rsize = nng_msg_len( msg->tp_buf );
577
578         // do NOT use ref_tpbuf() here! Must fill these in manually.
579         msg->header = nng_msg_body( msg->tp_buf );
580         msg->len = rsize;                                                       // len is the number of bytes received
581         msg->alloc_len = rsize;
582         msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
583         msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
584         msg->state = RMR_OK;
585         msg->flags = MFL_RAW;
586         msg->payload = msg->header;                                     // payload is the whole thing; no header
587         msg->xaction = NULL;
588
589         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
590
591         return msg;
592 */
593 }
594
595 /*
596         This does the hard work of actually sending the message to the given socket. On success,
597         a new message struct is returned. On error, the original msg is returned with the state
598         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
599         buffer will not be allocated and returned (mostly for call() interal processing since
600         the return message from call() is a received buffer, not a new one).
601
602         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
603         validation has been done prior.
604
605         When msg->state is not ok, this function must set tp_state in the message as some API 
606         fucntions return the message directly and do not propigate errno into the message.
607 */
608 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
609         int state;
610         uta_mhdr_t*     hdr;
611         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
612         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
613         int tot_len;                                                    // total send length (hdr + user data + tp header)
614
615         // future: ensure that application did not overrun the XID buffer; last byte must be 0
616
617         hdr = (uta_mhdr_t *) msg->header;
618         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
619         hdr->sub_id = htonl( msg->sub_id );
620         hdr->plen = htonl( msg->len );
621         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
622
623         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
624                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
625                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
626         }
627
628         if( retries == 0 ) {
629                 spin_retries = 100;
630                 retries++;
631         }
632
633         errno = 0;
634         msg->state = RMR_OK;
635         do {
636                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
637                 *((int*) msg->tp_buf) = tot_len;
638
639                 if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
640                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
641
642                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
643                         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg:  error!! sent state=%d\n", state );
644                         msg->state = state;
645                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
646                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
647                                         retries--;
648                                         if( retries > 0 ) {                                     // only if we'll loop through again
649                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
650                                         }
651                                         spin_retries = 1000;
652                                 }
653                         } else {
654                                 state = 0;                      // don't loop
655                         }
656                 } else {
657                         if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state );
658                         state = 0;
659                         msg->state = RMR_OK;
660                         hdr = NULL;
661                 }
662         } while( state && retries > 0 );
663
664         if( msg->state == RMR_OK ) {                                                                    // successful send
665                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
666                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
667                 } else {
668                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
669                         return NULL;
670                 }
671         } else {                                                                                        // send failed -- return original message
672                 if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
673                         errno = EAGAIN;
674                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
675                 } else {
676                         msg->state = RMR_ERR_SENDFAILED;
677                 }
678
679                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
680         }
681
682         return msg;
683 }
684
685 /*
686         send message with maximum timeout.
687         Accept a message and send it to an endpoint based on message type.
688         If NNG reports that the send attempt timed out, or should be retried,
689         RMr will retry for approximately max_to microseconds; rounded to the next
690         higher value of 10.
691
692         Allocates a new message buffer for the next send. If a message type has
693         more than one group of endpoints defined, then the message will be sent
694         in round robin fashion to one endpoint in each group.
695
696         An endpoint will be looked up in the route table using the message type and
697         the subscription id. If the subscription id is "UNSET_SUBID", then only the
698         message type is used.  If the initial lookup, with a subid, fails, then a
699         second lookup using just the mtype is tried.
700
701         When msg->state is not OK, this function must set tp_state in the message as 
702         some API fucntions return the message directly and do not propigate errno into 
703         the message.
704
705         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
706                 it will return with an error and errno set to eagain. If the send is
707                 a limited fanout, then the returned status is the status of the last
708                 send attempt.
709
710 */
711 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
712         endpoint_t*     ep;                                     // end point that we're attempting to send to
713         rtable_ent_t*   rte;                    // the route table entry which matches the message key
714         int     nn_sock;                                        // endpoint socket (fd in si case) for send
715         uta_ctx_t*      ctx;
716         int                     group;                          // selected group to get socket for
717         int                     send_again;                     // true if the message must be sent again
718         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
719         int                     sock_ok;                        // got a valid socket from round robin select
720         char*           d1;
721         int                     ok_sends = 0;           // track number of ok sends
722
723         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
724                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
725                 if( msg != NULL ) {
726                         msg->state = RMR_ERR_BADARG;
727                         errno = EINVAL;                                                                                 // must ensure it's not eagain
728                         msg->tp_state = errno;
729                 }
730                 return msg;
731         }
732
733         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
734         if( msg->header == NULL ) {
735                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
736                 msg->state = RMR_ERR_NOHDR;
737                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
738                 msg->tp_state = errno;
739                 return msg;
740         }
741
742         if( max_to < 0 ) {
743                 max_to = ctx->send_retries;             // convert to retries
744         }
745
746         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
747                 if( ctx->flags & CTXFL_WARN ) {
748                         fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
749                 }
750                 msg->state = RMR_ERR_NOENDPT;
751                 errno = ENXIO;                                                                          // must ensure it's not eagain
752                 msg->tp_state = errno;
753                 return msg;                                                                                     // caller can resend (maybe) or free
754         }
755
756         send_again = 1;                                                                                 // force loop entry
757         group = 0;                                                                                              // always start with group 0
758         while( send_again ) {
759                 sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx );         // select endpt from rr group and set again if more groups
760
761                 if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
762                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
763
764                 group++;
765
766                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
767                         if( send_again ) {
768                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
769                                 if( clone_m == NULL ) {
770                                         msg->state = RMR_ERR_SENDFAILED;
771                                         errno = ENOMEM;
772                                         msg->tp_state = errno;
773                                         if( ctx->flags & CTXFL_WARN ) {
774                                                 fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
775                                         }
776                                         return msg;
777                                 }
778
779                                 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
780                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
781                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
782         
783                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
784                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
785                                         msg = clone_m;
786                                 } else {
787                                         ok_sends++;
788                                         msg = clone_m;                                                                          // clone will be the next to send
789                                 }
790                         } else {
791                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
792                                 if( DEBUG ) {
793                                         if( msg == NULL ) {
794                                                 fprintf( stderr, "[DBUG] mtosend_msg:  send returned nil message!\n" );         
795                                         }
796                                 }
797                         }
798
799                         if( ep != NULL && msg != NULL ) {
800                                 switch( msg->state ) {
801                                         case RMR_OK:
802                                                 ep->scounts[EPSC_GOOD]++;
803                                                 break;
804                                 
805                                         case RMR_ERR_RETRY:
806                                                 ep->scounts[EPSC_TRANS]++;
807                                                 break;
808
809                                         default:
810                                                 ep->scounts[EPSC_FAIL]++;
811                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
812                                                 break;
813                                 }
814                         }
815                 } else {
816 /*
817                         if( ctx->flags & CTXFL_WARN ) {
818                                 fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
819                         }
820 */
821                         msg->state = RMR_ERR_NOENDPT;
822                         errno = ENXIO;
823                 }
824         }
825
826         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
827                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
828                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
829                         msg->state = RMR_OK;
830                 }
831         
832                 if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
833         
834                 msg->tp_state = errno;
835         }
836
837         return msg;                                                                     // last message caries the status of last/only send attempt
838 }
839
840
841 /*
842         A generic wrapper to the real send to keep wormhole stuff agnostic.
843         We assume the wormhole function vetted the buffer so we don't have to.
844 */
845 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
846         return send_msg( ctx, msg, ep->nn_sock, -1 );
847 }
848
849 #endif