0f59da2a1f8b0175c88a4179dfd23376cc73eab4
[ric-plt/lib/rmr.git] / src / nanomsg / src / sr_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_static.c
23         Abstract:       These are static send/receive related functions.
24
25                                 (broken out of rmr.c)
26         Author:         E. Scott Daniels
27         Date:           13 February 2019
28 */
29
30 #ifndef _sr_static_c
31 #define _sr_static_c
32
33 /*
34 #include <ctype.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <netdb.h>
38 #include <errno.h>
39 #include <string.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #include <unistd.h>
43 #include <stdint.h>
44 #include <time.h>
45 #include <arpa/inet.h>
46
47 #include <nanomsg/nn.h>
48 #include <nanomsg/tcp.h>
49 #include <nanomsg/pair.h>
50 #include <nanomsg/pipeline.h>
51 #include <nanomsg/pubsub.h>
52
53 #include "rmr.h"                                // things the users see
54 #include "rmr_private.h"                // things that we need too
55 #include "rmr_symtab.h"
56
57 #include "ring_static.c"                // message ring support
58 #include "rt_generic_static.c"  // generic route table (not nng/nano specific)
59 #include "rtable_static.c"              // route table things   (nano specific)
60 #include "tools_static.c"
61 */
62
63
64 /*
65         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
66         a new message struct as well. Size is the size of the zc buffer to allocate (not
67         including our header). If size is 0, then the buffer allocated is the size previously
68         allocated (if msg is !nil) or the default size given at initialisation).
69
70
71         The trlo parm is the trace length override which will be used if not 0. If 0, then the
72         length in the context is used (default).
73 */
74 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
75         int     mlen;
76         uta_mhdr_t*     hdr;
77         int tr_len;                             // length to allocate for trace info
78
79         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
80
81         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
82         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
83
84         if( msg == NULL ) {
85                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
86                 if( msg == NULL ) {
87                         fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" );
88                         exit( 1 );
89                 }
90         } else {
91                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
92         }
93
94         memset( msg, 0, sizeof( *msg ) );
95
96         if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                           // this will be released on send, so DO NOT free
97                 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno );
98                 exit( 1 );
99         }
100
101         hdr = (uta_mhdr_t *) msg->header;
102         hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
103         SET_HDR_LEN( hdr );
104         SET_HDR_TR_LEN( hdr, tr_len );                                                  // set the actual length used
105         //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
106         //SET_HDR_D2_LEN( hdr, ctx->d1_len );
107
108         msg->len = 0;                                                                                   // length of data in the payload
109         msg->alloc_len = mlen;                                                                  // length of allocated payload
110         msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
111         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
112         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
113         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
114         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
115
116         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header );
117
118         return msg;
119 }
120
121 /*
122         This will clone a message into a new zero copy buffer and return the cloned message.
123 */
124 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
125         rmr_mbuf_t* nm;                 // new message buffer
126         int     mlen;
127
128         if( old_msg == NULL ) {
129                 return NULL;
130         }
131
132         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
133         if( nm == NULL ) {
134                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" );
135                 exit( 1 );
136         }
137         memset( nm, 0, sizeof( *nm ) );
138
139         mlen = old_msg->alloc_len;                                                      // length allocated before
140         if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
141                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno );
142                 exit( 1 );
143         }
144
145         memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
146
147         nm->mtype = old_msg->mtype;
148         nm->len = old_msg->len;                                                                 // length of data in the payload
149         nm->alloc_len = mlen;                                                                   // length of allocated payload
150         nm->payload = PAYLOAD_ADDR( nm->header );                               // reference the payload
151         nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
152         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
153         nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
154         memcpy( nm->payload, old_msg->payload, old_msg->len );
155
156         return nm;
157 }
158
159 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
160         rmr_mbuf_t* nm;                 // new message buffer
161         size_t  mlen;
162         int state;
163         uta_mhdr_t* hdr;
164         uta_v1mhdr_t* v1hdr;
165         int     tr_old_len;                     // tr size in new buffer
166
167
168         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
169         if( nm == NULL ) {
170                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
171                 exit( 1 );
172         }
173         memset( nm, 0, sizeof( *nm ) );
174
175         hdr = old_msg->header;
176         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
177
178         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
179         if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
180         if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
181                 fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno );
182                 exit( 1 );
183         }
184
185         nm->tp_buf = nm->header;                                                                // in nano both are the same
186         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
187         switch( ntohl( v1hdr->rmr_ver ) ) {
188                 case 1:
189                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
190                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
191                         break;
192
193                 default:                                                                                        // current message always caught  here
194                         hdr = nm->header;
195                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
196                         if( RMR_D1_LEN( hdr )  ) {
197                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
198                         
199                         }
200                         if( RMR_D2_LEN( hdr )  ) {
201                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) );          // copy data1 and data2 if necessary
202                         }
203
204                         SET_HDR_TR_LEN( hdr, tr_len );                                                                          // len MUST be set before pointing payload
205                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // reference user payload
206                         break;
207         }
208                 
209         // --- these are all version agnostic -----------------------------------
210         nm->mtype = old_msg->mtype;
211         nm->len = old_msg->len;                                                                 // length of data in the payload
212         nm->alloc_len = mlen;                                                                   // length of allocated payload
213
214         nm->xaction = hdr->xid;                                                                 // reference xaction
215         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
216         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
217         memcpy( nm->payload, old_msg->payload, old_msg->len );
218
219         return nm;
220 }
221
222 /*
223         This is the receive work horse used by the outer layer receive functions.
224         It waits for a message to be received on our listen socket. If old msg
225         is passed in, the we assume we can use it instead of allocating a new
226         one, else a new block of memory is allocated.
227
228         This allocates a zero copy message so that if the user wishes to call
229         uta_rts_msg() the send is zero copy.
230 */
231 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
232         int nn_sock;                            // endpoint socket for send
233         int state;
234         rmr_mbuf_t*     msg = NULL;             // msg received
235         uta_mhdr_t* hdr;
236
237         if( old_msg ) {
238                 msg = old_msg;
239         } else {
240                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
241         }
242
243         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // total space (header + payload len) allocated
244         if( msg->state > (int) sizeof( uta_mhdr_t ) ) {                                         // we need more than just a header here
245                 hdr = (uta_mhdr_t *) msg->header;
246                 msg->len = ntohl( hdr->plen );                                          // length of data in the payload (likely < payload size)
247                 if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
248                         msg->state = RMR_ERR_TRUNC;
249                         msg->len = msg->state - RMR_HDR_LEN( hdr );
250                 }
251                 msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
252                 msg->state = RMR_OK;
253                 msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
254                 msg->payload = PAYLOAD_ADDR( msg->header );
255                 msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
256                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
257                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
258         } else {
259                 msg->len = 0;
260                 msg->state = RMR_ERR_EMPTY;
261         }
262
263         return msg;
264 }
265
266
267 /*
268         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
269         message buffer cannot be used to send, and the length information may or may 
270         not be correct (it is set to the length received which might be more than the
271         bytes actually in the payload).
272 */
273 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
274         int nn_sock;                            // endpoint socket for send
275         int state;
276         rmr_mbuf_t*     msg = NULL;             // msg received
277
278         if( old_msg ) {
279                 msg = old_msg;
280         } else {
281                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
282         }
283
284         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
285         if( msg->state >= 0 ) {
286                 msg->xaction = NULL;
287                 msg->mtype = -1;
288                 msg->len = msg->state;                                                                          // no header; len is the entire thing received
289                 msg->state = RMR_OK;
290                 msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
291                 msg->payload = msg->header;
292                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
293         } else {
294                 msg->len = 0;
295                 msg->state = RMR_ERR_EMPTY;
296                 msg->payload = NULL;
297                 msg->xaction = NULL;
298                 msg->mtype = -1;
299         }
300
301         return msg;
302 }
303
304 /*
305         This does the hard work of actually sending the message to the given socket. On success,
306         a new message struct is returned. On error, the original msg is returned with the state 
307         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
308         buffer will not be allocated and returned (mostly for call() interal processing since
309         the return message from call() is a received buffer, not a new one).
310
311         Called by rmr_send_msg() and rmr_rts_msg().
312 */
313 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
314         int state;
315         uta_mhdr_t*     hdr;
316         int     tr_len;                                 // length from the message being sent (must snarf before send to use after send)
317
318         // future: ensure that application did not overrun the XID buffer; last byte must be 0
319
320         hdr = (uta_mhdr_t *) msg->header;
321         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
322         hdr->plen = htonl( msg->len );
323
324         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
325                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
326         }
327
328         tr_len = RMR_TR_LEN( hdr );
329         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
330                 if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
331                         msg->state = state;
332                 } else {
333                         msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
334                 }
335         } else {
336                 if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
337                         msg->state = state;
338                 } 
339         }
340
341         // future:  if nano sends bytes, but less than mlen, then what to do?
342         if( msg->state >= 0 ) {                                                                         // successful send
343                 if( !(msg->flags & MFL_NOALLOC) ) {                                             // if noalloc is set, then caller doesn't want a new buffer
344                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );      // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer)
345                 } else {
346                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
347                         return NULL;
348                 }
349         } else {                                                                                        // send failed -- return original message
350                 if( errno == EAGAIN ) {
351                         msg->state = RMR_ERR_RETRY;                                     // some wrappers can't see errno, make this obvious
352                 } else {
353                         msg->state = RMR_ERR_SENDFAILED;                                        // errno will have nano reason
354                 }
355                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) );
356         }
357
358         return msg;
359 }
360
361
362 /*
363         A generic wrapper to the real send to keep wormhole stuff agnostic.
364         We assume the wormhole function vetted the buffer so we don't have to.
365 */
366 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
367         return send_msg( ctx, msg, ep->nn_sock );
368 }
369
370 #endif