enhance(API): Add source IP support to msg header
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include <nng/nng.h>
57 #include <nng/protocol/pubsub0/pub.h>
58 #include <nng/protocol/pubsub0/sub.h>
59 #include <nng/protocol/pipeline0/push.h>
60 #include <nng/protocol/pipeline0/pull.h>
61
62
63 #include "rmr.h"                                // things the users see
64 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
65 #include "rmr_nng_private.h"    // things that we need too
66 #include "rmr_symtab.h"
67
68 #include "ring_static.c"                        // message ring support
69 #include "rt_generic_static.c"          // route table things not transport specific
70 #include "rtable_nng_static.c"          // route table things -- transport specific
71 #include "rtc_static.c"                         // route table collector
72 #include "tools_static.c"
73 #include "sr_nng_static.c"                      // send/receive static functions
74 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_nng_static.c"
77
78
79 //------------------------------------------------------------------------------
80
81
82 /*
83         Clean up a context.
84 */
85 static void free_ctx( uta_ctx_t* ctx ) {
86         if( ctx ) {
87                 if( ctx->rtg_addr ) {
88                         free( ctx->rtg_addr );
89                 }
90         }
91 }
92
93 // --------------- public functions --------------------------------------------------------------------------
94
95 /*
96         Returns the size of the payload (bytes) that the msg buffer references.
97         Len in a message is the number of bytes which were received, or should
98         be transmitted, however, it is possible that the mbuf was allocated
99         with a larger payload space than the payload length indicates; this
100         function returns the absolute maximum space that the user has available
101         in the payload. On error (bad msg buffer) -1 is returned and errno should
102         indicate the rason.
103 */
104 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
105         if( msg == NULL || msg->header == NULL ) {
106                 errno = EINVAL;
107                 return -1;
108         }
109
110         errno = 0;
111         return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
112 }
113
114 /*
115         Allocates a send message as a zerocopy message allowing the underlying message protocol
116         to send the buffer without copy.
117 */
118 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
119         uta_ctx_t*      ctx;
120         rmr_mbuf_t*     m;
121
122         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
123                 return NULL;
124         }
125
126         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
127         return  m;
128 }
129
130
131 /*
132         Allocates a send message as a zerocopy message allowing the underlying message protocol
133         to send the buffer without copy. In addition, a trace data field of tr_size will be
134         added and the supplied data coppied to the buffer before returning the message to
135         the caller.
136 */
137 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
138         uta_ctx_t*      ctx;
139         rmr_mbuf_t*     m;
140         int state;
141
142         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
143                 return NULL;
144         }
145
146         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
147         if( m != NULL ) {
148                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
149                 if( state != tr_size ) {
150                         m->state = RMR_ERR_INITFAILED;
151                 }
152         }
153
154         return  m;
155 }
156
157 /*
158         This provides an external path to the realloc static function as it's called by an
159         outward facing mbuf api function. Used to reallocate a message with a different
160         trace data size.
161 */
162 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
163         return realloc_msg( msg, new_tr_size );
164 }
165
166
167 /*
168         Return the message to the available pool, or free it outright.
169 */
170 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
171         if( mbuf == NULL ) {
172                 return;
173         }
174
175         if( mbuf->header ) {
176                 if( mbuf->flags & MFL_ZEROCOPY ) {
177                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
178                         if( mbuf->tp_buf ) {
179                                 nng_msg_free(  mbuf->tp_buf );
180                         }
181                 }
182         }
183
184         free( mbuf );
185 }
186
187 /*
188         This is a wrapper to the real timeout send. We must wrap it now to ensure that
189         the call flag and call-id are reset
190 */
191 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
192         char* d1;                                                                                                                       // point at the call-id in the header
193
194         if( msg != NULL ) {
195                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
196
197                 d1 = DATA1_ADDR( msg->header );
198                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
199         }       
200
201         return mtosend_msg( vctx, msg, max_to );
202 }
203
204 /*
205         Send with default max timeout as is set in the context.
206         See rmr_mtosend_msg() for more details on the parameters.
207         See rmr_stimeout() for info on setting the default timeout.
208 */
209 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
210         char* d1;                                                                                                               // point at the call-id in the header
211
212         if( msg != NULL ) {
213                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
214
215                 d1 = DATA1_ADDR( msg->header );
216                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
217         }       
218
219         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
220 }
221
222 /*
223         Return to sender allows a message to be sent back to the endpoint where it originated.
224         The source information in the message is used to select the socket on which to write
225         the message rather than using the message type and round-robin selection. This
226         should return a message buffer with the state of the send operation set. On success
227         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
228         error it can be passed back to this function to retry the send if desired. On error,
229         errno will liklely have the failure reason set by the nng send processing.
230         The following are possible values for the state in the message buffer:
231
232         Message states returned:
233                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
234                 RMR_ERR_NOHDR  - message did not have a header
235                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
236                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
237                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
238
239         A nil message as the return value is rare, and generally indicates some kind of horrible
240         failure. The value of errno might give a clue as to what is wrong.
241
242         CAUTION:
243                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
244                 The caller must check for this and handle.
245 */
246 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
247         nng_socket      nn_sock;                        // endpoint socket for send
248         uta_ctx_t*      ctx;
249         int                     state;
250         char*           hold_src;                       // we need the original source if send fails
251         int                     sock_ok = 0;            // true if we found a valid endpoint socket
252
253         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
254                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
255                 if( msg != NULL ) {
256                         msg->state = RMR_ERR_BADARG;
257                 }
258                 return msg;
259         }
260
261         errno = 0;                                                                                                              // at this point any bad state is in msg returned
262         if( msg->header == NULL ) {
263                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
264                 msg->state = RMR_ERR_NOHDR;
265                 return msg;
266         }
267
268         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
269         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // new version uses sender's ip address for rts
270                 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );                      // default to IP based rts
271         }
272         if( ! sock_ok ) {
273                 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                // IP  not in rt, try name
274                 if( ! sock_ok ) {
275                         msg->state = RMR_ERR_NOENDPT;
276                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
277                 }
278         }
279
280         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
281         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
282         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
283         msg = send_msg( ctx, msg, nn_sock, -1 );
284         if( msg ) {
285                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
286                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
287         }
288
289         free( hold_src );
290         return msg;
291 }
292
293 /*
294         If multi-threading call is turned on, this invokes that mechanism with the special call
295         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
296         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
297         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
298         a flexible timeout.
299
300         On timeout this function will return a nil pointer. If the original message could not
301         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
302
303         Original behavour:
304         Call sends the message based on message routing using the message type, and waits for a
305         response message to arrive with the same transaction id that was in the outgoing message.
306         If, while wiating for the expected response,  messages are received which do not have the
307         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
308         order that they were received.
309
310         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
311         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
312         may be resent (likely the context pointer was nil).  If the message is sent, but no
313         response is received, a nil message is returned with errno set to indicate the likley
314         issue:
315                 ETIMEDOUT -- too many messages were queued before reciving the expected response
316                 ENOBUFS -- the queued message ring is full, messages were dropped
317                 EINVAL  -- A parameter was not valid
318                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
319                                         user should call this function with the message again.
320
321 */
322 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
323         uta_ctx_t*              ctx;
324         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
325
326         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
327                 if( msg != NULL ) {
328                         msg->state = RMR_ERR_BADARG;
329                 }
330                 return msg;
331         }
332
333         if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
334                 return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
335         }
336
337         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
338         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
339         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
340         errno = 0;
341         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
342
343         msg = rmr_send_msg( ctx, msg );
344         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
345                 if( msg->state != RMR_ERR_RETRY ) {
346                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
347                 }
348                 return msg;
349         }
350
351         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
352 }
353
354 /*
355         The outward facing receive function. When invoked it will pop the oldest message
356         from the receive ring, if any are queued, and return it. If the ring is empty
357         then the receive function is invoked to wait for the next message to arrive (blocking).
358
359         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
360         nil, a new one will be allocated. However, the caller should NOT expect to get the same
361         struct back (if a queued message is returned the message struct will be different).
362 */
363 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
364         uta_ctx_t*      ctx;
365         rmr_mbuf_t*     qm;                             // message that was queued on the ring
366
367         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
368                 if( old_msg != NULL ) {
369                         old_msg->state = RMR_ERR_BADARG;
370                 }
371                 errno = EINVAL;
372                 return old_msg;
373         }
374         errno = 0;
375
376         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
377                 return rmr_mt_rcv( ctx, old_msg, -1 );
378         }
379
380         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
381         if( qm != NULL ) {
382                 if( old_msg ) {
383                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
384                 }
385
386                 return qm;
387         }
388
389         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
390 }
391
392 /*
393         This implements a receive with a timeout via epoll. Mostly this is for
394         wrappers as native C applications can use epoll directly and will not have
395         to depend on this.
396 */
397 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
398         struct epoll_stuff* eps;        // convience pointer
399         uta_ctx_t*      ctx;
400         rmr_mbuf_t*     qm;                             // message that was queued on the ring
401         int nready;
402         rmr_mbuf_t* msg;
403
404         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
405                 if( old_msg != NULL ) {
406                         old_msg->state = RMR_ERR_BADARG;
407                 }
408                 errno = EINVAL;
409                 return old_msg;
410         }
411
412         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
413                 return rmr_mt_rcv( ctx, old_msg, ms_to );
414         }
415
416         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
417         if( qm != NULL ) {
418                 if( old_msg ) {
419                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
420                 }
421
422                 return qm;
423         }
424
425         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
426                 eps = malloc( sizeof *eps );
427
428                 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
429                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
430                         free( eps );
431                         return NULL;
432                 }
433
434                 eps->nng_fd = rmr_get_rcvfd( ctx );
435                 eps->epe.events = EPOLLIN;
436                 eps->epe.data.fd = eps->nng_fd;
437
438                 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
439                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
440                         free( eps );
441                         return NULL;
442                 }
443
444                 ctx->eps = eps;
445         }
446
447         if( old_msg ) {
448                 msg = old_msg;
449         } else {
450                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
451         }
452
453         if( ms_to < 0 ) {
454                 ms_to = 0;
455         }
456
457         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
458         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
459                 msg->state = RMR_ERR_TIMEOUT;
460         } else {
461                 return rcv_msg( ctx, msg );                                                             // receive it and return it
462         }
463
464         return msg;                             // return empty message with state set
465 }
466
467 /*
468         This blocks until the message with the 'expect' ID is received. Messages which are received
469         before the expected message are queued onto the message ring.  The function will return
470         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
471         expected message is received. If the queued message ring fills a nil pointer is returned
472         and errno is set to ENOBUFS.
473
474         Generally this will be invoked only by the call() function as it waits for a response, but
475         it is exposed to the user application as three is no reason not to.
476 */
477 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
478         uta_ctx_t*      ctx;
479         int     queued = 0;                             // number we pushed into the ring
480         int     exp_len = 0;                    // length of expected ID
481
482         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
483                 if( msg != NULL ) {
484                         msg->state = RMR_ERR_BADARG;
485                 }
486                 errno = EINVAL;
487                 return msg;
488         }
489
490         errno = 0;
491
492         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
493                 return rmr_rcv_msg( ctx, msg );
494         }
495
496         exp_len = strlen( expect );
497         if( exp_len > RMR_MAX_XID ) {
498                 exp_len = RMR_MAX_XID;
499         }
500         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
501
502         while( queued < allow2queue ) {
503                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
504                 if( msg->state == RMR_OK ) {
505                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
506                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
507                                 return msg;
508                         }
509
510                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
511                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
512                                 errno = ENOBUFS;
513                                 return NULL;
514                         }
515
516                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
517                         queued++;
518                         msg = NULL;
519                 }
520         }
521
522         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
523         errno = ETIMEDOUT;
524         return NULL;
525 }
526
527 //  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
528 //                              until those details are worked out, these generate a warning.
529 /*
530         Set send timeout. The value time is assumed to be microseconds.  The timeout is the
531         rough maximum amount of time that RMr will block on a send attempt when the underlying
532         mechnism indicates eagain or etimeedout.  All other error conditions are reported
533         without this delay. Setting a timeout of 0 causes no retries to be attempted in
534         RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
535         but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
536         after every 10K send attempts until the time value is reached. Retries are abandoned
537         if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
538
539         The default, if this function is not used, is 1; meaning that RMr will retry, but will
540         not enter a sleep.  In all cases the caller should check the status in the message returned
541         after a send call.
542
543         Returns -1 if the context was invalid; RMR_OK otherwise.
544 */
545 extern int rmr_set_stimeout( void* vctx, int time ) {
546         uta_ctx_t*      ctx;
547
548         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
549                 return -1;
550         }
551
552         if( time < 0 ) {
553                 time = 0;
554         }
555
556         ctx->send_retries = time;
557         return RMR_OK;
558 }
559
560 /*
561         Set receive timeout -- not supported in nng implementation
562 */
563 extern int rmr_set_rtimeout( void* vctx, int time ) {
564         fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
565         return 0;
566 }
567
568
569 /*
570         This is the actual init workhorse. The user visible function meerly ensures that the
571         calling programme does NOT set any internal flags that are supported, and then
572         invokes this.  Internal functions (the route table collector) which need additional
573         open ports without starting additional route table collectors, will invoke this
574         directly with the proper flag.
575 */
576 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
577         static  int announced = 0;
578         uta_ctx_t*      ctx = NULL;
579         char    bind_info[NNG_MAXADDRLEN];      // bind info
580         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
581         char*   port;
582         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
583         char*   proto_port;
584         char    wbuf[1024];                                     // work buffer
585         char*   tok;                                            // pointer at token in a buffer
586         int             state;
587
588         if( ! announced ) {
589                 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
590                         RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
591                 announced = 1;
592         }
593
594         errno = 0;
595         if( uproto_port == NULL ) {
596                 proto_port = strdup( DEF_COMM_PORT );
597         } else {
598                 proto_port = strdup( uproto_port );             // so we can modify it
599         }
600
601         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
602                 errno = ENOMEM;
603                 return NULL;
604         }
605         memset( ctx, 0, sizeof( uta_ctx_t ) );
606
607         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
608         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
609
610         if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
611                 ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
612                 init_mtcall( ctx );                                                     // set up call chutes
613         } else {
614                 ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
615         }
616
617         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
618         if( max_msg_size > 0 ) {
619                 ctx->max_plen = max_msg_size;
620         }
621
622         // we're using a listener to get rtg updates, so we do NOT need this.
623         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
624
625         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
626                 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
627                 free_ctx( ctx );
628                 return NULL;
629         }
630
631         if( (port = strchr( proto_port, ':' )) != NULL ) {
632                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
633                         port++;
634                 } else {
635                         *(port++) = 0;                  // term proto string and point at port string
636                         proto = proto_port;             // user supplied proto so point at it rather than default
637                 }
638         } else {
639                 port = proto_port;                      // assume something like "1234" was passed
640         }
641
642         if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
643                 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
644                 return NULL;
645         }
646         if( (tok = strchr( wbuf, '.' )) != NULL ) {
647                 *tok = 0;                                                                       // we don't keep domain portion
648         }
649         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
650         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
651                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
652                 return NULL;
653         }
654
655         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
656                 if( atoi( tok ) > 0 ) {
657                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
658                 }
659         }
660
661         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
662         if( flags & RMRFL_NAME_ONLY ) {
663                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
664         } else {
665                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
666                 if( ctx->my_ip == NULL ) {
667                         fprintf( stderr, "[WARN] rmr_init: default ip address could not be sussed out, using name\n" );
668                         strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
669                 }
670         }
671         if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
672
673
674
675         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
676                 interface = "0.0.0.0";
677         }
678         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
679         //       rather than using this generic listen() call.
680         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
681         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
682                 fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
683                 nng_close( ctx->nn_sock );
684                 free_ctx( ctx );
685                 return NULL;
686         }
687
688         if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
689                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
690                         fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
691                 }
692         }
693
694         if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
695                 ctx->flags |= CFL_MTC_ENABLED;
696                 if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
697                         fprintf( stderr, "[WARN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
698                 }
699                 
700         }
701
702         free( proto_port );
703         return (void *) ctx;
704 }
705
706 /*
707         Initialise the message routing environment. Flags are one of the UTAFL_
708         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
709         (tcp) to be used, then :port is all that is needed.
710
711         At the moment it seems that TCP really is the only viable protocol, but
712         we'll allow flexibility.
713
714         The return value is a void pointer which must be passed to most uta functions. On
715         error, a nil pointer is returned and errno should be set.
716
717         Flags:
718                 No user flags supported (needed) at the moment, but this provides for extension
719                 without drastically changing anything. The user should invoke with RMRFL_NONE to
720                 avoid any misbehavour as there are internal flags which are suported
721 */
722 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
723         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
724 }
725
726 /*
727         This sets the default trace length which will be added to any message buffers
728         allocated.  It can be set at any time, and if rmr_set_trace() is given a
729         trace len that is different than the default allcoated in a message, the message
730         will be resized.
731
732         Returns 0 on failure and 1 on success. If failure, then errno will be set.
733 */
734 extern int rmr_init_trace( void* vctx, int tr_len ) {
735         uta_ctx_t* ctx;
736
737         errno = 0;
738         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
739                 errno = EINVAL;
740                 return 0;
741         }
742
743         ctx->trace_data_len = tr_len;
744         return 1;
745 }
746
747 /*
748         Return true if routing table is initialised etc. and app can send/receive.
749 */
750 extern int rmr_ready( void* vctx ) {
751         uta_ctx_t *ctx;
752
753         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
754                 return FALSE;
755         }
756
757         if( ctx->rtable != NULL ) {
758                 return TRUE;
759         }
760
761         return FALSE;
762 }
763
764 /*
765         Returns a file descriptor which can be used with epoll() to signal a receive
766         pending. The file descriptor should NOT be read from directly, nor closed, as NNG
767         does not support this.
768 */
769 extern int rmr_get_rcvfd( void* vctx ) {
770         uta_ctx_t* ctx;
771         int fd;
772         int state;
773
774         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
775                 return -1;
776         }
777
778         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
779                 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
780                 return -1;
781         }
782
783         return fd;
784 }
785
786
787 /*
788         Clean up things.
789
790         There isn't an nng_flush() per se, but we can pause, generate
791         a context switch, which should allow the last sent buffer to
792         flow. There isn't exactly an nng_term/close either, so there
793         isn't much we can do.
794 */
795 extern void rmr_close( void* vctx ) {
796         uta_ctx_t *ctx;
797
798         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
799                 return;
800         }
801
802         ctx->shutdown = 1;
803         nng_close( ctx->nn_sock );
804 }
805
806
807 // ----- multi-threaded call/receive support -------------------------------------------------
808
809 /*
810         Blocks on the receive ring chute semaphore and then reads from the ring
811         when it is tickled.  If max_wait is -1 then the function blocks until
812         a message is ready on the ring. Else max_wait is assumed to be the number
813         of millaseconds to wait before returning a timeout message.
814 */
815 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
816         uta_ctx_t*      ctx;
817         uta_mhdr_t*     hdr;                    // header in the transport buffer
818         chute_t*        chute;
819         struct timespec ts;                     // time info if we have a timeout
820         long    new_ms;                         // adjusted mu-sec
821         long    seconds = 0;            // max wait seconds
822         long    nano_sec;                       // max wait xlated to nano seconds
823         int             state;
824         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
825         
826         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
827                 errno = EINVAL;
828                 if( mbuf ) {
829                         mbuf->state = RMR_ERR_BADARG;
830                 }
831                 return mbuf;
832         }
833
834         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
835                 errno = EINVAL;
836                 if( mbuf != NULL ) {
837                         mbuf->state = RMR_ERR_NOTSUPP;
838                 }
839                 return mbuf;
840         }
841
842         ombuf = mbuf;
843         if( ombuf ) {
844                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
845                 ombuf->len = 0;
846         }
847
848         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
849         
850         if( max_wait > 0 ) {
851                 clock_gettime( CLOCK_REALTIME, &ts );   
852
853                 if( max_wait > 999 ) {
854                         seconds = (max_wait - 999)/1000;
855                         max_wait -= seconds * 1000;
856                         ts.tv_sec += seconds;
857                 }
858                 if( max_wait > 0 ) {
859                         nano_sec = max_wait * 1000000;
860                         ts.tv_nsec += nano_sec;
861                         if( ts.tv_nsec > 999999999 ) {
862                                 ts.tv_nsec -= 999999999;
863                                 ts.tv_sec++;
864                         }
865                 }
866
867                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
868         }
869
870         errno = 0;
871         while( chute->mbuf == NULL && ! errno ) {
872                 if( seconds ) {
873                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
874                 } else {
875                         state = sem_wait( &chute->barrier );
876                 }
877
878                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
879                         errno = 0;
880                 }
881         }
882
883         if( state < 0 ) {
884                 mbuf = ombuf;                           // return caller's buffer if they passed one in
885         } else {
886                 if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
887                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
888                         if( mbuf ) {
889                                 mbuf->state = RMR_OK;
890
891                                 if( ombuf ) {
892                                         rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
893                                 }
894                         } else {
895                                 mbuf = ombuf;                           // no buffer, return user's if there
896                         }
897                 }
898         }
899
900         return mbuf;
901 }
902
903 /*
904         Accept a message buffer and caller ID, send the message and then wait
905         for the receiver to tickle the semaphore letting us know that a message
906         has been received. The call_id is a value between 2 and 255, inclusive; if
907         it's not in this range an error will be returned. Max wait is the amount
908         of time in millaseconds that the call should block for. If 0 is given
909         then no timeout is set.
910
911         If the mt_call feature has not been initialised, then the attempt to use this
912         funciton will fail with RMR_ERR_NOTSUPP
913
914         If no matching message is received before the max_wait period expires, a
915         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
916         occurs after the message has been sent, then a nil pointer is returned
917         with errno set to some other value.
918 */
919 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
920         rmr_mbuf_t* ombuf;                      // original mbuf passed in
921         uta_ctx_t*      ctx;
922         uta_mhdr_t*     hdr;                    // header in the transport buffer
923         chute_t*        chute;
924         unsigned char*  d1;                     // d1 data in header
925         struct timespec ts;                     // time info if we have a timeout
926         long    new_ms;                         // adjusted mu-sec
927         long    seconds = 0;            // max wait seconds
928         long    nano_sec;                       // max wait xlated to nano seconds
929         int             state;
930         
931         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
932                 errno = EINVAL;
933                 if( mbuf ) {
934                         mbuf->state = RMR_ERR_BADARG;
935                 }
936                 return mbuf;
937         }
938
939         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
940                 mbuf->state = RMR_ERR_NOTSUPP;
941                 return mbuf;
942         }
943
944         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
945                 mbuf->state = RMR_ERR_BADARG;
946                 return mbuf;
947         }
948
949         ombuf = mbuf;                                                                                                   // save to return timeout status with
950
951         chute = &ctx->chutes[call_id];
952         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
953                 rmr_free_msg( chute->mbuf );
954                 chute->mbuf = NULL;
955         }
956         
957         hdr = (uta_mhdr_t *) mbuf->header;
958         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
959         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
960         d1 = DATA1_ADDR( hdr );
961         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
962         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
963
964         if( max_wait > 0 ) {
965                 clock_gettime( CLOCK_REALTIME, &ts );   
966
967                 if( max_wait > 999 ) {
968                         seconds = (max_wait - 999)/1000;
969                         max_wait -= seconds * 1000;
970                         ts.tv_sec += seconds;
971                 }
972                 if( max_wait > 0 ) {
973                         nano_sec = max_wait * 1000000;
974                         ts.tv_nsec += nano_sec;
975                         if( ts.tv_nsec > 999999999 ) {
976                                 ts.tv_nsec -= 999999999;
977                                 ts.tv_sec++;
978                         }
979                 }
980
981                 seconds = 1;                                                                            // use as flag later to invoked timed wait
982         }
983
984         mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
985         if( mbuf ) {
986                 if( mbuf->state != RMR_OK ) {
987                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
988                 }
989         }
990
991         errno = 0;
992         while( chute->mbuf == NULL && ! errno ) {
993                 if( seconds ) {
994                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
995                 } else {
996                         state = sem_wait( &chute->barrier );
997                 }
998
999                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1000                         errno = 0;
1001                 }
1002
1003                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1004                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1005                                 rmr_free_msg( chute->mbuf );
1006                                 chute->mbuf = NULL;
1007                                 errno = 0;
1008                         }
1009                 }
1010         }
1011
1012         if( state < 0 ) {
1013                 return NULL;                                    // leave errno as set by sem wait call
1014         }
1015
1016         mbuf = chute->mbuf;
1017         mbuf->state = RMR_OK;
1018         chute->mbuf = NULL;
1019
1020         return mbuf;
1021 }