9a9a0430c7061d34b63f4625e7e9c8b796812b2f
[ric-plt/lib/rmr.git] / src / nng / src / rmr_nng.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr 
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library, 
28                                 RMr is built with a single compile so as to "hide" the 
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.  
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53
54 #include <nng/nng.h>
55 #include <nng/protocol/pubsub0/pub.h>
56 #include <nng/protocol/pubsub0/sub.h>
57 #include <nng/protocol/pipeline0/push.h>
58 #include <nng/protocol/pipeline0/pull.h>
59
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_nng_private.h"    // things that we need too
64 #include "rmr_symtab.h"
65
66 #include "ring_static.c"                        // message ring support
67 #include "rt_generic_static.c"          // route table things not transport specific
68 #include "rtable_nng_static.c"          // route table things -- transport specific
69 #include "rtc_static.c"                         // route table collector
70 #include "tools_static.c"
71 #include "sr_nng_static.c"                      // send/receive static functions
72 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
73
74
75 //------------------------------------------------------------------------------
76
77
78 /*
79         Clean up a context.
80 */
81 static void free_ctx( uta_ctx_t* ctx ) {
82         if( ctx ) {
83                 if( ctx->rtg_addr ) {
84                         free( ctx->rtg_addr );
85                 }
86         }
87 }
88
89 // --------------- public functions --------------------------------------------------------------------------
90
91 /*
92         Returns the size of the payload (bytes) that the msg buffer references.
93         Len in a message is the number of bytes which were received, or should
94         be transmitted, however, it is possible that the mbuf was allocated
95         with a larger payload space than the payload length indicates; this 
96         function returns the absolute maximum space that the user has available
97         in the payload. On error (bad msg buffer) -1 is returned and errno should
98         indicate the rason.
99 */
100 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
101         if( msg == NULL || msg->header == NULL ) {
102                 errno = EINVAL;
103                 return -1;
104         }
105
106         errno = 0;
107         return msg->alloc_len - sizeof( uta_mhdr_t );                                           // figure size should we not have a msg buffer
108 }
109
110 /*
111         Allocates a send message as a zerocopy message allowing the underlying message protocol
112         to send the buffer without copy.
113 */
114 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
115         uta_ctx_t*      ctx;
116         rmr_mbuf_t*     m;
117
118         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
119                 return NULL;
120         }
121
122         m = alloc_zcmsg( ctx, NULL, size, 0 );
123         return  m;
124 }
125
126 /*
127         Return the message to the available pool, or free it outright.
128 */
129 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
130         if( mbuf == NULL ) {
131                 return;
132         }
133
134         if( mbuf->header ) {
135                 if( mbuf->flags & MFL_ZEROCOPY ) {
136                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
137                         if( mbuf->tp_buf ) {
138                                 nng_msg_free(  mbuf->tp_buf );
139                         }
140                 }
141         }
142         
143         free( mbuf );
144 }
145
146 /*
147         send message with maximum timeout.
148         Accept a message and send it to an endpoint based on message type.      
149         If NNG reports that the send attempt timed out, or should be retried,
150         RMr will retry for approximately max_to microseconds; rounded to the next
151         higher value of 10.
152
153         Allocates a new message buffer for the next send. If a message type has
154         more than one group of endpoints defined, then the message will be sent
155         in round robin fashion to one endpoint in each group. 
156
157         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
158                 it will return with an error and errno set to eagain. If the send is
159                 a limited fanout, then the returned status is the status of the last
160                 send attempt.
161         
162 */
163 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
164         nng_socket nn_sock;                     // endpoint socket for send
165         uta_ctx_t*      ctx;
166         int     group;                                  // selected group to get socket for
167         int send_again;                         // true if the message must be sent again
168         rmr_mbuf_t*     clone_m;                // cloned message for an nth send
169         int sock_ok;                            // got a valid socket from round robin select
170
171         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
172                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
173                 if( msg != NULL ) {
174                         msg->state = RMR_ERR_BADARG;
175                         errno = EINVAL;                                                                                 // must ensure it's not eagain
176                 }                       
177                 return msg;
178         }
179
180         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
181         if( msg->header == NULL ) {
182                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
183                 msg->state = RMR_ERR_NOHDR;
184                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
185                 return msg;
186         }
187
188         if( max_to < 0 ) {
189                 max_to = ctx->send_retries;             // convert to retries
190         }
191
192         send_again = 1;                                                                                 // force loop entry
193         group = 0;                                                                                              // always start with group 0
194
195         while( send_again ) {
196                 sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock );               // round robin sel epoint; again set if mult groups
197                 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n", 
198                                 msg->mtype, send_again, group, msg->len, sock_ok );
199                 group++;
200
201                 if( ! sock_ok ) {
202                         msg->state = RMR_ERR_NOENDPT;
203                         errno = ENXIO;                                                                                  // must ensure it's not eagain
204                         return msg;                                                                                             // caller can resend (maybe) or free
205                 }
206
207                 if( send_again ) {
208                         clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
209                         if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
210                         msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
211                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
212                         /*
213                         if( msg ) {
214                                 // error do we need to count successes/errors, how to report some success, esp if last fails?
215                         } 
216                         */
217
218                         msg = clone_m;                                                                                  // clone will be the next to send
219                 } else {
220                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
221                 }
222         }
223
224         return msg;                                                                     // last message caries the status of last/only send attempt
225 }
226
227 /*
228         Send with default max timeout as is set in the context. 
229         See rmr_mtosend_msg() for more details on the parameters.
230         See rmr_stimeout() for info on setting the default timeout.
231 */
232 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
233         return rmr_mtosend_msg( vctx, msg,  -1 );                       // retries <  uses default from ctx
234 }
235
236 /*
237         Return to sender allows a message to be sent back to the endpoint where it originated. 
238         The source information in the message is used to select the socket on which to write
239         the message rather than using the message type and round-robin selection. This 
240         should return a message buffer with the state of the send operation set. On success
241         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
242         error it can be passed back to this function to retry the send if desired. On error,
243         errno will liklely have the failure reason set by the nng send processing.
244         The following are possible values for the state in the message buffer:
245
246         Message states returned:
247                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
248                 RMR_ERR_NOHDR  - message did not have a header
249                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
250                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
251                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
252
253         A nil message as the return value is rare, and generally indicates some kind of horrible
254         failure. The value of errno might give a clue as to what is wrong.
255
256         CAUTION:
257                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
258                 The caller must check for this and handle.
259 */
260 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
261         nng_socket nn_sock;                     // endpoint socket for send
262         uta_ctx_t*      ctx;
263         int state;
264         uta_mhdr_t*     hdr;
265         char*   hold_src;                       // we need the original source if send fails
266         int             sock_ok;                        // true if we found a valid endpoint socket
267
268         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
269                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
270                 if( msg != NULL ) {
271                         msg->state = RMR_ERR_BADARG;
272                 }                       
273                 return msg;
274         }
275
276         errno = 0;                                                                                                              // at this point any bad state is in msg returned
277         if( msg->header == NULL ) {
278                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
279                 msg->state = RMR_ERR_NOHDR;
280                 return msg;
281         }
282
283         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // socket of specific endpoint
284         if( ! sock_ok ) {
285                 msg->state = RMR_ERR_NOENDPT;
286                 return msg;                                                     // preallocated msg can be reused since not given back to nn
287         }
288
289         msg->state = RMR_OK;                                    // ensure it is clear before send
290         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
291         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                        // must overlay the source to be ours
292         msg = send_msg( ctx, msg, nn_sock, -1 );
293         if( msg ) {
294                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );                    // always return original source so rts can be called again
295                 msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
296         }
297
298         free( hold_src );
299         return msg;
300 }
301
302 /*
303         Call sends the message based on message routing using the message type, and waits for a
304         response message to arrive with the same transaction id that was in the outgoing message.
305         If, while wiating for the expected response,  messages are received which do not have the
306         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
307         order that they were received.
308
309         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
310         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
311         may be resent (likely the context pointer was nil).  If the message is sent, but no 
312         response is received, a nil message is returned with errno set to indicate the likley
313         issue:
314                 ETIMEDOUT -- too many messages were queued before reciving the expected response
315                 ENOBUFS -- the queued message ring is full, messages were dropped
316                 EINVAL  -- A parameter was not valid
317                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
318                                         user should call this function with the message again.
319
320
321         QUESTION:  should user specify the number of messages to allow to queue?
322 */
323 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
324         uta_ctx_t*              ctx;
325         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
326
327         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
328                 if( msg != NULL ) {
329                         msg->state = RMR_ERR_BADARG;
330                 }                       
331                 return msg;
332         }
333
334         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
335         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
336         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
337         errno = 0;
338         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
339
340         msg = rmr_send_msg( ctx, msg );
341         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
342                 if( msg->state != RMR_ERR_RETRY ) {
343                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
344                 }
345                 return msg;
346         }
347
348         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
349 }
350
351 /*
352         The outward facing receive function. When invoked it will pop the oldest message
353         from the receive ring, if any are queued, and return it. If the ring is empty
354         then the receive function is invoked to wait for the next message to arrive (blocking).
355
356         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
357         nil, a new one will be allocated. However, the caller should NOT expect to get the same
358         struct back (if a queued message is returned the message struct will be different).
359 */
360 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
361         uta_ctx_t*      ctx;
362         rmr_mbuf_t*     qm;                             // message that was queued on the ring
363
364         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
365                 if( old_msg != NULL ) {
366                         old_msg->state = RMR_ERR_BADARG;
367                 }                       
368                 errno = EINVAL;
369                 return old_msg;
370         }
371         errno = 0;
372
373         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
374         if( qm != NULL ) {
375                 if( old_msg ) {
376                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
377                 }
378
379                 return qm;
380         }
381
382         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
383 }
384
385 /*
386         This implements a receive with a timeout via epoll. Mostly this is for 
387         wrappers as native C applications can use epoll directly and will not have 
388         to depend on this.
389 */
390 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
391         struct epoll_stuff* eps;        // convience pointer
392         uta_ctx_t*      ctx;
393         rmr_mbuf_t*     qm;                             // message that was queued on the ring
394         int nready;
395         rmr_mbuf_t* msg;
396
397         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
398                 if( old_msg != NULL ) {
399                         old_msg->state = RMR_ERR_BADARG;
400                 }                       
401                 errno = EINVAL;
402                 return old_msg;
403         }
404
405         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
406         if( qm != NULL ) {
407                 if( old_msg ) {
408                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
409                 }
410
411                 return qm;
412         }
413
414         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
415                 eps = malloc( sizeof *eps );            
416
417         if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
418                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
419                         free( eps );
420                         return NULL;
421         }
422
423                 eps->nng_fd = rmr_get_rcvfd( ctx );
424                 eps->epe.events = EPOLLIN;
425         eps->epe.data.fd = eps->nng_fd;
426
427         if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
428                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
429                         free( eps );
430                         return NULL;
431         }
432
433                 ctx->eps = eps;
434         }
435
436         if( old_msg ) {
437                 msg = old_msg;
438         } else {
439                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
440         }
441
442         if( ms_to < 0 ) {
443                 ms_to = 0;
444         }
445
446         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
447         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
448                 msg->state = RMR_ERR_TIMEOUT;
449         } else {
450                 return rcv_msg( ctx, msg );                                                             // receive it and return it
451         }
452
453         return msg;                             // return empty message with state set
454 }
455
456 /*
457         This blocks until the message with the 'expect' ID is received. Messages which are received
458         before the expected message are queued onto the message ring.  The function will return 
459         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
460         expected message is received. If the queued message ring fills a nil pointer is returned
461         and errno is set to ENOBUFS.
462
463         Generally this will be invoked only by the call() function as it waits for a response, but 
464         it is exposed to the user application as three is no reason not to.
465 */
466 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
467         uta_ctx_t*      ctx;
468         int     queued = 0;                             // number we pushed into the ring
469         int     exp_len = 0;                    // length of expected ID
470         
471         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
472                 if( msg != NULL ) {
473                         msg->state = RMR_ERR_BADARG;
474                 }                       
475                 errno = EINVAL;
476                 return msg;
477         }
478
479         errno = 0;
480
481         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
482                 return rmr_rcv_msg( ctx, msg );
483         }
484
485         exp_len = strlen( expect );
486         if( exp_len > RMR_MAX_XID ) {
487                 exp_len = RMR_MAX_XID;
488         }
489         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
490
491         while( queued < allow2queue ) {
492                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
493                 if( msg->state == RMR_OK ) {
494                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
495                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
496                                 return msg;
497                         }
498
499                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
500                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
501                                 errno = ENOBUFS;
502                                 return NULL;
503                         }
504
505                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
506                         queued++;
507                         msg = NULL;
508                 }
509         }
510
511         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
512         errno = ETIMEDOUT;
513         return NULL;
514 }
515
516 //  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
517 //                              until those details are worked out, these generate a warning.
518 /*
519         Set send timeout. The value time is assumed to be microseconds.  The timeout is the 
520         rough maximum amount of time that RMr will block on a send attempt when the underlying
521         mechnism indicates eagain or etimeedout.  All other error conditions are reported
522         without this delay. Setting a timeout of 0 causes no retries to be attempted in
523         RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
524         but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us) 
525         after every 10K send attempts until the time value is reached. Retries are abandoned
526         if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
527
528         The default, if this function is not used, is 1; meaning that RMr will retry, but will
529         not enter a sleep.  In all cases the caller should check the status in the message returned
530         after a send call.
531
532         Returns -1 if the context was invalid; RMR_OK otherwise.
533 */
534 extern int rmr_set_stimeout( void* vctx, int time ) {
535         uta_ctx_t*      ctx;
536
537         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
538                 return -1;
539         }
540
541         if( time < 0 ) {
542                 time = 0;
543         }
544
545         ctx->send_retries = time;
546         return RMR_OK;
547 }
548
549 /*
550         Set receive timeout -- not supported in nng implementation
551 */
552 extern int rmr_set_rtimeout( void* vctx, int time ) {
553         fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
554         return 0;
555 }
556
557
558 /*
559         This is the actual init workhorse. The user visible function meerly ensures that the
560         calling programme does NOT set any internal flags that are supported, and then 
561         invokes this.  Internal functions (the route table collector) which need additional
562         open ports without starting additional route table collectors, will invoke this 
563         directly with the proper flag.
564 */
565 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
566         static  int announced = 0;
567         uta_ctx_t*      ctx = NULL;
568         char    bind_info[NNG_MAXADDRLEN];      // bind info
569         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
570         char*   port;
571         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
572         char*   proto_port;
573         char    wbuf[1024];                                     // work buffer 
574         char*   tok;                                            // pointer at token in a buffer
575         int             state;
576
577         if( ! announced ) {
578                 fprintf( stderr, "[INFO] ric message routing library on NNG (%s %s.%s.%s built: %s)\n", 
579                         QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
580                 announced = 1;
581         }
582
583         errno = 0;
584         if( uproto_port == NULL ) {
585                 proto_port = strdup( DEF_COMM_PORT );
586         } else {
587                 proto_port = strdup( uproto_port );             // so we can modify it
588         }
589
590         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
591                 errno = ENOMEM;
592                 return NULL;
593         }
594         memset( ctx, 0, sizeof( uta_ctx_t ) );
595
596         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
597         ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
598
599         ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t );               // default max buffer size
600         if( max_msg_size > 0 ) {
601                 if( max_msg_size <= ctx->max_plen ) {                                           // user defined len can be smaller
602                         ctx->max_plen = max_msg_size;
603                 } else {
604                         fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
605                 }
606         }
607
608         ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
609
610         // we're using a listener to get rtg updates, so we do NOT need this.
611         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
612
613         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
614                 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
615                 free_ctx( ctx );
616                 return NULL;
617         }
618
619         if( (port = strchr( proto_port, ':' )) != NULL ) {
620                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
621                         port++;
622                 } else {
623                         *(port++) = 0;                  // term proto string and point at port string
624                         proto = proto_port;             // user supplied proto so point at it rather than default
625                 }
626         } else {
627                 port = proto_port;                      // assume something like "1234" was passed
628         }
629
630         if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
631                 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
632                 return NULL;
633         }
634         if( (tok = strchr( wbuf, '.' )) != NULL ) {
635                 *tok = 0;                                                                       // we don't keep domain portion
636         }
637         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
638         if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
639                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
640                 return NULL;
641         }
642
643         ctx->ip_list = mk_ip_list( port );              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
644
645
646
647         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
648                 interface = "0.0.0.0";
649         }
650         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
651         //       rather than using this generic listen() call.
652         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
653         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
654                 fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
655                 nng_close( ctx->nn_sock );
656                 free_ctx( ctx );
657                 return NULL;
658         }
659
660         if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
661                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
662                         fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
663                 }
664         }
665
666         free( proto_port );
667         return (void *) ctx;
668 }
669
670 /*
671         Initialise the message routing environment. Flags are one of the UTAFL_
672         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
673         (tcp) to be used, then :port is all that is needed.
674
675         At the moment it seems that TCP really is the only viable protocol, but
676         we'll allow flexibility.
677
678         The return value is a void pointer which must be passed to most uta functions. On
679         error, a nil pointer is returned and errno should be set.
680
681         Flags:
682                 No user flags supported (needed) at the moment, but this provides for extension
683                 without drastically changing anything. The user should invoke with RMRFL_NONE to 
684                 avoid any misbehavour as there are internal flags which are suported
685 */
686 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
687         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
688 }
689
690 /*
691         Return true if routing table is initialised etc. and app can send/receive.
692 */
693 extern int rmr_ready( void* vctx ) {
694         uta_ctx_t *ctx;
695
696         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
697                 return FALSE;
698         }
699
700         if( ctx->rtable != NULL ) {
701                 return TRUE;
702         }
703
704         return FALSE;
705 }
706
707 /*
708     Returns a file descriptor which can be used with epoll() to signal a receive
709     pending. The file descriptor should NOT be read from directly, nor closed, as NNG
710     does not support this.
711 */
712 extern int rmr_get_rcvfd( void* vctx ) {
713         uta_ctx_t* ctx;
714         int fd;
715         int state;
716
717         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
718                 return -1;
719         }
720
721         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
722                 fprintf( stderr, ">>> cannot get recv fd: %s\n", nng_strerror( state ) );
723                 return -1;
724         }
725
726         return fd;
727 }
728
729
730 /*
731         Clean up things.
732
733         There isn't an nng_flush() per se, but we can pause, generate
734         a context switch, which should allow the last sent buffer to 
735         flow. There isn't exactly an nng_term/close either, so there
736         isn't much we can do.
737 */
738 extern void rmr_close( void* vctx ) {
739         uta_ctx_t *ctx;
740
741         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
742                 return;
743         }
744
745         ctx->shutdown = 1;
746         nng_close( ctx->nn_sock );
747 }
748
749
750