Fix possible allocation of 0 len buffer in rtc
[ric-plt/lib/rmr.git] / src / nng / src / sr_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_nng_static.c
23         Abstract:       These are static send/receive primatives which (sadly) 
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40
41 /*
42         Translates the nng state passed in to one of ours that is suitable to put
43         into the message, and sets errno to something that might be useful.
44         If we don't have a specific RMr state, then we return the default (e.g.
45         receive failed).
46 */
47 static inline int xlate_nng_state( int state, int def_state ) {
48
49         switch( state ) {
50                 case 0:
51                         errno = 0;
52                         state = RMR_OK;
53                         break;
54
55                 case NNG_EAGAIN:                                // soft errors get retry as the RMr error
56                         state = RMR_ERR_RETRY;
57                         errno = EAGAIN;
58                         break;
59                         
60                 case NNG_ETIMEDOUT:
61                         state = RMR_ERR_RETRY;
62                         errno = EAGAIN;
63                         break;
64
65                 case NNG_ENOTSUP:
66                         errno  = ENOTSUP;
67                         state = def_state;
68                         break;
69
70                 case NNG_EINVAL:
71                         errno  = EINVAL;
72                         state = def_state;
73                         break;
74
75                 case NNG_ENOMEM:
76                         errno  = ENOMEM;
77                         state = def_state;
78                         break;
79
80                 case NNG_ESTATE:
81                         errno  = EBADFD;                                // file des not in a good state for the operation
82                         state = def_state;
83                         break;
84
85                 case NNG_ECLOSED:
86                         errno  = EBADFD;                                // file des not in a good state for the operation
87                         state = def_state;
88                         break;
89                 
90                 default:
91                         errno = EBADE;
92                         state = def_state;
93                         break;
94         }
95
96         return state;
97 }
98
99 /*
100         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
101         a new message struct as well. Size is the size of the zc buffer to allocate (not
102         including our header). If size is 0, then the buffer allocated is the size previously
103         allocated (if msg is !nil) or the default size given at initialisation).
104
105         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
106                 are zero copy, however ONLY the message is zero copy. We now allocate and use
107                 nng messages.
108 */
109 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
110         size_t          mlen;
111         uta_mhdr_t*     hdr;                    // convenience pointer
112
113         mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
114         mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
115
116         if( msg == NULL ) {
117                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
118                 if( msg == NULL ) {
119                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
120                         exit( 1 );
121                 }
122         } else {
123                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
124         }
125
126         memset( msg, 0, sizeof( *msg ) );
127
128         if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
129                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
130                 abort( );                                                                                       // toss out a core file for this
131         }
132
133         msg->header = nng_msg_body( msg->tp_buf );
134         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
135                 hdr->rmr_ver = RMR_MSG_VER;                                                             // version info should we need to recognised old style messages someday
136         }
137         msg->len = 0;                                                                                   // length of data in the payload
138         msg->alloc_len = mlen;                                                                  // length of allocated payload
139         msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
140         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
141         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
142         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
143         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
144
145         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
146
147         return msg;
148 }
149
150 /*
151         Allocates only the mbuf and does NOT allocate an underlying transport buffer since 
152         NNG receive must allocate that on its own.
153 */
154 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
155         size_t  mlen;
156         uta_mhdr_t* hdr;                        // convenience pointer
157         rmr_mbuf_t* msg;
158
159         msg = (rmr_mbuf_t *) malloc( sizeof *msg );
160         if( msg == NULL ) {
161                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
162                 exit( 1 );
163         }
164
165         memset( msg, 0, sizeof( *msg ) );
166
167         msg->tp_buf = NULL;
168         msg->header = NULL;
169         msg->len = -1;                                                                                  // no payload; invalid len
170         msg->alloc_len = -1;
171         msg->payload = NULL;
172         msg->xaction = NULL;
173         msg->state = RMR_ERR_UNSET;
174         msg->flags = 0;
175
176         return msg;
177 }
178
179 /*
180         This accepts a message with the assumption that only the tp_buf pointer is valid. It
181         sets all of the various header/payload/xaction pointers in the mbuf to the proper
182         spot in the transport layer buffer.  The len in the header is assumed to be the 
183         allocated len (a receive buffer that nng created);
184
185         The alen parm is the assumed allocated length; assumed because it's a value likely
186         to have come from nng receive and the actual alloc len might be larger, but we
187         can only assume this is the total usable space.
188 */
189 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
190         uta_mhdr_t* hdr;
191
192         msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
193
194         hdr = (uta_mhdr_t *) msg->header;
195         hdr->rmr_ver = RMR_MSG_VER;                                                             // version info should we need to recognised old style messages someday
196         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
197         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header)
198         msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
199         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
200         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
201         msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
202         msg->state = RMR_OK;
203 }
204
205 /*
206         This will clone a message into a new zero copy buffer and return the cloned message.
207 */
208 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
209         rmr_mbuf_t* nm;                 // new message buffer
210         size_t  mlen;
211         int state;
212
213         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
214         if( nm == NULL ) {
215                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
216                 exit( 1 );
217         }
218         memset( nm, 0, sizeof( *nm ) );
219
220         mlen = old_msg->alloc_len;                                                                              // length allocated before
221         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
222                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
223                 exit( 1 );
224         }
225
226         nm->header = nng_msg_body( nm->tp_buf );
227         nm->mtype = old_msg->mtype;
228         nm->len = old_msg->len;                                                                 // length of data in the payload
229         nm->alloc_len = mlen;                                                                   // length of allocated payload
230         nm->payload = nm->header + sizeof( uta_mhdr_t );                // point past header to payload (single buffer allocation above)
231         nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
232         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
233         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
234
235         memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
236         memcpy( nm->payload, old_msg->payload, old_msg->len );
237
238         return nm;
239 }
240
241 /*
242         This is the receive work horse used by the outer layer receive functions.
243         It waits for a message to be received on our listen socket. If old msg
244         is passed in, the we assume we can use it instead of allocating a new
245         one, else a new block of memory is allocated.
246
247         This allocates a zero copy message so that if the user wishes to call
248         rmr_rts_msg() the send is zero copy.
249
250         The nng timeout on send is at the ms level which is a tad too long for
251         our needs.  So, if NNG returns eagain or timedout (we don't set one)
252         we will loop up to 5 times with a 10 microsecond delay between each
253         attempt.  If at the end of this set of retries NNG is still saying
254         eagain/timeout we'll return to the caller with that set in errno.
255         Right now this is only for zero-copy buffers (they should all be zc
256         buffers now).
257
258
259         In the NNG msg world it must allocate the receive buffer rather
260         than accepting one that we allocated from their space and could 
261         reuse.  They have their reasons I guess.  Thus, we will free
262         the old transport buffer if user passes the message in; at least
263         our mbuf will be reused. 
264 */
265 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
266         int state;
267         rmr_mbuf_t*     msg = NULL;             // msg received
268         uta_mhdr_t* hdr;
269         size_t  rsize;                          // nng needs to write back the size received... grrr
270
271         if( old_msg ) {
272                 msg = old_msg;
273                 if( msg->tp_buf != NULL ) {
274                         nng_msg_free( msg->tp_buf );
275                 }
276
277                 msg->tp_buf = NULL;
278         } else {
279                 //msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                    // will abort on failure, no need to check
280                 msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
281         }
282
283         msg->len = 0;
284         msg->payload = NULL;
285         msg->xaction = NULL;
286
287         //rsize = msg->alloc_len;                                                                                                               // set to max, and we'll get len back here too
288         //msg->state = nng_recv( ctx->nn_sock, msg->header, &rsize, NO_FLAGS );         // total space (header + payload len) allocated
289         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
290         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
291                 return msg;
292         }
293
294         if( msg->tp_buf == NULL ) {             // if state is good this _should_ not be nil, but parninoia says check anyway
295                 msg->state = RMR_ERR_EMPTY;
296                 return msg;
297         }
298
299         rsize = nng_msg_len( msg->tp_buf );
300         if( rsize >= sizeof( uta_mhdr_t ) ) {                                                           // we need at least a full header here
301
302                 ref_tpbuf( msg, rsize );                                                // point payload, header etc to the just received tp buffer
303                 hdr = (uta_mhdr_t *) msg->header;
304                 msg->flags |= MFL_ADDSRC;                               // turn on so if user app tries to send this buffer we reset src
305                 if( msg->len > (msg->alloc_len - sizeof( uta_mhdr_t )) ) {              // way more than we should have had room for; error
306                         msg->state = RMR_ERR_TRUNC;
307                 }
308
309                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
310                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
311         } else {
312                 msg->len = 0;
313                 msg->state = RMR_ERR_EMPTY;
314         }
315
316         return msg;
317 }
318
319 /*
320         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
321         message buffer cannot be used to send, and the length information may or may 
322         not be correct (it is set to the length received which might be more than the
323         bytes actually in the payload).
324
325         Mostly this supports the route table collector, but could be extended with an 
326         API external function.
327 */
328 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
329         int state;
330         rmr_mbuf_t*     msg = NULL;             // msg received
331         size_t  rsize;                          // nng needs to write back the size received... grrr
332
333         if( old_msg ) {
334                 msg = old_msg;
335         } else {
336                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
337         }
338
339         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
340         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
341                 return msg;
342         }
343         rsize = nng_msg_len( msg->tp_buf );
344
345         // do NOT use ref_tpbuf() here! Must fill these in manually.
346         msg->header = nng_msg_body( msg->tp_buf );
347         msg->len = rsize;                                                       // len is the number of bytes received
348         msg->alloc_len = rsize;
349         msg->mtype = -1;                                                        // raw message has no type
350         msg->state = RMR_OK;
351         msg->flags = MFL_RAW;
352         msg->payload = msg->header;                                     // payload is the whole thing; no header
353         msg->xaction = NULL;
354
355         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
356
357         return msg;
358 }
359
360 /*
361         This does the hard work of actually sending the message to the given socket. On success,
362         a new message struct is returned. On error, the original msg is returned with the state 
363         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
364         buffer will not be allocated and returned (mostly for call() interal processing since
365         the return message from call() is a received buffer, not a new one).
366
367         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
368         validation has been done prior.
369 */
370 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
371         int state;
372         uta_mhdr_t*     hdr;
373         int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
374         int spin_retries = 1000;                                // if eagain/timeout we'll spin this many times before giving up the CPU
375
376         // future: ensure that application did not overrun the XID buffer; last byte must be 0
377
378         hdr = (uta_mhdr_t *) msg->header;
379         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
380         hdr->plen = htonl( msg->len );
381
382         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
383                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
384         }
385
386         errno = 0;
387         msg->state = RMR_OK;
388         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
389                 //nng_flags |= NNG_FLAG_ALLOC;                                                                  // indicate a zc buffer that nng is expected to free
390
391                 do {
392                         if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
393                                 msg->state = state;
394                                 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
395                                         if( --spin_retries <= 0 ) {                     // don't give up the processor if we don't have to
396                                                 retries--;
397                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
398                                                 spin_retries = 1000;
399                                         }
400                                 } else {
401                                         state = 0;                      // don't loop
402                                         //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
403                                 }
404                         } else {
405                                 state = 0;
406                                 msg->state = RMR_OK;
407                                 msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
408                                 msg->tp_buf = NULL;
409                         }
410                 } while( state && retries > 0 );
411         } else {
412                 msg->state = RMR_ERR_SENDFAILED;
413                 errno = ENOTSUP;
414                 return msg;
415                 /*
416                 NOT SUPPORTED
417                 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
418                         msg->state = state;
419                         //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
420                 } 
421                 */
422         }
423
424         if( msg->state == RMR_OK ) {                                                            // successful send
425                 if( !(msg->flags & MFL_NOALLOC) ) {                             // allocate another sendable zc buffer unless told otherwise
426                         return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
427                 } else {
428                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
429                         return NULL;
430                 }
431         } else {                                                                                        // send failed -- return original message
432                 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
433                         errno = EAGAIN;
434                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
435                 } else {
436                         msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
437                         //errno = -msg->state;
438                         //msg->state = RMR_ERR_SENDFAILED;                                      // errno will have nano reason
439                 }
440
441                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
442         }
443
444         return msg;
445 }
446
447 /*
448         A generic wrapper to the real send to keep wormhole stuff agnostic.
449         We assume the wormhole function vetted the buffer so we don't have to.
450 */
451 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
452         return send_msg( ctx, msg, ep->nn_sock, -1 );
453 }
454
455 #endif