45a0e2349e96098533dfe20ba322f0267188e1e4
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include <nng/nng.h>
57 #include <nng/protocol/pubsub0/pub.h>
58 #include <nng/protocol/pubsub0/sub.h>
59 #include <nng/protocol/pipeline0/push.h>
60 #include <nng/protocol/pipeline0/pull.h>
61
62
63 #include "rmr.h"                                // things the users see
64 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
65 #include "rmr_nng_private.h"    // things that we need too
66 #include "rmr_symtab.h"
67
68 #include "ring_static.c"                        // message ring support
69 #include "rt_generic_static.c"          // route table things not transport specific
70 #include "rtable_nng_static.c"          // route table things -- transport specific
71 #include "rtc_static.c"                         // route table collector
72 #include "tools_static.c"
73 #include "sr_nng_static.c"                      // send/receive static functions
74 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_nng_static.c"
77
78
79 //------------------------------------------------------------------------------
80
81
82 /*
83         Clean up a context.
84 */
85 static void free_ctx( uta_ctx_t* ctx ) {
86         if( ctx ) {
87                 if( ctx->rtg_addr ) {
88                         free( ctx->rtg_addr );
89                 }
90         }
91 }
92
93 // --------------- public functions --------------------------------------------------------------------------
94
95 /*
96         Returns the size of the payload (bytes) that the msg buffer references.
97         Len in a message is the number of bytes which were received, or should
98         be transmitted, however, it is possible that the mbuf was allocated
99         with a larger payload space than the payload length indicates; this
100         function returns the absolute maximum space that the user has available
101         in the payload. On error (bad msg buffer) -1 is returned and errno should
102         indicate the rason.
103 */
104 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
105         if( msg == NULL || msg->header == NULL ) {
106                 errno = EINVAL;
107                 return -1;
108         }
109
110         errno = 0;
111         return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
112 }
113
114 /*
115         Allocates a send message as a zerocopy message allowing the underlying message protocol
116         to send the buffer without copy.
117 */
118 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
119         uta_ctx_t*      ctx;
120         rmr_mbuf_t*     m;
121
122         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
123                 return NULL;
124         }
125
126         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
127         return  m;
128 }
129
130
131 /*
132         Allocates a send message as a zerocopy message allowing the underlying message protocol
133         to send the buffer without copy. In addition, a trace data field of tr_size will be
134         added and the supplied data coppied to the buffer before returning the message to
135         the caller.
136 */
137 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
138         uta_ctx_t*      ctx;
139         rmr_mbuf_t*     m;
140         int state;
141
142         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
143                 return NULL;
144         }
145
146         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
147         if( m != NULL ) {
148                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
149                 if( state != tr_size ) {
150                         m->state = RMR_ERR_INITFAILED;
151                 }
152         }
153
154         return  m;
155 }
156
157 /*
158         This provides an external path to the realloc static function as it's called by an
159         outward facing mbuf api function. Used to reallocate a message with a different
160         trace data size.
161 */
162 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
163         return realloc_msg( msg, new_tr_size );
164 }
165
166
167 /*
168         Return the message to the available pool, or free it outright.
169 */
170 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
171         if( mbuf == NULL ) {
172                 return;
173         }
174
175         if( mbuf->header ) {
176                 if( mbuf->flags & MFL_ZEROCOPY ) {
177                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
178                         if( mbuf->tp_buf ) {
179                                 nng_msg_free(  mbuf->tp_buf );
180                         }
181                 }
182         }
183
184         free( mbuf );
185 }
186
187 /*
188         This is a wrapper to the real timeout send. We must wrap it now to ensure that
189         the call flag and call-id are reset
190 */
191 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
192         char* d1;                                                                                                                       // point at the call-id in the header
193
194         if( msg != NULL ) {
195                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
196
197                 d1 = DATA1_ADDR( msg->header );
198                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
199         }       
200
201         return mtosend_msg( vctx, msg, max_to );
202 }
203
204 /*
205         Send with default max timeout as is set in the context.
206         See rmr_mtosend_msg() for more details on the parameters.
207         See rmr_stimeout() for info on setting the default timeout.
208 */
209 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
210         char* d1;                                                                                                               // point at the call-id in the header
211
212         if( msg != NULL ) {
213                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
214
215                 d1 = DATA1_ADDR( msg->header );
216                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
217         }       
218
219         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
220 }
221
222 /*
223         Return to sender allows a message to be sent back to the endpoint where it originated.
224         The source information in the message is used to select the socket on which to write
225         the message rather than using the message type and round-robin selection. This
226         should return a message buffer with the state of the send operation set. On success
227         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
228         error it can be passed back to this function to retry the send if desired. On error,
229         errno will liklely have the failure reason set by the nng send processing.
230         The following are possible values for the state in the message buffer:
231
232         Message states returned:
233                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
234                 RMR_ERR_NOHDR  - message did not have a header
235                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
236                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
237                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
238
239         A nil message as the return value is rare, and generally indicates some kind of horrible
240         failure. The value of errno might give a clue as to what is wrong.
241
242         CAUTION:
243                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
244                 The caller must check for this and handle.
245 */
246 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
247         nng_socket      nn_sock;                        // endpoint socket for send
248         uta_ctx_t*      ctx;
249         int                     state;
250         char*           hold_src;                       // we need the original source if send fails
251         char*           hold_ip;                        // also must hold original ip
252         int                     sock_ok = 0;            // true if we found a valid endpoint socket
253         endpoint_t*     ep;                                     // end point to track counts
254
255         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
256                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
257                 if( msg != NULL ) {
258                         msg->state = RMR_ERR_BADARG;
259                         msg->tp_state = errno;
260                 }
261                 return msg;
262         }
263
264         errno = 0;                                                                                                              // at this point any bad state is in msg returned
265         if( msg->header == NULL ) {
266                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
267                 msg->state = RMR_ERR_NOHDR;
268                 msg->tp_state = errno;
269                 return msg;
270         }
271
272         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
273
274         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );                   // src is always used first for rts
275         if( ! sock_ok ) {
276                 if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
277                         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
278                 }
279                 if( ! sock_ok ) {
280                         msg->state = RMR_ERR_NOENDPT;
281                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
282                 }
283         }
284
285         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
286         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
287         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
288         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
289         msg = send_msg( ctx, msg, nn_sock, -1 );
290         if( msg ) {
291                 if( ep != NULL ) {
292                         switch( msg->state ) {
293                                 case RMR_OK:
294                                         ep->scounts[EPSC_GOOD]++;
295                                         break;
296                         
297                                 case RMR_ERR_RETRY:
298                                         ep->scounts[EPSC_TRANS]++;
299                                         break;
300
301                                 default:
302                                         ep->scounts[EPSC_FAIL]++;
303                                         break;
304                         }
305                 }
306                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
307                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
308                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
309         }
310
311         free( hold_src );
312         free( hold_ip );
313         return msg;
314 }
315
316 /*
317         If multi-threading call is turned on, this invokes that mechanism with the special call
318         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
319         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
320         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
321         a flexible timeout.
322
323         On timeout this function will return a nil pointer. If the original message could not
324         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
325
326         Original behavour:
327         Call sends the message based on message routing using the message type, and waits for a
328         response message to arrive with the same transaction id that was in the outgoing message.
329         If, while wiating for the expected response,  messages are received which do not have the
330         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
331         order that they were received.
332
333         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
334         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
335         may be resent (likely the context pointer was nil).  If the message is sent, but no
336         response is received, a nil message is returned with errno set to indicate the likley
337         issue:
338                 ETIMEDOUT -- too many messages were queued before reciving the expected response
339                 ENOBUFS -- the queued message ring is full, messages were dropped
340                 EINVAL  -- A parameter was not valid
341                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
342                                         user should call this function with the message again.
343
344 */
345 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
346         uta_ctx_t*              ctx;
347         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
348
349         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
350                 if( msg != NULL ) {
351                         msg->state = RMR_ERR_BADARG;
352                 }
353                 return msg;
354         }
355
356         if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
357                 return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
358         }
359
360         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
361         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
362         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
363         errno = 0;
364         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
365
366         msg = rmr_send_msg( ctx, msg );
367         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
368                 if( msg->state != RMR_ERR_RETRY ) {
369                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
370                 }
371                 msg->tp_state = errno;
372                 return msg;
373         }
374
375         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
376 }
377
378 /*
379         The outward facing receive function. When invoked it will pop the oldest message
380         from the receive ring, if any are queued, and return it. If the ring is empty
381         then the receive function is invoked to wait for the next message to arrive (blocking).
382
383         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
384         nil, a new one will be allocated. However, the caller should NOT expect to get the same
385         struct back (if a queued message is returned the message struct will be different).
386 */
387 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
388         uta_ctx_t*      ctx;
389         rmr_mbuf_t*     qm;                             // message that was queued on the ring
390
391         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
392                 errno = EINVAL;
393                 if( old_msg != NULL ) {
394                         old_msg->state = RMR_ERR_BADARG;
395                         old_msg->tp_state = errno;
396                 }
397                 return old_msg;
398         }
399         errno = 0;
400
401         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
402                 return rmr_mt_rcv( ctx, old_msg, -1 );
403         }
404
405         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
406         if( qm != NULL ) {
407                 if( old_msg ) {
408                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
409                 }
410
411                 return qm;
412         }
413
414         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
415 }
416
417 /*
418         This implements a receive with a timeout via epoll. Mostly this is for
419         wrappers as native C applications can use epoll directly and will not have
420         to depend on this.
421 */
422 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
423         struct epoll_stuff* eps;        // convience pointer
424         uta_ctx_t*      ctx;
425         rmr_mbuf_t*     qm;                             // message that was queued on the ring
426         int nready;
427         rmr_mbuf_t* msg;
428
429         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
430                 errno = EINVAL;
431                 if( old_msg != NULL ) {
432                         old_msg->state = RMR_ERR_BADARG;
433                         old_msg->tp_state = errno;
434                 }
435                 return old_msg;
436         }
437
438         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
439                 return rmr_mt_rcv( ctx, old_msg, ms_to );
440         }
441
442         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
443         if( qm != NULL ) {
444                 if( old_msg ) {
445                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
446                 }
447
448                 return qm;
449         }
450
451         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
452                 eps = malloc( sizeof *eps );
453
454                 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
455                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
456                         free( eps );
457                         return NULL;
458                 }
459
460                 eps->nng_fd = rmr_get_rcvfd( ctx );
461                 eps->epe.events = EPOLLIN;
462                 eps->epe.data.fd = eps->nng_fd;
463
464                 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
465                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
466                         free( eps );
467                         return NULL;
468                 }
469
470                 ctx->eps = eps;
471         }
472
473         if( old_msg ) {
474                 msg = old_msg;
475         } else {
476                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
477         }
478
479         if( ms_to < 0 ) {
480                 ms_to = 0;
481         }
482
483         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
484         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
485                 msg->state = RMR_ERR_TIMEOUT;
486                 msg->tp_state = errno;
487         } else {
488                 return rcv_msg( ctx, msg );                                                             // receive it and return it
489         }
490
491         return msg;                             // return empty message with state set
492 }
493
494 /*
495         This blocks until the message with the 'expect' ID is received. Messages which are received
496         before the expected message are queued onto the message ring.  The function will return
497         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
498         expected message is received. If the queued message ring fills a nil pointer is returned
499         and errno is set to ENOBUFS.
500
501         Generally this will be invoked only by the call() function as it waits for a response, but
502         it is exposed to the user application as three is no reason not to.
503 */
504 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
505         uta_ctx_t*      ctx;
506         int     queued = 0;                             // number we pushed into the ring
507         int     exp_len = 0;                    // length of expected ID
508
509         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
510                 errno = EINVAL;
511                 if( msg != NULL ) {
512                         msg->state = RMR_ERR_BADARG;
513                         msg->tp_state = errno;
514                 }
515                 return msg;
516         }
517
518         errno = 0;
519
520         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
521                 return rmr_rcv_msg( ctx, msg );
522         }
523
524         exp_len = strlen( expect );
525         if( exp_len > RMR_MAX_XID ) {
526                 exp_len = RMR_MAX_XID;
527         }
528         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
529
530         while( queued < allow2queue ) {
531                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
532                 if( msg->state == RMR_OK ) {
533                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
534                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
535                                 return msg;
536                         }
537
538                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
539                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
540                                 errno = ENOBUFS;
541                                 return NULL;
542                         }
543
544                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
545                         queued++;
546                         msg = NULL;
547                 }
548         }
549
550         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
551         errno = ETIMEDOUT;
552         return NULL;
553 }
554
555 /*
556         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
557         _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
558         mechnism indicates eagain or etimeedout.  All other error conditions are reported
559         without this delay. Setting a timeout of 0 causes no retries to be attempted in
560         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
561         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
562         after every 1K send attempts until the "time" value is reached. Retries are abandoned
563         if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
564
565         The default, if this function is not used, is 1; meaning that RMr will retry, but will
566         not enter a sleep.  In all cases the caller should check the status in the message returned
567         after a send call.
568
569         Returns -1 if the context was invalid; RMR_OK otherwise.
570 */
571 extern int rmr_set_stimeout( void* vctx, int time ) {
572         uta_ctx_t*      ctx;
573
574         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
575                 return -1;
576         }
577
578         if( time < 0 ) {
579                 time = 0;
580         }
581
582         ctx->send_retries = time;
583         return RMR_OK;
584 }
585
586 /*
587         Set receive timeout -- not supported in nng implementation
588
589         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
590 */
591 extern int rmr_set_rtimeout( void* vctx, int time ) {
592         fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
593         return 0;
594 }
595
596
597 /*
598         This is the actual init workhorse. The user visible function meerly ensures that the
599         calling programme does NOT set any internal flags that are supported, and then
600         invokes this.  Internal functions (the route table collector) which need additional
601         open ports without starting additional route table collectors, will invoke this
602         directly with the proper flag.
603 */
604 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
605         static  int announced = 0;
606         uta_ctx_t*      ctx = NULL;
607         char    bind_info[NNG_MAXADDRLEN];      // bind info
608         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
609         char*   port;
610         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
611         char*   proto_port;
612         char    wbuf[1024];                                     // work buffer
613         char*   tok;                                            // pointer at token in a buffer
614         char*   tok2;
615         int             state;
616
617         if( ! announced ) {
618                 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
619                         RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
620                 announced = 1;
621         }
622
623         errno = 0;
624         if( uproto_port == NULL ) {
625                 proto_port = strdup( DEF_COMM_PORT );
626         } else {
627                 proto_port = strdup( uproto_port );             // so we can modify it
628         }
629
630         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
631                 errno = ENOMEM;
632                 return NULL;
633         }
634         memset( ctx, 0, sizeof( uta_ctx_t ) );
635
636         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
637         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
638
639         if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
640                 ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
641                 init_mtcall( ctx );                                                     // set up call chutes
642         } else {
643                 ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
644         }
645
646         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
647         if( max_msg_size > 0 ) {
648                 ctx->max_plen = max_msg_size;
649         }
650
651         // we're using a listener to get rtg updates, so we do NOT need this.
652         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
653
654         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
655                 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
656                 free_ctx( ctx );
657                 return NULL;
658         }
659
660         if( (port = strchr( proto_port, ':' )) != NULL ) {
661                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
662                         port++;
663                 } else {
664                         *(port++) = 0;                  // term proto string and point at port string
665                         proto = proto_port;             // user supplied proto so point at it rather than default
666                 }
667         } else {
668                 port = proto_port;                      // assume something like "1234" was passed
669         }
670
671         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
672                 tok = strdup( tok );                                    // something we can destroy
673                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
674                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
675                 } else {
676                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
677                 }
678                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
679                         *tok2 = 0;                                                      // make it so
680                 }
681
682                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
683                 free( tok );
684         } else {
685                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
686                         fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
687                         return NULL;
688                 }
689                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
690                         *tok = 0;                                                                       // we don't keep domain portion
691                 }
692         }
693
694         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
695         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
696                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
697                 return NULL;
698         }
699
700         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
701                 if( atoi( tok ) > 0 ) {
702                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
703                 }
704         }
705
706         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
707         if( flags & RMRFL_NAME_ONLY ) {
708                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
709         } else {
710                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
711                 if( ctx->my_ip == NULL ) {
712                         fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
713                         strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
714                 }
715         }
716         if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
717
718         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
719                 if( *tok == '1' ) {
720                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
721                 }
722         }
723
724
725         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
726                 interface = "0.0.0.0";
727         }
728         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
729         //       rather than using this generic listen() call.
730         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
731         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
732                 fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
733                 nng_close( ctx->nn_sock );
734                 free_ctx( ctx );
735                 return NULL;
736         }
737
738         if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
739                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
740                         fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
741                 }
742         }
743
744         if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
745                 ctx->flags |= CFL_MTC_ENABLED;
746                 if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
747                         fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
748                 }
749                 
750         }
751
752         free( proto_port );
753         return (void *) ctx;
754 }
755
756 /*
757         Initialise the message routing environment. Flags are one of the UTAFL_
758         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
759         (tcp) to be used, then :port is all that is needed.
760
761         At the moment it seems that TCP really is the only viable protocol, but
762         we'll allow flexibility.
763
764         The return value is a void pointer which must be passed to most uta functions. On
765         error, a nil pointer is returned and errno should be set.
766
767         Flags:
768                 No user flags supported (needed) at the moment, but this provides for extension
769                 without drastically changing anything. The user should invoke with RMRFL_NONE to
770                 avoid any misbehavour as there are internal flags which are suported
771 */
772 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
773         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
774 }
775
776 /*
777         This sets the default trace length which will be added to any message buffers
778         allocated.  It can be set at any time, and if rmr_set_trace() is given a
779         trace len that is different than the default allcoated in a message, the message
780         will be resized.
781
782         Returns 0 on failure and 1 on success. If failure, then errno will be set.
783 */
784 extern int rmr_init_trace( void* vctx, int tr_len ) {
785         uta_ctx_t* ctx;
786
787         errno = 0;
788         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
789                 errno = EINVAL;
790                 return 0;
791         }
792
793         ctx->trace_data_len = tr_len;
794         return 1;
795 }
796
797 /*
798         Return true if routing table is initialised etc. and app can send/receive.
799 */
800 extern int rmr_ready( void* vctx ) {
801         uta_ctx_t *ctx;
802
803         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
804                 return FALSE;
805         }
806
807         if( ctx->rtable != NULL ) {
808                 return TRUE;
809         }
810
811         return FALSE;
812 }
813
814 /*
815         Returns a file descriptor which can be used with epoll() to signal a receive
816         pending. The file descriptor should NOT be read from directly, nor closed, as NNG
817         does not support this.
818 */
819 extern int rmr_get_rcvfd( void* vctx ) {
820         uta_ctx_t* ctx;
821         int fd;
822         int state;
823
824         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
825                 return -1;
826         }
827
828         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
829                 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
830                 return -1;
831         }
832
833         return fd;
834 }
835
836
837 /*
838         Clean up things.
839
840         There isn't an nng_flush() per se, but we can pause, generate
841         a context switch, which should allow the last sent buffer to
842         flow. There isn't exactly an nng_term/close either, so there
843         isn't much we can do.
844 */
845 extern void rmr_close( void* vctx ) {
846         uta_ctx_t *ctx;
847
848         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
849                 return;
850         }
851
852         ctx->shutdown = 1;
853         nng_close( ctx->nn_sock );
854 }
855
856
857 // ----- multi-threaded call/receive support -------------------------------------------------
858
859 /*
860         Blocks on the receive ring chute semaphore and then reads from the ring
861         when it is tickled.  If max_wait is -1 then the function blocks until
862         a message is ready on the ring. Else max_wait is assumed to be the number
863         of millaseconds to wait before returning a timeout message.
864 */
865 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
866         uta_ctx_t*      ctx;
867         uta_mhdr_t*     hdr;                    // header in the transport buffer
868         chute_t*        chute;
869         struct timespec ts;                     // time info if we have a timeout
870         long    new_ms;                         // adjusted mu-sec
871         long    seconds = 0;            // max wait seconds
872         long    nano_sec;                       // max wait xlated to nano seconds
873         int             state;
874         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
875         
876         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
877                 errno = EINVAL;
878                 if( mbuf ) {
879                         mbuf->state = RMR_ERR_BADARG;
880                         mbuf->tp_state = errno;
881                 }
882                 return mbuf;
883         }
884
885         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
886                 errno = EINVAL;
887                 if( mbuf != NULL ) {
888                         mbuf->state = RMR_ERR_NOTSUPP;
889                         mbuf->tp_state = errno;
890                 }
891                 return mbuf;
892         }
893
894         ombuf = mbuf;
895         if( ombuf ) {
896                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
897                 ombuf->len = 0;
898         }
899
900         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
901
902         if( max_wait >= 0 ) {
903                 clock_gettime( CLOCK_REALTIME, &ts );   
904
905                 if( max_wait > 999 ) {
906                         seconds = max_wait / 1000;
907                         max_wait -= seconds * 1000;
908                         ts.tv_sec += seconds;
909                 }
910                 if( max_wait > 0 ) {
911                         nano_sec = max_wait * 1000000;
912                         ts.tv_nsec += nano_sec;
913                         if( ts.tv_nsec > 999999999 ) {
914                                 ts.tv_nsec -= 999999999;
915                                 ts.tv_sec++;
916                         }
917                 }
918
919                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
920         }
921
922         errno = EINTR;
923         state = -1;
924         while( state < 0 && errno == EINTR ) {
925                 if( seconds ) {
926                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
927                 } else {
928                         state = sem_wait( &chute->barrier );
929                 }
930         }
931
932         if( state < 0 ) {
933                 mbuf = ombuf;                           // return caller's buffer if they passed one in
934         } else {
935                 errno = 0;                                              // interrupted call state could be left; clear
936                 if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
937                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
938                         mbuf->state = RMR_OK;
939
940                         if( ombuf ) {
941                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
942                         }
943                 } else {
944                         errno = ETIMEDOUT;
945                         mbuf = ombuf;                           // no buffer, return user's if there
946                 }
947         }
948
949         if( mbuf ) {
950                 mbuf->tp_state = errno;
951         }
952         return mbuf;
953 }
954
955 /*
956         Accept a message buffer and caller ID, send the message and then wait
957         for the receiver to tickle the semaphore letting us know that a message
958         has been received. The call_id is a value between 2 and 255, inclusive; if
959         it's not in this range an error will be returned. Max wait is the amount
960         of time in millaseconds that the call should block for. If 0 is given
961         then no timeout is set.
962
963         If the mt_call feature has not been initialised, then the attempt to use this
964         funciton will fail with RMR_ERR_NOTSUPP
965
966         If no matching message is received before the max_wait period expires, a
967         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
968         occurs after the message has been sent, then a nil pointer is returned
969         with errno set to some other value.
970 */
971 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
972         rmr_mbuf_t* ombuf;                      // original mbuf passed in
973         uta_ctx_t*      ctx;
974         uta_mhdr_t*     hdr;                    // header in the transport buffer
975         chute_t*        chute;
976         unsigned char*  d1;                     // d1 data in header
977         struct timespec ts;                     // time info if we have a timeout
978         long    new_ms;                         // adjusted mu-sec
979         long    seconds = 0;            // max wait seconds
980         long    nano_sec;                       // max wait xlated to nano seconds
981         int             state;
982         
983         errno = EINVAL;
984         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
985                 if( mbuf ) {
986                         mbuf->tp_state = errno;
987                         mbuf->state = RMR_ERR_BADARG;
988                 }
989                 return mbuf;
990         }
991
992         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
993                 mbuf->state = RMR_ERR_NOTSUPP;
994                 mbuf->tp_state = errno;
995                 return mbuf;
996         }
997
998         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
999                 mbuf->state = RMR_ERR_BADARG;
1000                 mbuf->tp_state = errno;
1001                 return mbuf;
1002         }
1003
1004         ombuf = mbuf;                                                                                                   // save to return timeout status with
1005
1006         chute = &ctx->chutes[call_id];
1007         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
1008                 rmr_free_msg( chute->mbuf );
1009                 chute->mbuf = NULL;
1010         }
1011         
1012         hdr = (uta_mhdr_t *) mbuf->header;
1013         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1014         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1015         d1 = DATA1_ADDR( hdr );
1016         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1017         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1018
1019         if( max_wait >= 0 ) {
1020                 clock_gettime( CLOCK_REALTIME, &ts );   
1021
1022                 if( max_wait > 999 ) {
1023                         seconds = max_wait / 1000;
1024                         max_wait -= seconds * 1000;
1025                         ts.tv_sec += seconds;
1026                 }
1027                 if( max_wait > 0 ) {
1028                         nano_sec = max_wait * 1000000;
1029                         ts.tv_nsec += nano_sec;
1030                         if( ts.tv_nsec > 999999999 ) {
1031                                 ts.tv_nsec -= 999999999;
1032                                 ts.tv_sec++;
1033                         }
1034                 }
1035
1036                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1037         }
1038
1039         mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
1040         if( mbuf ) {
1041                 if( mbuf->state != RMR_OK ) {
1042                         mbuf->tp_state = errno;
1043                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1044                 }
1045         }
1046
1047         state = 0;
1048         errno = 0;
1049         while( chute->mbuf == NULL && ! errno ) {
1050                 if( seconds ) {
1051                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1052                 } else {
1053                         state = sem_wait( &chute->barrier );
1054                 }
1055
1056                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1057                         errno = 0;
1058                 }
1059
1060                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1061                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1062                                 rmr_free_msg( chute->mbuf );
1063                                 chute->mbuf = NULL;
1064                                 errno = 0;
1065                         }
1066                 }
1067         }
1068
1069         if( state < 0 ) {
1070                 return NULL;                                    // leave errno as set by sem wait call
1071         }
1072
1073         mbuf = chute->mbuf;
1074         mbuf->state = RMR_OK;
1075         chute->mbuf = NULL;
1076
1077         return mbuf;
1078 }