Allow user programme to set RMR verbosity level
[ric-plt/lib/rmr.git] / src / rmr / nng / src / sr_nng_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_nng_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40 /*
41         Translates the nng state passed in to one of ours that is suitable to put
42         into the message, and sets errno to something that might be useful.
43         If we don't have a specific RMr state, then we return the default (e.g.
44         receive failed).
45
46         The addition of the connection shut error code to the switch requires
47         that the NNG version at commit e618abf8f3db2a94269a (or after) be
48         used for compiling RMR. 
49 */
50 static inline int xlate_nng_state( int state, int def_state ) {
51
52         switch( state ) {
53                 case 0:
54                         errno = 0;
55                         state = RMR_OK;
56                         break;
57
58                 case NNG_EAGAIN:                                // soft errors get retry as the RMr error
59                         state = RMR_ERR_RETRY;
60                         errno = EAGAIN;
61                         break;
62
63                 case NNG_ETIMEDOUT:
64                         state = RMR_ERR_RETRY;
65                         errno = EAGAIN;
66                         break;
67
68                 case NNG_ENOTSUP:
69                         errno  = ENOTSUP;
70                         state = def_state;
71                         break;
72
73                 case NNG_EINVAL:
74                         errno  = EINVAL;
75                         state = def_state;
76                         break;
77
78                 case NNG_ENOMEM:
79                         errno  = ENOMEM;
80                         state = def_state;
81                         break;
82
83                 case NNG_ESTATE:
84                         errno  = EBADFD;                                // file des not in a good state for the operation
85                         state = def_state;
86                         break;
87
88                 case NNG_ECONNSHUT:                                     // new error with nng commit e618abf8f3db2a94269a79c8901a51148d48fcc2 (Sept 2019)
89                 case NNG_ECLOSED:
90                         errno  = EBADFD;                                // file des not in a good state for the operation
91                         state = def_state;
92                         break;
93
94                 default:
95                         errno = EBADE;
96                         state = def_state;
97                         break;
98         }
99
100         return state;
101 }
102
103 /*
104         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
105         a new message struct as well. Size is the size of the zc buffer to allocate (not
106         including our header). If size is 0, then the buffer allocated is the size previously
107         allocated (if msg is !nil) or the default size given at initialisation).
108
109         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
110         the context value is used.
111
112         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
113                 are zero copy, however ONLY the message is zero copy. We now allocate and use
114                 nng messages.
115 */
116 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
117         size_t          mlen;                   // size of the transport buffer that we'll allocate
118         uta_mhdr_t*     hdr;                    // convenience pointer
119         int                     tr_len;                 // trace data len (default or override)
120
121         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
122
123         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
124         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
125
126         if( msg == NULL ) {
127                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
128                 if( msg == NULL ) {
129                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
130                         exit( 1 );
131                 }
132         } else {
133                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
134         }
135
136         memset( msg, 0, sizeof( *msg ) );
137
138         if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
139                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
140                 abort( );                                                                                       // toss out a core file for this
141         }
142
143         msg->header = nng_msg_body( msg->tp_buf );
144         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
145         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
146                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
147                 hdr->sub_id = htonl( UNSET_SUBID );
148                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
149                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
150                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
151                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
152         }
153         msg->len = 0;                                                                                   // length of data in the payload
154         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
155         msg->sub_id = UNSET_SUBID;
156         msg->mtype = UNSET_MSGTYPE;
157         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
158         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
159         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
160         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
161         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
162         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
163
164         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
165
166         return msg;
167 }
168
169 /*
170         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
171         NNG receive must allocate that on its own.
172 */
173 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
174         size_t  mlen;
175         uta_mhdr_t* hdr;                        // convenience pointer
176         rmr_mbuf_t* msg;
177
178         msg = (rmr_mbuf_t *) malloc( sizeof *msg );
179         if( msg == NULL ) {
180                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
181                 exit( 1 );
182         }
183
184         memset( msg, 0, sizeof( *msg ) );
185
186         msg->sub_id = UNSET_SUBID;
187         msg->mtype = UNSET_MSGTYPE;
188         msg->tp_buf = NULL;
189         msg->header = NULL;
190         msg->len = -1;                                                                                  // no payload; invalid len
191         msg->alloc_len = -1;
192         msg->payload = NULL;
193         msg->xaction = NULL;
194         msg->state = RMR_ERR_UNSET;
195         msg->flags = 0;
196
197         return msg;
198 }
199
200 /*
201         This accepts a message with the assumption that only the tp_buf pointer is valid. It
202         sets all of the various header/payload/xaction pointers in the mbuf to the proper
203         spot in the transport layer buffer.  The len in the header is assumed to be the
204         allocated len (a receive buffer that nng created);
205
206         The alen parm is the assumed allocated length; assumed because it's a value likely
207         to have come from nng receive and the actual alloc len might be larger, but we
208         can only assume this is the total usable space.
209
210         This function returns the message with an error state set if it detects that the
211         received message might have been truncated.  Check is done here as the calculation
212         is somewhat based on header version.
213 */
214 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
215         uta_mhdr_t* hdr = NULL;                 // current header
216         uta_v1mhdr_t* v1hdr;                    // version 1 header
217         int ver;
218         int     hlen;                                           // header len to use for a truncation check
219
220         msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
221         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
222
223         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
224                 ver = 1;
225                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
226         } else {
227                 ver = ntohl( v1hdr->rmr_ver );
228         }
229
230         switch( ver ) {
231                 case 1:
232                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
233                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
234                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
235
236                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
237                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
238                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
239                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
240                         msg->state = RMR_OK;
241                         hlen = sizeof( uta_v1mhdr_t );
242                         break;
243
244                 default:                                                                                                        // current version always lands here
245                         hdr = (uta_mhdr_t *) msg->header;
246                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
247                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
248
249                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
250                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
251                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
252                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
253                         msg->sub_id = ntohl( hdr->sub_id );
254                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
255                         break;
256         }
257
258         if( msg->len > (msg->alloc_len - hlen ) ) {
259                 msg->state = RMR_ERR_TRUNC;
260                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
261         } else {
262                 msg->state = RMR_OK;
263         }
264 }
265
266 /*
267         This will clone a message into a new zero copy buffer and return the cloned message.
268 */
269 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
270         rmr_mbuf_t* nm;                 // new message buffer
271         size_t  mlen;
272         int state;
273         uta_mhdr_t* hdr;
274         uta_v1mhdr_t* v1hdr;
275
276         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
277         if( nm == NULL ) {
278                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
279                 exit( 1 );
280         }
281         memset( nm, 0, sizeof( *nm ) );
282
283         mlen = old_msg->alloc_len;                                                                              // length allocated before
284         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
285                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
286                 exit( 1 );
287         }
288
289         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
290         v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
291         switch( ntohl( v1hdr->rmr_ver ) ) {
292                 case 1:
293                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
294                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
295                         break;
296
297                 default:                                                                                        // current message always caught  here
298                         hdr = nm->header;
299                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
300                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
301                         break;
302         }
303
304         // --- these are all version agnostic -----------------------------------
305         nm->mtype = old_msg->mtype;
306         nm->sub_id = old_msg->sub_id;
307         nm->len = old_msg->len;                                                                 // length of data in the payload
308         nm->alloc_len = mlen;                                                                   // length of allocated payload
309         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "clone values: mty=%d sid=%d len=%d alloc=%d\n", nm->mtype, nm->sub_id, nm->len, nm->alloc_len );
310
311         nm->xaction = hdr->xid;                                                                 // reference xaction
312         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
313         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
314         memcpy( nm->payload, old_msg->payload, old_msg->len );
315
316         return nm;
317 }
318
319 /*
320         This will clone a message with a change to the trace area in the header such that
321         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
322         The orignal message will be left unchanged, and a pointer to the new message is returned.
323         It is not possible to realloc buffers and change the data sizes.
324 */
325 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
326         rmr_mbuf_t* nm;                 // new message buffer
327         size_t  mlen;
328         int state;
329         uta_mhdr_t* hdr;
330         uta_v1mhdr_t* v1hdr;
331         int     tr_old_len;                     // tr size in new buffer
332
333         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
334         if( nm == NULL ) {
335                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
336                 exit( 1 );
337         }
338         memset( nm, 0, sizeof( *nm ) );
339
340         hdr = old_msg->header;
341         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
342
343         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
344         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
345         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
346                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
347                 exit( 1 );
348         }
349
350         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
351         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
352         switch( ntohl( v1hdr->rmr_ver ) ) {
353                 case 1:
354                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
355                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
356                         break;
357
358                 default:                                                                                        // current message version always caught  here
359                         hdr = nm->header;
360                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
361                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
362
363                         if( RMR_D1_LEN( hdr )  ) {
364                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
365                         }
366                         if( RMR_D2_LEN( hdr )  ) {
367                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
368                         }
369
370                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
371                         break;
372         }
373
374         // --- these are all version agnostic -----------------------------------
375         nm->mtype = old_msg->mtype;
376         nm->sub_id = old_msg->sub_id;
377         nm->len = old_msg->len;                                                                 // length of data in the payload
378         nm->alloc_len = mlen;                                                                   // length of allocated payload
379
380         nm->xaction = hdr->xid;                                                                 // reference xaction
381         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
382         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
383         memcpy( nm->payload, old_msg->payload, old_msg->len );
384
385         return nm;
386 }
387
388 /*
389         Realloc the message such that the payload is at least payload_len bytes.  If the current
390         payload size is large enough, no action is taken. If copy is false, the actual payload
391         bytes are NOT copied.  This allows a caller to realloc for a response message (to retain
392         the source information which would be lost on a simple alloc) which has no need for the
393         original message.
394
395         The old message buffer will reference the new underlying transport, and the original payload
396         will be lost unless clone is set to true. If clone is true, the old message buffer will continue
397         to reference the original payload, and a new message buffer will be allocated (even if the
398         payload size in the old message was larger than requested).
399
400         The return value is a pointer to the message with at least payload_len bytes allocated. It 
401         will be the same as the old_message if clone is false.
402
403         CAUTION:
404         If the message is not a message which was received, the mtype, sub-id, length values in the
405         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
406         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
407         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
408         settings and NOT from the copied RMR header in transport buffer.
409 */
410 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
411         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
412         size_t  mlen;
413         int state;
414         uta_mhdr_t* omhdr;              // old message header
415         uta_v1mhdr_t* v1hdr;
416         int     tr_old_len;                     // tr size in new buffer
417         int old_psize = 0;              // current size of message for payload
418         int     hdr_len = 0;            // length of RMR header in old msg
419         void*   old_tp_buf;             // pointer to the old tp buffer
420         int     free_tp = 1;            // free the transport buffer (old) when done (when not cloning)
421         int             old_mt;                 // msg type and sub-id from the message passed in
422         int             old_sid;
423         int             old_len;
424
425         if( old_msg == NULL || payload_len <= 0 ) {
426                 errno = EINVAL;
427                 return NULL;
428         }
429
430         old_mt = old_msg->mtype;
431         old_sid = old_msg->sub_id;
432         old_len = old_msg->len;
433         old_psize = old_msg->alloc_len - RMR_HDR_LEN( old_msg->header );                                // allocated transport size less the header and other data bits
434         if( !clone  && payload_len <= old_psize ) {                                                             // old message is large enough, nothing to do
435                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
436                 return old_msg;
437         }
438
439         hdr_len = RMR_HDR_LEN( old_msg->header );
440         old_tp_buf = old_msg->tp_buf;
441
442         if( clone ) {
443                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
444                 free_tp = 0;
445
446                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
447                 if( nm == NULL ) {
448                         rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
449                         return NULL;
450                 }
451                 memset( nm, 0, sizeof( *nm ) );
452         } else {
453                 nm = old_msg;
454         }
455
456         omhdr = old_msg->header;
457         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
458
459         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );    
460         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
461                 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
462                 return NULL;
463         }
464
465         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
466         SET_HDR_LEN( nm->header );
467
468         if( copy ) {                                                                                                                            // if we need to copy the old payload too
469                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
470                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
471         } else {                                                                                                                                        // just need to copy header
472                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
473                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
474         }
475
476         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
477
478         if( !copy ) {
479                 nm->mtype = -1;                                         // didn't copy payload, so mtype and sub-id are invalid
480                 nm->sub_id = -1;
481                 nm->len = 0;                                            // and len is 0
482         } else {
483                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
484                 nm->mtype = old_mt;
485                 nm->sub_id = old_sid;
486         }
487
488         if( free_tp ) {
489                 free( old_tp_buf );                             // we did not clone, so free b/c no references
490         }
491
492         return nm;
493 }
494
495 /*
496         This is the receive work horse used by the outer layer receive functions.
497         It waits for a message to be received on our listen socket. If old msg
498         is passed in, the we assume we can use it instead of allocating a new
499         one, else a new block of memory is allocated.
500
501         This allocates a zero copy message so that if the user wishes to call
502         rmr_rts_msg() the send is zero copy.
503
504         The nng timeout on send is at the ms level which is a tad too long for
505         our needs.  So, if NNG returns eagain or timedout (we don't set one)
506         we will loop up to 5 times with a 10 microsecond delay between each
507         attempt.  If at the end of this set of retries NNG is still saying
508         eagain/timeout we'll return to the caller with that set in errno.
509         Right now this is only for zero-copy buffers (they should all be zc
510         buffers now).
511
512
513         In the NNG msg world it must allocate the receive buffer rather
514         than accepting one that we allocated from their space and could
515         reuse.  They have their reasons I guess.  Thus, we will free
516         the old transport buffer if user passes the message in; at least
517         our mbuf will be reused.
518
519         When msg->state is not ok, this function must set tp_state in the message as some API 
520         fucntions return the message directly and do not propigate errno into the message.
521 */
522 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
523         int state;
524         rmr_mbuf_t*     msg = NULL;             // msg received
525         uta_mhdr_t* hdr;
526         size_t  rsize;                          // nng needs to write back the size received... grrr
527
528         if( old_msg ) {
529                 msg = old_msg;
530                 if( msg->tp_buf != NULL ) {
531                         nng_msg_free( msg->tp_buf );
532                 }
533
534                 msg->tp_buf = NULL;
535         } else {
536                 msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
537         }
538
539         msg->alloc_len = 0;
540         msg->len = 0;
541         msg->payload = NULL;
542         msg->xaction = NULL;
543         msg->tp_buf = NULL;
544
545         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
546         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
547                 msg->tp_state = errno;
548                 return msg;
549         }
550
551         msg->tp_state = 0;
552         if( msg->tp_buf == NULL ) {             // if state is good this _should_ not be nil, but parninoia says check anyway
553                 msg->state = RMR_ERR_EMPTY;
554                 msg->tp_state = 0;
555                 return msg;
556         }
557
558         rsize = nng_msg_len( msg->tp_buf );
559         if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
560                 ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
561                 hdr = (uta_mhdr_t *) msg->header;
562                 msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
563
564                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
565                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
566         } else {
567                 msg->state = RMR_ERR_EMPTY;
568                 msg->tp_state = 0;
569                 msg->len = 0;
570                 msg->alloc_len = rsize;
571                 msg->payload = NULL;
572                 msg->xaction = NULL;
573                 msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
574                 msg->mtype = UNSET_MSGTYPE;
575                 msg->sub_id = UNSET_SUBID;
576         }
577
578         return msg;
579 }
580
581 /*
582         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
583         message buffer cannot be used to send, and the length information may or may
584         not be correct (it is set to the length received which might be more than the
585         bytes actually in the payload).
586
587         Mostly this supports the route table collector, but could be extended with an
588         API external function.
589 */
590 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
591         int state;
592         rmr_mbuf_t*     msg = NULL;             // msg received
593         size_t  rsize;                          // nng needs to write back the size received... grrr
594
595         if( old_msg ) {
596                 msg = old_msg;
597         } else {
598                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
599         }
600
601         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
602         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
603                 return msg;
604         }
605         rsize = nng_msg_len( msg->tp_buf );
606
607         // do NOT use ref_tpbuf() here! Must fill these in manually.
608         msg->header = nng_msg_body( msg->tp_buf );
609         msg->len = rsize;                                                       // len is the number of bytes received
610         msg->alloc_len = rsize;
611         msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
612         msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
613         msg->state = RMR_OK;
614         msg->flags = MFL_RAW;
615         msg->payload = msg->header;                                     // payload is the whole thing; no header
616         msg->xaction = NULL;
617
618         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
619
620         return msg;
621 }
622
623 /*
624         This does the hard work of actually sending the message to the given socket. On success,
625         a new message struct is returned. On error, the original msg is returned with the state
626         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
627         buffer will not be allocated and returned (mostly for call() interal processing since
628         the return message from call() is a received buffer, not a new one).
629
630         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
631         validation has been done prior.
632
633         When msg->state is not ok, this function must set tp_state in the message as some API 
634         fucntions return the message directly and do not propigate errno into the message.
635 */
636 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
637         int state;
638         uta_mhdr_t*     hdr;
639         int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
640         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
641         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
642
643         // future: ensure that application did not overrun the XID buffer; last byte must be 0
644
645         hdr = (uta_mhdr_t *) msg->header;
646         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
647         hdr->sub_id = htonl( msg->sub_id );
648         hdr->plen = htonl( msg->len );
649         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
650
651         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
652                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
653                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
654         }
655
656         if( retries == 0 ) {
657                 spin_retries = 100;
658                 retries++;
659         }
660
661         errno = 0;
662         msg->state = RMR_OK;
663         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
664                 do {
665                         if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
666                                 msg->state = state;
667                                 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
668                                         if( --spin_retries <= 0 ) {                     // don't give up the processor if we don't have to
669                                                 retries--;
670                                                 if( retries > 0 ) {                                     // only if we'll loop through again
671                                                         usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
672                                                 }
673                                                 spin_retries = 1000;
674                                         }
675                                 } else {
676                                         state = 0;                      // don't loop
677                                         //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
678                                 }
679                         } else {
680                                 state = 0;
681                                 msg->state = RMR_OK;
682                                 msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
683                                 msg->tp_buf = NULL;
684                                 hdr = NULL;
685                         }
686                 } while( state && retries > 0 );
687         } else {
688                 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
689                 msg->state = RMR_ERR_SENDFAILED;
690                 errno = ENOTSUP;
691                 msg->tp_state = errno;
692                 return msg;
693                 /*
694                 NOT SUPPORTED
695                 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
696                         msg->state = state;
697                         //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
698                 }
699                 */
700         }
701
702         if( msg->state == RMR_OK ) {                                                                    // successful send
703                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
704                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
705                 } else {
706                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
707                         return NULL;
708                 }
709         } else {                                                                                        // send failed -- return original message
710                 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
711                         errno = EAGAIN;
712                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
713                 } else {
714                         msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
715                 }
716
717                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
718         }
719
720         return msg;
721 }
722
723 /*
724         send message with maximum timeout.
725         Accept a message and send it to an endpoint based on message type.
726         If NNG reports that the send attempt timed out, or should be retried,
727         RMr will retry for approximately max_to microseconds; rounded to the next
728         higher value of 10.
729
730         Allocates a new message buffer for the next send. If a message type has
731         more than one group of endpoints defined, then the message will be sent
732         in round robin fashion to one endpoint in each group.
733
734         An endpoint will be looked up in the route table using the message type and
735         the subscription id. If the subscription id is "UNSET_SUBID", then only the
736         message type is used.  If the initial lookup, with a subid, fails, then a
737         second lookup using just the mtype is tried.
738
739         When msg->state is not OK, this function must set tp_state in the message as 
740         some API fucntions return the message directly and do not propigate errno into 
741         the message.
742
743         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
744                 it will return with an error and errno set to eagain. If the send is
745                 a limited fanout, then the returned status is the status of the last
746                 send attempt.
747
748 */
749 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
750         endpoint_t*     ep;                                     // end point that we're attempting to send to
751         rtable_ent_t*   rte;                    // the route table entry which matches the message key
752         nng_socket      nn_sock;                        // endpoint socket for send
753         uta_ctx_t*      ctx;
754         int                     group;                          // selected group to get socket for
755         int                     send_again;                     // true if the message must be sent again
756         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
757         int                     sock_ok;                        // got a valid socket from round robin select
758         char*           d1;
759         int                     ok_sends = 0;           // track number of ok sends
760
761         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
762                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
763                 if( msg != NULL ) {
764                         msg->state = RMR_ERR_BADARG;
765                         errno = EINVAL;                                                                                 // must ensure it's not eagain
766                         msg->tp_state = errno;
767                 }
768                 return msg;
769         }
770
771         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
772         if( msg->header == NULL ) {
773                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
774                 msg->state = RMR_ERR_NOHDR;
775                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
776                 msg->tp_state = errno;
777                 return msg;
778         }
779
780         if( max_to < 0 ) {
781                 max_to = ctx->send_retries;             // convert to retries
782         }
783
784         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
785                 if( ctx->flags & CTXFL_WARN ) {
786                         rmr_vlog( RMR_VL_WARN, "no endpoint for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
787                 }
788                 msg->state = RMR_ERR_NOENDPT;
789                 errno = ENXIO;                                                                          // must ensure it's not eagain
790                 msg->tp_state = errno;
791                 return msg;                                                                                     // caller can resend (maybe) or free
792         }
793
794         send_again = 1;                                                                                 // force loop entry
795         group = 0;                                                                                              // always start with group 0
796         while( send_again ) {
797                 if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry
798                         sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep );              // select endpt from rr group and set again if more groups
799                 } else {
800                         sock_ok = epsock_meid( ctx->rtable, msg, &nn_sock, &ep );
801                         send_again = 0;
802                 }
803
804                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
805                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
806
807                 group++;
808
809                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
810                         if( send_again ) {
811                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
812                                 if( clone_m == NULL ) {
813                                         msg->state = RMR_ERR_SENDFAILED;
814                                         errno = ENOMEM;
815                                         msg->tp_state = errno;
816                                         if( ctx->flags & CTXFL_WARN ) {
817                                                 rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
818                                         }
819                                         return msg;
820                                 }
821
822                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
823                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
824                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
825         
826                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
827                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
828                                         msg = clone_m;
829                                 } else {
830                                         ok_sends++;
831                                         msg = clone_m;                                                                          // clone will be the next to send
832                                 }
833                         } else {
834                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
835                                 if( DEBUG ) {
836                                         if( msg == NULL ) {
837                                                 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );         
838                                         }
839                                 }
840                         }
841
842                         if( ep != NULL && msg != NULL ) {
843                                 switch( msg->state ) {
844                                         case RMR_OK:
845                                                 ep->scounts[EPSC_GOOD]++;
846                                                 break;
847                                 
848                                         case RMR_ERR_RETRY:
849                                                 ep->scounts[EPSC_TRANS]++;
850                                                 break;
851
852                                         default:
853                                                 ep->scounts[EPSC_FAIL]++;
854                                                 break;
855                                 }
856                         }
857                 } else {
858                         if( ctx->flags & CTXFL_WARN ) {
859                                 rmr_vlog( RMR_VL_WARN, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
860                         }
861                         msg->state = RMR_ERR_NOENDPT;
862                         errno = ENXIO;
863                 }
864         }
865
866         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
867                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
868                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
869                         msg->state = RMR_OK;
870                 }
871         
872                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
873         
874                 msg->tp_state = errno;
875         }
876
877         return msg;                                                                     // last message caries the status of last/only send attempt
878 }
879
880
881 /*
882         A generic wrapper to the real send to keep wormhole stuff agnostic.
883         We assume the wormhole function vetted the buffer so we don't have to.
884 */
885 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
886         return send_msg( ctx, msg, ep->nn_sock, -1 );
887 }
888
889 #endif