e50b5d026695991e0a082b27153880f39bf6e5e5
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59
60 #include "rmr.h"                                // things the users see
61 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
62 #include "rmr_si_private.h"     // things that we need too
63 #include "rmr_symtab.h"
64
65 #include "ring_static.c"                        // message ring support
66 #include "rt_generic_static.c"          // route table things not transport specific
67 #include "rtable_si_static.c"           // route table things -- transport specific
68 #include "rtc_si_static.c"                      // specific RMR only route table collector (SI only for now)
69 #include "tools_static.c"
70 #include "sr_si_static.c"                       // send/receive static functions
71 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
72 #include "mt_call_static.c"
73 #include "mt_call_si_static.c"
74
75
76 //------------------------------------------------------------------------------
77
78
79 /*
80         Clean up a context.
81 */
82 static void free_ctx( uta_ctx_t* ctx ) {
83         if( ctx ) {
84                 if( ctx->rtg_addr ) {
85                         free( ctx->rtg_addr );
86                 }
87         }
88 }
89
90 // --------------- public functions --------------------------------------------------------------------------
91
92 /*
93         Returns the size of the payload (bytes) that the msg buffer references.
94         Len in a message is the number of bytes which were received, or should
95         be transmitted, however, it is possible that the mbuf was allocated
96         with a larger payload space than the payload length indicates; this
97         function returns the absolute maximum space that the user has available
98         in the payload. On error (bad msg buffer) -1 is returned and errno should
99         indicate the rason.
100
101         The allocated len stored in the msg is:
102                 transport header length +
103                 message header + 
104                 user requested payload 
105
106         The msg header is a combination of the fixed RMR header and the variable
107         trace data and d2 fields which may vary for each message.
108 */
109 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
110         if( msg == NULL || msg->header == NULL ) {
111                 errno = EINVAL;
112                 return -1;
113         }
114
115         errno = 0;
116         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
117 }
118
119 /*
120         Allocates a send message as a zerocopy message allowing the underlying message protocol
121         to send the buffer without copy.
122 */
123 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
124         uta_ctx_t*      ctx;
125         rmr_mbuf_t*     m;
126
127         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
128                 return NULL;
129         }
130
131         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
132         return  m;
133 }
134
135
136 /*
137         Allocates a send message as a zerocopy message allowing the underlying message protocol
138         to send the buffer without copy. In addition, a trace data field of tr_size will be
139         added and the supplied data coppied to the buffer before returning the message to
140         the caller.
141 */
142 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
143         uta_ctx_t*      ctx;
144         rmr_mbuf_t*     m;
145         int state;
146
147         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
148                 return NULL;
149         }
150
151         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
152         if( m != NULL ) {
153                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
154                 if( state != tr_size ) {
155                         m->state = RMR_ERR_INITFAILED;
156                 }
157         }
158
159         return  m;
160 }
161
162 /*
163         This provides an external path to the realloc static function as it's called by an
164         outward facing mbuf api function. Used to reallocate a message with a different
165         trace data size.
166 */
167 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
168         return realloc_msg( msg, new_tr_size );
169 }
170
171
172 /*
173         Return the message to the available pool, or free it outright.
174 */
175 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
176         //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
177         //return;
178
179         if( mbuf == NULL ) {
180                 return;
181         }
182
183         if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
184                 if( mbuf->tp_buf ) {
185                         free( mbuf->tp_buf );
186                 }
187                 free( mbuf );
188         }
189 }
190
191 /*
192         This is a wrapper to the real timeout send. We must wrap it now to ensure that
193         the call flag and call-id are reset
194 */
195 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
196         char* d1;                                                                                                                       // point at the call-id in the header
197
198         if( msg != NULL ) {
199                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
200
201                 d1 = DATA1_ADDR( msg->header );
202                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
203         }       
204
205         return mtosend_msg( vctx, msg, max_to );
206 }
207
208 /*
209         Send with default max timeout as is set in the context.
210         See rmr_mtosend_msg() for more details on the parameters.
211         See rmr_stimeout() for info on setting the default timeout.
212 */
213 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
214         char* d1;                                                                                                               // point at the call-id in the header
215
216         if( msg != NULL ) {
217                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
218
219                 d1 = DATA1_ADDR( msg->header );
220                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
221         }       
222
223         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
224 }
225
226 /*
227         Return to sender allows a message to be sent back to the endpoint where it originated.
228
229         In the SI world the file descriptor that was the source of the message is captured in
230         the mbuffer and thus can be used to quickly find the target for an RTS call. 
231
232         The source information in the message is used to select the socket on which to write
233         the message rather than using the message type and round-robin selection. This
234         should return a message buffer with the state of the send operation set. On success
235         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
236         error it can be passed back to this function to retry the send if desired. On error,
237         errno will liklely have the failure reason set by the nng send processing.
238         The following are possible values for the state in the message buffer:
239
240         Message states returned:
241                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
242                 RMR_ERR_NOHDR  - message did not have a header
243                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
244                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
245                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
246
247         A nil message as the return value is rare, and generally indicates some kind of horrible
248         failure. The value of errno might give a clue as to what is wrong.
249
250         CAUTION:
251                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
252                 The caller must check for this and handle it properly.
253 */
254 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
255         int                     nn_sock;                        // endpoint socket for send
256         uta_ctx_t*      ctx;
257         int                     state;
258         char*           hold_src;                       // we need the original source if send fails
259         char*           hold_ip;                        // also must hold original ip
260         int                     sock_ok = 0;            // true if we found a valid endpoint socket
261         endpoint_t*     ep = NULL;                      // end point to track counts
262
263         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
264                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
265                 if( msg != NULL ) {
266                         msg->state = RMR_ERR_BADARG;
267                         msg->tp_state = errno;
268                 }
269                 return msg;
270         }
271
272         errno = 0;                                                                                                              // at this point any bad state is in msg returned
273         if( msg->header == NULL ) {
274                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
275                 msg->state = RMR_ERR_NOHDR;
276                 msg->tp_state = errno;
277                 return msg;
278         }
279
280         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
281
282 /*
283         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx );                      // src is always used first for rts
284         if( ! sock_ok ) {
285 */
286         if( (nn_sock = msg->rts_fd) < 0 ) {
287                 if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
288                         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
289                 }
290                 if( ! sock_ok ) {
291                         msg->state = RMR_ERR_NOENDPT;
292                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
293                 }
294         }
295
296
297         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
298         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
299         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
300         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
301         msg = send_msg( ctx, msg, nn_sock, -1 );
302         if( msg ) {
303                 if( ep != NULL ) {
304                         switch( msg->state ) {
305                                 case RMR_OK:
306                                         ep->scounts[EPSC_GOOD]++;
307                                         break;
308                         
309                                 case RMR_ERR_RETRY:
310                                         ep->scounts[EPSC_TRANS]++;
311                                         break;
312
313                                 default:
314                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
315                                         ep->scounts[EPSC_FAIL]++;
316                                         break;
317                         }
318                 }
319                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
320                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
321                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
322         }
323
324         free( hold_src );
325         free( hold_ip );
326         return msg;
327 }
328
329 /*
330         If multi-threading call is turned on, this invokes that mechanism with the special call
331         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
332         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
333         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
334         a flexible timeout.
335
336         On timeout this function will return a nil pointer. If the original message could not
337         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
338
339         Original behavour:
340         Call sends the message based on message routing using the message type, and waits for a
341         response message to arrive with the same transaction id that was in the outgoing message.
342         If, while wiating for the expected response,  messages are received which do not have the
343         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
344         order that they were received.
345
346         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
347         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
348         may be resent (likely the context pointer was nil).  If the message is sent, but no
349         response is received, a nil message is returned with errno set to indicate the likley
350         issue:
351                 ETIMEDOUT -- too many messages were queued before reciving the expected response
352                 ENOBUFS -- the queued message ring is full, messages were dropped
353                 EINVAL  -- A parameter was not valid
354                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
355                                         user should call this function with the message again.
356
357 */
358 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
359         uta_ctx_t*              ctx;
360
361         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
362                 if( msg != NULL ) {
363                         msg->state = RMR_ERR_BADARG;
364                 }
365                 return msg;
366         }
367
368         return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
369 }
370
371 /*
372         The outward facing receive function. When invoked it will pop the oldest message
373         from the receive ring, if any are queued, and return it. If the ring is empty
374         then the receive function is invoked to wait for the next message to arrive (blocking).
375
376         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
377         nil, a new one will be allocated. However, the caller should NOT expect to get the same
378         struct back (if a queued message is returned the message struct will be different).
379 */
380 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
381         uta_ctx_t*      ctx;
382         rmr_mbuf_t*     qm;                             // message that was queued on the ring
383
384         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
385                 errno = EINVAL;
386                 if( old_msg != NULL ) {
387                         old_msg->state = RMR_ERR_BADARG;
388                         old_msg->tp_state = errno;
389                 }
390                 return old_msg;
391         }
392         errno = 0;
393
394         return rmr_mt_rcv( ctx, old_msg, -1 );
395 }
396
397 /*
398         This allows a timeout based receive for applications unable to implement epoll_wait()
399         (e.g. wrappers).
400 */
401 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
402         uta_ctx_t*      ctx;
403
404         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
405                 errno = EINVAL;
406                 if( old_msg != NULL ) {
407                         old_msg->state = RMR_ERR_BADARG;
408                         old_msg->tp_state = errno;
409                 }
410                 return old_msg;
411         }
412
413         return rmr_mt_rcv( ctx, old_msg, ms_to );
414 }
415
416 /*
417         This blocks until the message with the 'expect' ID is received. Messages which are received
418         before the expected message are queued onto the message ring.  The function will return
419         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
420         expected message is received. If the queued message ring fills a nil pointer is returned
421         and errno is set to ENOBUFS.
422
423         Generally this will be invoked only by the call() function as it waits for a response, but
424         it is exposed to the user application as three is no reason not to.
425 */
426 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
427         uta_ctx_t*      ctx;
428         int     queued = 0;                             // number we pushed into the ring
429         int     exp_len = 0;                    // length of expected ID
430
431         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
432                 errno = EINVAL;
433                 if( msg != NULL ) {
434                         msg->state = RMR_ERR_BADARG;
435                         msg->tp_state = errno;
436                 }
437                 return msg;
438         }
439
440         errno = 0;
441
442         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
443                 return rmr_rcv_msg( ctx, msg );
444         }
445
446         exp_len = strlen( expect );
447         if( exp_len > RMR_MAX_XID ) {
448                 exp_len = RMR_MAX_XID;
449         }
450         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
451
452         while( queued < allow2queue ) {
453                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
454                 if( msg->state == RMR_OK ) {
455                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
456                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
457                                 return msg;
458                         }
459
460                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
461                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
462                                 errno = ENOBUFS;
463                                 return NULL;
464                         }
465
466                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
467                         queued++;
468                         msg = NULL;
469                 }
470         }
471
472         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
473         errno = ETIMEDOUT;
474         return NULL;
475 }
476
477 /*
478         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
479         _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
480         mechnism indicates eagain or etimeedout.  All other error conditions are reported
481         without this delay. Setting a timeout of 0 causes no retries to be attempted in
482         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
483         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
484         after every 1K send attempts until the "time" value is reached. Retries are abandoned
485         if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
486
487         The default, if this function is not used, is 1; meaning that RMr will retry, but will
488         not enter a sleep.  In all cases the caller should check the status in the message returned
489         after a send call.
490
491         Returns -1 if the context was invalid; RMR_OK otherwise.
492 */
493 extern int rmr_set_stimeout( void* vctx, int time ) {
494         uta_ctx_t*      ctx;
495
496         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
497                 return -1;
498         }
499
500         if( time < 0 ) {
501                 time = 0;
502         }
503
504         ctx->send_retries = time;
505         return RMR_OK;
506 }
507
508 /*
509         Set receive timeout -- not supported in nng implementation
510
511         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
512 */
513 extern int rmr_set_rtimeout( void* vctx, int time ) {
514         fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
515         return 0;
516 }
517
518
519 /*
520         This is the actual init workhorse. The user visible function meerly ensures that the
521         calling programme does NOT set any internal flags that are supported, and then
522         invokes this.  Internal functions (the route table collector) which need additional
523         open ports without starting additional route table collectors, will invoke this
524         directly with the proper flag.
525
526         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
527                                 that we know about. The _user_ should ensure that the supplied length also
528                                 includes the trace data length maximum as they are in control of that.
529 */
530 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
531         static  int announced = 0;
532         uta_ctx_t*      ctx = NULL;
533         char    bind_info[256];                         // bind info
534         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
535         char*   port;
536         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
537         char*   proto_port;
538         char    wbuf[1024];                                     // work buffer
539         char*   tok;                                            // pointer at token in a buffer
540         char*   tok2;
541         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
542         int             state;
543         int             i;
544
545         if( ! announced ) {
546                 fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
547                         RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
548                 announced = 1;
549         }
550
551         errno = 0;
552         if( uproto_port == NULL ) {
553                 proto_port = strdup( DEF_COMM_PORT );
554         } else {
555                 proto_port = strdup( uproto_port );             // so we can modify it
556         }
557
558         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
559                 errno = ENOMEM;
560                 return NULL;
561         }
562         memset( ctx, 0, sizeof( uta_ctx_t ) );
563
564         if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" );
565         ctx->nrivers = 256;                                                             // number of input flows we'll manage
566         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
567         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
568         for( i = 0; i < ctx->nrivers; i++ ) {
569                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
570         }
571
572         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
573         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
574         ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
575         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + 64;          // add in our header size and a bit of fudge
576
577         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
578         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
579
580         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
581                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
582                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
583         } else {
584                 fprintf( stderr, "[INFO] receive ring locking disabled by user application\n" );
585         }
586         init_mtcall( ctx );                                                             // set up call chutes
587
588
589         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
590         if( max_msg_size > 0 ) {
591                 ctx->max_plen = max_msg_size;
592         }
593
594         // we're using a listener to get rtg updates, so we do NOT need this.
595         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
596
597         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
598         if( ctx->si_ctx == NULL ) {
599                 fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" );
600                 free_ctx( ctx );
601                 return NULL;
602         }
603
604         if( (port = strchr( proto_port, ':' )) != NULL ) {
605                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
606                         port++;
607                 } else {
608                         *(port++) = 0;                  // term proto string and point at port string
609                         proto = proto_port;             // user supplied proto so point at it rather than default
610                 }
611         } else {
612                 port = proto_port;                      // assume something like "1234" was passed
613         }
614
615         if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
616                 if( atoi( tok ) < 1 ) {
617                         static_rtc = 1;
618                 }
619         }
620
621         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
622                 tok = strdup( tok );                                    // something we can destroy
623                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
624                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
625                 } else {
626                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
627                 }
628                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
629                         *tok2 = 0;                                                      // make it so
630                 }
631
632                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
633                 free( tok );
634         } else {
635                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
636                         fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
637                         return NULL;
638                 }
639                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
640                         *tok = 0;                                                                       // we don't keep domain portion
641                 }
642         }
643
644         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
645         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
646                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
647                 return NULL;
648         }
649
650         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
651                 if( atoi( tok ) > 0 ) {
652                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
653                 }
654         }
655
656         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
657         if( flags & RMRFL_NAME_ONLY ) {
658                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
659         } else {
660                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
661                 if( ctx->my_ip == NULL ) {
662                         fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
663                         strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
664                 }
665         }
666         if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
667
668         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
669                 if( *tok == '1' ) {
670                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
671                 }
672         }
673
674
675         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
676                 interface = "0.0.0.0";
677         }
678         
679         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
680         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
681                 fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
682                 free_ctx( ctx );
683                 return NULL;
684         }
685
686         if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need a RTC
687                 if( static_rtc ) {
688                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
689                                 fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
690                         }
691                 } else {
692                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
693                                 fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
694                         }
695                 }
696         }
697
698         ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
699         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
700                 fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
701         }
702
703         free( proto_port );
704         return (void *) ctx;
705 }
706
707 /*
708         Initialise the message routing environment. Flags are one of the UTAFL_
709         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
710         (tcp) to be used, then :port is all that is needed.
711
712         At the moment it seems that TCP really is the only viable protocol, but
713         we'll allow flexibility.
714
715         The return value is a void pointer which must be passed to most uta functions. On
716         error, a nil pointer is returned and errno should be set.
717
718         Flags:
719                 No user flags supported (needed) at the moment, but this provides for extension
720                 without drastically changing anything. The user should invoke with RMRFL_NONE to
721                 avoid any misbehavour as there are internal flags which are suported
722 */
723 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
724         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
725 }
726
727 /*
728         This sets the default trace length which will be added to any message buffers
729         allocated.  It can be set at any time, and if rmr_set_trace() is given a
730         trace len that is different than the default allcoated in a message, the message
731         will be resized.
732
733         Returns 0 on failure and 1 on success. If failure, then errno will be set.
734 */
735 extern int rmr_init_trace( void* vctx, int tr_len ) {
736         uta_ctx_t* ctx;
737
738         errno = 0;
739         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
740                 errno = EINVAL;
741                 return 0;
742         }
743
744         ctx->trace_data_len = tr_len;
745         return 1;
746 }
747
748 /*
749         Return true if routing table is initialised etc. and app can send/receive.
750 */
751 extern int rmr_ready( void* vctx ) {
752         uta_ctx_t *ctx;
753
754         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
755                 return FALSE;
756         }
757
758         if( ctx->rtable != NULL ) {
759                 return TRUE;
760         }
761
762         return FALSE;
763 }
764
765 /*
766         This returns the message queue ring's filedescriptor which can be used for
767         calls to epoll.  The user shouild NOT read, write, or close the fd.
768
769         Returns the file descriptor or -1 on error.
770 */
771 extern int rmr_get_rcvfd( void* vctx ) {
772         uta_ctx_t* ctx;
773         int state;
774
775         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
776                 return -1;
777         }
778
779 /*
780         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
781                 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
782                 return -1;
783         }
784 */
785
786         return uta_ring_getpfd( ctx->mring );
787 }
788
789
790 /*
791         Clean up things.
792
793         There isn't an si_flush() per se, but we can pause, generate
794         a context switch, which should allow the last sent buffer to
795         flow. There isn't exactly an nng_term/close either, so there
796         isn't much we can do.
797 */
798 extern void rmr_close( void* vctx ) {
799         uta_ctx_t *ctx;
800
801         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
802                 return;
803         }
804
805         ctx->shutdown = 1;
806
807         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
808
809         // FIX ME -- how to we turn off si; close all sessions etc?
810         //SIclose( ctx->nn_sock );
811
812 }
813
814
815 // ----- multi-threaded call/receive support -------------------------------------------------
816
817 /*
818         Blocks on the receive ring chute semaphore and then reads from the ring
819         when it is tickled.  If max_wait is -1 then the function blocks until
820         a message is ready on the ring. Else max_wait is assumed to be the number
821         of millaseconds to wait before returning a timeout message.
822 */
823 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
824         uta_ctx_t*      ctx;
825         uta_mhdr_t*     hdr;                    // header in the transport buffer
826         chute_t*        chute;
827         struct timespec ts;                     // time info if we have a timeout
828         long    new_ms;                         // adjusted mu-sec
829         long    seconds = 0;            // max wait seconds
830         long    nano_sec;                       // max wait xlated to nano seconds
831         int             state;
832         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
833         
834         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
835                 errno = EINVAL;
836                 if( mbuf ) {
837                         mbuf->state = RMR_ERR_BADARG;
838                         mbuf->tp_state = errno;
839                 }
840                 return mbuf;
841         }
842
843         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
844
845         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
846
847         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
848                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
849                         if( ombuf ) {
850                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
851                         }       
852                 } else {
853                         mbuf = ombuf;                                           // return original if it was given with timeout status
854                         if( ombuf != NULL ) {
855                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
856                                 mbuf->len = 0;
857                         }
858                 }
859
860                 return mbuf;
861         }
862
863         if( ombuf ) {
864                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
865                 ombuf->len = 0;
866         }
867         if( max_wait > 0 ) {
868                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
869
870                 if( max_wait > 999 ) {
871                         seconds = max_wait / 1000;
872                         max_wait -= seconds * 1000;
873                         ts.tv_sec += seconds;
874                 }
875                 if( max_wait > 0 ) {
876                         nano_sec = max_wait * 1000000;
877                         ts.tv_nsec += nano_sec;
878                         if( ts.tv_nsec > 999999999 ) {
879                                 ts.tv_nsec -= 999999999;
880                                 ts.tv_sec++;
881                         }
882                 }
883
884                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
885         }
886
887         errno = EINTR;
888         state = -1;
889         while( state < 0 && errno == EINTR ) {
890                 if( seconds ) {
891                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
892                 } else {
893                         state = sem_wait( &chute->barrier );
894                 }
895         }
896
897         if( state < 0 ) {
898                 mbuf = ombuf;                           // return caller's buffer if they passed one in
899         } else {
900                 errno = 0;                                              // interrupted call state could be left; clear
901                 if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
902                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
903                         mbuf->state = RMR_OK;
904
905                         if( ombuf ) {
906                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
907                         }
908                 } else {
909                         errno = ETIMEDOUT;
910                         mbuf = ombuf;                           // no buffer, return user's if there
911                 }
912         }
913
914         if( mbuf ) {
915                 mbuf->tp_state = errno;
916         }
917         return mbuf;
918 }
919
920 /*
921         Accept a message buffer and caller ID, send the message and then wait
922         for the receiver to tickle the semaphore letting us know that a message
923         has been received. The call_id is a value between 2 and 255, inclusive; if
924         it's not in this range an error will be returned. Max wait is the amount
925         of time in millaseconds that the call should block for. If 0 is given
926         then no timeout is set.
927
928         If the mt_call feature has not been initialised, then the attempt to use this
929         funciton will fail with RMR_ERR_NOTSUPP
930
931         If no matching message is received before the max_wait period expires, a
932         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
933         occurs after the message has been sent, then a nil pointer is returned
934         with errno set to some other value.
935 */
936 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
937         rmr_mbuf_t* ombuf;                      // original mbuf passed in
938         uta_ctx_t*      ctx;
939         uta_mhdr_t*     hdr;                    // header in the transport buffer
940         chute_t*        chute;
941         unsigned char*  d1;                     // d1 data in header
942         struct timespec ts;                     // time info if we have a timeout
943         long    new_ms;                         // adjusted mu-sec
944         long    seconds = 0;            // max wait seconds
945         long    nano_sec;                       // max wait xlated to nano seconds
946         int             state;
947         
948         errno = EINVAL;
949         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
950                 if( mbuf ) {
951                         mbuf->tp_state = errno;
952                         mbuf->state = RMR_ERR_BADARG;
953                 }
954                 return mbuf;
955         }
956
957         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
958                 mbuf->state = RMR_ERR_NOTSUPP;
959                 mbuf->tp_state = errno;
960                 return mbuf;
961         }
962
963         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
964                 mbuf->state = RMR_ERR_BADARG;
965                 mbuf->tp_state = errno;
966                 return mbuf;
967         }
968
969         ombuf = mbuf;                                                                                                   // save to return timeout status with
970
971         chute = &ctx->chutes[call_id];
972         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
973                 rmr_free_msg( chute->mbuf );
974                 chute->mbuf = NULL;
975         }
976         
977         hdr = (uta_mhdr_t *) mbuf->header;
978         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
979         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
980         d1 = DATA1_ADDR( hdr );
981         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
982         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
983
984         if( max_wait >= 0 ) {
985                 clock_gettime( CLOCK_REALTIME, &ts );   
986
987                 if( max_wait > 999 ) {
988                         seconds = max_wait / 1000;
989                         max_wait -= seconds * 1000;
990                         ts.tv_sec += seconds;
991                 }
992                 if( max_wait > 0 ) {
993                         nano_sec = max_wait * 1000000;
994                         ts.tv_nsec += nano_sec;
995                         if( ts.tv_nsec > 999999999 ) {
996                                 ts.tv_nsec -= 999999999;
997                                 ts.tv_sec++;
998                         }
999                 }
1000
1001                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1002         }
1003
1004         mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
1005         if( mbuf ) {
1006                 if( mbuf->state != RMR_OK ) {
1007                         mbuf->tp_state = errno;
1008                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1009                 }
1010         }
1011
1012         state = 0;
1013         errno = 0;
1014         while( chute->mbuf == NULL && ! errno ) {
1015                 if( seconds ) {
1016                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1017                 } else {
1018                         state = sem_wait( &chute->barrier );
1019                 }
1020
1021                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1022                         errno = 0;
1023                 }
1024
1025                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1026                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1027                                 rmr_free_msg( chute->mbuf );
1028                                 chute->mbuf = NULL;
1029                                 errno = 0;
1030                         }
1031                 }
1032         }
1033
1034         if( state < 0 ) {
1035                 return NULL;                                    // leave errno as set by sem wait call
1036         }
1037
1038         mbuf = chute->mbuf;
1039         mbuf->state = RMR_OK;
1040         chute->mbuf = NULL;
1041
1042         return mbuf;
1043 }
1044
1045 /*
1046         Given an existing message buffer, reallocate the payload portion to
1047         be at least new_len bytes.  The message header will remain such that
1048         the caller may use the rmr_rts_msg() function to return a payload
1049         to the sender. 
1050
1051         The mbuf passed in may or may not be reallocated and the caller must
1052         use the returned pointer and should NOT assume that it can use the 
1053         pointer passed in with the exceptions based on the clone flag.
1054
1055         If the clone flag is set, then a duplicated message, with larger payload
1056         size, is allocated and returned.  The old_msg pointer in this situation is
1057         still valid and must be explicitly freed by the application. If the clone 
1058         message is not set (0), then any memory management of the old message is
1059         handled by the function.
1060
1061         If the copy flag is set, the contents of the old message's payload is 
1062         copied to the reallocated payload.  If the flag is not set, then the 
1063         contents of the payload is undetermined.
1064 */
1065 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1066         if( old_msg == NULL ) {
1067                 return NULL;
1068         }
1069
1070         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1071 }
1072
1073 /*
1074         Enable low latency things in the transport (when supported).
1075 */
1076 extern void rmr_set_low_latency( void* vctx ) {
1077         uta_ctx_t*      ctx;
1078
1079         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1080                 if( ctx->si_ctx != NULL ) {
1081                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1082                 }
1083         }
1084 }
1085
1086 /*
1087         Turn on fast acks.
1088 */
1089 extern void rmr_set_fack( void* vctx ) {
1090         uta_ctx_t*      ctx;
1091
1092         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1093                 if( ctx->si_ctx != NULL ) {
1094                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1095                 }
1096         }
1097 }
1098