720f6ade6ce722b89eb50978c06fda174e59dba4
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include <nng/nng.h>
57 #include <nng/protocol/pubsub0/pub.h>
58 #include <nng/protocol/pubsub0/sub.h>
59 #include <nng/protocol/pipeline0/push.h>
60 #include <nng/protocol/pipeline0/pull.h>
61
62
63 #include "rmr.h"                                // things the users see
64 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
65 #include "rmr_nng_private.h"    // things that we need too
66 #include "rmr_symtab.h"
67
68 #include "ring_static.c"                        // message ring support
69 #include "rt_generic_static.c"          // route table things not transport specific
70 #include "rtable_nng_static.c"          // route table things -- transport specific
71 #include "rtc_static.c"                         // route table collector
72 #include "tools_static.c"
73 #include "sr_nng_static.c"                      // send/receive static functions
74 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_nng_static.c"
77
78
79 //------------------------------------------------------------------------------
80
81
82 /*
83         Clean up a context.
84 */
85 static void free_ctx( uta_ctx_t* ctx ) {
86         if( ctx ) {
87                 if( ctx->rtg_addr ) {
88                         free( ctx->rtg_addr );
89                 }
90         }
91 }
92
93 // --------------- public functions --------------------------------------------------------------------------
94
95 /*
96         Returns the size of the payload (bytes) that the msg buffer references.
97         Len in a message is the number of bytes which were received, or should
98         be transmitted, however, it is possible that the mbuf was allocated
99         with a larger payload space than the payload length indicates; this
100         function returns the absolute maximum space that the user has available
101         in the payload. On error (bad msg buffer) -1 is returned and errno should
102         indicate the rason.
103 */
104 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
105         if( msg == NULL || msg->header == NULL ) {
106                 errno = EINVAL;
107                 return -1;
108         }
109
110         errno = 0;
111         return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
112 }
113
114 /*
115         Allocates a send message as a zerocopy message allowing the underlying message protocol
116         to send the buffer without copy.
117 */
118 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
119         uta_ctx_t*      ctx;
120         rmr_mbuf_t*     m;
121
122         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
123                 return NULL;
124         }
125
126         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
127         return  m;
128 }
129
130
131 /*
132         Allocates a send message as a zerocopy message allowing the underlying message protocol
133         to send the buffer without copy. In addition, a trace data field of tr_size will be
134         added and the supplied data coppied to the buffer before returning the message to
135         the caller.
136 */
137 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
138         uta_ctx_t*      ctx;
139         rmr_mbuf_t*     m;
140         int state;
141
142         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
143                 return NULL;
144         }
145
146         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
147         if( m != NULL ) {
148                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
149                 if( state != tr_size ) {
150                         m->state = RMR_ERR_INITFAILED;
151                 }
152         }
153
154         return  m;
155 }
156
157 /*
158         This provides an external path to the realloc static function as it's called by an
159         outward facing mbuf api function. Used to reallocate a message with a different
160         trace data size.
161 */
162 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
163         return realloc_msg( msg, new_tr_size );
164 }
165
166
167 /*
168         Return the message to the available pool, or free it outright.
169 */
170 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
171         if( mbuf == NULL ) {
172                 return;
173         }
174
175         if( mbuf->header ) {
176                 if( mbuf->flags & MFL_ZEROCOPY ) {
177                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
178                         if( mbuf->tp_buf ) {
179                                 nng_msg_free(  mbuf->tp_buf );
180                         }
181                 }
182         }
183
184         free( mbuf );
185 }
186
187 /*
188         This is a wrapper to the real timeout send. We must wrap it now to ensure that
189         the call flag and call-id are reset
190 */
191 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
192         char* d1;                                                                                                                       // point at the call-id in the header
193
194         if( msg != NULL ) {
195                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
196
197                 d1 = DATA1_ADDR( msg->header );
198                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
199         }       
200
201         return mtosend_msg( vctx, msg, max_to );
202 }
203
204 /*
205         Send with default max timeout as is set in the context.
206         See rmr_mtosend_msg() for more details on the parameters.
207         See rmr_stimeout() for info on setting the default timeout.
208 */
209 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
210         char* d1;                                                                                                               // point at the call-id in the header
211
212         if( msg != NULL ) {
213                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
214
215                 d1 = DATA1_ADDR( msg->header );
216                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
217         }       
218
219         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
220 }
221
222 /*
223         Return to sender allows a message to be sent back to the endpoint where it originated.
224         The source information in the message is used to select the socket on which to write
225         the message rather than using the message type and round-robin selection. This
226         should return a message buffer with the state of the send operation set. On success
227         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
228         error it can be passed back to this function to retry the send if desired. On error,
229         errno will liklely have the failure reason set by the nng send processing.
230         The following are possible values for the state in the message buffer:
231
232         Message states returned:
233                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
234                 RMR_ERR_NOHDR  - message did not have a header
235                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
236                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
237                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
238
239         A nil message as the return value is rare, and generally indicates some kind of horrible
240         failure. The value of errno might give a clue as to what is wrong.
241
242         CAUTION:
243                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
244                 The caller must check for this and handle.
245 */
246 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
247         nng_socket      nn_sock;                        // endpoint socket for send
248         uta_ctx_t*      ctx;
249         int                     state;
250         char*           hold_src;                       // we need the original source if send fails
251         char*           hold_ip;                        // also must hold original ip
252         int                     sock_ok = 0;            // true if we found a valid endpoint socket
253         endpoint_t*     ep;                                     // end point to track counts
254
255         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
256                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
257                 if( msg != NULL ) {
258                         msg->state = RMR_ERR_BADARG;
259                         msg->tp_state = errno;
260                 }
261                 return msg;
262         }
263
264         errno = 0;                                                                                                              // at this point any bad state is in msg returned
265         if( msg->header == NULL ) {
266                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
267                 msg->state = RMR_ERR_NOHDR;
268                 msg->tp_state = errno;
269                 return msg;
270         }
271
272         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
273
274         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );                   // src is always used first for rts
275         if( ! sock_ok ) {
276                 if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
277                         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
278                 }
279                 if( ! sock_ok ) {
280                         msg->state = RMR_ERR_NOENDPT;
281                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
282                 }
283         }
284
285         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
286         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
287         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
288         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
289         msg = send_msg( ctx, msg, nn_sock, -1 );
290         if( msg ) {
291                 if( ep != NULL ) {
292                         switch( msg->state ) {
293                                 case RMR_OK:
294                                         ep->scounts[EPSC_GOOD]++;
295                                         break;
296                         
297                                 case RMR_ERR_RETRY:
298                                         ep->scounts[EPSC_TRANS]++;
299                                         break;
300
301                                 default:
302                                         ep->scounts[EPSC_FAIL]++;
303                                         break;
304                         }
305                 }
306                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
307                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
308                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
309         }
310
311         free( hold_src );
312         free( hold_ip );
313         return msg;
314 }
315
316 /*
317         If multi-threading call is turned on, this invokes that mechanism with the special call
318         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
319         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
320         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
321         a flexible timeout.
322
323         On timeout this function will return a nil pointer. If the original message could not
324         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
325
326         Original behavour:
327         Call sends the message based on message routing using the message type, and waits for a
328         response message to arrive with the same transaction id that was in the outgoing message.
329         If, while wiating for the expected response,  messages are received which do not have the
330         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
331         order that they were received.
332
333         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
334         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
335         may be resent (likely the context pointer was nil).  If the message is sent, but no
336         response is received, a nil message is returned with errno set to indicate the likley
337         issue:
338                 ETIMEDOUT -- too many messages were queued before reciving the expected response
339                 ENOBUFS -- the queued message ring is full, messages were dropped
340                 EINVAL  -- A parameter was not valid
341                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
342                                         user should call this function with the message again.
343
344 */
345 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
346         uta_ctx_t*              ctx;
347         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
348
349         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
350                 if( msg != NULL ) {
351                         msg->state = RMR_ERR_BADARG;
352                 }
353                 return msg;
354         }
355
356         if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
357                 return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
358         }
359
360         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
361         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
362         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
363         errno = 0;
364         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
365
366         msg = rmr_send_msg( ctx, msg );
367         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
368                 if( msg->state != RMR_ERR_RETRY ) {
369                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
370                 }
371                 msg->tp_state = errno;
372                 return msg;
373         }
374
375         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
376 }
377
378 /*
379         The outward facing receive function. When invoked it will pop the oldest message
380         from the receive ring, if any are queued, and return it. If the ring is empty
381         then the receive function is invoked to wait for the next message to arrive (blocking).
382
383         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
384         nil, a new one will be allocated. However, the caller should NOT expect to get the same
385         struct back (if a queued message is returned the message struct will be different).
386 */
387 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
388         uta_ctx_t*      ctx;
389         rmr_mbuf_t*     qm;                             // message that was queued on the ring
390
391         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
392                 errno = EINVAL;
393                 if( old_msg != NULL ) {
394                         old_msg->state = RMR_ERR_BADARG;
395                         old_msg->tp_state = errno;
396                 }
397                 return old_msg;
398         }
399         errno = 0;
400
401         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
402                 return rmr_mt_rcv( ctx, old_msg, -1 );
403         }
404
405         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
406         if( qm != NULL ) {
407                 if( old_msg ) {
408                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
409                 }
410
411                 return qm;
412         }
413
414         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
415 }
416
417 /*
418         This implements a receive with a timeout via epoll. Mostly this is for
419         wrappers as native C applications can use epoll directly and will not have
420         to depend on this.
421 */
422 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
423         struct epoll_stuff* eps;        // convience pointer
424         uta_ctx_t*      ctx;
425         rmr_mbuf_t*     qm;                             // message that was queued on the ring
426         int nready;
427         rmr_mbuf_t* msg;
428
429         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
430                 errno = EINVAL;
431                 if( old_msg != NULL ) {
432                         old_msg->state = RMR_ERR_BADARG;
433                         old_msg->tp_state = errno;
434                 }
435                 return old_msg;
436         }
437
438         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
439                 return rmr_mt_rcv( ctx, old_msg, ms_to );
440         }
441
442         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
443         if( qm != NULL ) {
444                 if( old_msg ) {
445                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
446                 }
447
448                 return qm;
449         }
450
451         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
452                 eps = malloc( sizeof *eps );
453
454                 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
455                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
456                         free( eps );
457                         ctx->eps = NULL;
458                         if( old_msg != NULL ) {
459                                 old_msg->state = RMR_ERR_INITFAILED;
460                                 old_msg->tp_state = errno;
461                         }
462                         return old_msg;
463                 }
464
465                 eps->nng_fd = rmr_get_rcvfd( ctx );
466                 eps->epe.events = EPOLLIN;
467                 eps->epe.data.fd = eps->nng_fd;
468
469                 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
470                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
471                         free( eps );
472                         ctx->eps = NULL;
473                         if( old_msg != NULL ) {
474                                 old_msg->state = RMR_ERR_INITFAILED;
475                                 old_msg->tp_state = errno;
476                         }
477                         return old_msg;
478                 }
479
480                 ctx->eps = eps;
481         }
482
483         if( old_msg ) {
484                 msg = old_msg;
485         } else {
486                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
487         }
488
489         if( ms_to < 0 ) {
490                 ms_to = 0;
491         }
492
493         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
494         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
495                 msg->state = RMR_ERR_TIMEOUT;
496                 msg->tp_state = errno;
497         } else {
498                 return rcv_msg( ctx, msg );                                                             // receive it and return it
499         }
500
501         return msg;                             // return empty message with state set
502 }
503
504 /*
505         This blocks until the message with the 'expect' ID is received. Messages which are received
506         before the expected message are queued onto the message ring.  The function will return
507         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
508         expected message is received. If the queued message ring fills a nil pointer is returned
509         and errno is set to ENOBUFS.
510
511         Generally this will be invoked only by the call() function as it waits for a response, but
512         it is exposed to the user application as three is no reason not to.
513 */
514 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
515         uta_ctx_t*      ctx;
516         int     queued = 0;                             // number we pushed into the ring
517         int     exp_len = 0;                    // length of expected ID
518
519         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
520                 errno = EINVAL;
521                 if( msg != NULL ) {
522                         msg->state = RMR_ERR_BADARG;
523                         msg->tp_state = errno;
524                 }
525                 return msg;
526         }
527
528         errno = 0;
529
530         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
531                 return rmr_rcv_msg( ctx, msg );
532         }
533
534         exp_len = strlen( expect );
535         if( exp_len > RMR_MAX_XID ) {
536                 exp_len = RMR_MAX_XID;
537         }
538         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
539
540         while( queued < allow2queue ) {
541                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
542                 if( msg->state == RMR_OK ) {
543                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
544                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
545                                 return msg;
546                         }
547
548                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
549                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
550                                 errno = ENOBUFS;
551                                 return NULL;
552                         }
553
554                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
555                         queued++;
556                         msg = NULL;
557                 }
558         }
559
560         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
561         errno = ETIMEDOUT;
562         return NULL;
563 }
564
565 /*
566         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
567         _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
568         mechnism indicates eagain or etimeedout.  All other error conditions are reported
569         without this delay. Setting a timeout of 0 causes no retries to be attempted in
570         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
571         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
572         after every 1K send attempts until the "time" value is reached. Retries are abandoned
573         if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
574
575         The default, if this function is not used, is 1; meaning that RMr will retry, but will
576         not enter a sleep.  In all cases the caller should check the status in the message returned
577         after a send call.
578
579         Returns -1 if the context was invalid; RMR_OK otherwise.
580 */
581 extern int rmr_set_stimeout( void* vctx, int time ) {
582         uta_ctx_t*      ctx;
583
584         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
585                 return -1;
586         }
587
588         if( time < 0 ) {
589                 time = 0;
590         }
591
592         ctx->send_retries = time;
593         return RMR_OK;
594 }
595
596 /*
597         Set receive timeout -- not supported in nng implementation
598
599         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
600 */
601 extern int rmr_set_rtimeout( void* vctx, int time ) {
602         fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
603         return 0;
604 }
605
606
607 /*
608         This is the actual init workhorse. The user visible function meerly ensures that the
609         calling programme does NOT set any internal flags that are supported, and then
610         invokes this.  Internal functions (the route table collector) which need additional
611         open ports without starting additional route table collectors, will invoke this
612         directly with the proper flag.
613 */
614 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
615         static  int announced = 0;
616         uta_ctx_t*      ctx = NULL;
617         char    bind_info[NNG_MAXADDRLEN];      // bind info
618         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
619         char*   port;
620         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
621         char*   proto_port;
622         char    wbuf[1024];                                     // work buffer
623         char*   tok;                                            // pointer at token in a buffer
624         char*   tok2;
625         int             state;
626
627         if( ! announced ) {
628                 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
629                         RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
630                 announced = 1;
631         }
632
633         errno = 0;
634         if( uproto_port == NULL ) {
635                 proto_port = strdup( DEF_COMM_PORT );
636         } else {
637                 proto_port = strdup( uproto_port );             // so we can modify it
638         }
639
640         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
641                 errno = ENOMEM;
642                 return NULL;
643         }
644         memset( ctx, 0, sizeof( uta_ctx_t ) );
645
646         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
647         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
648
649         if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
650                 ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
651                 init_mtcall( ctx );                                                     // set up call chutes
652         } else {
653                 ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
654         }
655
656         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
657         if( max_msg_size > 0 ) {
658                 ctx->max_plen = max_msg_size;
659         }
660
661         // we're using a listener to get rtg updates, so we do NOT need this.
662         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
663
664         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
665                 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
666                 free_ctx( ctx );
667                 return NULL;
668         }
669
670         if( (port = strchr( proto_port, ':' )) != NULL ) {
671                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
672                         port++;
673                 } else {
674                         *(port++) = 0;                  // term proto string and point at port string
675                         proto = proto_port;             // user supplied proto so point at it rather than default
676                 }
677         } else {
678                 port = proto_port;                      // assume something like "1234" was passed
679         }
680
681         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
682                 tok = strdup( tok );                                    // something we can destroy
683                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
684                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
685                 } else {
686                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
687                 }
688                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
689                         *tok2 = 0;                                                      // make it so
690                 }
691
692                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
693                 free( tok );
694         } else {
695                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
696                         fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
697                         return NULL;
698                 }
699                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
700                         *tok = 0;                                                                       // we don't keep domain portion
701                 }
702         }
703
704         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
705         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
706                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
707                 return NULL;
708         }
709
710         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
711                 if( atoi( tok ) > 0 ) {
712                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
713                 }
714         }
715
716         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
717         if( flags & RMRFL_NAME_ONLY ) {
718                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
719         } else {
720                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
721                 if( ctx->my_ip == NULL ) {
722                         fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
723                         strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
724                 }
725         }
726         if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
727
728         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
729                 if( *tok == '1' ) {
730                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
731                 }
732         }
733
734
735         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
736                 interface = "0.0.0.0";
737         }
738         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
739         //       rather than using this generic listen() call.
740         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
741         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
742                 fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
743                 nng_close( ctx->nn_sock );
744                 free_ctx( ctx );
745                 return NULL;
746         }
747
748         if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
749                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
750                         fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
751                 }
752         }
753
754         if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
755                 ctx->flags |= CFL_MTC_ENABLED;
756                 if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
757                         fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
758                 }
759                 
760         }
761
762         free( proto_port );
763         return (void *) ctx;
764 }
765
766 /*
767         Initialise the message routing environment. Flags are one of the UTAFL_
768         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
769         (tcp) to be used, then :port is all that is needed.
770
771         At the moment it seems that TCP really is the only viable protocol, but
772         we'll allow flexibility.
773
774         The return value is a void pointer which must be passed to most uta functions. On
775         error, a nil pointer is returned and errno should be set.
776
777         Flags:
778                 No user flags supported (needed) at the moment, but this provides for extension
779                 without drastically changing anything. The user should invoke with RMRFL_NONE to
780                 avoid any misbehavour as there are internal flags which are suported
781 */
782 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
783         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
784 }
785
786 /*
787         This sets the default trace length which will be added to any message buffers
788         allocated.  It can be set at any time, and if rmr_set_trace() is given a
789         trace len that is different than the default allcoated in a message, the message
790         will be resized.
791
792         Returns 0 on failure and 1 on success. If failure, then errno will be set.
793 */
794 extern int rmr_init_trace( void* vctx, int tr_len ) {
795         uta_ctx_t* ctx;
796
797         errno = 0;
798         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
799                 errno = EINVAL;
800                 return 0;
801         }
802
803         ctx->trace_data_len = tr_len;
804         return 1;
805 }
806
807 /*
808         Return true if routing table is initialised etc. and app can send/receive.
809 */
810 extern int rmr_ready( void* vctx ) {
811         uta_ctx_t *ctx;
812
813         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
814                 return FALSE;
815         }
816
817         if( ctx->rtable != NULL ) {
818                 return TRUE;
819         }
820
821         return FALSE;
822 }
823
824 /*
825         Returns a file descriptor which can be used with epoll() to signal a receive
826         pending. The file descriptor should NOT be read from directly, nor closed, as NNG
827         does not support this.
828 */
829 extern int rmr_get_rcvfd( void* vctx ) {
830         uta_ctx_t* ctx;
831         int fd;
832         int state;
833
834         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
835                 return -1;
836         }
837
838         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
839                 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
840                 return -1;
841         }
842
843         return fd;
844 }
845
846
847 /*
848         Clean up things.
849
850         There isn't an nng_flush() per se, but we can pause, generate
851         a context switch, which should allow the last sent buffer to
852         flow. There isn't exactly an nng_term/close either, so there
853         isn't much we can do.
854 */
855 extern void rmr_close( void* vctx ) {
856         uta_ctx_t *ctx;
857
858         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
859                 return;
860         }
861
862         ctx->shutdown = 1;
863         nng_close( ctx->nn_sock );
864 }
865
866
867 // ----- multi-threaded call/receive support -------------------------------------------------
868
869 /*
870         Blocks on the receive ring chute semaphore and then reads from the ring
871         when it is tickled.  If max_wait is -1 then the function blocks until
872         a message is ready on the ring. Else max_wait is assumed to be the number
873         of millaseconds to wait before returning a timeout message.
874 */
875 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
876         uta_ctx_t*      ctx;
877         uta_mhdr_t*     hdr;                    // header in the transport buffer
878         chute_t*        chute;
879         struct timespec ts;                     // time info if we have a timeout
880         long    new_ms;                         // adjusted mu-sec
881         long    seconds = 0;            // max wait seconds
882         long    nano_sec;                       // max wait xlated to nano seconds
883         int             state;
884         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
885         
886         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
887                 errno = EINVAL;
888                 if( mbuf ) {
889                         mbuf->state = RMR_ERR_BADARG;
890                         mbuf->tp_state = errno;
891                 }
892                 return mbuf;
893         }
894
895         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
896                 errno = EINVAL;
897                 if( mbuf != NULL ) {
898                         mbuf->state = RMR_ERR_NOTSUPP;
899                         mbuf->tp_state = errno;
900                 }
901                 return mbuf;
902         }
903
904         ombuf = mbuf;
905         if( ombuf ) {
906                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
907                 ombuf->len = 0;
908         }
909
910         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
911
912         if( max_wait >= 0 ) {
913                 clock_gettime( CLOCK_REALTIME, &ts );   
914
915                 if( max_wait > 999 ) {
916                         seconds = max_wait / 1000;
917                         max_wait -= seconds * 1000;
918                         ts.tv_sec += seconds;
919                 }
920                 if( max_wait > 0 ) {
921                         nano_sec = max_wait * 1000000;
922                         ts.tv_nsec += nano_sec;
923                         if( ts.tv_nsec > 999999999 ) {
924                                 ts.tv_nsec -= 999999999;
925                                 ts.tv_sec++;
926                         }
927                 }
928
929                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
930         }
931
932         errno = EINTR;
933         state = -1;
934         while( state < 0 && errno == EINTR ) {
935                 if( seconds ) {
936                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
937                 } else {
938                         state = sem_wait( &chute->barrier );
939                 }
940         }
941
942         if( state < 0 ) {
943                 mbuf = ombuf;                           // return caller's buffer if they passed one in
944         } else {
945                 errno = 0;                                              // interrupted call state could be left; clear
946                 if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
947                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
948                         mbuf->state = RMR_OK;
949
950                         if( ombuf ) {
951                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
952                         }
953                 } else {
954                         errno = ETIMEDOUT;
955                         mbuf = ombuf;                           // no buffer, return user's if there
956                 }
957         }
958
959         if( mbuf ) {
960                 mbuf->tp_state = errno;
961         }
962         return mbuf;
963 }
964
965 /*
966         Accept a message buffer and caller ID, send the message and then wait
967         for the receiver to tickle the semaphore letting us know that a message
968         has been received. The call_id is a value between 2 and 255, inclusive; if
969         it's not in this range an error will be returned. Max wait is the amount
970         of time in millaseconds that the call should block for. If 0 is given
971         then no timeout is set.
972
973         If the mt_call feature has not been initialised, then the attempt to use this
974         funciton will fail with RMR_ERR_NOTSUPP
975
976         If no matching message is received before the max_wait period expires, a
977         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
978         occurs after the message has been sent, then a nil pointer is returned
979         with errno set to some other value.
980 */
981 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
982         rmr_mbuf_t* ombuf;                      // original mbuf passed in
983         uta_ctx_t*      ctx;
984         uta_mhdr_t*     hdr;                    // header in the transport buffer
985         chute_t*        chute;
986         unsigned char*  d1;                     // d1 data in header
987         struct timespec ts;                     // time info if we have a timeout
988         long    new_ms;                         // adjusted mu-sec
989         long    seconds = 0;            // max wait seconds
990         long    nano_sec;                       // max wait xlated to nano seconds
991         int             state;
992         
993         errno = EINVAL;
994         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
995                 if( mbuf ) {
996                         mbuf->tp_state = errno;
997                         mbuf->state = RMR_ERR_BADARG;
998                 }
999                 return mbuf;
1000         }
1001
1002         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1003                 mbuf->state = RMR_ERR_NOTSUPP;
1004                 mbuf->tp_state = errno;
1005                 return mbuf;
1006         }
1007
1008         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
1009                 mbuf->state = RMR_ERR_BADARG;
1010                 mbuf->tp_state = errno;
1011                 return mbuf;
1012         }
1013
1014         ombuf = mbuf;                                                                                                   // save to return timeout status with
1015
1016         chute = &ctx->chutes[call_id];
1017         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
1018                 rmr_free_msg( chute->mbuf );
1019                 chute->mbuf = NULL;
1020         }
1021         
1022         hdr = (uta_mhdr_t *) mbuf->header;
1023         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1024         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1025         d1 = DATA1_ADDR( hdr );
1026         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1027         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1028
1029         if( max_wait >= 0 ) {
1030                 clock_gettime( CLOCK_REALTIME, &ts );   
1031
1032                 if( max_wait > 999 ) {
1033                         seconds = max_wait / 1000;
1034                         max_wait -= seconds * 1000;
1035                         ts.tv_sec += seconds;
1036                 }
1037                 if( max_wait > 0 ) {
1038                         nano_sec = max_wait * 1000000;
1039                         ts.tv_nsec += nano_sec;
1040                         if( ts.tv_nsec > 999999999 ) {
1041                                 ts.tv_nsec -= 999999999;
1042                                 ts.tv_sec++;
1043                         }
1044                 }
1045
1046                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1047         }
1048
1049         mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
1050         if( mbuf ) {
1051                 if( mbuf->state != RMR_OK ) {
1052                         mbuf->tp_state = errno;
1053                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1054                 }
1055         }
1056
1057         state = 0;
1058         errno = 0;
1059         while( chute->mbuf == NULL && ! errno ) {
1060                 if( seconds ) {
1061                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1062                 } else {
1063                         state = sem_wait( &chute->barrier );
1064                 }
1065
1066                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1067                         errno = 0;
1068                 }
1069
1070                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1071                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1072                                 rmr_free_msg( chute->mbuf );
1073                                 chute->mbuf = NULL;
1074                                 errno = 0;
1075                         }
1076                 }
1077         }
1078
1079         if( state < 0 ) {
1080                 return NULL;                                    // leave errno as set by sem wait call
1081         }
1082
1083         mbuf = chute->mbuf;
1084         mbuf->state = RMR_OK;
1085         chute->mbuf = NULL;
1086
1087         return mbuf;
1088 }
1089
1090 /*
1091         Given an existing message buffer, reallocate the payload portion to
1092         be at least new_len bytes.  The message header will remain such that
1093         the caller may use the rmr_rts_msg() function to return a payload
1094         to the sender. 
1095
1096         The mbuf passed in may or may not be reallocated and the caller must
1097         use the returned pointer and should NOT assume that it can use the 
1098         pointer passed in with the exceptions based on the clone flag.
1099
1100         If the clone flag is set, then a duplicated message, with larger payload
1101         size, is allocated and returned.  The old_msg pointer in this situation is
1102         still valid and must be explicitly freed by the application. If the clone 
1103         message is not set (0), then any memory management of the old message is
1104         handled by the function.
1105
1106         If the copy flag is set, the contents of the old message's payload is 
1107         copied to the reallocated payload.  If the flag is not set, then the 
1108         contents of the payload is undetermined.
1109 */
1110 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1111         if( old_msg == NULL ) {
1112                 return NULL;
1113         }
1114
1115         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1116 }
1117
1118 /*
1119         The following functions are "dummies" as NNG has no concept of supporting
1120         them, but are needed to resolve calls at link time.
1121 */
1122
1123 extern void rmr_set_fack( void* p ) {
1124         return;
1125 }