The change to fix a bug2
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include <nng/nng.h>
57 #include <nng/protocol/pubsub0/pub.h>
58 #include <nng/protocol/pubsub0/sub.h>
59 #include <nng/protocol/pipeline0/push.h>
60 #include <nng/protocol/pipeline0/pull.h>
61
62
63 #include "rmr.h"                                // things the users see
64 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
65 #include "rmr_nng_private.h"    // things that we need too
66 #include "rmr_symtab.h"
67
68 #include "ring_static.c"                        // message ring support
69 #include "rt_generic_static.c"          // route table things not transport specific
70 #include "rtable_nng_static.c"          // route table things -- transport specific
71 #include "rtc_static.c"                         // route table collector
72 #include "tools_static.c"
73 #include "sr_nng_static.c"                      // send/receive static functions
74 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_nng_static.c"
77
78
79 //------------------------------------------------------------------------------
80
81
82 /*
83         Clean up a context.
84 */
85 static void free_ctx( uta_ctx_t* ctx ) {
86         if( ctx ) {
87                 if( ctx->rtg_addr ) {
88                         free( ctx->rtg_addr );
89                 }
90         }
91 }
92
93 // --------------- public functions --------------------------------------------------------------------------
94
95 /*
96         Returns the size of the payload (bytes) that the msg buffer references.
97         Len in a message is the number of bytes which were received, or should
98         be transmitted, however, it is possible that the mbuf was allocated
99         with a larger payload space than the payload length indicates; this
100         function returns the absolute maximum space that the user has available
101         in the payload. On error (bad msg buffer) -1 is returned and errno should
102         indicate the rason.
103 */
104 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
105         if( msg == NULL || msg->header == NULL ) {
106                 errno = EINVAL;
107                 return -1;
108         }
109
110         errno = 0;
111         return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
112 }
113
114 /*
115         Allocates a send message as a zerocopy message allowing the underlying message protocol
116         to send the buffer without copy.
117 */
118 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
119         uta_ctx_t*      ctx;
120         rmr_mbuf_t*     m;
121
122         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
123                 return NULL;
124         }
125
126         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
127         return  m;
128 }
129
130
131 /*
132         Allocates a send message as a zerocopy message allowing the underlying message protocol
133         to send the buffer without copy. In addition, a trace data field of tr_size will be
134         added and the supplied data coppied to the buffer before returning the message to
135         the caller.
136 */
137 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
138         uta_ctx_t*      ctx;
139         rmr_mbuf_t*     m;
140         int state;
141
142         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
143                 return NULL;
144         }
145
146         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
147         if( m != NULL ) {
148                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
149                 if( state != tr_size ) {
150                         m->state = RMR_ERR_INITFAILED;
151                 }
152         }
153
154         return  m;
155 }
156
157 /*
158         This provides an external path to the realloc static function as it's called by an
159         outward facing mbuf api function. Used to reallocate a message with a different
160         trace data size.
161 */
162 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
163         return realloc_msg( msg, new_tr_size );
164 }
165
166
167 /*
168         Return the message to the available pool, or free it outright.
169 */
170 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
171         if( mbuf == NULL ) {
172                 return;
173         }
174
175         if( mbuf->header ) {
176                 if( mbuf->flags & MFL_ZEROCOPY ) {
177                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
178                         if( mbuf->tp_buf ) {
179                                 nng_msg_free(  mbuf->tp_buf );
180                         }
181                 }
182         }
183
184         free( mbuf );
185 }
186
187 /*
188         This is a wrapper to the real timeout send. We must wrap it now to ensure that
189         the call flag and call-id are reset
190 */
191 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
192         char* d1;                                                                                                                       // point at the call-id in the header
193
194         if( msg != NULL ) {
195                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
196
197                 d1 = DATA1_ADDR( msg->header );
198                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
199         }       
200
201         return mtosend_msg( vctx, msg, max_to );
202 }
203
204 /*
205         Send with default max timeout as is set in the context.
206         See rmr_mtosend_msg() for more details on the parameters.
207         See rmr_stimeout() for info on setting the default timeout.
208 */
209 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
210         char* d1;                                                                                                               // point at the call-id in the header
211
212         if( msg != NULL ) {
213                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
214
215                 d1 = DATA1_ADDR( msg->header );
216                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
217         }       
218
219         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
220 }
221
222 /*
223         Return to sender allows a message to be sent back to the endpoint where it originated.
224         The source information in the message is used to select the socket on which to write
225         the message rather than using the message type and round-robin selection. This
226         should return a message buffer with the state of the send operation set. On success
227         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
228         error it can be passed back to this function to retry the send if desired. On error,
229         errno will liklely have the failure reason set by the nng send processing.
230         The following are possible values for the state in the message buffer:
231
232         Message states returned:
233                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
234                 RMR_ERR_NOHDR  - message did not have a header
235                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
236                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
237                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
238
239         A nil message as the return value is rare, and generally indicates some kind of horrible
240         failure. The value of errno might give a clue as to what is wrong.
241
242         CAUTION:
243                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
244                 The caller must check for this and handle.
245 */
246 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
247         nng_socket      nn_sock;                        // endpoint socket for send
248         uta_ctx_t*      ctx;
249         int                     state;
250         char*           hold_src;                       // we need the original source if send fails
251         char*           hold_ip;                        // also must hold original ip
252         int                     sock_ok = 0;            // true if we found a valid endpoint socket
253
254         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
255                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
256                 if( msg != NULL ) {
257                         msg->state = RMR_ERR_BADARG;
258                         msg->tp_state = errno;
259                 }
260                 return msg;
261         }
262
263         errno = 0;                                                                                                              // at this point any bad state is in msg returned
264         if( msg->header == NULL ) {
265                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
266                 msg->state = RMR_ERR_NOHDR;
267                 msg->tp_state = errno;
268                 return msg;
269         }
270
271         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
272
273         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // src is always used first for rts
274         if( ! sock_ok ) {
275                 if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
276                         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );
277                 }
278                 if( ! sock_ok ) {
279                         msg->state = RMR_ERR_NOENDPT;
280                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
281                 }
282         }
283
284         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
285         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
286         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
287         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
288         msg = send_msg( ctx, msg, nn_sock, -1 );
289         if( msg ) {
290                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
291                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
292                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
293         }
294
295         free( hold_src );
296         free( hold_ip );
297         return msg;
298 }
299
300 /*
301         If multi-threading call is turned on, this invokes that mechanism with the special call
302         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
303         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
304         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
305         a flexible timeout.
306
307         On timeout this function will return a nil pointer. If the original message could not
308         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
309
310         Original behavour:
311         Call sends the message based on message routing using the message type, and waits for a
312         response message to arrive with the same transaction id that was in the outgoing message.
313         If, while wiating for the expected response,  messages are received which do not have the
314         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
315         order that they were received.
316
317         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
318         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
319         may be resent (likely the context pointer was nil).  If the message is sent, but no
320         response is received, a nil message is returned with errno set to indicate the likley
321         issue:
322                 ETIMEDOUT -- too many messages were queued before reciving the expected response
323                 ENOBUFS -- the queued message ring is full, messages were dropped
324                 EINVAL  -- A parameter was not valid
325                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
326                                         user should call this function with the message again.
327
328 */
329 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
330         uta_ctx_t*              ctx;
331         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
332
333         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
334                 if( msg != NULL ) {
335                         msg->state = RMR_ERR_BADARG;
336                 }
337                 return msg;
338         }
339
340         if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
341                 return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
342         }
343
344         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
345         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
346         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
347         errno = 0;
348         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
349
350         msg = rmr_send_msg( ctx, msg );
351         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
352                 if( msg->state != RMR_ERR_RETRY ) {
353                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
354                 }
355                 msg->tp_state = errno;
356                 return msg;
357         }
358
359         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
360 }
361
362 /*
363         The outward facing receive function. When invoked it will pop the oldest message
364         from the receive ring, if any are queued, and return it. If the ring is empty
365         then the receive function is invoked to wait for the next message to arrive (blocking).
366
367         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
368         nil, a new one will be allocated. However, the caller should NOT expect to get the same
369         struct back (if a queued message is returned the message struct will be different).
370 */
371 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
372         uta_ctx_t*      ctx;
373         rmr_mbuf_t*     qm;                             // message that was queued on the ring
374
375         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
376                 errno = EINVAL;
377                 if( old_msg != NULL ) {
378                         old_msg->state = RMR_ERR_BADARG;
379                         old_msg->tp_state = errno;
380                 }
381                 return old_msg;
382         }
383         errno = 0;
384
385         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
386                 return rmr_mt_rcv( ctx, old_msg, -1 );
387         }
388
389         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
390         if( qm != NULL ) {
391                 if( old_msg ) {
392                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
393                 }
394
395                 return qm;
396         }
397
398         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
399 }
400
401 /*
402         This implements a receive with a timeout via epoll. Mostly this is for
403         wrappers as native C applications can use epoll directly and will not have
404         to depend on this.
405 */
406 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
407         struct epoll_stuff* eps;        // convience pointer
408         uta_ctx_t*      ctx;
409         rmr_mbuf_t*     qm;                             // message that was queued on the ring
410         int nready;
411         rmr_mbuf_t* msg;
412
413         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
414                 errno = EINVAL;
415                 if( old_msg != NULL ) {
416                         old_msg->state = RMR_ERR_BADARG;
417                         old_msg->tp_state = errno;
418                 }
419                 return old_msg;
420         }
421
422         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
423                 return rmr_mt_rcv( ctx, old_msg, ms_to );
424         }
425
426         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
427         if( qm != NULL ) {
428                 if( old_msg ) {
429                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
430                 }
431
432                 return qm;
433         }
434
435         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
436                 eps = malloc( sizeof *eps );
437
438                 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
439                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
440                         free( eps );
441                         return NULL;
442                 }
443
444                 eps->nng_fd = rmr_get_rcvfd( ctx );
445                 eps->epe.events = EPOLLIN;
446                 eps->epe.data.fd = eps->nng_fd;
447
448                 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
449                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
450                         free( eps );
451                         return NULL;
452                 }
453
454                 ctx->eps = eps;
455         }
456
457         if( old_msg ) {
458                 msg = old_msg;
459         } else {
460                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
461         }
462
463         if( ms_to < 0 ) {
464                 ms_to = 0;
465         }
466
467         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
468         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
469                 msg->state = RMR_ERR_TIMEOUT;
470                 msg->tp_state = errno;
471         } else {
472                 return rcv_msg( ctx, msg );                                                             // receive it and return it
473         }
474
475         return msg;                             // return empty message with state set
476 }
477
478 /*
479         This blocks until the message with the 'expect' ID is received. Messages which are received
480         before the expected message are queued onto the message ring.  The function will return
481         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
482         expected message is received. If the queued message ring fills a nil pointer is returned
483         and errno is set to ENOBUFS.
484
485         Generally this will be invoked only by the call() function as it waits for a response, but
486         it is exposed to the user application as three is no reason not to.
487 */
488 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
489         uta_ctx_t*      ctx;
490         int     queued = 0;                             // number we pushed into the ring
491         int     exp_len = 0;                    // length of expected ID
492
493         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
494                 errno = EINVAL;
495                 if( msg != NULL ) {
496                         msg->state = RMR_ERR_BADARG;
497                         msg->tp_state = errno;
498                 }
499                 return msg;
500         }
501
502         errno = 0;
503
504         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
505                 return rmr_rcv_msg( ctx, msg );
506         }
507
508         exp_len = strlen( expect );
509         if( exp_len > RMR_MAX_XID ) {
510                 exp_len = RMR_MAX_XID;
511         }
512         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
513
514         while( queued < allow2queue ) {
515                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
516                 if( msg->state == RMR_OK ) {
517                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
518                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
519                                 return msg;
520                         }
521
522                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
523                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
524                                 errno = ENOBUFS;
525                                 return NULL;
526                         }
527
528                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
529                         queued++;
530                         msg = NULL;
531                 }
532         }
533
534         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
535         errno = ETIMEDOUT;
536         return NULL;
537 }
538
539 /*
540         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
541         _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
542         mechnism indicates eagain or etimeedout.  All other error conditions are reported
543         without this delay. Setting a timeout of 0 causes no retries to be attempted in
544         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
545         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
546         after every 1K send attempts until the "time" value is reached. Retries are abandoned
547         if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
548
549         The default, if this function is not used, is 1; meaning that RMr will retry, but will
550         not enter a sleep.  In all cases the caller should check the status in the message returned
551         after a send call.
552
553         Returns -1 if the context was invalid; RMR_OK otherwise.
554 */
555 extern int rmr_set_stimeout( void* vctx, int time ) {
556         uta_ctx_t*      ctx;
557
558         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
559                 return -1;
560         }
561
562         if( time < 0 ) {
563                 time = 0;
564         }
565
566         ctx->send_retries = time;
567         return RMR_OK;
568 }
569
570 /*
571         Set receive timeout -- not supported in nng implementation
572
573         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
574 */
575 extern int rmr_set_rtimeout( void* vctx, int time ) {
576         fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
577         return 0;
578 }
579
580
581 /*
582         This is the actual init workhorse. The user visible function meerly ensures that the
583         calling programme does NOT set any internal flags that are supported, and then
584         invokes this.  Internal functions (the route table collector) which need additional
585         open ports without starting additional route table collectors, will invoke this
586         directly with the proper flag.
587 */
588 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
589         static  int announced = 0;
590         uta_ctx_t*      ctx = NULL;
591         char    bind_info[NNG_MAXADDRLEN];      // bind info
592         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
593         char*   port;
594         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
595         char*   proto_port;
596         char    wbuf[1024];                                     // work buffer
597         char*   tok;                                            // pointer at token in a buffer
598         char*   tok2;
599         int             state;
600
601         if( ! announced ) {
602                 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
603                         RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
604                 announced = 1;
605         }
606
607         errno = 0;
608         if( uproto_port == NULL ) {
609                 proto_port = strdup( DEF_COMM_PORT );
610         } else {
611                 proto_port = strdup( uproto_port );             // so we can modify it
612         }
613
614         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
615                 errno = ENOMEM;
616                 return NULL;
617         }
618         memset( ctx, 0, sizeof( uta_ctx_t ) );
619
620         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
621         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
622
623         if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
624                 ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
625                 init_mtcall( ctx );                                                     // set up call chutes
626         } else {
627                 ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
628         }
629
630         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
631         if( max_msg_size > 0 ) {
632                 ctx->max_plen = max_msg_size;
633         }
634
635         // we're using a listener to get rtg updates, so we do NOT need this.
636         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
637
638         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
639                 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
640                 free_ctx( ctx );
641                 return NULL;
642         }
643
644         if( (port = strchr( proto_port, ':' )) != NULL ) {
645                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
646                         port++;
647                 } else {
648                         *(port++) = 0;                  // term proto string and point at port string
649                         proto = proto_port;             // user supplied proto so point at it rather than default
650                 }
651         } else {
652                 port = proto_port;                      // assume something like "1234" was passed
653         }
654
655         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
656                 tok = strdup( tok );                                    // something we can destroy
657                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
658                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
659                 } else {
660                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
661                 }
662                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
663                         *tok2 = 0;                                                      // make it so
664                 }
665
666                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
667                 free( tok );
668         } else {
669                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
670                         fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
671                         return NULL;
672                 }
673                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
674                         *tok = 0;                                                                       // we don't keep domain portion
675                 }
676         }
677
678         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
679         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
680                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
681                 return NULL;
682         }
683
684         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
685                 if( atoi( tok ) > 0 ) {
686                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
687                 }
688         }
689
690         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
691         if( flags & RMRFL_NAME_ONLY ) {
692                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
693         } else {
694                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
695                 if( ctx->my_ip == NULL ) {
696                         fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
697                         strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
698                 }
699         }
700         if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
701
702         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
703                 if( *tok == '1' ) {
704                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
705                 }
706         }
707
708
709         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
710                 interface = "0.0.0.0";
711         }
712         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
713         //       rather than using this generic listen() call.
714         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
715         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
716                 fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
717                 nng_close( ctx->nn_sock );
718                 free_ctx( ctx );
719                 return NULL;
720         }
721
722         if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
723                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
724                         fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
725                 }
726         }
727
728         if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
729                 ctx->flags |= CFL_MTC_ENABLED;
730                 if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
731                         fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
732                 }
733                 
734         }
735
736         free( proto_port );
737         return (void *) ctx;
738 }
739
740 /*
741         Initialise the message routing environment. Flags are one of the UTAFL_
742         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
743         (tcp) to be used, then :port is all that is needed.
744
745         At the moment it seems that TCP really is the only viable protocol, but
746         we'll allow flexibility.
747
748         The return value is a void pointer which must be passed to most uta functions. On
749         error, a nil pointer is returned and errno should be set.
750
751         Flags:
752                 No user flags supported (needed) at the moment, but this provides for extension
753                 without drastically changing anything. The user should invoke with RMRFL_NONE to
754                 avoid any misbehavour as there are internal flags which are suported
755 */
756 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
757         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
758 }
759
760 /*
761         This sets the default trace length which will be added to any message buffers
762         allocated.  It can be set at any time, and if rmr_set_trace() is given a
763         trace len that is different than the default allcoated in a message, the message
764         will be resized.
765
766         Returns 0 on failure and 1 on success. If failure, then errno will be set.
767 */
768 extern int rmr_init_trace( void* vctx, int tr_len ) {
769         uta_ctx_t* ctx;
770
771         errno = 0;
772         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
773                 errno = EINVAL;
774                 return 0;
775         }
776
777         ctx->trace_data_len = tr_len;
778         return 1;
779 }
780
781 /*
782         Return true if routing table is initialised etc. and app can send/receive.
783 */
784 extern int rmr_ready( void* vctx ) {
785         uta_ctx_t *ctx;
786
787         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
788                 return FALSE;
789         }
790
791         if( ctx->rtable != NULL ) {
792                 return TRUE;
793         }
794
795         return FALSE;
796 }
797
798 /*
799         Returns a file descriptor which can be used with epoll() to signal a receive
800         pending. The file descriptor should NOT be read from directly, nor closed, as NNG
801         does not support this.
802 */
803 extern int rmr_get_rcvfd( void* vctx ) {
804         uta_ctx_t* ctx;
805         int fd;
806         int state;
807
808         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
809                 return -1;
810         }
811
812         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
813                 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
814                 return -1;
815         }
816
817         return fd;
818 }
819
820
821 /*
822         Clean up things.
823
824         There isn't an nng_flush() per se, but we can pause, generate
825         a context switch, which should allow the last sent buffer to
826         flow. There isn't exactly an nng_term/close either, so there
827         isn't much we can do.
828 */
829 extern void rmr_close( void* vctx ) {
830         uta_ctx_t *ctx;
831
832         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
833                 return;
834         }
835
836         ctx->shutdown = 1;
837         nng_close( ctx->nn_sock );
838 }
839
840
841 // ----- multi-threaded call/receive support -------------------------------------------------
842
843 /*
844         Blocks on the receive ring chute semaphore and then reads from the ring
845         when it is tickled.  If max_wait is -1 then the function blocks until
846         a message is ready on the ring. Else max_wait is assumed to be the number
847         of millaseconds to wait before returning a timeout message.
848 */
849 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
850         uta_ctx_t*      ctx;
851         uta_mhdr_t*     hdr;                    // header in the transport buffer
852         chute_t*        chute;
853         struct timespec ts;                     // time info if we have a timeout
854         long    new_ms;                         // adjusted mu-sec
855         long    seconds = 0;            // max wait seconds
856         long    nano_sec;                       // max wait xlated to nano seconds
857         int             state;
858         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
859         
860         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
861                 errno = EINVAL;
862                 if( mbuf ) {
863                         mbuf->state = RMR_ERR_BADARG;
864                         mbuf->tp_state = errno;
865                 }
866                 return mbuf;
867         }
868
869         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
870                 errno = EINVAL;
871                 if( mbuf != NULL ) {
872                         mbuf->state = RMR_ERR_NOTSUPP;
873                         mbuf->tp_state = errno;
874                 }
875                 return mbuf;
876         }
877
878         ombuf = mbuf;
879         if( ombuf ) {
880                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
881                 ombuf->len = 0;
882         }
883
884         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
885
886         if( max_wait > 0 ) {
887                 clock_gettime( CLOCK_REALTIME, &ts );   
888
889                 if( max_wait > 999 ) {
890                         seconds = (max_wait - 999)/1000;
891                         max_wait -= seconds * 1000;
892                         ts.tv_sec += seconds;
893                 }
894                 if( max_wait > 0 ) {
895                         nano_sec = max_wait * 1000000;
896                         ts.tv_nsec += nano_sec;
897                         if( ts.tv_nsec > 999999999 ) {
898                                 ts.tv_nsec -= 999999999;
899                                 ts.tv_sec++;
900                         }
901                 }
902
903                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
904         }
905
906         errno = EINTR;
907         state = -1;
908         while( state < 0 && errno == EINTR ) {
909                 if( seconds ) {
910                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
911                 } else {
912                         state = sem_wait( &chute->barrier );
913                 }
914         }
915
916         if( state < 0 ) {
917                 mbuf = ombuf;                           // return caller's buffer if they passed one in
918         } else {
919                 errno = 0;                                              // interrupted call state could be left; clear
920                 if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
921                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
922                         mbuf->state = RMR_OK;
923
924                         if( ombuf ) {
925                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
926                         }
927                 } else {
928                         errno = ETIMEDOUT;
929                         mbuf = ombuf;                           // no buffer, return user's if there
930                 }
931         }
932
933         if( mbuf ) {
934                 mbuf->tp_state = errno;
935         }
936         return mbuf;
937 }
938
939 /*
940         Accept a message buffer and caller ID, send the message and then wait
941         for the receiver to tickle the semaphore letting us know that a message
942         has been received. The call_id is a value between 2 and 255, inclusive; if
943         it's not in this range an error will be returned. Max wait is the amount
944         of time in millaseconds that the call should block for. If 0 is given
945         then no timeout is set.
946
947         If the mt_call feature has not been initialised, then the attempt to use this
948         funciton will fail with RMR_ERR_NOTSUPP
949
950         If no matching message is received before the max_wait period expires, a
951         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
952         occurs after the message has been sent, then a nil pointer is returned
953         with errno set to some other value.
954 */
955 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
956         rmr_mbuf_t* ombuf;                      // original mbuf passed in
957         uta_ctx_t*      ctx;
958         uta_mhdr_t*     hdr;                    // header in the transport buffer
959         chute_t*        chute;
960         unsigned char*  d1;                     // d1 data in header
961         struct timespec ts;                     // time info if we have a timeout
962         long    new_ms;                         // adjusted mu-sec
963         long    seconds = 0;            // max wait seconds
964         long    nano_sec;                       // max wait xlated to nano seconds
965         int             state;
966         
967         errno = EINVAL;
968         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
969                 if( mbuf ) {
970                         mbuf->tp_state = errno;
971                         mbuf->state = RMR_ERR_BADARG;
972                 }
973                 return mbuf;
974         }
975
976         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
977                 mbuf->state = RMR_ERR_NOTSUPP;
978                 mbuf->tp_state = errno;
979                 return mbuf;
980         }
981
982         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
983                 mbuf->state = RMR_ERR_BADARG;
984                 mbuf->tp_state = errno;
985                 return mbuf;
986         }
987
988         ombuf = mbuf;                                                                                                   // save to return timeout status with
989
990         chute = &ctx->chutes[call_id];
991         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
992                 rmr_free_msg( chute->mbuf );
993                 chute->mbuf = NULL;
994         }
995         
996         hdr = (uta_mhdr_t *) mbuf->header;
997         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
998         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
999         d1 = DATA1_ADDR( hdr );
1000         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1001         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1002
1003         if( max_wait > 0 ) {
1004                 clock_gettime( CLOCK_REALTIME, &ts );   
1005
1006                 if( max_wait > 999 ) {
1007                         seconds = (max_wait - 999)/1000;
1008                         max_wait -= seconds * 1000;
1009                         ts.tv_sec += seconds;
1010                 }
1011                 if( max_wait > 0 ) {
1012                         nano_sec = max_wait * 1000000;
1013                         ts.tv_nsec += nano_sec;
1014                         if( ts.tv_nsec > 999999999 ) {
1015                                 ts.tv_nsec -= 999999999;
1016                                 ts.tv_sec++;
1017                         }
1018                 }
1019
1020                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1021         }
1022
1023         mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
1024         if( mbuf ) {
1025                 if( mbuf->state != RMR_OK ) {
1026                         mbuf->tp_state = errno;
1027                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1028                 }
1029         }
1030
1031         state = 0;
1032         errno = 0;
1033         while( chute->mbuf == NULL && ! errno ) {
1034                 if( seconds ) {
1035                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1036                 } else {
1037                         state = sem_wait( &chute->barrier );
1038                 }
1039
1040                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1041                         errno = 0;
1042                 }
1043
1044                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1045                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1046                                 rmr_free_msg( chute->mbuf );
1047                                 chute->mbuf = NULL;
1048                                 errno = 0;
1049                         }
1050                 }
1051         }
1052
1053         if( state < 0 ) {
1054                 return NULL;                                    // leave errno as set by sem wait call
1055         }
1056
1057         mbuf = chute->mbuf;
1058         mbuf->state = RMR_OK;
1059         chute->mbuf = NULL;
1060
1061         return mbuf;
1062 }