ba7c852f99461466602e003e830dd74baebca7e2
[ric-plt/lib/rmr.git] / src / nng / src / rmr_nng.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53
54 #include <nng/nng.h>
55 #include <nng/protocol/pubsub0/pub.h>
56 #include <nng/protocol/pubsub0/sub.h>
57 #include <nng/protocol/pipeline0/push.h>
58 #include <nng/protocol/pipeline0/pull.h>
59
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_nng_private.h"    // things that we need too
64 #include "rmr_symtab.h"
65
66 #include "ring_static.c"                        // message ring support
67 #include "rt_generic_static.c"          // route table things not transport specific
68 #include "rtable_nng_static.c"          // route table things -- transport specific
69 #include "rtc_static.c"                         // route table collector
70 #include "tools_static.c"
71 #include "sr_nng_static.c"                      // send/receive static functions
72 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
73
74
75 //------------------------------------------------------------------------------
76
77
78 /*
79         Clean up a context.
80 */
81 static void free_ctx( uta_ctx_t* ctx ) {
82         if( ctx ) {
83                 if( ctx->rtg_addr ) {
84                         free( ctx->rtg_addr );
85                 }
86         }
87 }
88
89 // --------------- public functions --------------------------------------------------------------------------
90
91 /*
92         Returns the size of the payload (bytes) that the msg buffer references.
93         Len in a message is the number of bytes which were received, or should
94         be transmitted, however, it is possible that the mbuf was allocated
95         with a larger payload space than the payload length indicates; this
96         function returns the absolute maximum space that the user has available
97         in the payload. On error (bad msg buffer) -1 is returned and errno should
98         indicate the rason.
99 */
100 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
101         if( msg == NULL || msg->header == NULL ) {
102                 errno = EINVAL;
103                 return -1;
104         }
105
106         errno = 0;
107         return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
108 }
109
110 /*
111         Allocates a send message as a zerocopy message allowing the underlying message protocol
112         to send the buffer without copy.
113 */
114 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
115         uta_ctx_t*      ctx;
116         rmr_mbuf_t*     m;
117
118         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
119                 return NULL;
120         }
121
122         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
123         return  m;
124 }
125
126
127 /*
128         Allocates a send message as a zerocopy message allowing the underlying message protocol
129         to send the buffer without copy. In addition, a trace data field of tr_size will be
130         added and the supplied data coppied to the buffer before returning the message to
131         the caller.
132 */
133 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
134         uta_ctx_t*      ctx;
135         rmr_mbuf_t*     m;
136         int state;
137
138         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
139                 return NULL;
140         }
141
142         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
143         if( m != NULL ) {
144                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
145                 if( state != tr_size ) {
146                         m->state = RMR_ERR_INITFAILED;
147                 }
148         }
149
150         return  m;
151 }
152
153 /*
154         This provides an external path to the realloc static function as it's called by an
155         outward facing mbuf api function. Used to reallocate a message with a different
156         trace data size.
157 */
158 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
159         return realloc_msg( msg, new_tr_size );
160 }
161
162
163 /*
164         Return the message to the available pool, or free it outright.
165 */
166 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
167         if( mbuf == NULL ) {
168                 return;
169         }
170
171         if( mbuf->header ) {
172                 if( mbuf->flags & MFL_ZEROCOPY ) {
173                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
174                         if( mbuf->tp_buf ) {
175                                 nng_msg_free(  mbuf->tp_buf );
176                         }
177                 }
178         }
179
180         free( mbuf );
181 }
182
183 /*
184         send message with maximum timeout.
185         Accept a message and send it to an endpoint based on message type.
186         If NNG reports that the send attempt timed out, or should be retried,
187         RMr will retry for approximately max_to microseconds; rounded to the next
188         higher value of 10.
189
190         Allocates a new message buffer for the next send. If a message type has
191         more than one group of endpoints defined, then the message will be sent
192         in round robin fashion to one endpoint in each group.
193
194         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
195                 it will return with an error and errno set to eagain. If the send is
196                 a limited fanout, then the returned status is the status of the last
197                 send attempt.
198
199 */
200 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
201         nng_socket nn_sock;                     // endpoint socket for send
202         uta_ctx_t*      ctx;
203         int     group;                                  // selected group to get socket for
204         int send_again;                         // true if the message must be sent again
205         rmr_mbuf_t*     clone_m;                // cloned message for an nth send
206         int sock_ok;                            // got a valid socket from round robin select
207         uint64_t key;                           // mtype or sub-id/mtype sym table key
208
209         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
210                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
211                 if( msg != NULL ) {
212                         msg->state = RMR_ERR_BADARG;
213                         errno = EINVAL;                                                                                 // must ensure it's not eagain
214                 }
215                 return msg;
216         }
217
218         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
219         if( msg->header == NULL ) {
220                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
221                 msg->state = RMR_ERR_NOHDR;
222                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
223                 return msg;
224         }
225
226         if( max_to < 0 ) {
227                 max_to = ctx->send_retries;             // convert to retries
228         }
229
230         send_again = 1;                                                                                 // force loop entry
231         group = 0;                                                                                              // always start with group 0
232
233         key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
234         while( send_again ) {
235                 sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
236                 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
237                                 msg->mtype, send_again, group, msg->len, sock_ok );
238                 group++;
239
240                 if( ! sock_ok ) {
241                         msg->state = RMR_ERR_NOENDPT;
242                         errno = ENXIO;                                                                                  // must ensure it's not eagain
243                         return msg;                                                                                             // caller can resend (maybe) or free
244                 }
245
246                 if( send_again ) {
247                         clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
248                         if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
249                         msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
250                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
251                         /*
252                         if( msg ) {
253                                 // error do we need to count successes/errors, how to report some success, esp if last fails?
254                         }
255                         */
256
257                         msg = clone_m;                                                                                  // clone will be the next to send
258                 } else {
259                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
260                 }
261         }
262
263         return msg;                                                                     // last message caries the status of last/only send attempt
264 }
265
266 /*
267         Send with default max timeout as is set in the context.
268         See rmr_mtosend_msg() for more details on the parameters.
269         See rmr_stimeout() for info on setting the default timeout.
270 */
271 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
272         return rmr_mtosend_msg( vctx, msg,  -1 );                       // retries <  uses default from ctx
273 }
274
275 /*
276         Return to sender allows a message to be sent back to the endpoint where it originated.
277         The source information in the message is used to select the socket on which to write
278         the message rather than using the message type and round-robin selection. This
279         should return a message buffer with the state of the send operation set. On success
280         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
281         error it can be passed back to this function to retry the send if desired. On error,
282         errno will liklely have the failure reason set by the nng send processing.
283         The following are possible values for the state in the message buffer:
284
285         Message states returned:
286                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
287                 RMR_ERR_NOHDR  - message did not have a header
288                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
289                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
290                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
291
292         A nil message as the return value is rare, and generally indicates some kind of horrible
293         failure. The value of errno might give a clue as to what is wrong.
294
295         CAUTION:
296                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
297                 The caller must check for this and handle.
298 */
299 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
300         nng_socket nn_sock;                     // endpoint socket for send
301         uta_ctx_t*      ctx;
302         int state;
303         uta_mhdr_t*     hdr;
304         char*   hold_src;                       // we need the original source if send fails
305         int             sock_ok;                        // true if we found a valid endpoint socket
306
307         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
308                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
309                 if( msg != NULL ) {
310                         msg->state = RMR_ERR_BADARG;
311                 }
312                 return msg;
313         }
314
315         errno = 0;                                                                                                              // at this point any bad state is in msg returned
316         if( msg->header == NULL ) {
317                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
318                 msg->state = RMR_ERR_NOHDR;
319                 return msg;
320         }
321
322         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // socket of specific endpoint
323         if( ! sock_ok ) {
324                 msg->state = RMR_ERR_NOENDPT;
325                 return msg;                                                     // preallocated msg can be reused since not given back to nn
326         }
327
328         msg->state = RMR_OK;                                    // ensure it is clear before send
329         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
330         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                        // must overlay the source to be ours
331         msg = send_msg( ctx, msg, nn_sock, -1 );
332         if( msg ) {
333                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );                    // always return original source so rts can be called again
334                 msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
335         }
336
337         free( hold_src );
338         return msg;
339 }
340
341 /*
342         Call sends the message based on message routing using the message type, and waits for a
343         response message to arrive with the same transaction id that was in the outgoing message.
344         If, while wiating for the expected response,  messages are received which do not have the
345         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
346         order that they were received.
347
348         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
349         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
350         may be resent (likely the context pointer was nil).  If the message is sent, but no
351         response is received, a nil message is returned with errno set to indicate the likley
352         issue:
353                 ETIMEDOUT -- too many messages were queued before reciving the expected response
354                 ENOBUFS -- the queued message ring is full, messages were dropped
355                 EINVAL  -- A parameter was not valid
356                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
357                                         user should call this function with the message again.
358
359
360         QUESTION:  should user specify the number of messages to allow to queue?
361 */
362 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
363         uta_ctx_t*              ctx;
364         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
365
366         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
367                 if( msg != NULL ) {
368                         msg->state = RMR_ERR_BADARG;
369                 }
370                 return msg;
371         }
372
373         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
374         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
375         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
376         errno = 0;
377         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
378
379         msg = rmr_send_msg( ctx, msg );
380         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
381                 if( msg->state != RMR_ERR_RETRY ) {
382                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
383                 }
384                 return msg;
385         }
386
387         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
388 }
389
390 /*
391         The outward facing receive function. When invoked it will pop the oldest message
392         from the receive ring, if any are queued, and return it. If the ring is empty
393         then the receive function is invoked to wait for the next message to arrive (blocking).
394
395         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
396         nil, a new one will be allocated. However, the caller should NOT expect to get the same
397         struct back (if a queued message is returned the message struct will be different).
398 */
399 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
400         uta_ctx_t*      ctx;
401         rmr_mbuf_t*     qm;                             // message that was queued on the ring
402
403         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
404                 if( old_msg != NULL ) {
405                         old_msg->state = RMR_ERR_BADARG;
406                 }
407                 errno = EINVAL;
408                 return old_msg;
409         }
410         errno = 0;
411
412         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
413         if( qm != NULL ) {
414                 if( old_msg ) {
415                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
416                 }
417
418                 return qm;
419         }
420
421         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
422 }
423
424 /*
425         This implements a receive with a timeout via epoll. Mostly this is for
426         wrappers as native C applications can use epoll directly and will not have
427         to depend on this.
428 */
429 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
430         struct epoll_stuff* eps;        // convience pointer
431         uta_ctx_t*      ctx;
432         rmr_mbuf_t*     qm;                             // message that was queued on the ring
433         int nready;
434         rmr_mbuf_t* msg;
435
436         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
437                 if( old_msg != NULL ) {
438                         old_msg->state = RMR_ERR_BADARG;
439                 }
440                 errno = EINVAL;
441                 return old_msg;
442         }
443
444         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
445         if( qm != NULL ) {
446                 if( old_msg ) {
447                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
448                 }
449
450                 return qm;
451         }
452
453         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
454                 eps = malloc( sizeof *eps );
455
456                 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
457                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
458                         free( eps );
459                         return NULL;
460                 }
461
462                 eps->nng_fd = rmr_get_rcvfd( ctx );
463                 eps->epe.events = EPOLLIN;
464                 eps->epe.data.fd = eps->nng_fd;
465
466                 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
467                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
468                         free( eps );
469                         return NULL;
470                 }
471
472                 ctx->eps = eps;
473         }
474
475         if( old_msg ) {
476                 msg = old_msg;
477         } else {
478                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
479         }
480
481         if( ms_to < 0 ) {
482                 ms_to = 0;
483         }
484
485         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
486         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
487                 msg->state = RMR_ERR_TIMEOUT;
488         } else {
489                 return rcv_msg( ctx, msg );                                                             // receive it and return it
490         }
491
492         return msg;                             // return empty message with state set
493 }
494
495 /*
496         This blocks until the message with the 'expect' ID is received. Messages which are received
497         before the expected message are queued onto the message ring.  The function will return
498         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
499         expected message is received. If the queued message ring fills a nil pointer is returned
500         and errno is set to ENOBUFS.
501
502         Generally this will be invoked only by the call() function as it waits for a response, but
503         it is exposed to the user application as three is no reason not to.
504 */
505 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
506         uta_ctx_t*      ctx;
507         int     queued = 0;                             // number we pushed into the ring
508         int     exp_len = 0;                    // length of expected ID
509
510         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
511                 if( msg != NULL ) {
512                         msg->state = RMR_ERR_BADARG;
513                 }
514                 errno = EINVAL;
515                 return msg;
516         }
517
518         errno = 0;
519
520         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
521                 return rmr_rcv_msg( ctx, msg );
522         }
523
524         exp_len = strlen( expect );
525         if( exp_len > RMR_MAX_XID ) {
526                 exp_len = RMR_MAX_XID;
527         }
528         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
529
530         while( queued < allow2queue ) {
531                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
532                 if( msg->state == RMR_OK ) {
533                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
534                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
535                                 return msg;
536                         }
537
538                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
539                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
540                                 errno = ENOBUFS;
541                                 return NULL;
542                         }
543
544                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
545                         queued++;
546                         msg = NULL;
547                 }
548         }
549
550         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
551         errno = ETIMEDOUT;
552         return NULL;
553 }
554
555 //  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
556 //                              until those details are worked out, these generate a warning.
557 /*
558         Set send timeout. The value time is assumed to be microseconds.  The timeout is the
559         rough maximum amount of time that RMr will block on a send attempt when the underlying
560         mechnism indicates eagain or etimeedout.  All other error conditions are reported
561         without this delay. Setting a timeout of 0 causes no retries to be attempted in
562         RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
563         but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
564         after every 10K send attempts until the time value is reached. Retries are abandoned
565         if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
566
567         The default, if this function is not used, is 1; meaning that RMr will retry, but will
568         not enter a sleep.  In all cases the caller should check the status in the message returned
569         after a send call.
570
571         Returns -1 if the context was invalid; RMR_OK otherwise.
572 */
573 extern int rmr_set_stimeout( void* vctx, int time ) {
574         uta_ctx_t*      ctx;
575
576         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
577                 return -1;
578         }
579
580         if( time < 0 ) {
581                 time = 0;
582         }
583
584         ctx->send_retries = time;
585         return RMR_OK;
586 }
587
588 /*
589         Set receive timeout -- not supported in nng implementation
590 */
591 extern int rmr_set_rtimeout( void* vctx, int time ) {
592         fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
593         return 0;
594 }
595
596
597 /*
598         This is the actual init workhorse. The user visible function meerly ensures that the
599         calling programme does NOT set any internal flags that are supported, and then
600         invokes this.  Internal functions (the route table collector) which need additional
601         open ports without starting additional route table collectors, will invoke this
602         directly with the proper flag.
603 */
604 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
605         static  int announced = 0;
606         uta_ctx_t*      ctx = NULL;
607         char    bind_info[NNG_MAXADDRLEN];      // bind info
608         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
609         char*   port;
610         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
611         char*   proto_port;
612         char    wbuf[1024];                                     // work buffer
613         char*   tok;                                            // pointer at token in a buffer
614         int             state;
615
616         if( ! announced ) {
617                 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
618                         RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
619                 announced = 1;
620         }
621
622         errno = 0;
623         if( uproto_port == NULL ) {
624                 proto_port = strdup( DEF_COMM_PORT );
625         } else {
626                 proto_port = strdup( uproto_port );             // so we can modify it
627         }
628
629         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
630                 errno = ENOMEM;
631                 return NULL;
632         }
633         memset( ctx, 0, sizeof( uta_ctx_t ) );
634
635         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
636         ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
637
638         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
639         if( max_msg_size > 0 ) {
640                 ctx->max_plen = max_msg_size;
641         }
642
643         // we're using a listener to get rtg updates, so we do NOT need this.
644         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
645
646         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
647                 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
648                 free_ctx( ctx );
649                 return NULL;
650         }
651
652         if( (port = strchr( proto_port, ':' )) != NULL ) {
653                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
654                         port++;
655                 } else {
656                         *(port++) = 0;                  // term proto string and point at port string
657                         proto = proto_port;             // user supplied proto so point at it rather than default
658                 }
659         } else {
660                 port = proto_port;                      // assume something like "1234" was passed
661         }
662
663         if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
664                 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
665                 return NULL;
666         }
667         if( (tok = strchr( wbuf, '.' )) != NULL ) {
668                 *tok = 0;                                                                       // we don't keep domain portion
669         }
670         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
671         if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
672                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
673                 return NULL;
674         }
675
676         ctx->ip_list = mk_ip_list( port );              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
677
678
679
680         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
681                 interface = "0.0.0.0";
682         }
683         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
684         //       rather than using this generic listen() call.
685         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
686         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
687                 fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
688                 nng_close( ctx->nn_sock );
689                 free_ctx( ctx );
690                 return NULL;
691         }
692
693         if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
694                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
695                         fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
696                 }
697         }
698
699         free( proto_port );
700         return (void *) ctx;
701 }
702
703 /*
704         Initialise the message routing environment. Flags are one of the UTAFL_
705         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
706         (tcp) to be used, then :port is all that is needed.
707
708         At the moment it seems that TCP really is the only viable protocol, but
709         we'll allow flexibility.
710
711         The return value is a void pointer which must be passed to most uta functions. On
712         error, a nil pointer is returned and errno should be set.
713
714         Flags:
715                 No user flags supported (needed) at the moment, but this provides for extension
716                 without drastically changing anything. The user should invoke with RMRFL_NONE to
717                 avoid any misbehavour as there are internal flags which are suported
718 */
719 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
720         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
721 }
722
723 /*
724         This sets the default trace length which will be added to any message buffers
725         allocated.  It can be set at any time, and if rmr_set_trace() is given a
726         trace len that is different than the default allcoated in a message, the message
727         will be resized.
728
729         Returns 0 on failure and 1 on success. If failure, then errno will be set.
730 */
731 extern int rmr_init_trace( void* vctx, int tr_len ) {
732         uta_ctx_t* ctx;
733
734         errno = 0;
735         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
736                 errno = EINVAL;
737                 return 0;
738         }
739
740         ctx->trace_data_len = tr_len;
741         return 1;
742 }
743
744 /*
745         Return true if routing table is initialised etc. and app can send/receive.
746 */
747 extern int rmr_ready( void* vctx ) {
748         uta_ctx_t *ctx;
749
750         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
751                 return FALSE;
752         }
753
754         if( ctx->rtable != NULL ) {
755                 return TRUE;
756         }
757
758         return FALSE;
759 }
760
761 /*
762         Returns a file descriptor which can be used with epoll() to signal a receive
763         pending. The file descriptor should NOT be read from directly, nor closed, as NNG
764         does not support this.
765 */
766 extern int rmr_get_rcvfd( void* vctx ) {
767         uta_ctx_t* ctx;
768         int fd;
769         int state;
770
771         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
772                 return -1;
773         }
774
775         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
776                 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
777                 return -1;
778         }
779
780         return fd;
781 }
782
783
784 /*
785         Clean up things.
786
787         There isn't an nng_flush() per se, but we can pause, generate
788         a context switch, which should allow the last sent buffer to
789         flow. There isn't exactly an nng_term/close either, so there
790         isn't much we can do.
791 */
792 extern void rmr_close( void* vctx ) {
793         uta_ctx_t *ctx;
794
795         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
796                 return;
797         }
798
799         ctx->shutdown = 1;
800         nng_close( ctx->nn_sock );
801 }
802
803
804