Address potential error state after good send
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include <nng/nng.h>
57 #include <nng/protocol/pubsub0/pub.h>
58 #include <nng/protocol/pubsub0/sub.h>
59 #include <nng/protocol/pipeline0/push.h>
60 #include <nng/protocol/pipeline0/pull.h>
61
62
63 #include "rmr.h"                                // things the users see
64 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
65 #include "rmr_nng_private.h"    // things that we need too
66 #include "rmr_symtab.h"
67
68 #include "ring_static.c"                        // message ring support
69 #include "rt_generic_static.c"          // route table things not transport specific
70 #include "rtable_nng_static.c"          // route table things -- transport specific
71 #include "rtc_static.c"                         // route table collector
72 #include "tools_static.c"
73 #include "sr_nng_static.c"                      // send/receive static functions
74 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_nng_static.c"
77
78
79 //------------------------------------------------------------------------------
80
81
82 /*
83         Clean up a context.
84 */
85 static void free_ctx( uta_ctx_t* ctx ) {
86         if( ctx ) {
87                 if( ctx->rtg_addr ) {
88                         free( ctx->rtg_addr );
89                 }
90         }
91 }
92
93 // --------------- public functions --------------------------------------------------------------------------
94
95 /*
96         Returns the size of the payload (bytes) that the msg buffer references.
97         Len in a message is the number of bytes which were received, or should
98         be transmitted, however, it is possible that the mbuf was allocated
99         with a larger payload space than the payload length indicates; this
100         function returns the absolute maximum space that the user has available
101         in the payload. On error (bad msg buffer) -1 is returned and errno should
102         indicate the rason.
103 */
104 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
105         if( msg == NULL || msg->header == NULL ) {
106                 errno = EINVAL;
107                 return -1;
108         }
109
110         errno = 0;
111         return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
112 }
113
114 /*
115         Allocates a send message as a zerocopy message allowing the underlying message protocol
116         to send the buffer without copy.
117 */
118 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
119         uta_ctx_t*      ctx;
120         rmr_mbuf_t*     m;
121
122         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
123                 return NULL;
124         }
125
126         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
127         return  m;
128 }
129
130
131 /*
132         Allocates a send message as a zerocopy message allowing the underlying message protocol
133         to send the buffer without copy. In addition, a trace data field of tr_size will be
134         added and the supplied data coppied to the buffer before returning the message to
135         the caller.
136 */
137 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
138         uta_ctx_t*      ctx;
139         rmr_mbuf_t*     m;
140         int state;
141
142         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
143                 return NULL;
144         }
145
146         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
147         if( m != NULL ) {
148                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
149                 if( state != tr_size ) {
150                         m->state = RMR_ERR_INITFAILED;
151                 }
152         }
153
154         return  m;
155 }
156
157 /*
158         This provides an external path to the realloc static function as it's called by an
159         outward facing mbuf api function. Used to reallocate a message with a different
160         trace data size.
161 */
162 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
163         return realloc_msg( msg, new_tr_size );
164 }
165
166
167 /*
168         Return the message to the available pool, or free it outright.
169 */
170 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
171         if( mbuf == NULL ) {
172                 return;
173         }
174
175         if( mbuf->header ) {
176                 if( mbuf->flags & MFL_ZEROCOPY ) {
177                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
178                         if( mbuf->tp_buf ) {
179                                 nng_msg_free(  mbuf->tp_buf );
180                         }
181                 }
182         }
183
184         free( mbuf );
185 }
186
187 /*
188         This is a wrapper to the real timeout send. We must wrap it now to ensure that
189         the call flag and call-id are reset
190 */
191 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
192         char* d1;                                                                                                                       // point at the call-id in the header
193
194         if( msg != NULL ) {
195                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
196
197                 d1 = DATA1_ADDR( msg->header );
198                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
199         }       
200
201         return mtosend_msg( vctx, msg, max_to );
202 }
203
204 /*
205         Send with default max timeout as is set in the context.
206         See rmr_mtosend_msg() for more details on the parameters.
207         See rmr_stimeout() for info on setting the default timeout.
208 */
209 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
210         char* d1;                                                                                                               // point at the call-id in the header
211
212         if( msg != NULL ) {
213                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
214
215                 d1 = DATA1_ADDR( msg->header );
216                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
217         }       
218
219         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
220 }
221
222 /*
223         Return to sender allows a message to be sent back to the endpoint where it originated.
224         The source information in the message is used to select the socket on which to write
225         the message rather than using the message type and round-robin selection. This
226         should return a message buffer with the state of the send operation set. On success
227         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
228         error it can be passed back to this function to retry the send if desired. On error,
229         errno will liklely have the failure reason set by the nng send processing.
230         The following are possible values for the state in the message buffer:
231
232         Message states returned:
233                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
234                 RMR_ERR_NOHDR  - message did not have a header
235                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
236                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
237                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
238
239         A nil message as the return value is rare, and generally indicates some kind of horrible
240         failure. The value of errno might give a clue as to what is wrong.
241
242         CAUTION:
243                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
244                 The caller must check for this and handle.
245 */
246 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
247         nng_socket      nn_sock;                        // endpoint socket for send
248         uta_ctx_t*      ctx;
249         int                     state;
250         char*           hold_src;                       // we need the original source if send fails
251         int                     sock_ok = 0;            // true if we found a valid endpoint socket
252
253         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
254                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
255                 if( msg != NULL ) {
256                         msg->state = RMR_ERR_BADARG;
257                         msg->tp_state = errno;
258                 }
259                 return msg;
260         }
261
262         errno = 0;                                                                                                              // at this point any bad state is in msg returned
263         if( msg->header == NULL ) {
264                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
265                 msg->state = RMR_ERR_NOHDR;
266                 msg->tp_state = errno;
267                 return msg;
268         }
269
270         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
271         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // new version uses sender's ip address for rts
272                 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );                      // default to IP based rts
273         }
274         if( ! sock_ok ) {
275                 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                // IP  not in rt, try name
276                 if( ! sock_ok ) {
277                         msg->state = RMR_ERR_NOENDPT;
278                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
279                 }
280         }
281
282         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
283         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
284         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
285         msg = send_msg( ctx, msg, nn_sock, -1 );
286         if( msg ) {
287                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
288                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
289         }
290
291         free( hold_src );
292         return msg;
293 }
294
295 /*
296         If multi-threading call is turned on, this invokes that mechanism with the special call
297         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
298         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
299         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
300         a flexible timeout.
301
302         On timeout this function will return a nil pointer. If the original message could not
303         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
304
305         Original behavour:
306         Call sends the message based on message routing using the message type, and waits for a
307         response message to arrive with the same transaction id that was in the outgoing message.
308         If, while wiating for the expected response,  messages are received which do not have the
309         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
310         order that they were received.
311
312         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
313         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
314         may be resent (likely the context pointer was nil).  If the message is sent, but no
315         response is received, a nil message is returned with errno set to indicate the likley
316         issue:
317                 ETIMEDOUT -- too many messages were queued before reciving the expected response
318                 ENOBUFS -- the queued message ring is full, messages were dropped
319                 EINVAL  -- A parameter was not valid
320                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
321                                         user should call this function with the message again.
322
323 */
324 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
325         uta_ctx_t*              ctx;
326         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
327
328         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
329                 if( msg != NULL ) {
330                         msg->state = RMR_ERR_BADARG;
331                 }
332                 return msg;
333         }
334
335         if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
336                 return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
337         }
338
339         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
340         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
341         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
342         errno = 0;
343         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
344
345         msg = rmr_send_msg( ctx, msg );
346         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
347                 if( msg->state != RMR_ERR_RETRY ) {
348                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
349                 }
350                 msg->tp_state = errno;
351                 return msg;
352         }
353
354         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
355 }
356
357 /*
358         The outward facing receive function. When invoked it will pop the oldest message
359         from the receive ring, if any are queued, and return it. If the ring is empty
360         then the receive function is invoked to wait for the next message to arrive (blocking).
361
362         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
363         nil, a new one will be allocated. However, the caller should NOT expect to get the same
364         struct back (if a queued message is returned the message struct will be different).
365 */
366 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
367         uta_ctx_t*      ctx;
368         rmr_mbuf_t*     qm;                             // message that was queued on the ring
369
370         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
371                 errno = EINVAL;
372                 if( old_msg != NULL ) {
373                         old_msg->state = RMR_ERR_BADARG;
374                         old_msg->tp_state = errno;
375                 }
376                 return old_msg;
377         }
378         errno = 0;
379
380         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
381                 return rmr_mt_rcv( ctx, old_msg, -1 );
382         }
383
384         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
385         if( qm != NULL ) {
386                 if( old_msg ) {
387                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
388                 }
389
390                 return qm;
391         }
392
393         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
394 }
395
396 /*
397         This implements a receive with a timeout via epoll. Mostly this is for
398         wrappers as native C applications can use epoll directly and will not have
399         to depend on this.
400 */
401 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
402         struct epoll_stuff* eps;        // convience pointer
403         uta_ctx_t*      ctx;
404         rmr_mbuf_t*     qm;                             // message that was queued on the ring
405         int nready;
406         rmr_mbuf_t* msg;
407
408         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
409                 errno = EINVAL;
410                 if( old_msg != NULL ) {
411                         old_msg->state = RMR_ERR_BADARG;
412                         old_msg->tp_state = errno;
413                 }
414                 return old_msg;
415         }
416
417         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
418                 return rmr_mt_rcv( ctx, old_msg, ms_to );
419         }
420
421         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
422         if( qm != NULL ) {
423                 if( old_msg ) {
424                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
425                 }
426
427                 return qm;
428         }
429
430         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
431                 eps = malloc( sizeof *eps );
432
433                 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
434                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
435                         free( eps );
436                         return NULL;
437                 }
438
439                 eps->nng_fd = rmr_get_rcvfd( ctx );
440                 eps->epe.events = EPOLLIN;
441                 eps->epe.data.fd = eps->nng_fd;
442
443                 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
444                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
445                         free( eps );
446                         return NULL;
447                 }
448
449                 ctx->eps = eps;
450         }
451
452         if( old_msg ) {
453                 msg = old_msg;
454         } else {
455                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
456         }
457
458         if( ms_to < 0 ) {
459                 ms_to = 0;
460         }
461
462         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
463         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
464                 msg->state = RMR_ERR_TIMEOUT;
465                 msg->tp_state = errno;
466         } else {
467                 return rcv_msg( ctx, msg );                                                             // receive it and return it
468         }
469
470         return msg;                             // return empty message with state set
471 }
472
473 /*
474         This blocks until the message with the 'expect' ID is received. Messages which are received
475         before the expected message are queued onto the message ring.  The function will return
476         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
477         expected message is received. If the queued message ring fills a nil pointer is returned
478         and errno is set to ENOBUFS.
479
480         Generally this will be invoked only by the call() function as it waits for a response, but
481         it is exposed to the user application as three is no reason not to.
482 */
483 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
484         uta_ctx_t*      ctx;
485         int     queued = 0;                             // number we pushed into the ring
486         int     exp_len = 0;                    // length of expected ID
487
488         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
489                 errno = EINVAL;
490                 if( msg != NULL ) {
491                         msg->state = RMR_ERR_BADARG;
492                         msg->tp_state = errno;
493                 }
494                 return msg;
495         }
496
497         errno = 0;
498
499         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
500                 return rmr_rcv_msg( ctx, msg );
501         }
502
503         exp_len = strlen( expect );
504         if( exp_len > RMR_MAX_XID ) {
505                 exp_len = RMR_MAX_XID;
506         }
507         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
508
509         while( queued < allow2queue ) {
510                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
511                 if( msg->state == RMR_OK ) {
512                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
513                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
514                                 return msg;
515                         }
516
517                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
518                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
519                                 errno = ENOBUFS;
520                                 return NULL;
521                         }
522
523                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
524                         queued++;
525                         msg = NULL;
526                 }
527         }
528
529         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
530         errno = ETIMEDOUT;
531         return NULL;
532 }
533
534 /*
535         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
536         _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
537         mechnism indicates eagain or etimeedout.  All other error conditions are reported
538         without this delay. Setting a timeout of 0 causes no retries to be attempted in
539         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
540         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
541         after every 1K send attempts until the "time" value is reached. Retries are abandoned
542         if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
543
544         The default, if this function is not used, is 1; meaning that RMr will retry, but will
545         not enter a sleep.  In all cases the caller should check the status in the message returned
546         after a send call.
547
548         Returns -1 if the context was invalid; RMR_OK otherwise.
549 */
550 extern int rmr_set_stimeout( void* vctx, int time ) {
551         uta_ctx_t*      ctx;
552
553         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
554                 return -1;
555         }
556
557         if( time < 0 ) {
558                 time = 0;
559         }
560
561         ctx->send_retries = time;
562         return RMR_OK;
563 }
564
565 /*
566         Set receive timeout -- not supported in nng implementation
567
568         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
569 */
570 extern int rmr_set_rtimeout( void* vctx, int time ) {
571         fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
572         return 0;
573 }
574
575
576 /*
577         This is the actual init workhorse. The user visible function meerly ensures that the
578         calling programme does NOT set any internal flags that are supported, and then
579         invokes this.  Internal functions (the route table collector) which need additional
580         open ports without starting additional route table collectors, will invoke this
581         directly with the proper flag.
582 */
583 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
584         static  int announced = 0;
585         uta_ctx_t*      ctx = NULL;
586         char    bind_info[NNG_MAXADDRLEN];      // bind info
587         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
588         char*   port;
589         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
590         char*   proto_port;
591         char    wbuf[1024];                                     // work buffer
592         char*   tok;                                            // pointer at token in a buffer
593         int             state;
594
595         if( ! announced ) {
596                 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
597                         RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
598                 announced = 1;
599         }
600
601         errno = 0;
602         if( uproto_port == NULL ) {
603                 proto_port = strdup( DEF_COMM_PORT );
604         } else {
605                 proto_port = strdup( uproto_port );             // so we can modify it
606         }
607
608         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
609                 errno = ENOMEM;
610                 return NULL;
611         }
612         memset( ctx, 0, sizeof( uta_ctx_t ) );
613
614         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
615         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
616
617         if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
618                 ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
619                 init_mtcall( ctx );                                                     // set up call chutes
620         } else {
621                 ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
622         }
623
624         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
625         if( max_msg_size > 0 ) {
626                 ctx->max_plen = max_msg_size;
627         }
628
629         // we're using a listener to get rtg updates, so we do NOT need this.
630         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
631
632         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
633                 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
634                 free_ctx( ctx );
635                 return NULL;
636         }
637
638         if( (port = strchr( proto_port, ':' )) != NULL ) {
639                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
640                         port++;
641                 } else {
642                         *(port++) = 0;                  // term proto string and point at port string
643                         proto = proto_port;             // user supplied proto so point at it rather than default
644                 }
645         } else {
646                 port = proto_port;                      // assume something like "1234" was passed
647         }
648
649         if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
650                 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
651                 return NULL;
652         }
653         if( (tok = strchr( wbuf, '.' )) != NULL ) {
654                 *tok = 0;                                                                       // we don't keep domain portion
655         }
656         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
657         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
658                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
659                 return NULL;
660         }
661
662         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
663                 if( atoi( tok ) > 0 ) {
664                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
665                 }
666         }
667
668         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
669         if( flags & RMRFL_NAME_ONLY ) {
670                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
671         } else {
672                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
673                 if( ctx->my_ip == NULL ) {
674                         fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
675                         strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
676                 }
677         }
678         if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
679
680         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
681                 if( *tok == '1' ) {
682                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
683                 }
684         }
685
686
687         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
688                 interface = "0.0.0.0";
689         }
690         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
691         //       rather than using this generic listen() call.
692         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
693         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
694                 fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
695                 nng_close( ctx->nn_sock );
696                 free_ctx( ctx );
697                 return NULL;
698         }
699
700         if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
701                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
702                         fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
703                 }
704         }
705
706         if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
707                 ctx->flags |= CFL_MTC_ENABLED;
708                 if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
709                         fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
710                 }
711                 
712         }
713
714         free( proto_port );
715         return (void *) ctx;
716 }
717
718 /*
719         Initialise the message routing environment. Flags are one of the UTAFL_
720         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
721         (tcp) to be used, then :port is all that is needed.
722
723         At the moment it seems that TCP really is the only viable protocol, but
724         we'll allow flexibility.
725
726         The return value is a void pointer which must be passed to most uta functions. On
727         error, a nil pointer is returned and errno should be set.
728
729         Flags:
730                 No user flags supported (needed) at the moment, but this provides for extension
731                 without drastically changing anything. The user should invoke with RMRFL_NONE to
732                 avoid any misbehavour as there are internal flags which are suported
733 */
734 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
735         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
736 }
737
738 /*
739         This sets the default trace length which will be added to any message buffers
740         allocated.  It can be set at any time, and if rmr_set_trace() is given a
741         trace len that is different than the default allcoated in a message, the message
742         will be resized.
743
744         Returns 0 on failure and 1 on success. If failure, then errno will be set.
745 */
746 extern int rmr_init_trace( void* vctx, int tr_len ) {
747         uta_ctx_t* ctx;
748
749         errno = 0;
750         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
751                 errno = EINVAL;
752                 return 0;
753         }
754
755         ctx->trace_data_len = tr_len;
756         return 1;
757 }
758
759 /*
760         Return true if routing table is initialised etc. and app can send/receive.
761 */
762 extern int rmr_ready( void* vctx ) {
763         uta_ctx_t *ctx;
764
765         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
766                 return FALSE;
767         }
768
769         if( ctx->rtable != NULL ) {
770                 return TRUE;
771         }
772
773         return FALSE;
774 }
775
776 /*
777         Returns a file descriptor which can be used with epoll() to signal a receive
778         pending. The file descriptor should NOT be read from directly, nor closed, as NNG
779         does not support this.
780 */
781 extern int rmr_get_rcvfd( void* vctx ) {
782         uta_ctx_t* ctx;
783         int fd;
784         int state;
785
786         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
787                 return -1;
788         }
789
790         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
791                 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
792                 return -1;
793         }
794
795         return fd;
796 }
797
798
799 /*
800         Clean up things.
801
802         There isn't an nng_flush() per se, but we can pause, generate
803         a context switch, which should allow the last sent buffer to
804         flow. There isn't exactly an nng_term/close either, so there
805         isn't much we can do.
806 */
807 extern void rmr_close( void* vctx ) {
808         uta_ctx_t *ctx;
809
810         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
811                 return;
812         }
813
814         ctx->shutdown = 1;
815         nng_close( ctx->nn_sock );
816 }
817
818
819 // ----- multi-threaded call/receive support -------------------------------------------------
820
821 /*
822         Blocks on the receive ring chute semaphore and then reads from the ring
823         when it is tickled.  If max_wait is -1 then the function blocks until
824         a message is ready on the ring. Else max_wait is assumed to be the number
825         of millaseconds to wait before returning a timeout message.
826 */
827 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
828         uta_ctx_t*      ctx;
829         uta_mhdr_t*     hdr;                    // header in the transport buffer
830         chute_t*        chute;
831         struct timespec ts;                     // time info if we have a timeout
832         long    new_ms;                         // adjusted mu-sec
833         long    seconds = 0;            // max wait seconds
834         long    nano_sec;                       // max wait xlated to nano seconds
835         int             state;
836         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
837         
838         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
839                 errno = EINVAL;
840                 if( mbuf ) {
841                         mbuf->state = RMR_ERR_BADARG;
842                         mbuf->tp_state = errno;
843                 }
844                 return mbuf;
845         }
846
847         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
848                 errno = EINVAL;
849                 if( mbuf != NULL ) {
850                         mbuf->state = RMR_ERR_NOTSUPP;
851                         mbuf->tp_state = errno;
852                 }
853                 return mbuf;
854         }
855
856         ombuf = mbuf;
857         if( ombuf ) {
858                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
859                 ombuf->len = 0;
860         }
861
862         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
863         
864         if( max_wait > 0 ) {
865                 clock_gettime( CLOCK_REALTIME, &ts );   
866
867                 if( max_wait > 999 ) {
868                         seconds = (max_wait - 999)/1000;
869                         max_wait -= seconds * 1000;
870                         ts.tv_sec += seconds;
871                 }
872                 if( max_wait > 0 ) {
873                         nano_sec = max_wait * 1000000;
874                         ts.tv_nsec += nano_sec;
875                         if( ts.tv_nsec > 999999999 ) {
876                                 ts.tv_nsec -= 999999999;
877                                 ts.tv_sec++;
878                         }
879                 }
880
881                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
882         }
883
884         errno = 0;
885         state = 0;
886         while( chute->mbuf == NULL && ! errno ) {
887                 if( seconds ) {
888                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
889                 } else {
890                         state = sem_wait( &chute->barrier );
891                 }
892
893                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
894                         errno = 0;
895                 }
896         }
897
898         if( state < 0 ) {
899                 mbuf = ombuf;                           // return caller's buffer if they passed one in
900         } else {
901                 if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
902                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
903                         if( mbuf ) {
904                                 mbuf->state = RMR_OK;
905
906                                 if( ombuf ) {
907                                         rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
908                                 }
909                         } else {
910                                 mbuf = ombuf;                           // no buffer, return user's if there
911                         }
912                 }
913         }
914
915         if( mbuf ) {
916                 mbuf->tp_state = errno;
917         }
918         return mbuf;
919 }
920
921 /*
922         Accept a message buffer and caller ID, send the message and then wait
923         for the receiver to tickle the semaphore letting us know that a message
924         has been received. The call_id is a value between 2 and 255, inclusive; if
925         it's not in this range an error will be returned. Max wait is the amount
926         of time in millaseconds that the call should block for. If 0 is given
927         then no timeout is set.
928
929         If the mt_call feature has not been initialised, then the attempt to use this
930         funciton will fail with RMR_ERR_NOTSUPP
931
932         If no matching message is received before the max_wait period expires, a
933         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
934         occurs after the message has been sent, then a nil pointer is returned
935         with errno set to some other value.
936 */
937 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
938         rmr_mbuf_t* ombuf;                      // original mbuf passed in
939         uta_ctx_t*      ctx;
940         uta_mhdr_t*     hdr;                    // header in the transport buffer
941         chute_t*        chute;
942         unsigned char*  d1;                     // d1 data in header
943         struct timespec ts;                     // time info if we have a timeout
944         long    new_ms;                         // adjusted mu-sec
945         long    seconds = 0;            // max wait seconds
946         long    nano_sec;                       // max wait xlated to nano seconds
947         int             state;
948         
949         errno = EINVAL;
950         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
951                 if( mbuf ) {
952                         mbuf->tp_state = errno;
953                         mbuf->state = RMR_ERR_BADARG;
954                 }
955                 return mbuf;
956         }
957
958         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
959                 mbuf->state = RMR_ERR_NOTSUPP;
960                 mbuf->tp_state = errno;
961                 return mbuf;
962         }
963
964         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
965                 mbuf->state = RMR_ERR_BADARG;
966                 mbuf->tp_state = errno;
967                 return mbuf;
968         }
969
970         ombuf = mbuf;                                                                                                   // save to return timeout status with
971
972         chute = &ctx->chutes[call_id];
973         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
974                 rmr_free_msg( chute->mbuf );
975                 chute->mbuf = NULL;
976         }
977         
978         hdr = (uta_mhdr_t *) mbuf->header;
979         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
980         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
981         d1 = DATA1_ADDR( hdr );
982         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
983         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
984
985         if( max_wait > 0 ) {
986                 clock_gettime( CLOCK_REALTIME, &ts );   
987
988                 if( max_wait > 999 ) {
989                         seconds = (max_wait - 999)/1000;
990                         max_wait -= seconds * 1000;
991                         ts.tv_sec += seconds;
992                 }
993                 if( max_wait > 0 ) {
994                         nano_sec = max_wait * 1000000;
995                         ts.tv_nsec += nano_sec;
996                         if( ts.tv_nsec > 999999999 ) {
997                                 ts.tv_nsec -= 999999999;
998                                 ts.tv_sec++;
999                         }
1000                 }
1001
1002                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1003         }
1004
1005         mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
1006         if( mbuf ) {
1007                 if( mbuf->state != RMR_OK ) {
1008                         mbuf->tp_state = errno;
1009                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1010                 }
1011         }
1012
1013         state = 0;
1014         errno = 0;
1015         while( chute->mbuf == NULL && ! errno ) {
1016                 if( seconds ) {
1017                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1018                 } else {
1019                         state = sem_wait( &chute->barrier );
1020                 }
1021
1022                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1023                         errno = 0;
1024                 }
1025
1026                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1027                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1028                                 rmr_free_msg( chute->mbuf );
1029                                 chute->mbuf = NULL;
1030                                 errno = 0;
1031                         }
1032                 }
1033         }
1034
1035         if( state < 0 ) {
1036                 return NULL;                                    // leave errno as set by sem wait call
1037         }
1038
1039         mbuf = chute->mbuf;
1040         mbuf->state = RMR_OK;
1041         chute->mbuf = NULL;
1042
1043         return mbuf;
1044 }