369c68ec43709417aefc17a13144070fda28c501
[ric-plt/lib/rmr.git] / src / nng / src / sr_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_nng_static.c
23         Abstract:       These are static send/receive primatives which (sadly) 
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40
41 /*
42         Translates the nng state passed in to one of ours that is suitable to put
43         into the message, and sets errno to something that might be useful.
44         If we don't have a specific RMr state, then we return the default (e.g.
45         receive failed).
46 */
47 static inline int xlate_nng_state( int state, int def_state ) {
48
49         switch( state ) {
50                 case 0:
51                         errno = 0;
52                         state = RMR_OK;
53                         break;
54
55                 case NNG_EAGAIN:                                // soft errors get retry as the RMr error
56                         state = RMR_ERR_RETRY;
57                         errno = EAGAIN;
58                         break;
59                         
60                 case NNG_ETIMEDOUT:
61                         state = RMR_ERR_RETRY;
62                         errno = EAGAIN;
63                         break;
64
65                 case NNG_ENOTSUP:
66                         errno  = ENOTSUP;
67                         state = def_state;
68                         break;
69
70                 case NNG_EINVAL:
71                         errno  = EINVAL;
72                         state = def_state;
73                         break;
74
75                 case NNG_ENOMEM:
76                         errno  = ENOMEM;
77                         state = def_state;
78                         break;
79
80                 case NNG_ESTATE:
81                         errno  = EBADFD;                                // file des not in a good state for the operation
82                         state = def_state;
83                         break;
84
85                 case NNG_ECLOSED:
86                         errno  = EBADFD;                                // file des not in a good state for the operation
87                         state = def_state;
88                         break;
89                 
90                 default:
91                         errno = EBADE;
92                         state = def_state;
93                         break;
94         }
95
96         return state;
97 }
98
99 /*
100         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
101         a new message struct as well. Size is the size of the zc buffer to allocate (not
102         including our header). If size is 0, then the buffer allocated is the size previously
103         allocated (if msg is !nil) or the default size given at initialisation).
104
105         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
106                 are zero copy, however ONLY the message is zero copy. We now allocate and use
107                 nng messages.
108 */
109 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
110         size_t          mlen;                   // size of the transport buffer that we'll allocate
111         uta_mhdr_t*     hdr;                    // convenience pointer
112
113         mlen = sizeof( uta_mhdr_t ) + ctx->trace_data_len + ctx->d1_len + ctx->d2_len;  // start with header and trace/data lengths
114         mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
115
116         if( msg == NULL ) {
117                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
118                 if( msg == NULL ) {
119                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
120                         exit( 1 );
121                 }
122         } else {
123                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
124         }
125
126         memset( msg, 0, sizeof( *msg ) );
127
128         if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
129                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
130                 abort( );                                                                                       // toss out a core file for this
131         }
132
133         msg->header = nng_msg_body( msg->tp_buf );
134         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
135         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
136                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
137                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
138                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
139                 //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
140                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );
141         }
142         msg->len = 0;                                                                                   // length of data in the payload
143         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
144         msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // past header, trace and other data bits
145         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
146         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
147         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
148         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
149
150         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
151
152         return msg;
153 }
154
155 /*
156         Allocates only the mbuf and does NOT allocate an underlying transport buffer since 
157         NNG receive must allocate that on its own.
158 */
159 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
160         size_t  mlen;
161         uta_mhdr_t* hdr;                        // convenience pointer
162         rmr_mbuf_t* msg;
163
164         msg = (rmr_mbuf_t *) malloc( sizeof *msg );
165         if( msg == NULL ) {
166                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
167                 exit( 1 );
168         }
169
170         memset( msg, 0, sizeof( *msg ) );
171
172         msg->tp_buf = NULL;
173         msg->header = NULL;
174         msg->len = -1;                                                                                  // no payload; invalid len
175         msg->alloc_len = -1;
176         msg->payload = NULL;
177         msg->xaction = NULL;
178         msg->state = RMR_ERR_UNSET;
179         msg->flags = 0;
180
181         return msg;
182 }
183
184 /*
185         This accepts a message with the assumption that only the tp_buf pointer is valid. It
186         sets all of the various header/payload/xaction pointers in the mbuf to the proper
187         spot in the transport layer buffer.  The len in the header is assumed to be the 
188         allocated len (a receive buffer that nng created);
189
190         The alen parm is the assumed allocated length; assumed because it's a value likely
191         to have come from nng receive and the actual alloc len might be larger, but we
192         can only assume this is the total usable space.
193
194         This function returns the message with an error state set if it detects that the
195         received message might have been truncated.  Check is done here as the calculation
196         is somewhat based on header version.
197 */
198 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
199         uta_mhdr_t* hdr;                                // current header
200         uta_v1mhdr_t* v1hdr;                    // version 1 header
201         int ver;
202         int     hlen;                                           // header len to use for a truncation check
203
204         msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
205         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
206
207         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order 
208                 ver = 1;
209                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
210         } else {
211                 ver = ntohl( v1hdr->rmr_ver );
212         }
213
214         switch( ver ) {
215                 case 1:
216                         msg->len = ntohl( v1hdr->plen );                                                        // length sender says is in the payload (received length could be larger)
217                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
218                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
219
220                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
221                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
222                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
223                         msg->state = RMR_OK;
224                         hlen = sizeof( uta_v1mhdr_t );
225                         break;
226
227                 default:                                                                                                        // current version always lands here
228                         hdr = (uta_mhdr_t *) msg->header;
229                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
230                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
231
232                         msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // past header, trace and other data bits
233                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
234                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
235                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
236                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later         
237                         break;
238         }
239
240         if( msg->len > (msg->alloc_len - hlen ) ) {                                             // more than we should have had room for; error
241                 msg->state = RMR_ERR_TRUNC;
242                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
243         } else {
244                 msg->state = RMR_OK;
245         }
246 }
247
248 /*
249         This will clone a message into a new zero copy buffer and return the cloned message.
250 */
251 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
252         rmr_mbuf_t* nm;                 // new message buffer
253         size_t  mlen;
254         int state;
255         uta_mhdr_t* hdr;
256         uta_v1mhdr_t* v1hdr;
257
258         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
259         if( nm == NULL ) {
260                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
261                 exit( 1 );
262         }
263         memset( nm, 0, sizeof( *nm ) );
264
265         mlen = old_msg->alloc_len;                                                                              // length allocated before
266         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
267                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
268                 exit( 1 );
269         }
270
271         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
272         v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
273         switch( ntohl( v1hdr->rmr_ver ) ) {
274                 case 1:
275                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
276                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
277                         break;
278
279                 default:                                                                                        // current message always caught  here
280                         hdr = nm->header;
281                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) );         // copy complete header, trace and other data
282                         nm->payload = nm->header + PAYLOAD_OFFSET( hdr );               // point at the payload
283                         break;
284         }
285                 
286         // --- these are all version agnostic -----------------------------------
287         nm->mtype = old_msg->mtype;
288         nm->len = old_msg->len;                                                                 // length of data in the payload
289         nm->alloc_len = mlen;                                                                   // length of allocated payload
290
291         nm->xaction = hdr->xid;                                                                 // reference xaction
292         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
293         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
294         memcpy( nm->payload, old_msg->payload, old_msg->len );
295
296         return nm;
297 }
298
299 /*
300         This is the receive work horse used by the outer layer receive functions.
301         It waits for a message to be received on our listen socket. If old msg
302         is passed in, the we assume we can use it instead of allocating a new
303         one, else a new block of memory is allocated.
304
305         This allocates a zero copy message so that if the user wishes to call
306         rmr_rts_msg() the send is zero copy.
307
308         The nng timeout on send is at the ms level which is a tad too long for
309         our needs.  So, if NNG returns eagain or timedout (we don't set one)
310         we will loop up to 5 times with a 10 microsecond delay between each
311         attempt.  If at the end of this set of retries NNG is still saying
312         eagain/timeout we'll return to the caller with that set in errno.
313         Right now this is only for zero-copy buffers (they should all be zc
314         buffers now).
315
316
317         In the NNG msg world it must allocate the receive buffer rather
318         than accepting one that we allocated from their space and could 
319         reuse.  They have their reasons I guess.  Thus, we will free
320         the old transport buffer if user passes the message in; at least
321         our mbuf will be reused. 
322 */
323 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
324         int state;
325         rmr_mbuf_t*     msg = NULL;             // msg received
326         uta_mhdr_t* hdr;
327         size_t  rsize;                          // nng needs to write back the size received... grrr
328
329         if( old_msg ) {
330                 msg = old_msg;
331                 if( msg->tp_buf != NULL ) {
332                         nng_msg_free( msg->tp_buf );
333                 }
334
335                 msg->tp_buf = NULL;
336         } else {
337                 msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
338         }
339
340         msg->alloc_len = 0;
341         msg->len = 0;
342         msg->payload = NULL;
343         msg->xaction = NULL;
344
345         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
346         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
347                 return msg;
348         }
349
350         if( msg->tp_buf == NULL ) {             // if state is good this _should_ not be nil, but parninoia says check anyway
351                 msg->state = RMR_ERR_EMPTY;
352                 return msg;
353         }
354
355         rsize = nng_msg_len( msg->tp_buf );
356         if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
357                 ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
358                 hdr = (uta_mhdr_t *) msg->header;
359                 msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
360
361
362                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
363                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
364         } else {
365                 msg->state = RMR_ERR_EMPTY;
366                 msg->len = 0;
367                 msg->alloc_len = rsize;
368                 msg->payload = NULL;
369                 msg->xaction = NULL;
370                 msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
371                 msg->mtype = -1;
372         }
373
374         return msg;
375 }
376
377 /*
378         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
379         message buffer cannot be used to send, and the length information may or may 
380         not be correct (it is set to the length received which might be more than the
381         bytes actually in the payload).
382
383         Mostly this supports the route table collector, but could be extended with an 
384         API external function.
385 */
386 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
387         int state;
388         rmr_mbuf_t*     msg = NULL;             // msg received
389         size_t  rsize;                          // nng needs to write back the size received... grrr
390
391         if( old_msg ) {
392                 msg = old_msg;
393         } else {
394                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
395         }
396
397         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
398         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
399                 return msg;
400         }
401         rsize = nng_msg_len( msg->tp_buf );
402
403         // do NOT use ref_tpbuf() here! Must fill these in manually.
404         msg->header = nng_msg_body( msg->tp_buf );
405         msg->len = rsize;                                                       // len is the number of bytes received
406         msg->alloc_len = rsize;
407         msg->mtype = -1;                                                        // raw message has no type
408         msg->state = RMR_OK;
409         msg->flags = MFL_RAW;
410         msg->payload = msg->header;                                     // payload is the whole thing; no header
411         msg->xaction = NULL;
412
413         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
414
415         return msg;
416 }
417
418 /*
419         This does the hard work of actually sending the message to the given socket. On success,
420         a new message struct is returned. On error, the original msg is returned with the state 
421         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
422         buffer will not be allocated and returned (mostly for call() interal processing since
423         the return message from call() is a received buffer, not a new one).
424
425         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
426         validation has been done prior.
427 */
428 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
429         int state;
430         uta_mhdr_t*     hdr;
431         int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
432         int spin_retries = 1000;                                // if eagain/timeout we'll spin this many times before giving up the CPU
433
434         // future: ensure that application did not overrun the XID buffer; last byte must be 0
435
436         hdr = (uta_mhdr_t *) msg->header;
437         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
438         hdr->plen = htonl( msg->len );
439
440         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
441                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
442         }
443
444         errno = 0;
445         msg->state = RMR_OK;
446         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
447                 do {
448                         if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
449                                 msg->state = state;
450                                 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
451                                         if( --spin_retries <= 0 ) {                     // don't give up the processor if we don't have to
452                                                 retries--;
453                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
454                                                 spin_retries = 1000;
455                                         }
456                                 } else {
457                                         state = 0;                      // don't loop
458                                         //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
459                                 }
460                         } else {
461                                 state = 0;
462                                 msg->state = RMR_OK;
463                                 msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
464                                 msg->tp_buf = NULL;
465                         }
466                 } while( state && retries > 0 );
467         } else {
468                 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
469                 msg->state = RMR_ERR_SENDFAILED;
470                 errno = ENOTSUP;
471                 return msg;
472                 /*
473                 NOT SUPPORTED
474                 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
475                         msg->state = state;
476                         //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
477                 } 
478                 */
479         }
480
481         if( msg->state == RMR_OK ) {                                                            // successful send
482                 if( !(msg->flags & MFL_NOALLOC) ) {                             // allocate another sendable zc buffer unless told otherwise
483                         return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
484                 } else {
485                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
486                         return NULL;
487                 }
488         } else {                                                                                        // send failed -- return original message
489                 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
490                         errno = EAGAIN;
491                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
492                 } else {
493                         msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
494                 }
495
496                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
497         }
498
499         return msg;
500 }
501
502 /*
503         A generic wrapper to the real send to keep wormhole stuff agnostic.
504         We assume the wormhole function vetted the buffer so we don't have to.
505 */
506 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
507         return send_msg( ctx, msg, ep->nn_sock, -1 );
508 }
509
510 #endif