18acdda51b9d96379d539b77c261b9deba050983
[ric-plt/lib/rmr.git] / src / rmr / nanomsg / src / sr_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_static.c
23         Abstract:       These are static send/receive related functions.
24
25                                 (broken out of rmr.c)
26         Author:         E. Scott Daniels
27         Date:           13 February 2019
28 */
29
30 #ifndef _sr_static_c
31 #define _sr_static_c
32
33 /*
34 #include <ctype.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <netdb.h>
38 #include <errno.h>
39 #include <string.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #include <unistd.h>
43 #include <stdint.h>
44 #include <time.h>
45 #include <arpa/inet.h>
46
47 #include <nanomsg/nn.h>
48 #include <nanomsg/tcp.h>
49 #include <nanomsg/pair.h>
50 #include <nanomsg/pipeline.h>
51 #include <nanomsg/pubsub.h>
52
53 #include "rmr.h"                                // things the users see
54 #include "rmr_private.h"                // things that we need too
55 #include "rmr_symtab.h"
56
57 #include "ring_static.c"                // message ring support
58 #include "rt_generic_static.c"  // generic route table (not nng/nano specific)
59 #include "rtable_static.c"              // route table things   (nano specific)
60 #include "tools_static.c"
61 */
62
63
64 /*
65         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
66         a new message struct as well. Size is the size of the zc buffer to allocate (not
67         including our header). If size is 0, then the buffer allocated is the size previously
68         allocated (if msg is !nil) or the default size given at initialisation).
69
70
71         The trlo parm is the trace length override which will be used if not 0. If 0, then the
72         length in the context is used (default).
73 */
74 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
75         int     mlen;
76         uta_mhdr_t*     hdr;
77         int tr_len;                             // length to allocate for trace info
78
79         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
80
81         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
82         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
83
84         if( msg == NULL ) {
85                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
86                 if( msg == NULL ) {
87                         fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" );
88                         exit( 1 );
89                 }
90         } else {
91                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
92         }
93
94         memset( msg, 0, sizeof( *msg ) );
95
96         if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                           // this will be released on send, so DO NOT free
97                 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno );
98                 exit( 1 );
99         }
100
101         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                 // must ensure that header portion of tpbuf is 0
102         msg->tp_buf = msg->header;
103         hdr = (uta_mhdr_t *) msg->header;
104         hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
105         hdr->sub_id = htonl( UNSET_SUBID );
106         SET_HDR_LEN( hdr );
107         SET_HDR_TR_LEN( hdr, tr_len );                                                  // set the actual length used
108         //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
109         //SET_HDR_D2_LEN( hdr, ctx->d1_len );
110
111         msg->len = 0;                                                                                   // length of data in the payload
112         msg->alloc_len = mlen;                                                                  // length of allocated payload
113         msg->sub_id = UNSET_SUBID;
114         msg->mtype = UNSET_MSGTYPE;
115         msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
116         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
117         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
118         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
119         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
120
121         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header );
122
123         return msg;
124 }
125
126 /*
127         This will clone a message into a new zero copy buffer and return the cloned message.
128 */
129 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
130         rmr_mbuf_t* nm;                 // new message buffer
131         int     mlen;
132
133         if( old_msg == NULL ) {
134                 return NULL;
135         }
136
137         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
138         if( nm == NULL ) {
139                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" );
140                 exit( 1 );
141         }
142         memset( nm, 0, sizeof( *nm ) );
143
144         mlen = old_msg->alloc_len;                                                      // length allocated before
145         if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
146                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno );
147                 exit( 1 );
148         }
149
150         memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
151
152         nm->mtype = old_msg->mtype;
153         nm->sub_id = old_msg->sub_id;
154         nm->len = old_msg->len;                                                                 // length of data in the payload
155         nm->alloc_len = mlen;                                                                   // length of allocated payload
156         nm->payload = PAYLOAD_ADDR( nm->header );                               // reference the payload
157         nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
158         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
159         nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
160         memcpy( nm->payload, old_msg->payload, old_msg->len );
161
162         return nm;
163 }
164
165 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
166         rmr_mbuf_t* nm;                 // new message buffer
167         size_t  mlen;
168         int state;
169         uta_mhdr_t* hdr;
170         uta_v1mhdr_t* v1hdr;
171         int     tr_old_len;                     // tr size in new buffer
172
173
174         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
175         if( nm == NULL ) {
176                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
177                 exit( 1 );
178         }
179         memset( nm, 0, sizeof( *nm ) );
180
181         hdr = old_msg->header;
182         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
183
184         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
185         if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
186         if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
187                 fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno );
188                 exit( 1 );
189         }
190
191         nm->tp_buf = nm->header;                                                                // in nano both are the same
192         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
193         switch( ntohl( v1hdr->rmr_ver ) ) {
194                 case 1:
195                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
196                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
197                         break;
198
199                 default:                                                                                        // current message always caught  here
200                         hdr = nm->header;
201                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
202                         if( RMR_D1_LEN( hdr )  ) {
203                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
204
205                         }
206                         if( RMR_D2_LEN( hdr )  ) {
207                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) );          // copy data1 and data2 if necessary
208                         }
209
210                         SET_HDR_TR_LEN( hdr, tr_len );                                                                          // len MUST be set before pointing payload
211                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // reference user payload
212                         break;
213         }
214
215         // --- these are all version agnostic -----------------------------------
216         nm->mtype = old_msg->mtype;
217         nm->sub_id = old_msg->sub_id;
218         nm->len = old_msg->len;                                                                 // length of data in the payload
219         nm->alloc_len = mlen;                                                                   // length of allocated payload
220
221         nm->xaction = hdr->xid;                                                                 // reference xaction
222         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
223         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
224         memcpy( nm->payload, old_msg->payload, old_msg->len );
225
226         return nm;
227 }
228
229 /*
230         This is the receive work horse used by the outer layer receive functions.
231         It waits for a message to be received on our listen socket. If old msg
232         is passed in, the we assume we can use it instead of allocating a new
233         one, else a new block of memory is allocated.
234
235         This allocates a zero copy message so that if the user wishes to call
236         uta_rts_msg() the send is zero copy.
237 */
238 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
239         int nn_sock;                            // endpoint socket for send
240         int state;
241         rmr_mbuf_t*     msg = NULL;             // msg received
242         uta_mhdr_t* hdr;
243
244         if( old_msg ) {
245                 msg = old_msg;
246         } else {
247                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
248         }
249
250         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // total space (header + payload len) allocated
251         if( msg->state > (int) sizeof( uta_mhdr_t ) ) {                                         // we need more than just a header here
252                 hdr = (uta_mhdr_t *) msg->header;
253                 msg->len = ntohl( hdr->plen );                                          // length of data in the payload (likely < payload size)
254                 if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
255                         msg->state = RMR_ERR_TRUNC;
256                         msg->len = msg->state - RMR_HDR_LEN( hdr );
257                 }
258                 msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
259                 msg->sub_id = ntohl( hdr->sub_id );                                                             // capture and convert from network order to local order
260                 msg->state = RMR_OK;
261                 msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
262                 msg->payload = PAYLOAD_ADDR( msg->header );
263                 msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
264                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
265                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
266         } else {
267                 msg->len = 0;
268                 msg->state = RMR_ERR_EMPTY;
269         }
270
271         return msg;
272 }
273
274
275 /*
276         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
277         message buffer cannot be used to send, and the length information may or may
278         not be correct (it is set to the length received which might be more than the
279         bytes actually in the payload).
280 */
281 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
282         int nn_sock;                            // endpoint socket for send
283         int state;
284         rmr_mbuf_t*     msg = NULL;             // msg received
285
286         if( old_msg ) {
287                 msg = old_msg;
288         } else {
289                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
290         }
291
292         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
293         if( msg->state >= 0 ) {
294                 msg->xaction = NULL;
295                 msg->mtype = UNSET_MSGTYPE;
296                 msg->sub_id = UNSET_SUBID;
297                 msg->len = msg->state;                                                                          // no header; len is the entire thing received
298                 msg->state = RMR_OK;
299                 msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
300                 msg->payload = msg->header;
301                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
302         } else {
303                 msg->len = 0;
304                 msg->state = RMR_ERR_EMPTY;
305                 msg->payload = NULL;
306                 msg->xaction = NULL;
307                 msg->mtype = UNSET_MSGTYPE;
308                 msg->sub_id = UNSET_SUBID;
309         }
310
311         return msg;
312 }
313
314 /*
315         This does the hard work of actually sending the message to the given socket. On success,
316         a new message struct is returned. On error, the original msg is returned with the state
317         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
318         buffer will not be allocated and returned (mostly for call() interal processing since
319         the return message from call() is a received buffer, not a new one).
320
321         Called by rmr_send_msg() and rmr_rts_msg().
322 */
323 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
324         int state;
325         uta_mhdr_t*     hdr;
326         int     tr_len;                                 // length from the message being sent (must snarf before send to use after send)
327
328         // future: ensure that application did not overrun the XID buffer; last byte must be 0
329
330         //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype );
331         hdr = (uta_mhdr_t *) msg->header;
332         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
333         hdr->sub_id = htonl( msg->sub_id );
334         hdr->plen = htonl( msg->len );
335
336         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
337                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
338         }
339
340         tr_len = RMR_TR_LEN( hdr );
341         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
342                 if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
343                         msg->state = state;
344                 } else {
345                         msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
346                 }
347         } else {
348                 if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
349                         msg->state = state;
350                 }
351         }
352
353         // future:  if nano sends bytes, but less than mlen, then what to do?
354         if( msg->state >= 0 ) {                                                                         // successful send
355                 if( !(msg->flags & MFL_NOALLOC) ) {                                             // if noalloc is set, then caller doesn't want a new buffer
356                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );      // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer)
357                 } else {
358                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
359                         return NULL;
360                 }
361         } else {                                                                                        // send failed -- return original message
362                 if( errno == EAGAIN ) {
363                         msg->state = RMR_ERR_RETRY;                                     // some wrappers can't see errno, make this obvious
364                 } else {
365                         msg->state = RMR_ERR_SENDFAILED;                                        // errno will have nano reason
366                 }
367                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) );
368         }
369
370         return msg;
371 }
372
373
374 /*
375         A generic wrapper to the real send to keep wormhole stuff agnostic.
376         We assume the wormhole function vetted the buffer so we don't have to.
377 */
378 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
379         return send_msg( ctx, msg, ep->nn_sock );
380 }
381
382 #endif