Fix sonar flagged bugs
[ric-plt/lib/rmr.git] / src / rmr / nng / src / sr_nng_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_nng_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40 /*
41         Translates the nng state passed in to one of ours that is suitable to put
42         into the message, and sets errno to something that might be useful.
43         If we don't have a specific RMr state, then we return the default (e.g.
44         receive failed).
45
46         The addition of the connection shut error code to the switch requires
47         that the NNG version at commit e618abf8f3db2a94269a (or after) be
48         used for compiling RMR. 
49 */
50 static inline int xlate_nng_state( int state, int def_state ) {
51
52         switch( state ) {
53                 case 0:
54                         errno = 0;
55                         state = RMR_OK;
56                         break;
57
58                 case NNG_EAGAIN:                                // soft errors get retry as the RMr error
59                         state = RMR_ERR_RETRY;
60                         errno = EAGAIN;
61                         break;
62
63                 case NNG_ETIMEDOUT:
64                         state = RMR_ERR_RETRY;
65                         errno = EAGAIN;
66                         break;
67
68                 case NNG_ENOTSUP:
69                         errno  = ENOTSUP;
70                         state = def_state;
71                         break;
72
73                 case NNG_EINVAL:
74                         errno  = EINVAL;
75                         state = def_state;
76                         break;
77
78                 case NNG_ENOMEM:
79                         errno  = ENOMEM;
80                         state = def_state;
81                         break;
82
83                 case NNG_ESTATE:
84                         errno  = EBADFD;                                // file des not in a good state for the operation
85                         state = def_state;
86                         break;
87
88                 case NNG_ECONNSHUT:                                     // new error with nng commit e618abf8f3db2a94269a79c8901a51148d48fcc2 (Sept 2019)
89                 case NNG_ECLOSED:
90                         errno  = EBADFD;                                // file des not in a good state for the operation
91                         state = def_state;
92                         break;
93
94                 default:
95                         errno = EBADE;
96                         state = def_state;
97                         break;
98         }
99
100         return state;
101 }
102
103 /*
104         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
105         a new message struct as well. Size is the size of the zc buffer to allocate (not
106         including our header). If size is 0, then the buffer allocated is the size previously
107         allocated (if msg is !nil) or the default size given at initialisation).
108
109         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
110         the context value is used.
111
112         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
113                 are zero copy, however ONLY the message is zero copy. We now allocate and use
114                 nng messages.
115 */
116 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
117         size_t          mlen;                   // size of the transport buffer that we'll allocate
118         uta_mhdr_t*     hdr;                    // convenience pointer
119         int                     tr_len;                 // trace data len (default or override)
120
121         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
122
123         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
124         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
125
126         if( msg == NULL ) {
127                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
128                 if( msg == NULL ) {
129                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
130                         exit( 1 );
131                 }
132         } else {
133                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
134         }
135
136         memset( msg, 0, sizeof( *msg ) );
137
138         if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
139                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
140                 abort( );                                                                                       // toss out a core file for this
141         }
142
143         msg->header = nng_msg_body( msg->tp_buf );
144         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
145         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
146                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
147                 hdr->sub_id = htonl( UNSET_SUBID );
148                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
149                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
150                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
151                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
152         }
153         msg->len = 0;                                                                                   // length of data in the payload
154         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
155         msg->sub_id = UNSET_SUBID;
156         msg->mtype = UNSET_MSGTYPE;
157         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
158         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
159         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
160         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
161         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
162         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
163
164         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
165
166         return msg;
167 }
168
169 /*
170         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
171         NNG receive must allocate that on its own.
172 */
173 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
174         size_t  mlen;
175         uta_mhdr_t* hdr;                        // convenience pointer
176         rmr_mbuf_t* msg;
177
178         msg = (rmr_mbuf_t *) malloc( sizeof *msg );
179         if( msg == NULL ) {
180                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
181                 exit( 1 );
182         }
183
184         memset( msg, 0, sizeof( *msg ) );
185
186         msg->sub_id = UNSET_SUBID;
187         msg->mtype = UNSET_MSGTYPE;
188         msg->tp_buf = NULL;
189         msg->header = NULL;
190         msg->len = -1;                                                                                  // no payload; invalid len
191         msg->alloc_len = -1;
192         msg->payload = NULL;
193         msg->xaction = NULL;
194         msg->state = RMR_ERR_UNSET;
195         msg->flags = 0;
196
197         return msg;
198 }
199
200 /*
201         This accepts a message with the assumption that only the tp_buf pointer is valid. It
202         sets all of the various header/payload/xaction pointers in the mbuf to the proper
203         spot in the transport layer buffer.  The len in the header is assumed to be the
204         allocated len (a receive buffer that nng created);
205
206         The alen parm is the assumed allocated length; assumed because it's a value likely
207         to have come from nng receive and the actual alloc len might be larger, but we
208         can only assume this is the total usable space.
209
210         This function returns the message with an error state set if it detects that the
211         received message might have been truncated.  Check is done here as the calculation
212         is somewhat based on header version.
213 */
214 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
215         uta_mhdr_t* hdr = NULL;                 // current header
216         uta_v1mhdr_t* v1hdr;                    // version 1 header
217         int ver;
218         int     hlen;                                           // header len to use for a truncation check
219
220         msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
221         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
222
223         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
224                 ver = 1;
225                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
226         } else {
227                 ver = ntohl( v1hdr->rmr_ver );
228         }
229
230         switch( ver ) {
231                 case 1:
232                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
233                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
234                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
235
236                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
237                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
238                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
239                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
240                         msg->state = RMR_OK;
241                         hlen = sizeof( uta_v1mhdr_t );
242                         break;
243
244                 default:                                                                                                        // current version always lands here
245                         hdr = (uta_mhdr_t *) msg->header;
246                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
247                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
248
249                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
250                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
251                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
252                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
253                         msg->sub_id = ntohl( hdr->sub_id );
254                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
255                         break;
256         }
257
258         if( msg->len > (msg->alloc_len - hlen ) ) {
259                 msg->state = RMR_ERR_TRUNC;
260                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
261         } else {
262                 msg->state = RMR_OK;
263         }
264 }
265
266 /*
267         This will clone a message into a new zero copy buffer and return the cloned message.
268 */
269 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
270         rmr_mbuf_t* nm;                 // new message buffer
271         size_t  mlen;
272         int state;
273         uta_mhdr_t* hdr;
274         uta_v1mhdr_t* v1hdr;
275
276         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
277         if( nm == NULL ) {
278                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
279                 exit( 1 );
280         }
281         memset( nm, 0, sizeof( *nm ) );
282
283         mlen = old_msg->alloc_len;                                                                              // length allocated before
284         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
285                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
286                 exit( 1 );
287         }
288
289         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
290         v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
291         switch( ntohl( v1hdr->rmr_ver ) ) {
292                 case 1:
293                         hdr = nm->header;
294                         memcpy( hdr, old_msg->header, sizeof( *v1hdr ) );               // copy complete header
295                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
296                         break;
297
298                 default:                                                                                        // current message always caught  here
299                         hdr = nm->header;
300                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
301                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
302                         break;
303         }
304
305         // --- these are all version agnostic -----------------------------------
306         nm->mtype = old_msg->mtype;
307         nm->sub_id = old_msg->sub_id;
308         nm->len = old_msg->len;                                                                 // length of data in the payload
309         nm->alloc_len = mlen;                                                                   // length of allocated payload
310         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "clone values: mty=%d sid=%d len=%d alloc=%d\n", nm->mtype, nm->sub_id, nm->len, nm->alloc_len );
311
312         nm->xaction = &hdr->xid[0];                                                     // point at transaction id in header area
313         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
314         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
315         memcpy( nm->payload, old_msg->payload, old_msg->len );
316
317         return nm;
318 }
319
320 /*
321         This will clone a message with a change to the trace area in the header such that
322         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
323         The orignal message will be left unchanged, and a pointer to the new message is returned.
324         It is not possible to realloc buffers and change the data sizes.
325 */
326 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
327         rmr_mbuf_t* nm;                 // new message buffer
328         size_t  mlen;
329         int state;
330         uta_mhdr_t* hdr;
331         uta_v1mhdr_t* v1hdr;
332         int     tr_old_len;                     // tr size in new buffer
333
334         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
335         if( nm == NULL ) {
336                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
337                 exit( 1 );
338         }
339         memset( nm, 0, sizeof( *nm ) );
340
341         hdr = old_msg->header;
342         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
343
344         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
345         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
346         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
347                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
348                 exit( 1 );
349         }
350
351         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
352         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
353         switch( ntohl( v1hdr->rmr_ver ) ) {
354                 case 1:
355                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
356                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
357                         break;
358
359                 default:                                                                                        // current message version always caught  here
360                         hdr = nm->header;
361                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
362                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
363
364                         if( RMR_D1_LEN( hdr )  ) {
365                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
366                         }
367                         if( RMR_D2_LEN( hdr )  ) {
368                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
369                         }
370
371                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
372                         break;
373         }
374
375         // --- these are all version agnostic -----------------------------------
376         nm->mtype = old_msg->mtype;
377         nm->sub_id = old_msg->sub_id;
378         nm->len = old_msg->len;                                                                 // length of data in the payload
379         nm->alloc_len = mlen;                                                                   // length of allocated payload
380
381         nm->xaction = &hdr->xid[0];                                                     // point at transaction id in header area
382         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
383         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
384         memcpy( nm->payload, old_msg->payload, old_msg->len );
385
386         return nm;
387 }
388
389 /*
390         Realloc the message such that the payload is at least payload_len bytes.  If the current
391         payload size is large enough, no action is taken. If copy is false, the actual payload
392         bytes are NOT copied.  This allows a caller to realloc for a response message (to retain
393         the source information which would be lost on a simple alloc) which has no need for the
394         original message.
395
396         The old message buffer will reference the new underlying transport, and the original payload
397         will be lost unless clone is set to true. If clone is true, the old message buffer will continue
398         to reference the original payload, and a new message buffer will be allocated (even if the
399         payload size in the old message was larger than requested).
400
401         The return value is a pointer to the message with at least payload_len bytes allocated. It 
402         will be the same as the old_message if clone is false.
403
404         CAUTION:
405         If the message is not a message which was received, the mtype, sub-id, length values in the
406         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
407         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
408         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
409         settings and NOT from the copied RMR header in transport buffer.
410 */
411 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
412         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
413         size_t  mlen;
414         int state;
415         uta_mhdr_t* omhdr;              // old message header
416         uta_v1mhdr_t* v1hdr;
417         int     tr_old_len;                     // tr size in new buffer
418         int old_psize = 0;              // current size of message for payload
419         int     hdr_len = 0;            // length of RMR header in old msg
420         void*   old_tp_buf;             // pointer to the old tp buffer
421         int     free_tp = 1;            // free the transport buffer (old) when done (when not cloning)
422         int             old_mt;                 // msg type and sub-id from the message passed in
423         int             old_sid;
424         int             old_len;
425
426         if( old_msg == NULL || payload_len <= 0 ) {
427                 errno = EINVAL;
428                 return NULL;
429         }
430
431         old_mt = old_msg->mtype;
432         old_sid = old_msg->sub_id;
433         old_len = old_msg->len;
434         old_psize = old_msg->alloc_len - RMR_HDR_LEN( old_msg->header );                                // allocated transport size less the header and other data bits
435         if( !clone  && payload_len <= old_psize ) {                                                             // old message is large enough, nothing to do
436                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
437                 return old_msg;
438         }
439
440         hdr_len = RMR_HDR_LEN( old_msg->header );
441         old_tp_buf = old_msg->tp_buf;
442
443         if( clone ) {
444                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
445                 free_tp = 0;
446
447                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
448                 if( nm == NULL ) {
449                         rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
450                         return NULL;
451                 }
452                 memset( nm, 0, sizeof( *nm ) );
453         } else {
454                 nm = old_msg;
455         }
456
457         omhdr = old_msg->header;
458         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
459
460         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );    
461         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
462                 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
463                 return NULL;
464         }
465
466         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
467         SET_HDR_LEN( nm->header );
468
469         if( copy ) {                                                                                                                            // if we need to copy the old payload too
470                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
471                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
472         } else {                                                                                                                                        // just need to copy header
473                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
474                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
475         }
476
477         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
478
479         if( !copy ) {
480                 nm->mtype = -1;                                         // didn't copy payload, so mtype and sub-id are invalid
481                 nm->sub_id = -1;
482                 nm->len = 0;                                            // and len is 0
483         } else {
484                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
485                 nm->mtype = old_mt;
486                 nm->sub_id = old_sid;
487         }
488
489         if( free_tp ) {
490                 free( old_tp_buf );                             // we did not clone, so free b/c no references
491         }
492
493         return nm;
494 }
495
496 /*
497         This is the receive work horse used by the outer layer receive functions.
498         It waits for a message to be received on our listen socket. If old msg
499         is passed in, the we assume we can use it instead of allocating a new
500         one, else a new block of memory is allocated.
501
502         This allocates a zero copy message so that if the user wishes to call
503         rmr_rts_msg() the send is zero copy.
504
505         The nng timeout on send is at the ms level which is a tad too long for
506         our needs.  So, if NNG returns eagain or timedout (we don't set one)
507         we will loop up to 5 times with a 10 microsecond delay between each
508         attempt.  If at the end of this set of retries NNG is still saying
509         eagain/timeout we'll return to the caller with that set in errno.
510         Right now this is only for zero-copy buffers (they should all be zc
511         buffers now).
512
513
514         In the NNG msg world it must allocate the receive buffer rather
515         than accepting one that we allocated from their space and could
516         reuse.  They have their reasons I guess.  Thus, we will free
517         the old transport buffer if user passes the message in; at least
518         our mbuf will be reused.
519
520         When msg->state is not ok, this function must set tp_state in the message as some API 
521         fucntions return the message directly and do not propigate errno into the message.
522 */
523 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
524         int state;
525         rmr_mbuf_t*     msg = NULL;             // msg received
526         uta_mhdr_t* hdr;
527         size_t  rsize;                          // nng needs to write back the size received... grrr
528
529         if( old_msg ) {
530                 msg = old_msg;
531                 if( msg->tp_buf != NULL ) {
532                         nng_msg_free( msg->tp_buf );
533                 }
534
535                 msg->tp_buf = NULL;
536         } else {
537                 msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
538         }
539
540         msg->alloc_len = 0;
541         msg->len = 0;
542         msg->payload = NULL;
543         msg->xaction = NULL;
544         msg->tp_buf = NULL;
545
546         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
547         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
548                 msg->tp_state = errno;
549                 return msg;
550         }
551
552         msg->tp_state = 0;
553         if( msg->tp_buf == NULL ) {             // if state is good this _should_ not be nil, but parninoia says check anyway
554                 msg->state = RMR_ERR_EMPTY;
555                 msg->tp_state = 0;
556                 return msg;
557         }
558
559         rsize = nng_msg_len( msg->tp_buf );
560         if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
561                 ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
562                 hdr = (uta_mhdr_t *) msg->header;
563                 msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
564
565                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
566                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
567         } else {
568                 msg->state = RMR_ERR_EMPTY;
569                 msg->tp_state = 0;
570                 msg->len = 0;
571                 msg->alloc_len = rsize;
572                 msg->payload = NULL;
573                 msg->xaction = NULL;
574                 msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
575                 msg->mtype = UNSET_MSGTYPE;
576                 msg->sub_id = UNSET_SUBID;
577         }
578
579         return msg;
580 }
581
582 /*
583         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
584         message buffer cannot be used to send, and the length information may or may
585         not be correct (it is set to the length received which might be more than the
586         bytes actually in the payload).
587
588         Mostly this supports the route table collector, but could be extended with an
589         API external function.
590 */
591 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
592         int state;
593         rmr_mbuf_t*     msg = NULL;             // msg received
594         size_t  rsize;                          // nng needs to write back the size received... grrr
595
596         if( old_msg ) {
597                 msg = old_msg;
598         } else {
599                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
600         }
601
602         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
603         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
604                 return msg;
605         }
606         rsize = nng_msg_len( msg->tp_buf );
607
608         // do NOT use ref_tpbuf() here! Must fill these in manually.
609         msg->header = nng_msg_body( msg->tp_buf );
610         msg->len = rsize;                                                       // len is the number of bytes received
611         msg->alloc_len = rsize;
612         msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
613         msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
614         msg->state = RMR_OK;
615         msg->flags = MFL_RAW;
616         msg->payload = msg->header;                                     // payload is the whole thing; no header
617         msg->xaction = NULL;
618
619         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
620
621         return msg;
622 }
623
624 /*
625         This does the hard work of actually sending the message to the given socket. On success,
626         a new message struct is returned. On error, the original msg is returned with the state
627         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
628         buffer will not be allocated and returned (mostly for call() interal processing since
629         the return message from call() is a received buffer, not a new one).
630
631         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
632         validation has been done prior.
633
634         When msg->state is not ok, this function must set tp_state in the message as some API 
635         fucntions return the message directly and do not propigate errno into the message.
636 */
637 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
638         int state;
639         uta_mhdr_t*     hdr;
640         int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
641         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
642         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
643
644         // future: ensure that application did not overrun the XID buffer; last byte must be 0
645
646         hdr = (uta_mhdr_t *) msg->header;
647         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
648         hdr->sub_id = htonl( msg->sub_id );
649         hdr->plen = htonl( msg->len );
650         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
651
652         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
653                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
654                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
655         }
656
657         if( retries == 0 ) {
658                 spin_retries = 100;
659                 retries++;
660         }
661
662         errno = 0;
663         msg->state = RMR_OK;
664         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
665                 do {
666                         if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
667                                 msg->state = state;
668                                 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
669                                         if( --spin_retries <= 0 ) {                     // don't give up the processor if we don't have to
670                                                 retries--;
671                                                 if( retries > 0 ) {                                     // only if we'll loop through again
672                                                         usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
673                                                 }
674                                                 spin_retries = 1000;
675                                         }
676                                 } else {
677                                         state = 0;                      // don't loop
678                                         //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
679                                 }
680                         } else {
681                                 state = 0;
682                                 msg->state = RMR_OK;
683                                 msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
684                                 msg->tp_buf = NULL;
685                                 hdr = NULL;
686                         }
687                 } while( state && retries > 0 );
688         } else {
689                 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
690                 msg->state = RMR_ERR_SENDFAILED;
691                 errno = ENOTSUP;
692                 msg->tp_state = errno;
693                 return msg;
694                 /*
695                 NOT SUPPORTED
696                 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
697                         msg->state = state;
698                         //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
699                 }
700                 */
701         }
702
703         if( msg->state == RMR_OK ) {                                                                    // successful send
704                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
705                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
706                 } else {
707                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
708                         return NULL;
709                 }
710         } else {                                                                                        // send failed -- return original message
711                 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
712                         errno = EAGAIN;
713                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
714                 } else {
715                         msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
716                 }
717
718                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
719         }
720
721         return msg;
722 }
723
724 /*
725         send message with maximum timeout.
726         Accept a message and send it to an endpoint based on message type.
727         If NNG reports that the send attempt timed out, or should be retried,
728         RMr will retry for approximately max_to microseconds; rounded to the next
729         higher value of 10.
730
731         Allocates a new message buffer for the next send. If a message type has
732         more than one group of endpoints defined, then the message will be sent
733         in round robin fashion to one endpoint in each group.
734
735         An endpoint will be looked up in the route table using the message type and
736         the subscription id. If the subscription id is "UNSET_SUBID", then only the
737         message type is used.  If the initial lookup, with a subid, fails, then a
738         second lookup using just the mtype is tried.
739
740         When msg->state is not OK, this function must set tp_state in the message as 
741         some API fucntions return the message directly and do not propigate errno into 
742         the message.
743
744         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
745                 it will return with an error and errno set to eagain. If the send is
746                 a limited fanout, then the returned status is the status of the last
747                 send attempt.
748
749 */
750 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
751         endpoint_t*     ep;                                     // end point that we're attempting to send to
752         rtable_ent_t*   rte;                    // the route table entry which matches the message key
753         nng_socket      nn_sock;                        // endpoint socket for send
754         uta_ctx_t*      ctx;
755         int                     group;                          // selected group to get socket for
756         int                     send_again;                     // true if the message must be sent again
757         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
758         int                     sock_ok;                        // got a valid socket from round robin select
759         char*           d1;
760         int                     ok_sends = 0;           // track number of ok sends
761
762         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
763                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
764                 if( msg != NULL ) {
765                         msg->state = RMR_ERR_BADARG;
766                         errno = EINVAL;                                                                                 // must ensure it's not eagain
767                         msg->tp_state = errno;
768                 }
769                 return msg;
770         }
771
772         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
773         if( msg->header == NULL ) {
774                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
775                 msg->state = RMR_ERR_NOHDR;
776                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
777                 msg->tp_state = errno;
778                 return msg;
779         }
780
781         if( max_to < 0 ) {
782                 max_to = ctx->send_retries;             // convert to retries
783         }
784
785         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
786                 if( ctx->flags & CTXFL_WARN ) {
787                         rmr_vlog( RMR_VL_WARN, "no endpoint for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
788                 }
789                 msg->state = RMR_ERR_NOENDPT;
790                 errno = ENXIO;                                                                          // must ensure it's not eagain
791                 msg->tp_state = errno;
792                 return msg;                                                                                     // caller can resend (maybe) or free
793         }
794
795         send_again = 1;                                                                                 // force loop entry
796         group = 0;                                                                                              // always start with group 0
797         while( send_again ) {
798                 if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry
799                         sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep );              // select endpt from rr group and set again if more groups
800                 } else {
801                         sock_ok = epsock_meid( ctx->rtable, msg, &nn_sock, &ep );
802                         send_again = 0;
803                 }
804
805                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
806                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
807
808                 group++;
809
810                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
811                         if( send_again ) {
812                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
813                                 if( clone_m == NULL ) {
814                                         msg->state = RMR_ERR_SENDFAILED;
815                                         errno = ENOMEM;
816                                         msg->tp_state = errno;
817                                         if( ctx->flags & CTXFL_WARN ) {
818                                                 rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
819                                         }
820                                         return msg;
821                                 }
822
823                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
824                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
825                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
826         
827                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
828                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
829                                         msg = clone_m;
830                                 } else {
831                                         ok_sends++;
832                                         msg = clone_m;                                                                          // clone will be the next to send
833                                 }
834                         } else {
835                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
836                                 if( DEBUG ) {
837                                         if( msg == NULL ) {
838                                                 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );         
839                                         }
840                                 }
841                         }
842
843                         if( ep != NULL && msg != NULL ) {
844                                 switch( msg->state ) {
845                                         case RMR_OK:
846                                                 ep->scounts[EPSC_GOOD]++;
847                                                 break;
848                                 
849                                         case RMR_ERR_RETRY:
850                                                 ep->scounts[EPSC_TRANS]++;
851                                                 break;
852
853                                         default:
854                                                 ep->scounts[EPSC_FAIL]++;
855                                                 break;
856                                 }
857                         }
858                 } else {
859                         if( ctx->flags & CTXFL_WARN ) {
860                                 rmr_vlog( RMR_VL_WARN, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
861                         }
862                         msg->state = RMR_ERR_NOENDPT;
863                         errno = ENXIO;
864                 }
865         }
866
867         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
868                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
869                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
870                         msg->state = RMR_OK;
871                 }
872         
873                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
874         
875                 msg->tp_state = errno;
876         }
877
878         return msg;                                                                     // last message caries the status of last/only send attempt
879 }
880
881
882 /*
883         A generic wrapper to the real send to keep wormhole stuff agnostic.
884         We assume the wormhole function vetted the buffer so we don't have to.
885 */
886 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
887         return send_msg( ctx, msg, ep->nn_sock, -1 );
888 }
889
890 #endif