Fix missing realloc function in SI
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40
41 static void dump_40( char *p, char* label ) {
42         int i;
43
44         if( label )
45                 fprintf( stderr, ">>>>> %s p=%p\n", label, p );
46
47         for( i = 0; i < 40; i++ ) {
48                 fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
49         }
50         fprintf( stderr, "\n" );
51 }
52
53 /*
54         Translates the nng state passed in to one of ours that is suitable to put
55         into the message, and sets errno to something that might be useful.
56         If we don't have a specific RMr state, then we return the default (e.g.
57         receive failed).
58
59         The addition of the connection shut error code to the switch requires
60         that the NNG version at commit e618abf8f3db2a94269a (or after) be
61         used for compiling RMR. 
62 */
63 static inline int xlate_si_state( int state, int def_state ) {
64
65         switch( state ) {
66                 case SI_ERR_NONE:
67                         errno = 0;
68                         state = RMR_OK;
69                         break;
70
71                 default:
72                         state = def_state;
73                         errno = EAGAIN;
74                         break;
75
76                 case SI_ERR_QUEUED:
77                 case SI_ERR_TPORT:
78                 case SI_ERR_UPORT:
79                 case SI_ERR_FORK:
80                 case SI_ERR_HANDLE:
81                 case SI_ERR_SESSID:
82                 case SI_ERR_TP:
83                 case SI_ERR_SHUTD:
84                 case SI_ERR_NOFDS:
85                 case SI_ERR_SIGUSR1:
86                 case SI_ERR_SIGUSR2:
87                 case SI_ERR_DISC:
88                 case SI_ERR_TIMEOUT:
89                 case SI_ERR_ORDREL:
90                 case SI_ERR_SIGALRM:
91                 case SI_ERR_NOMEM:
92                 case SI_ERR_ADDR:
93                         errno  = ENOTSUP;
94                         state = def_state;
95                         break;
96         }
97
98         return state;
99 }
100
101 /*
102         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
103         a new message struct as well. Size is the size of the zc buffer to allocate (not
104         including our header). If size is 0, then the buffer allocated is the size previously
105         allocated (if msg is !nil) or the default size given at initialisation).
106
107         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
108         the context value is used.
109
110         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
111                 are zero copy, however ONLY the message is zero copy. We now allocate and use
112                 nng messages.
113 */
114 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
115         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
116         uta_mhdr_t*     hdr;                    // convenience pointer
117         int                     tr_len;                 // trace data len (default or override)
118         int*            alen;                   // convenience pointer to set allocated len
119
120         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
121
122         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
123         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
124         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
125
126         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
127                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
128                 if( msg == NULL ) {
129                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
130                         return NULL;                                                            // we used to exit -- that seems wrong
131                 }
132                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
133         } else {                                                                // user message or message from the ring
134                 if( mlen > msg->alloc_len ) {           // current allocation is too small
135                         msg->alloc_len = 0;                             // force tp_buffer realloc below
136                         if( msg->tp_buf ) {
137                                 free( msg->tp_buf );
138                         }
139                 } else {
140                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
141                 }
142         }
143
144         msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
145
146         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
147                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
148                 abort( );                                                                                       // toss out a core file for this
149         }
150
151 /*
152         memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
153         memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );                // NOT for production -- debugging eyecatcher
154 */
155         alen = (int *) msg->tp_buf;
156         *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
157
158         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
159         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
160         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
161                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
162                 hdr->sub_id = htonl( UNSET_SUBID );
163                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
164                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
165                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
166                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
167         }
168         msg->len = 0;                                                                                   // length of data in the payload
169         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
170         msg->sub_id = UNSET_SUBID;
171         msg->mtype = UNSET_MSGTYPE;
172         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
173         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
174         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
175         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
176         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
177         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
178         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
179
180         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
181
182         return msg;
183 }
184
185 /*
186         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
187         transport receive should allocate that on its own.
188 */
189 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
190         size_t  mlen;
191         uta_mhdr_t* hdr;                        // convenience pointer
192         rmr_mbuf_t* msg;
193
194         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
195                 if( msg->tp_buf ) {
196                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
197                 }
198         } else {
199                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
200                         fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" );
201                         return NULL;                                                    // this used to exit, but that seems wrong
202                 }
203         }
204
205         memset( msg, 0, sizeof( *msg ) );
206
207         msg->sub_id = UNSET_SUBID;
208         msg->mtype = UNSET_MSGTYPE;
209         msg->tp_buf = NULL;
210         msg->header = NULL;
211         msg->len = -1;                                                                                  // no payload; invalid len
212         msg->alloc_len = 0;
213         msg->payload = NULL;
214         msg->xaction = NULL;
215         msg->state = state;
216         msg->flags = 0;
217         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
218
219         return msg;
220 }
221
222 /*
223         This accepts a message with the assumption that only the tp_buf pointer is valid. It
224         sets all of the various header/payload/xaction pointers in the mbuf to the proper
225         spot in the transport layer buffer.  The len in the header is assumed to be the
226         allocated len (a receive buffer that nng created);
227
228         The alen parm is the assumed allocated length; assumed because it's a value likely
229         to have come from si receive and the actual alloc len might be larger, but we
230         can only assume this is the total usable space. Because we are managing a transport
231         header in the first n bytes of the real msg, we must adjust this length down by the
232         size of the tp header (for testing 50 bytes, but this should change to a struct if
233         we adopt this interface).
234
235         This function returns the message with an error state set if it detects that the
236         received message might have been truncated.  Check is done here as the calculation
237         is somewhat based on header version.
238 */
239 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
240         uta_mhdr_t* hdr = NULL;                 // current header
241         uta_v1mhdr_t* v1hdr;                    // version 1 header
242         int ver;
243         int     hlen;                                           // header len to use for a truncation check
244
245         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
246
247         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
248
249         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
250                 ver = 1;
251                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
252         } else {
253                 ver = ntohl( v1hdr->rmr_ver );
254         }
255
256         switch( ver ) {
257                 case 1:
258                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
259                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
260                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
261
262                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
263                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
264                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
265                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
266                         msg->state = RMR_OK;
267                         hlen = sizeof( uta_v1mhdr_t );
268                         break;
269
270                 default:                                                                                                        // current version always lands here
271                         hdr = (uta_mhdr_t *) msg->header;
272                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
273                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
274
275                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
276                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
277                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
278                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
279                         msg->sub_id = ntohl( hdr->sub_id );
280                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
281                         break;
282         }
283
284         if( msg->len > (msg->alloc_len - hlen ) ) {
285                 msg->state = RMR_ERR_TRUNC;
286                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
287         } else {
288                 msg->state = RMR_OK;
289         }
290 }
291
292 /*
293         This will clone a message into a new zero copy buffer and return the cloned message.
294 */
295 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
296         rmr_mbuf_t* nm;                 // new message buffer
297         size_t  mlen;
298         int state;
299         uta_mhdr_t* hdr;
300         uta_v1mhdr_t* v1hdr;
301
302         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
303         if( nm == NULL ) {
304                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
305                 exit( 1 );
306         }
307         memset( nm, 0, sizeof( *nm ) );
308
309         mlen = old_msg->alloc_len;                                                                              // length allocated before
310         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
311                 fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
312                 abort();
313         }
314
315         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
316         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
317         switch( ntohl( v1hdr->rmr_ver ) ) {
318                 case 1:
319                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
320                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
321                         break;
322
323                 default:                                                                                        // current message always caught  here
324                         hdr = nm->header;
325                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
326                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
327                         break;
328         }
329
330         // --- these are all version agnostic -----------------------------------
331         nm->mtype = old_msg->mtype;
332         nm->sub_id = old_msg->sub_id;
333         nm->len = old_msg->len;                                                                 // length of data in the payload
334         nm->alloc_len = mlen;                                                                   // length of allocated payload
335
336         nm->xaction = hdr->xid;                                                                 // reference xaction
337         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
338         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
339         memcpy( nm->payload, old_msg->payload, old_msg->len );
340
341         return nm;
342 }
343
344 /*
345         This will clone a message with a change to the trace area in the header such that
346         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
347         The orignal message will be left unchanged, and a pointer to the new message is returned.
348         It is not possible to realloc buffers and change the data sizes.
349 */
350 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
351         rmr_mbuf_t* nm;                 // new message buffer
352         size_t  mlen;
353         int state;
354         uta_mhdr_t* hdr;
355         uta_v1mhdr_t* v1hdr;
356         int     tr_old_len;                     // tr size in new buffer
357         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
358         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
359
360         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
361         if( nm == NULL ) {
362                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
363                 exit( 1 );
364         }
365         memset( nm, 0, sizeof( *nm ) );
366
367         hdr = old_msg->header;
368         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
369
370         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
371         if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
372
373         tpb_len = mlen + TP_HDR_LEN;
374         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
375                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
376                 exit( 1 );
377         }
378         memset( nm->tp_buf, 0, tpb_len );
379         memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
380         alen = (int *) nm->tp_buf;
381         *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
382
383         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
384
385         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
386         switch( ntohl( v1hdr->rmr_ver ) ) {
387                 case 1:
388                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
389                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
390                         break;
391
392                 default:                                                                                        // current message version always caught  here
393                         hdr = nm->header;
394                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
395                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
396
397                         if( RMR_D1_LEN( hdr )  ) {
398                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
399                         }
400                         if( RMR_D2_LEN( hdr )  ) {
401                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
402                         }
403
404                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
405                         break;
406         }
407
408         // --- these are all version agnostic -----------------------------------
409         nm->mtype = old_msg->mtype;
410         nm->sub_id = old_msg->sub_id;
411         nm->len = old_msg->len;                                                                 // length of data in the payload
412         nm->alloc_len = mlen;                                                                   // length of allocated payload
413
414         nm->xaction = hdr->xid;                                                                 // reference xaction
415         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
416         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
417         memcpy( nm->payload, old_msg->payload, old_msg->len );
418
419         return nm;
420 }
421
422 /*
423         Realloc the message such that the payload is at least payload_len bytes.  
424         The clone and copy options affect what portion of the original payload is copied to
425         the reallocated message, and whether or not the original payload is lost after the
426         reallocation process has finished.
427
428                 copy == true
429                 The entire payload from the original message will be coppied to the reallocated
430                 payload.
431
432                 copy == false
433                 Only the header (preserving return to sender information, message type, etc)
434                 is preserved after reallocation; the payload used lengrh is set to 0 and the
435                 payload is NOT initialised/cleared.
436
437                 clone == true
438                 The orignal message is preserved and a completely new message buffer and payload
439                 are allocated (even if the size given is the same). A pointer to the new message
440                 buffer is returned and it is the user application's responsibility to manage the
441                 old buffer (e.g. free when not needed).
442
443                 clone == false
444                 The old payload will be lost after reallocation. The message buffer pointer which
445                 is returned will likely reference the same structure (don't depend on that).
446                 
447
448         CAUTION:
449         If the message is not a message which was received, the mtype, sub-id, length values in the
450         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
451         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
452         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
453         settings and NOT from the copied RMR header in transport buffer.
454 */
455 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
456         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
457         size_t  mlen;
458         uta_mhdr_t* omhdr;              // old message header
459         int             tr_old_len;             // tr size in new buffer
460         int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
461         int             hdr_len = 0;    // length of RMR and transport headers in old msg
462         void*   old_tp_buf;             // pointer to the old tp buffer
463         int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
464         int             old_mt;                 // msg type and sub-id from the message passed in
465         int             old_sid;
466         int             old_len;
467         int             old_rfd;                // rts file descriptor from old message
468
469         if( old_msg == NULL || payload_len <= 0 ) {
470                 errno = EINVAL;
471                 return NULL;
472         }
473
474         old_mt = old_msg->mtype;                        // preserve mbuf info
475         old_sid = old_msg->sub_id;
476         old_len = old_msg->len;
477         old_rfd = old_msg->rts_fd;
478
479         old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
480
481         if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
482                 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
483                 return old_msg;
484         }
485
486         hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
487         old_tp_buf = old_msg->tp_buf;
488
489         if( clone ) {
490                 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: cloning message\n" );
491                 free_tp = 0;
492
493                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
494                 if( nm == NULL ) {
495                         fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
496                         return NULL;
497                 }
498                 memset( nm, 0, sizeof( *nm ) );
499                 nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
500         } else {
501                 nm = old_msg;
502         }
503
504         omhdr = old_msg->header;
505         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
506
507         if( DEBUG ) fprintf( stderr, "[DBUG] reallocate for payload increase. new message size: %d\n", (int) mlen );    
508         if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
509                 fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
510                 return NULL;
511         }
512
513         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
514         SET_HDR_LEN( nm->header );
515
516         if( copy ) {                                                                                                                            // if we need to copy the old payload too
517                 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
518                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
519         } else {                                                                                                                                        // just need to copy header
520                 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
521                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
522         }
523
524         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
525
526         if( !copy ) {
527                 nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
528                 nm->sub_id = -1;
529                 nm->len = 0;                                            // and len is 0
530         } else {
531                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
532                 nm->mtype = old_mt;
533                 nm->sub_id = old_sid;
534         }
535
536         if( free_tp ) {
537                 free( old_tp_buf );                             // we did not clone, so free b/c no references
538         }
539
540         return nm;
541 }
542
543 /*
544         For SI95 based transport all receives are driven through the threaded
545         ring and thus this function should NOT be called. If it is we will panic
546         and abort straight away.
547 */
548 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
549
550 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
551 exit( 1 );
552
553         return NULL;
554 }
555
556 /*
557         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
558         message buffer cannot be used to send, and the length information may or may
559         not be correct (it is set to the length received which might be more than the
560         bytes actually in the payload).
561
562         Mostly this supports the route table collector, but could be extended with an
563         API external function.
564 */
565 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
566         return NULL;
567 /*
568 FIXME: do we need this in the SI world?  The only user was the route table collector
569         int state;
570         rmr_mbuf_t*     msg = NULL;             // msg received
571         size_t  rsize;                          // nng needs to write back the size received... grrr
572
573         if( old_msg ) {
574                 msg = old_msg;
575         } else {
576                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
577         }
578
579         //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                        // blocks hard until received
580         if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
581                 return msg;
582         }
583         rsize = nng_msg_len( msg->tp_buf );
584
585         // do NOT use ref_tpbuf() here! Must fill these in manually.
586         msg->header = nng_msg_body( msg->tp_buf );
587         msg->len = rsize;                                                       // len is the number of bytes received
588         msg->alloc_len = rsize;
589         msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
590         msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
591         msg->state = RMR_OK;
592         msg->flags = MFL_RAW;
593         msg->payload = msg->header;                                     // payload is the whole thing; no header
594         msg->xaction = NULL;
595
596         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
597
598         return msg;
599 */
600 }
601
602 /*
603         This does the hard work of actually sending the message to the given socket. On success,
604         a new message struct is returned. On error, the original msg is returned with the state
605         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
606         buffer will not be allocated and returned (mostly for call() interal processing since
607         the return message from call() is a received buffer, not a new one).
608
609         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
610         validation has been done prior.
611
612         When msg->state is not ok, this function must set tp_state in the message as some API 
613         fucntions return the message directly and do not propigate errno into the message.
614 */
615 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
616         int state;
617         uta_mhdr_t*     hdr;
618         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
619         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
620         int tot_len;                                                    // total send length (hdr + user data + tp header)
621
622         // future: ensure that application did not overrun the XID buffer; last byte must be 0
623
624         hdr = (uta_mhdr_t *) msg->header;
625         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
626         hdr->sub_id = htonl( msg->sub_id );
627         hdr->plen = htonl( msg->len );
628         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
629
630         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
631                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
632                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
633         }
634
635         if( retries == 0 ) {
636                 spin_retries = 100;
637                 retries++;
638         }
639
640         errno = 0;
641         msg->state = RMR_OK;
642         do {
643                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
644                 *((int*) msg->tp_buf) = tot_len;
645
646                 if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
647                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
648
649                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
650                         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg:  error!! sent state=%d\n", state );
651                         msg->state = state;
652                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
653                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
654                                         retries--;
655                                         if( retries > 0 ) {                                     // only if we'll loop through again
656                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
657                                         }
658                                         spin_retries = 1000;
659                                 }
660                         } else {
661                                 state = 0;                      // don't loop
662                         }
663                 } else {
664                         if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state );
665                         state = 0;
666                         msg->state = RMR_OK;
667                         hdr = NULL;
668                 }
669         } while( state && retries > 0 );
670
671         if( msg->state == RMR_OK ) {                                                                    // successful send
672                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
673                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
674                 } else {
675                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
676                         return NULL;
677                 }
678         } else {                                                                                        // send failed -- return original message
679                 if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
680                         errno = EAGAIN;
681                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
682                 } else {
683                         msg->state = RMR_ERR_SENDFAILED;
684                 }
685
686                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
687         }
688
689         return msg;
690 }
691
692 /*
693         send message with maximum timeout.
694         Accept a message and send it to an endpoint based on message type.
695         If NNG reports that the send attempt timed out, or should be retried,
696         RMr will retry for approximately max_to microseconds; rounded to the next
697         higher value of 10.
698
699         Allocates a new message buffer for the next send. If a message type has
700         more than one group of endpoints defined, then the message will be sent
701         in round robin fashion to one endpoint in each group.
702
703         An endpoint will be looked up in the route table using the message type and
704         the subscription id. If the subscription id is "UNSET_SUBID", then only the
705         message type is used.  If the initial lookup, with a subid, fails, then a
706         second lookup using just the mtype is tried.
707
708         When msg->state is not OK, this function must set tp_state in the message as 
709         some API fucntions return the message directly and do not propigate errno into 
710         the message.
711
712         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
713                 it will return with an error and errno set to eagain. If the send is
714                 a limited fanout, then the returned status is the status of the last
715                 send attempt.
716
717 */
718 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
719         endpoint_t*     ep;                                     // end point that we're attempting to send to
720         rtable_ent_t*   rte;                    // the route table entry which matches the message key
721         int     nn_sock;                                        // endpoint socket (fd in si case) for send
722         uta_ctx_t*      ctx;
723         int                     group;                          // selected group to get socket for
724         int                     send_again;                     // true if the message must be sent again
725         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
726         int                     sock_ok;                        // got a valid socket from round robin select
727         char*           d1;
728         int                     ok_sends = 0;           // track number of ok sends
729
730         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
731                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
732                 if( msg != NULL ) {
733                         msg->state = RMR_ERR_BADARG;
734                         errno = EINVAL;                                                                                 // must ensure it's not eagain
735                         msg->tp_state = errno;
736                 }
737                 return msg;
738         }
739
740         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
741         if( msg->header == NULL ) {
742                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
743                 msg->state = RMR_ERR_NOHDR;
744                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
745                 msg->tp_state = errno;
746                 return msg;
747         }
748
749         if( max_to < 0 ) {
750                 max_to = ctx->send_retries;             // convert to retries
751         }
752
753         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
754                 if( ctx->flags & CTXFL_WARN ) {
755                         fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
756                 }
757                 msg->state = RMR_ERR_NOENDPT;
758                 errno = ENXIO;                                                                          // must ensure it's not eagain
759                 msg->tp_state = errno;
760                 return msg;                                                                                     // caller can resend (maybe) or free
761         }
762
763         send_again = 1;                                                                                 // force loop entry
764         group = 0;                                                                                              // always start with group 0
765         while( send_again ) {
766                 sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx );         // select endpt from rr group and set again if more groups
767
768                 if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
769                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
770
771                 group++;
772
773                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
774                         if( send_again ) {
775                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
776                                 if( clone_m == NULL ) {
777                                         msg->state = RMR_ERR_SENDFAILED;
778                                         errno = ENOMEM;
779                                         msg->tp_state = errno;
780                                         if( ctx->flags & CTXFL_WARN ) {
781                                                 fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
782                                         }
783                                         return msg;
784                                 }
785
786                                 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
787                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
788                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
789         
790                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
791                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
792                                         msg = clone_m;
793                                 } else {
794                                         ok_sends++;
795                                         msg = clone_m;                                                                          // clone will be the next to send
796                                 }
797                         } else {
798                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
799                                 if( DEBUG ) {
800                                         if( msg == NULL ) {
801                                                 fprintf( stderr, "[DBUG] mtosend_msg:  send returned nil message!\n" );         
802                                         }
803                                 }
804                         }
805
806                         if( ep != NULL && msg != NULL ) {
807                                 switch( msg->state ) {
808                                         case RMR_OK:
809                                                 ep->scounts[EPSC_GOOD]++;
810                                                 break;
811                                 
812                                         case RMR_ERR_RETRY:
813                                                 ep->scounts[EPSC_TRANS]++;
814                                                 break;
815
816                                         default:
817                                                 ep->scounts[EPSC_FAIL]++;
818                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
819                                                 break;
820                                 }
821                         }
822                 } else {
823 /*
824                         if( ctx->flags & CTXFL_WARN ) {
825                                 fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
826                         }
827 */
828                         msg->state = RMR_ERR_NOENDPT;
829                         errno = ENXIO;
830                 }
831         }
832
833         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
834                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
835                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
836                         msg->state = RMR_OK;
837                 }
838         
839                 if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
840         
841                 msg->tp_state = errno;
842         }
843
844         return msg;                                                                     // last message caries the status of last/only send attempt
845 }
846
847
848 /*
849         A generic wrapper to the real send to keep wormhole stuff agnostic.
850         We assume the wormhole function vetted the buffer so we don't have to.
851 */
852 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
853         return send_msg( ctx, msg, ep->nn_sock, -1 );
854 }
855
856 #endif