77a0e64642d33b9f7bd2e3d3dc071c07b9d57e43
[ric-plt/lib/rmr.git] / src / nanomsg / src / sr_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_static.c
23         Abstract:       These are static send/receive related functions.
24
25                                 (broken out of rmr.c)
26         Author:         E. Scott Daniels
27         Date:           13 February 2019
28 */
29
30 #ifndef _sr_static_c
31 #define _sr_static_c
32
33 /*
34 #include <ctype.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <netdb.h>
38 #include <errno.h>
39 #include <string.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #include <unistd.h>
43 #include <stdint.h>
44 #include <time.h>
45 #include <arpa/inet.h>
46
47 #include <nanomsg/nn.h>
48 #include <nanomsg/tcp.h>
49 #include <nanomsg/pair.h>
50 #include <nanomsg/pipeline.h>
51 #include <nanomsg/pubsub.h>
52
53 #include "rmr.h"                                // things the users see
54 #include "rmr_private.h"                // things that we need too
55 #include "rmr_symtab.h"
56
57 #include "ring_static.c"                // message ring support
58 #include "rt_generic_static.c"  // generic route table (not nng/nano specific)
59 #include "rtable_static.c"              // route table things   (nano specific)
60 #include "tools_static.c"
61 */
62
63
64 /*
65         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
66         a new message struct as well. Size is the size of the zc buffer to allocate (not
67         including our header). If size is 0, then the buffer allocated is the size previously
68         allocated (if msg is !nil) or the default size given at initialisation).
69
70
71         The trlo parm is the trace length override which will be used if not 0. If 0, then the
72         length in the context is used (default).
73 */
74 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
75         int     mlen;
76         uta_mhdr_t*     hdr;
77         int tr_len;                             // length to allocate for trace info
78
79         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
80
81         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
82         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
83
84         if( msg == NULL ) {
85                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
86                 if( msg == NULL ) {
87                         fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" );
88                         exit( 1 );
89                 }
90         } else {
91                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
92         }
93
94         memset( msg, 0, sizeof( *msg ) );
95
96         if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                           // this will be released on send, so DO NOT free
97                 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno );
98                 exit( 1 );
99         }
100
101         hdr = (uta_mhdr_t *) msg->header;
102         hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
103         hdr->sub_id = htonl( UNSET_SUBID );
104         SET_HDR_LEN( hdr );
105         SET_HDR_TR_LEN( hdr, tr_len );                                                  // set the actual length used
106         //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
107         //SET_HDR_D2_LEN( hdr, ctx->d1_len );
108
109         msg->len = 0;                                                                                   // length of data in the payload
110         msg->alloc_len = mlen;                                                                  // length of allocated payload
111         msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
112         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
113         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
114         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
115         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
116
117         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header );
118
119         return msg;
120 }
121
122 /*
123         This will clone a message into a new zero copy buffer and return the cloned message.
124 */
125 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
126         rmr_mbuf_t* nm;                 // new message buffer
127         int     mlen;
128
129         if( old_msg == NULL ) {
130                 return NULL;
131         }
132
133         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
134         if( nm == NULL ) {
135                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" );
136                 exit( 1 );
137         }
138         memset( nm, 0, sizeof( *nm ) );
139
140         mlen = old_msg->alloc_len;                                                      // length allocated before
141         if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
142                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno );
143                 exit( 1 );
144         }
145
146         memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
147
148         nm->mtype = old_msg->mtype;
149         nm->sub_id = old_msg->sub_id;
150         nm->len = old_msg->len;                                                                 // length of data in the payload
151         nm->alloc_len = mlen;                                                                   // length of allocated payload
152         nm->payload = PAYLOAD_ADDR( nm->header );                               // reference the payload
153         nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
154         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
155         nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
156         memcpy( nm->payload, old_msg->payload, old_msg->len );
157
158         return nm;
159 }
160
161 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
162         rmr_mbuf_t* nm;                 // new message buffer
163         size_t  mlen;
164         int state;
165         uta_mhdr_t* hdr;
166         uta_v1mhdr_t* v1hdr;
167         int     tr_old_len;                     // tr size in new buffer
168
169
170         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
171         if( nm == NULL ) {
172                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
173                 exit( 1 );
174         }
175         memset( nm, 0, sizeof( *nm ) );
176
177         hdr = old_msg->header;
178         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
179
180         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
181         if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
182         if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
183                 fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno );
184                 exit( 1 );
185         }
186
187         nm->tp_buf = nm->header;                                                                // in nano both are the same
188         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
189         switch( ntohl( v1hdr->rmr_ver ) ) {
190                 case 1:
191                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
192                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
193                         break;
194
195                 default:                                                                                        // current message always caught  here
196                         hdr = nm->header;
197                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
198                         if( RMR_D1_LEN( hdr )  ) {
199                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
200
201                         }
202                         if( RMR_D2_LEN( hdr )  ) {
203                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) );          // copy data1 and data2 if necessary
204                         }
205
206                         SET_HDR_TR_LEN( hdr, tr_len );                                                                          // len MUST be set before pointing payload
207                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // reference user payload
208                         break;
209         }
210
211         // --- these are all version agnostic -----------------------------------
212         nm->mtype = old_msg->mtype;
213         nm->sub_id = old_msg->sub_id;
214         nm->len = old_msg->len;                                                                 // length of data in the payload
215         nm->alloc_len = mlen;                                                                   // length of allocated payload
216
217         nm->xaction = hdr->xid;                                                                 // reference xaction
218         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
219         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
220         memcpy( nm->payload, old_msg->payload, old_msg->len );
221
222         return nm;
223 }
224
225 /*
226         This is the receive work horse used by the outer layer receive functions.
227         It waits for a message to be received on our listen socket. If old msg
228         is passed in, the we assume we can use it instead of allocating a new
229         one, else a new block of memory is allocated.
230
231         This allocates a zero copy message so that if the user wishes to call
232         uta_rts_msg() the send is zero copy.
233 */
234 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
235         int nn_sock;                            // endpoint socket for send
236         int state;
237         rmr_mbuf_t*     msg = NULL;             // msg received
238         uta_mhdr_t* hdr;
239
240         if( old_msg ) {
241                 msg = old_msg;
242         } else {
243                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
244         }
245
246         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // total space (header + payload len) allocated
247         if( msg->state > (int) sizeof( uta_mhdr_t ) ) {                                         // we need more than just a header here
248                 hdr = (uta_mhdr_t *) msg->header;
249                 msg->len = ntohl( hdr->plen );                                          // length of data in the payload (likely < payload size)
250                 if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
251                         msg->state = RMR_ERR_TRUNC;
252                         msg->len = msg->state - RMR_HDR_LEN( hdr );
253                 }
254                 msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
255                 msg->sub_id = ntohl( hdr->sub_id );                                                             // capture and convert from network order to local order
256                 msg->state = RMR_OK;
257                 msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
258                 msg->payload = PAYLOAD_ADDR( msg->header );
259                 msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
260                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
261                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
262         } else {
263                 msg->len = 0;
264                 msg->state = RMR_ERR_EMPTY;
265         }
266
267         return msg;
268 }
269
270
271 /*
272         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
273         message buffer cannot be used to send, and the length information may or may
274         not be correct (it is set to the length received which might be more than the
275         bytes actually in the payload).
276 */
277 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
278         int nn_sock;                            // endpoint socket for send
279         int state;
280         rmr_mbuf_t*     msg = NULL;             // msg received
281
282         if( old_msg ) {
283                 msg = old_msg;
284         } else {
285                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
286         }
287
288         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
289         if( msg->state >= 0 ) {
290                 msg->xaction = NULL;
291                 msg->mtype = UNSET_MSGTYPE;
292                 msg->sub_id = UNSET_SUBID;
293                 msg->len = msg->state;                                                                          // no header; len is the entire thing received
294                 msg->state = RMR_OK;
295                 msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
296                 msg->payload = msg->header;
297                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
298         } else {
299                 msg->len = 0;
300                 msg->state = RMR_ERR_EMPTY;
301                 msg->payload = NULL;
302                 msg->xaction = NULL;
303                 msg->mtype = UNSET_MSGTYPE;
304                 msg->sub_id = UNSET_SUBID;
305         }
306
307         return msg;
308 }
309
310 /*
311         This does the hard work of actually sending the message to the given socket. On success,
312         a new message struct is returned. On error, the original msg is returned with the state
313         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
314         buffer will not be allocated and returned (mostly for call() interal processing since
315         the return message from call() is a received buffer, not a new one).
316
317         Called by rmr_send_msg() and rmr_rts_msg().
318 */
319 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
320         int state;
321         uta_mhdr_t*     hdr;
322         int     tr_len;                                 // length from the message being sent (must snarf before send to use after send)
323
324         // future: ensure that application did not overrun the XID buffer; last byte must be 0
325
326         hdr = (uta_mhdr_t *) msg->header;
327         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
328         hdr->sub_id = htonl( msg->sub_id );
329         hdr->plen = htonl( msg->len );
330
331         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
332                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
333         }
334
335         tr_len = RMR_TR_LEN( hdr );
336         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
337                 if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
338                         msg->state = state;
339                 } else {
340                         msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
341                 }
342         } else {
343                 if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
344                         msg->state = state;
345                 }
346         }
347
348         // future:  if nano sends bytes, but less than mlen, then what to do?
349         if( msg->state >= 0 ) {                                                                         // successful send
350                 if( !(msg->flags & MFL_NOALLOC) ) {                                             // if noalloc is set, then caller doesn't want a new buffer
351                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );      // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer)
352                 } else {
353                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
354                         return NULL;
355                 }
356         } else {                                                                                        // send failed -- return original message
357                 if( errno == EAGAIN ) {
358                         msg->state = RMR_ERR_RETRY;                                     // some wrappers can't see errno, make this obvious
359                 } else {
360                         msg->state = RMR_ERR_SENDFAILED;                                        // errno will have nano reason
361                 }
362                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) );
363         }
364
365         return msg;
366 }
367
368
369 /*
370         A generic wrapper to the real send to keep wormhole stuff agnostic.
371         We assume the wormhole function vetted the buffer so we don't have to.
372 */
373 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
374         return send_msg( ctx, msg, ep->nn_sock );
375 }
376
377 #endif