enhance(API): Add source IP support to msg header
[ric-plt/lib/rmr.git] / src / rmr / nanomsg / src / sr_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_static.c
23         Abstract:       These are static send/receive related functions.
24
25                                 (broken out of rmr.c)
26         Author:         E. Scott Daniels
27         Date:           13 February 2019
28 */
29
30 #ifndef _sr_static_c
31 #define _sr_static_c
32
33 /*
34 #include <ctype.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <netdb.h>
38 #include <errno.h>
39 #include <string.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #include <unistd.h>
43 #include <stdint.h>
44 #include <time.h>
45 #include <arpa/inet.h>
46
47 #include <nanomsg/nn.h>
48 #include <nanomsg/tcp.h>
49 #include <nanomsg/pair.h>
50 #include <nanomsg/pipeline.h>
51 #include <nanomsg/pubsub.h>
52
53 #include "rmr.h"                                // things the users see
54 #include "rmr_private.h"                // things that we need too
55 #include "rmr_symtab.h"
56
57 #include "ring_static.c"                // message ring support
58 #include "rt_generic_static.c"  // generic route table (not nng/nano specific)
59 #include "rtable_static.c"              // route table things   (nano specific)
60 #include "tools_static.c"
61 */
62
63
64 /*
65         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
66         a new message struct as well. Size is the size of the zc buffer to allocate (not
67         including our header). If size is 0, then the buffer allocated is the size previously
68         allocated (if msg is !nil) or the default size given at initialisation).
69
70
71         The trlo parm is the trace length override which will be used if not 0. If 0, then the
72         length in the context is used (default).
73 */
74 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
75         int     mlen;
76         uta_mhdr_t*     hdr;
77         int tr_len;                             // length to allocate for trace info
78
79         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
80
81         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
82         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
83
84         if( msg == NULL ) {
85                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
86                 if( msg == NULL ) {
87                         fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" );
88                         exit( 1 );
89                 }
90         } else {
91                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
92         }
93
94         memset( msg, 0, sizeof( *msg ) );
95
96         if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                           // this will be released on send, so DO NOT free
97                 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno );
98                 exit( 1 );
99         }
100
101         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                 // must ensure that header portion of tpbuf is 0
102         msg->tp_buf = msg->header;
103         hdr = (uta_mhdr_t *) msg->header;
104         hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
105         hdr->sub_id = htonl( UNSET_SUBID );
106         SET_HDR_LEN( hdr );
107         SET_HDR_TR_LEN( hdr, tr_len );                                                  // set the actual length used
108         //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
109         //SET_HDR_D2_LEN( hdr, ctx->d1_len );
110
111         msg->len = 0;                                                                                   // length of data in the payload
112         msg->alloc_len = mlen;                                                                  // length of allocated payload
113         msg->sub_id = UNSET_SUBID;
114         msg->mtype = UNSET_MSGTYPE;
115         msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
116         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
117         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
118         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
119         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
120         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
121
122         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header );
123
124         return msg;
125 }
126
127 /*
128         This will clone a message into a new zero copy buffer and return the cloned message.
129 */
130 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
131         rmr_mbuf_t* nm;                 // new message buffer
132         int     mlen;
133
134         if( old_msg == NULL ) {
135                 return NULL;
136         }
137
138         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
139         if( nm == NULL ) {
140                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" );
141                 exit( 1 );
142         }
143         memset( nm, 0, sizeof( *nm ) );
144
145         mlen = old_msg->alloc_len;                                                      // length allocated before
146         if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
147                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno );
148                 exit( 1 );
149         }
150
151         memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
152
153         nm->mtype = old_msg->mtype;
154         nm->sub_id = old_msg->sub_id;
155         nm->len = old_msg->len;                                                                 // length of data in the payload
156         nm->alloc_len = mlen;                                                                   // length of allocated payload
157         nm->payload = PAYLOAD_ADDR( nm->header );                               // reference the payload
158         nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
159         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
160         nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
161         memcpy( nm->payload, old_msg->payload, old_msg->len );
162
163         return nm;
164 }
165
166 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
167         rmr_mbuf_t* nm;                 // new message buffer
168         size_t  mlen;
169         int state;
170         uta_mhdr_t* hdr;
171         uta_v1mhdr_t* v1hdr;
172         int     tr_old_len;                     // tr size in new buffer
173
174
175         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
176         if( nm == NULL ) {
177                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
178                 exit( 1 );
179         }
180         memset( nm, 0, sizeof( *nm ) );
181
182         hdr = old_msg->header;
183         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
184
185         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
186         if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
187         if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
188                 fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno );
189                 exit( 1 );
190         }
191
192         nm->tp_buf = nm->header;                                                                // in nano both are the same
193         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
194         switch( ntohl( v1hdr->rmr_ver ) ) {
195                 case 1:
196                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
197                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
198                         break;
199
200                 default:                                                                                        // current message always caught  here
201                         hdr = nm->header;
202                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
203                         if( RMR_D1_LEN( hdr )  ) {
204                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
205
206                         }
207                         if( RMR_D2_LEN( hdr )  ) {
208                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) );          // copy data1 and data2 if necessary
209                         }
210
211                         SET_HDR_TR_LEN( hdr, tr_len );                                                                          // len MUST be set before pointing payload
212                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // reference user payload
213                         break;
214         }
215
216         // --- these are all version agnostic -----------------------------------
217         nm->mtype = old_msg->mtype;
218         nm->sub_id = old_msg->sub_id;
219         nm->len = old_msg->len;                                                                 // length of data in the payload
220         nm->alloc_len = mlen;                                                                   // length of allocated payload
221
222         nm->xaction = hdr->xid;                                                                 // reference xaction
223         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
224         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
225         memcpy( nm->payload, old_msg->payload, old_msg->len );
226
227         return nm;
228 }
229
230 /*
231         This is the receive work horse used by the outer layer receive functions.
232         It waits for a message to be received on our listen socket. If old msg
233         is passed in, the we assume we can use it instead of allocating a new
234         one, else a new block of memory is allocated.
235
236         This allocates a zero copy message so that if the user wishes to call
237         uta_rts_msg() the send is zero copy.
238 */
239 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
240         int nn_sock;                            // endpoint socket for send
241         int state;
242         rmr_mbuf_t*     msg = NULL;             // msg received
243         uta_mhdr_t* hdr;
244
245         if( old_msg ) {
246                 msg = old_msg;
247         } else {
248                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
249         }
250
251         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // total space (header + payload len) allocated
252         if( msg->state > (int) sizeof( uta_mhdr_t ) ) {                                         // we need more than just a header here
253                 hdr = (uta_mhdr_t *) msg->header;
254                 msg->len = ntohl( hdr->plen );                                          // length of data in the payload (likely < payload size)
255                 if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
256                         msg->state = RMR_ERR_TRUNC;
257                         msg->len = msg->state - RMR_HDR_LEN( hdr );
258                 }
259                 msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
260                 msg->sub_id = ntohl( hdr->sub_id );                                                             // capture and convert from network order to local order
261                 msg->state = RMR_OK;
262                 msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
263                 msg->payload = PAYLOAD_ADDR( msg->header );
264                 msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
265                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
266                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
267         } else {
268                 msg->len = 0;
269                 msg->state = RMR_ERR_EMPTY;
270         }
271
272         return msg;
273 }
274
275
276 /*
277         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
278         message buffer cannot be used to send, and the length information may or may
279         not be correct (it is set to the length received which might be more than the
280         bytes actually in the payload).
281 */
282 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
283         int nn_sock;                            // endpoint socket for send
284         int state;
285         rmr_mbuf_t*     msg = NULL;             // msg received
286
287         if( old_msg ) {
288                 msg = old_msg;
289         } else {
290                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
291         }
292
293         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
294         if( msg->state >= 0 ) {
295                 msg->xaction = NULL;
296                 msg->mtype = UNSET_MSGTYPE;
297                 msg->sub_id = UNSET_SUBID;
298                 msg->len = msg->state;                                                                          // no header; len is the entire thing received
299                 msg->state = RMR_OK;
300                 msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
301                 msg->payload = msg->header;
302                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
303         } else {
304                 msg->len = 0;
305                 msg->state = RMR_ERR_EMPTY;
306                 msg->payload = NULL;
307                 msg->xaction = NULL;
308                 msg->mtype = UNSET_MSGTYPE;
309                 msg->sub_id = UNSET_SUBID;
310         }
311
312         return msg;
313 }
314
315 /*
316         This does the hard work of actually sending the message to the given socket. On success,
317         a new message struct is returned. On error, the original msg is returned with the state
318         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
319         buffer will not be allocated and returned (mostly for call() interal processing since
320         the return message from call() is a received buffer, not a new one).
321
322         Called by rmr_send_msg() and rmr_rts_msg().
323 */
324 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
325         int state;
326         uta_mhdr_t*     hdr;
327         int     tr_len;                                 // length from the message being sent (must snarf before send to use after send)
328
329         // future: ensure that application did not overrun the XID buffer; last byte must be 0
330
331         //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype );
332         hdr = (uta_mhdr_t *) msg->header;
333         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
334         hdr->sub_id = htonl( msg->sub_id );
335         hdr->plen = htonl( msg->len );
336
337         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
338                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
339                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
340         }
341
342         tr_len = RMR_TR_LEN( hdr );
343         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
344                 if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
345                         msg->state = state;
346                 } else {
347                         msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
348                 }
349         } else {
350                 if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
351                         msg->state = state;
352                 }
353         }
354
355         // future:  if nano sends bytes, but less than mlen, then what to do?
356         if( msg->state >= 0 ) {                                                                         // successful send
357                 if( !(msg->flags & MFL_NOALLOC) ) {                                             // if noalloc is set, then caller doesn't want a new buffer
358                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );      // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer)
359                 } else {
360                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
361                         return NULL;
362                 }
363         } else {                                                                                        // send failed -- return original message
364                 if( errno == EAGAIN ) {
365                         msg->state = RMR_ERR_RETRY;                                     // some wrappers can't see errno, make this obvious
366                 } else {
367                         msg->state = RMR_ERR_SENDFAILED;                                        // errno will have nano reason
368                 }
369                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) );
370         }
371
372         return msg;
373 }
374
375
376 /*
377         A generic wrapper to the real send to keep wormhole stuff agnostic.
378         We assume the wormhole function vetted the buffer so we don't have to.
379 */
380 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
381         return send_msg( ctx, msg, ep->nn_sock );
382 }
383
384 #endif