8e326e309fb13d52d9fdb752f039335b7b6e146b
[ric-plt/lib/rmr.git] / src / nng / src / sr_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_nng_static.c
23         Abstract:       These are static send/receive primatives which (sadly) 
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40
41 /*
42         Translates the nng state passed in to one of ours that is suitable to put
43         into the message, and sets errno to something that might be useful.
44         If we don't have a specific RMr state, then we return the default (e.g.
45         receive failed).
46 */
47 static inline int xlate_nng_state( int state, int def_state ) {
48
49         switch( state ) {
50                 case 0:
51                         errno = 0;
52                         state = RMR_OK;
53                         break;
54
55                 case NNG_EAGAIN:                                // soft errors get retry as the RMr error
56                         state = RMR_ERR_RETRY;
57                         errno = EAGAIN;
58                         break;
59                         
60                 case NNG_ETIMEDOUT:
61                         state = RMR_ERR_RETRY;
62                         errno = EAGAIN;
63                         break;
64
65                 case NNG_ENOTSUP:
66                         errno  = ENOTSUP;
67                         state = def_state;
68                         break;
69
70                 case NNG_EINVAL:
71                         errno  = EINVAL;
72                         state = def_state;
73                         break;
74
75                 case NNG_ENOMEM:
76                         errno  = ENOMEM;
77                         state = def_state;
78                         break;
79
80                 case NNG_ESTATE:
81                         errno  = EBADFD;                                // file des not in a good state for the operation
82                         state = def_state;
83                         break;
84
85                 case NNG_ECLOSED:
86                         errno  = EBADFD;                                // file des not in a good state for the operation
87                         state = def_state;
88                         break;
89                 
90                 default:
91                         errno = EBADE;
92                         state = def_state;
93                         break;
94         }
95
96         return state;
97 }
98
99 /*
100         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
101         a new message struct as well. Size is the size of the zc buffer to allocate (not
102         including our header). If size is 0, then the buffer allocated is the size previously
103         allocated (if msg is !nil) or the default size given at initialisation).
104
105         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
106                 are zero copy, however ONLY the message is zero copy. We now allocate and use
107                 nng messages.
108 */
109 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
110         size_t          mlen;
111         uta_mhdr_t*     hdr;                    // convenience pointer
112
113         mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
114         mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
115
116         if( msg == NULL ) {
117                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
118                 if( msg == NULL ) {
119                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
120                         exit( 1 );
121                 }
122         } else {
123                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
124         }
125
126         memset( msg, 0, sizeof( *msg ) );
127
128         if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
129                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
130                 abort( );                                                                                       // toss out a core file for this
131         }
132
133         msg->header = nng_msg_body( msg->tp_buf );
134         hdr = (uta_mhdr_t *) msg->header;
135         hdr->rmr_ver = RMR_MSG_VER;                                                             // version info should we need to recognised old style messages someday
136         msg->len = 0;                                                                                   // length of data in the payload
137         msg->alloc_len = mlen;                                                                  // length of allocated payload
138         msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
139         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
140         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
141         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
142         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
143
144         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
145
146         return msg;
147 }
148
149 /*
150         Allocates only the mbuf and does NOT allocate an underlying transport buffer since 
151         NNG receive must allocate that on its own.
152 */
153 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
154         size_t  mlen;
155         uta_mhdr_t* hdr;                        // convenience pointer
156         rmr_mbuf_t* msg;
157
158         msg = (rmr_mbuf_t *) malloc( sizeof *msg );
159         if( msg == NULL ) {
160                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
161                 exit( 1 );
162         }
163
164         memset( msg, 0, sizeof( *msg ) );
165
166         msg->tp_buf = NULL;
167         msg->header = NULL;
168         msg->len = -1;                                                                                  // no payload; invalid len
169         msg->alloc_len = -1;
170         msg->payload = NULL;
171         msg->xaction = NULL;
172         msg->state = RMR_ERR_UNSET;
173         msg->flags = 0;
174
175         return msg;
176 }
177
178 /*
179         This accepts a message with the assumption that only the tp_buf pointer is valid. It
180         sets all of the various header/payload/xaction pointers in the mbuf to the proper
181         spot in the transport layer buffer.  The len in the header is assumed to be the 
182         allocated len (a receive buffer that nng created);
183
184         The alen parm is the assumed allocated length; assumed because it's a value likely
185         to have come from nng receive and the actual alloc len might be larger, but we
186         can only assume this is the total usable space.
187 */
188 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
189         uta_mhdr_t* hdr;
190
191         msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
192
193         hdr = (uta_mhdr_t *) msg->header;
194         hdr->rmr_ver = RMR_MSG_VER;                                                             // version info should we need to recognised old style messages someday
195         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
196         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header)
197         msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
198         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
199         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
200         msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
201         msg->state = RMR_OK;
202 }
203
204 /*
205         This will clone a message into a new zero copy buffer and return the cloned message.
206 */
207 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
208         rmr_mbuf_t* nm;                 // new message buffer
209         size_t  mlen;
210         int state;
211
212         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
213         if( nm == NULL ) {
214                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
215                 exit( 1 );
216         }
217         memset( nm, 0, sizeof( *nm ) );
218
219         mlen = old_msg->alloc_len;                                                                              // length allocated before
220         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
221                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
222                 exit( 1 );
223         }
224
225         nm->header = nng_msg_body( nm->tp_buf );
226         nm->mtype = old_msg->mtype;
227         nm->len = old_msg->len;                                                                 // length of data in the payload
228         nm->alloc_len = mlen;                                                                   // length of allocated payload
229         nm->payload = nm->header + sizeof( uta_mhdr_t );                // point past header to payload (single buffer allocation above)
230         nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
231         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
232         nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
233
234         memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
235         memcpy( nm->payload, old_msg->payload, old_msg->len );
236
237         return nm;
238 }
239
240 /*
241         This is the receive work horse used by the outer layer receive functions.
242         It waits for a message to be received on our listen socket. If old msg
243         is passed in, the we assume we can use it instead of allocating a new
244         one, else a new block of memory is allocated.
245
246         This allocates a zero copy message so that if the user wishes to call
247         rmr_rts_msg() the send is zero copy.
248
249         The nng timeout on send is at the ms level which is a tad too long for
250         our needs.  So, if NNG returns eagain or timedout (we don't set one)
251         we will loop up to 5 times with a 10 microsecond delay between each
252         attempt.  If at the end of this set of retries NNG is still saying
253         eagain/timeout we'll return to the caller with that set in errno.
254         Right now this is only for zero-copy buffers (they should all be zc
255         buffers now).
256
257
258         In the NNG msg world it must allocate the receive buffer rather
259         than accepting one that we allocated from their space and could 
260         reuse.  They have their reasons I guess.  Thus, we will free
261         the old transport buffer if user passes the message in; at least
262         our mbuf will be reused. 
263 */
264 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
265         int state;
266         rmr_mbuf_t*     msg = NULL;             // msg received
267         uta_mhdr_t* hdr;
268         size_t  rsize;                          // nng needs to write back the size received... grrr
269
270         if( old_msg ) {
271                 msg = old_msg;
272                 if( msg->tp_buf != NULL ) {
273                         nng_msg_free( msg->tp_buf );
274                 }
275
276                 msg->tp_buf = NULL;
277         } else {
278                 //msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                    // will abort on failure, no need to check
279                 msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
280         }
281
282         msg->len = 0;
283         msg->payload = NULL;
284         msg->xaction = NULL;
285
286         //rsize = msg->alloc_len;                                                                                                               // set to max, and we'll get len back here too
287         //msg->state = nng_recv( ctx->nn_sock, msg->header, &rsize, NO_FLAGS );         // total space (header + payload len) allocated
288         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
289         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
290                 return msg;
291         }
292
293         if( msg->tp_buf == NULL ) {             // if state is good this _should_ not be nil, but parninoia says check anyway
294                 msg->state = RMR_ERR_EMPTY;
295                 return msg;
296         }
297
298         rsize = nng_msg_len( msg->tp_buf );
299         if( rsize >= sizeof( uta_mhdr_t ) ) {                                                           // we need at least a full header here
300
301                 ref_tpbuf( msg, rsize );                                                // point payload, header etc to the just received tp buffer
302                 hdr = (uta_mhdr_t *) msg->header;
303                 msg->flags |= MFL_ADDSRC;                               // turn on so if user app tries to send this buffer we reset src
304                 if( msg->len > (msg->alloc_len - sizeof( uta_mhdr_t )) ) {              // way more than we should have had room for; error
305                         msg->state = RMR_ERR_TRUNC;
306                 }
307
308                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
309                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
310         } else {
311                 msg->len = 0;
312                 msg->state = RMR_ERR_EMPTY;
313         }
314
315         return msg;
316 }
317
318 /*
319         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
320         message buffer cannot be used to send, and the length information may or may 
321         not be correct (it is set to the length received which might be more than the
322         bytes actually in the payload).
323
324         Mostly this supports the route table collector, but could be extended with an 
325         API external function.
326 */
327 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
328         int state;
329         rmr_mbuf_t*     msg = NULL;             // msg received
330         size_t  rsize;                          // nng needs to write back the size received... grrr
331
332         if( old_msg ) {
333                 msg = old_msg;
334         } else {
335                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
336         }
337
338         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
339         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
340                 return msg;
341         }
342         rsize = nng_msg_len( msg->tp_buf );
343
344         // do NOT use ref_tpbuf() here! Must fill these in manually.
345         msg->header = nng_msg_body( msg->tp_buf );
346         msg->len = rsize;                                                       // len is the number of bytes received
347         msg->alloc_len = rsize;
348         msg->mtype = -1;                                                        // raw message has no type
349         msg->state = RMR_OK;
350         msg->flags = MFL_RAW;
351         msg->payload = msg->header;                                     // payload is the whole thing; no header
352         msg->xaction = NULL;
353
354         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
355
356         return msg;
357 }
358
359 /*
360         This does the hard work of actually sending the message to the given socket. On success,
361         a new message struct is returned. On error, the original msg is returned with the state 
362         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
363         buffer will not be allocated and returned (mostly for call() interal processing since
364         the return message from call() is a received buffer, not a new one).
365
366         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
367         validation has been done prior.
368 */
369 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
370         int state;
371         uta_mhdr_t*     hdr;
372         int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
373         int spin_retries = 1000;                                // if eagain/timeout we'll spin this many times before giving up the CPU
374
375         // future: ensure that application did not overrun the XID buffer; last byte must be 0
376
377         hdr = (uta_mhdr_t *) msg->header;
378         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
379         hdr->plen = htonl( msg->len );
380
381         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
382                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
383         }
384
385         errno = 0;
386         msg->state = RMR_OK;
387         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
388                 //nng_flags |= NNG_FLAG_ALLOC;                                                                  // indicate a zc buffer that nng is expected to free
389
390                 do {
391                         if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
392                                 msg->state = state;
393                                 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
394                                         if( --spin_retries <= 0 ) {                     // don't give up the processor if we don't have to
395                                                 retries--;
396                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
397                                                 spin_retries = 1000;
398                                         }
399                                 } else {
400                                         state = 0;                      // don't loop
401                                         //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
402                                 }
403                         } else {
404                                 state = 0;
405                                 msg->state = RMR_OK;
406                                 msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
407                                 msg->tp_buf = NULL;
408                         }
409                 } while( state && retries > 0 );
410         } else {
411                 msg->state = RMR_ERR_SENDFAILED;
412                 errno = ENOTSUP;
413                 return msg;
414                 /*
415                 NOT SUPPORTED
416                 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
417                         msg->state = state;
418                         //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
419                 } 
420                 */
421         }
422
423         if( msg->state == RMR_OK ) {                                                            // successful send
424                 if( !(msg->flags & MFL_NOALLOC) ) {                             // allocate another sendable zc buffer unless told otherwise
425                         return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
426                 } else {
427                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
428                         return NULL;
429                 }
430         } else {                                                                                        // send failed -- return original message
431                 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
432                         errno = EAGAIN;
433                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
434                 } else {
435                         msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
436                         //errno = -msg->state;
437                         //msg->state = RMR_ERR_SENDFAILED;                                      // errno will have nano reason
438                 }
439
440                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
441         }
442
443         return msg;
444 }
445
446 /*
447         A generic wrapper to the real send to keep wormhole stuff agnostic.
448         We assume the wormhole function vetted the buffer so we don't have to.
449 */
450 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
451         return send_msg( ctx, msg, ep->nn_sock, -1 );
452 }
453
454 #endif