feat(msgs): Add header v2 support
[ric-plt/lib/rmr.git] / src / nanomsg / src / sr_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_static.c
23         Abstract:       These are static send/receive related functions.
24
25                                 (broken out of rmr.c)
26         Author:         E. Scott Daniels
27         Date:           13 February 2019
28 */
29
30 #ifndef _sr_static_c
31 #define _sr_static_c
32
33 /*
34 #include <ctype.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <netdb.h>
38 #include <errno.h>
39 #include <string.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #include <unistd.h>
43 #include <stdint.h>
44 #include <time.h>
45 #include <arpa/inet.h>
46
47 #include <nanomsg/nn.h>
48 #include <nanomsg/tcp.h>
49 #include <nanomsg/pair.h>
50 #include <nanomsg/pipeline.h>
51 #include <nanomsg/pubsub.h>
52
53 #include "rmr.h"                                // things the users see
54 #include "rmr_private.h"                // things that we need too
55 #include "rmr_symtab.h"
56
57 #include "ring_static.c"                // message ring support
58 #include "rt_generic_static.c"  // generic route table (not nng/nano specific)
59 #include "rtable_static.c"              // route table things   (nano specific)
60 #include "tools_static.c"
61 */
62
63
64 /*
65         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
66         a new message struct as well. Size is the size of the zc buffer to allocate (not
67         including our header). If size is 0, then the buffer allocated is the size previously
68         allocated (if msg is !nil) or the default size given at initialisation).
69 */
70 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
71         int     mlen;
72         uta_mhdr_t*     hdr;
73
74         mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
75         mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
76
77         if( msg == NULL ) {
78                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
79                 if( msg == NULL ) {
80                         fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" );
81                         exit( 1 );
82                 }
83         } else {
84                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
85         }
86
87         memset( msg, 0, sizeof( *msg ) );
88
89         if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                           // this will be released on send, so DO NOT free
90                 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno );
91                 exit( 1 );
92         }
93
94         hdr = (uta_mhdr_t *) msg->header;
95         hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
96         SET_HDR_LEN( hdr );
97         SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
98         //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
99         //SET_HDR_D2_LEN( hdr, ctx->d1_len );
100
101         msg->len = 0;                                                                                   // length of data in the payload
102         msg->alloc_len = mlen;                                                                  // length of allocated payload
103         msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // point at the payload in transport
104         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                                                // point at transaction id in header area
105         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
106         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
107         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
108
109         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header );
110
111         return msg;
112 }
113
114 /*
115         This will clone a message into a new zero copy buffer and return the cloned message.
116 */
117 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
118         rmr_mbuf_t* nm;                 // new message buffer
119         int     mlen;
120
121         if( old_msg == NULL ) {
122                 return NULL;
123         }
124
125         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
126         if( nm == NULL ) {
127                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" );
128                 exit( 1 );
129         }
130         memset( nm, 0, sizeof( *nm ) );
131
132         mlen = old_msg->alloc_len;                                                      // length allocated before
133         if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
134                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno );
135                 exit( 1 );
136         }
137
138         memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
139
140         nm->mtype = old_msg->mtype;
141         nm->len = old_msg->len;                                                                 // length of data in the payload
142         nm->alloc_len = mlen;                                                                   // length of allocated payload
143         nm->payload = nm->header + PAYLOAD_OFFSET( nm->header );
144         nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
145         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
146         nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
147         memcpy( nm->payload, old_msg->payload, old_msg->len );
148
149         return nm;
150 }
151
152 /*
153         This is the receive work horse used by the outer layer receive functions.
154         It waits for a message to be received on our listen socket. If old msg
155         is passed in, the we assume we can use it instead of allocating a new
156         one, else a new block of memory is allocated.
157
158         This allocates a zero copy message so that if the user wishes to call
159         uta_rts_msg() the send is zero copy.
160 */
161 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
162         int nn_sock;                            // endpoint socket for send
163         int state;
164         rmr_mbuf_t*     msg = NULL;             // msg received
165         uta_mhdr_t* hdr;
166
167         if( old_msg ) {
168                 msg = old_msg;
169         } else {
170                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
171         }
172
173         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // total space (header + payload len) allocated
174         if( msg->state > (int) sizeof( uta_mhdr_t ) ) {                                         // we need more than just a header here
175                 hdr = (uta_mhdr_t *) msg->header;
176                 msg->len = ntohl( hdr->plen );                                          // length of data in the payload (likely < payload size)
177                 if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
178                         msg->state = RMR_ERR_TRUNC;
179                         msg->len = msg->state - RMR_HDR_LEN( hdr );
180                 }
181                 msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
182                 msg->state = RMR_OK;
183                 msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
184                 msg->payload = msg->header + PAYLOAD_OFFSET( msg->header );
185                 msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
186                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
187                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
188         } else {
189                 msg->len = 0;
190                 msg->state = RMR_ERR_EMPTY;
191         }
192
193         return msg;
194 }
195
196
197 /*
198         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
199         message buffer cannot be used to send, and the length information may or may 
200         not be correct (it is set to the length received which might be more than the
201         bytes actually in the payload).
202 */
203 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
204         int nn_sock;                            // endpoint socket for send
205         int state;
206         rmr_mbuf_t*     msg = NULL;             // msg received
207
208         if( old_msg ) {
209                 msg = old_msg;
210         } else {
211                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
212         }
213
214         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
215         if( msg->state >= 0 ) {
216                 msg->xaction = NULL;
217                 msg->mtype = -1;
218                 msg->len = msg->state;                                                                          // no header; len is the entire thing received
219                 msg->state = RMR_OK;
220                 msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
221                 msg->payload = msg->header;
222                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
223         } else {
224                 msg->len = 0;
225                 msg->state = RMR_ERR_EMPTY;
226                 msg->payload = NULL;
227                 msg->xaction = NULL;
228                 msg->mtype = -1;
229         }
230
231         return msg;
232 }
233
234 /*
235         This does the hard work of actually sending the message to the given socket. On success,
236         a new message struct is returned. On error, the original msg is returned with the state 
237         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
238         buffer will not be allocated and returned (mostly for call() interal processing since
239         the return message from call() is a received buffer, not a new one).
240
241         Called by rmr_send_msg() and rmr_rts_msg().
242 */
243 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
244         int state;
245         uta_mhdr_t*     hdr;
246
247         // future: ensure that application did not overrun the XID buffer; last byte must be 0
248
249         hdr = (uta_mhdr_t *) msg->header;
250         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
251         hdr->plen = htonl( msg->len );
252
253         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
254                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
255         }
256
257         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
258                 if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
259                         msg->state = state;
260                 } else {
261                         msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
262                 }
263         } else {
264                 if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
265                         msg->state = state;
266                 } 
267         }
268
269         // future:  if nano sends bytes, but less than mlen, then what to do?
270         if( msg->state >= 0 ) {                                                         // successful send
271                 if( !(msg->flags & MFL_NOALLOC) ) {                             // if noalloc is set, then caller doesn't want a new buffer
272                         return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
273                 } else {
274                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
275                         return NULL;
276                 }
277         } else {                                                                                        // send failed -- return original message
278                 if( errno == EAGAIN ) {
279                         msg->state = RMR_ERR_RETRY;                                     // some wrappers can't see errno, make this obvious
280                 } else {
281                         msg->state = RMR_ERR_SENDFAILED;                                        // errno will have nano reason
282                 }
283                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) );
284         }
285
286         return msg;
287 }
288
289
290 /*
291         A generic wrapper to the real send to keep wormhole stuff agnostic.
292         We assume the wormhole function vetted the buffer so we don't have to.
293 */
294 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
295         return send_msg( ctx, msg, ep->nn_sock );
296 }
297
298 #endif