Initial commit of RMR Library
[ric-plt/lib/rmr.git] / src / nanomsg / src / sr_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_static.c
23         Abstract:       These are static send/receive related functions.
24
25                                 (broken out of rmr.c)
26         Author:         E. Scott Daniels
27         Date:           13 February 2019
28 */
29
30 #ifndef _sr_static_c
31 #define _sr_static_c
32
33 /*
34 #include <ctype.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <netdb.h>
38 #include <errno.h>
39 #include <string.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #include <unistd.h>
43 #include <stdint.h>
44 #include <time.h>
45 #include <arpa/inet.h>
46
47 #include <nanomsg/nn.h>
48 #include <nanomsg/tcp.h>
49 #include <nanomsg/pair.h>
50 #include <nanomsg/pipeline.h>
51 #include <nanomsg/pubsub.h>
52
53 #include "rmr.h"                                // things the users see
54 #include "rmr_private.h"                // things that we need too
55 #include "rmr_symtab.h"
56
57 #include "ring_static.c"                // message ring support
58 #include "rt_generic_static.c"  // generic route table (not nng/nano specific)
59 #include "rtable_static.c"              // route table things   (nano specific)
60 #include "tools_static.c"
61 */
62
63
64 /*
65         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
66         a new message struct as well. Size is the size of the zc buffer to allocate (not
67         including our header). If size is 0, then the buffer allocated is the size previously
68         allocated (if msg is !nil) or the default size given at initialisation).
69 */
70 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
71         int     mlen;
72
73         mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
74         mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
75
76         if( msg == NULL ) {
77                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
78                 if( msg == NULL ) {
79                         fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" );
80                         exit( 1 );
81                 }
82         } else {
83                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
84         }
85
86         memset( msg, 0, sizeof( *msg ) );
87
88         if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                           // this will be released on send, so DO NOT free
89                 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno );
90                 exit( 1 );
91         }
92
93         ((uta_mhdr_t *) msg->header)->rmr_ver = RMR_MSG_VER;    // version info should we need to recognised old style messages someday
94         msg->len = 0;                                                                                   // length of data in the payload
95         msg->alloc_len = mlen;                                                                  // length of allocated payload
96         msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
97         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                                                // point at transaction id in header area
98         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
99         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
100         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
101
102         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header );
103
104         return msg;
105 }
106
107 /*
108         This will clone a message into a new zero copy buffer and return the cloned message.
109 */
110 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
111         rmr_mbuf_t* nm;                 // new message buffer
112         int     mlen;
113
114         if( old_msg == NULL ) {
115                 return NULL;
116         }
117
118         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
119         if( nm == NULL ) {
120                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" );
121                 exit( 1 );
122         }
123         memset( nm, 0, sizeof( *nm ) );
124
125         mlen = old_msg->alloc_len;                                                      // length allocated before
126         if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
127                 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno );
128                 exit( 1 );
129         }
130
131         nm->mtype = old_msg->mtype;
132         nm->len = old_msg->len;                                                                 // length of data in the payload
133         nm->alloc_len = mlen;                                                                   // length of allocated payload
134         nm->payload = nm->header + sizeof( uta_mhdr_t );                // point past header to payload (single buffer allocation above)
135         nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
136         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
137         nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
138         memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
139         memcpy( nm->payload, old_msg->payload, old_msg->len );
140
141         return nm;
142 }
143
144 /*
145         This is the receive work horse used by the outer layer receive functions.
146         It waits for a message to be received on our listen socket. If old msg
147         is passed in, the we assume we can use it instead of allocating a new
148         one, else a new block of memory is allocated.
149
150         This allocates a zero copy message so that if the user wishes to call
151         uta_rts_msg() the send is zero copy.
152 */
153 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
154         int nn_sock;                            // endpoint socket for send
155         int state;
156         rmr_mbuf_t*     msg = NULL;             // msg received
157         uta_mhdr_t* hdr;
158
159         if( old_msg ) {
160                 msg = old_msg;
161         } else {
162                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
163         }
164
165         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // total space (header + payload len) allocated
166         if( msg->state > (int) sizeof( uta_mhdr_t ) ) {                                         // we need more than just a header here
167                 hdr = (uta_mhdr_t *) msg->header;
168                 msg->len = ntohl( hdr->plen );                                          // length of data in the payload (likely < payload size)
169                 if( msg->len > msg->state - sizeof( uta_mhdr_t ) ) {
170                         fprintf( stderr, "[WARN] rmr_rcv indicated payload length < rcvd payload: expected %d got %ld\n", 
171                                 msg->len, msg->state - sizeof( uta_mhdr_t ) );
172                 }
173                 msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
174                 msg->state = RMR_OK;
175                 msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
176                 msg->payload = msg->header + sizeof( uta_mhdr_t );
177                 msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
178                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
179                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
180         } else {
181                 msg->len = 0;
182                 msg->state = RMR_ERR_EMPTY;
183         }
184
185         return msg;
186 }
187
188
189 /*
190         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
191         message buffer cannot be used to send, and the length information may or may 
192         not be correct (it is set to the length received which might be more than the
193         bytes actually in the payload).
194 */
195 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
196         int nn_sock;                            // endpoint socket for send
197         int state;
198         rmr_mbuf_t*     msg = NULL;             // msg received
199
200         if( old_msg ) {
201                 msg = old_msg;
202         } else {
203                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
204         }
205
206         msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
207         if( msg->state >= 0 ) {
208                 msg->xaction = NULL;
209                 msg->mtype = -1;
210                 msg->len = msg->state;                                                                          // no header; len is the entire thing received
211                 msg->state = RMR_OK;
212                 msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
213                 msg->payload = msg->header;
214                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
215         } else {
216                 msg->len = 0;
217                 msg->state = RMR_ERR_EMPTY;
218                 msg->payload = NULL;
219                 msg->xaction = NULL;
220                 msg->mtype = -1;
221         }
222
223         return msg;
224 }
225
226 /*
227         This does the hard work of actually sending the message to the given socket. On success,
228         a new message struct is returned. On error, the original msg is returned with the state 
229         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
230         buffer will not be allocated and returned (mostly for call() interal processing since
231         the return message from call() is a received buffer, not a new one).
232
233         Called by rmr_send_msg() and rmr_rts_msg().
234 */
235 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
236         int state;
237         uta_mhdr_t*     hdr;
238
239         // future: ensure that application did not overrun the XID buffer; last byte must be 0
240
241         hdr = (uta_mhdr_t *) msg->header;
242         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
243         hdr->plen = htonl( msg->len );
244
245         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
246                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
247         }
248
249         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
250                 if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
251                         msg->state = state;
252                 } else {
253                         msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
254                 }
255         } else {
256                 if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
257                         msg->state = state;
258                 } 
259         }
260
261         // future:  if nano sends bytes, but less than mlen, then what to do?
262         if( msg->state >= 0 ) {                                                         // successful send
263                 if( !(msg->flags & MFL_NOALLOC) ) {                             // if noalloc is set, then caller doesn't want a new buffer
264                         return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
265                 } else {
266                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
267                         return NULL;
268                 }
269         } else {                                                                                        // send failed -- return original message
270                 if( errno == EAGAIN ) {
271                         msg->state = RMR_ERR_RETRY;                                     // some wrappers can't see errno, make this obvious
272                 } else {
273                         msg->state = RMR_ERR_SENDFAILED;                                        // errno will have nano reason
274                 }
275                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) );
276         }
277
278         return msg;
279 }
280
281
282 /*
283         A generic wrapper to the real send to keep wormhole stuff agnostic.
284         We assume the wormhole function vetted the buffer so we don't have to.
285 */
286 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
287         return send_msg( ctx, msg, ep->nn_sock );
288 }
289
290 #endif