Initial commit of RMR Library
[ric-plt/lib/rmr.git] / src / nanomsg / src / rmr.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr.c
23         Abstract:       The bulk of the ric message routing library which is built upon
24                                 the older nanomsg messaging transport mehhanism.
25
26                                 To "hide" internal functions the choice was made to implement them
27                                 all as static functions. This means that we include nearly 
28                                 all of our modules here as 90% of the library is not visible to
29                                 the outside world. 
30
31         Author:         E. Scott Daniels
32         Date:           28 November 2018
33 */
34
35 #include <ctype.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <netdb.h>
39 #include <errno.h>
40 #include <string.h>
41 #include <errno.h>
42 #include <pthread.h>
43 #include <unistd.h>
44 #include <stdint.h>
45 #include <time.h>
46 #include <arpa/inet.h>
47
48 #include <nanomsg/nn.h>
49 #include <nanomsg/tcp.h>
50 #include <nanomsg/pair.h>
51 #include <nanomsg/pipeline.h>
52 #include <nanomsg/pubsub.h>
53
54 #include "rmr.h"                                // things the users see
55 #include "rmr_agnostic.h"               // headers agnostic to the underlying transport mechanism
56 #include "rmr_private.h"                // things that we need too
57 #include "rmr_symtab.h"
58
59 #include "ring_static.c"                // message ring support
60 #include "rt_generic_static.c"  // generic route table (not nng/nano specific)
61 #include "rtable_static.c"              // route table things   (nano specific)
62 #include "rtc_static.c"                 // common rt collector
63 #include "tools_static.c"
64 #include "sr_static.c"                  // send/receive static functions
65 #include "wormholes.c"                  // external wormhole api, and it's static functions (must be LAST)
66
67 // ------------------------------------------------------------------------------------------------------
68
69 /*
70         Clean up a context.
71 */
72 static void free_ctx( uta_ctx_t* ctx ) {
73         if( ctx ) {
74                 if( ctx->rtg_addr ) {
75                         free( ctx->rtg_addr );
76                 }
77         }
78 }
79
80 // --------------- public functions --------------------------------------------------------------------------
81
82 /*
83         Set the receive timeout to time. If time >1000 we assume the time is milliseconds,
84         else we assume seconds. Setting -1 is always block.
85         Returns the nn value (0 on success <0 on error).
86 */
87 extern int rmr_set_rtimeout( void* vctx, int time ) {
88         uta_ctx_t* ctx;
89
90         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
91                 errno = EINVAL;
92                 return -1;
93         }
94
95         if( time > 0 ) {
96                 if( time < 1000 ) {     
97                         time = time * 1000;                     // assume seconds, nn wants ms
98                 }
99         } 
100
101         return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
102 }
103
104 /*
105         Deprecated -- use rmr_set_rtimeout()
106 */
107 extern int rmr_rcv_to( void* vctx, int time ) {
108         return rmr_rcv_to( vctx, time );
109 }
110
111
112 /*
113         Set the send timeout to time. If time >1000 we assume the time is milliseconds,
114         else we assume seconds. Setting -1 is always block.
115         Returns the nn value (0 on success <0 on error).
116 */
117 extern int rmr_set_stimeout( void* vctx, int time ) {
118         uta_ctx_t* ctx;
119
120         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
121                 errno = EINVAL;
122                 return -1;
123         }
124
125         if( time > 0 ) {
126                 if( time < 1000 ) {     
127                         time = time * 1000;                     // assume seconds, nn wants ms
128                 }
129         } 
130
131         return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
132 }
133
134 /*
135         Deprecated -- use rmr_set_stimeout()
136 */
137 extern int rmr_send_to( void* vctx, int time ) {
138         return rmr_send_to( vctx, time );
139 }
140
141 /*
142         Returns the size of the payload (bytes) that the msg buffer references.
143         Len in a message is the number of bytes which were received, or should
144         be transmitted, however, it is possible that the mbuf was allocated
145         with a larger payload space than the payload length indicates; this 
146         function returns the absolute maximum space that the user has available
147         in the payload. On error (bad msg buffer) -1 is returned and errno should
148         indicate the rason.
149 */
150 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
151         if( msg == NULL || msg->header == NULL ) {
152                 errno = EINVAL;
153                 return -1;
154         }
155
156         errno = 0;
157         return msg->alloc_len - sizeof( uta_mhdr_t );                                           // figure size should we not have a msg buffer
158 }
159
160 /*
161         Allocates a send message as a zerocopy message allowing the underlying message protocol
162         to send the buffer without copy.
163 */
164 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
165         uta_ctx_t*      ctx;
166         rmr_mbuf_t*     m;
167
168         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
169                 return NULL;
170         }
171
172         m = alloc_zcmsg( ctx, NULL, size, 0 );
173         return  m;
174 }
175
176 /*
177         Return the message to the available pool, or free it outright.
178 */
179 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
180         if( mbuf == NULL ) {
181                 return;
182         }
183
184         if( mbuf->header ) {
185                 if( mbuf->flags & MFL_ZEROCOPY ) {
186                         nn_freemsg( mbuf->header );                             // must let nano free it
187                 } else {
188                         free( mbuf->header );
189                 }
190         }
191         
192         free( mbuf );
193 }
194
195 /*
196         Accept a message and send it to an endpoint based on message type.      
197         Allocates a new message buffer for the next send. If a message type has
198         more than one group of endpoints defined, then the message will be sent
199         in round robin fashion to one endpoint in each group. 
200
201         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
202                 it will return with an error and errno set to eagain. If the send is
203                 a limited fanout, then the returned status is the status of the last
204                 send attempt.
205 */
206 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
207         int nn_sock;                            // endpoint socket for send
208         uta_ctx_t*      ctx;
209         int     group;                                  // selected group to get socket for
210         int send_again;                         // true if the message must be sent again
211         rmr_mbuf_t*     clone_m;                // cloned message for an nth send
212
213         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
214                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
215                 if( msg != NULL ) {
216                         msg->state = RMR_ERR_BADARG;
217                         errno = EINVAL;                                                                                 // must ensure it's not eagain
218                 }                       
219                 return msg;
220         }
221
222         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
223         if( msg->header == NULL ) {
224                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
225                 msg->state = RMR_ERR_NOHDR;
226                 errno = EBADMSG;                                                                                // must ensure it's not eagain
227                 return msg;
228         }
229
230         send_again = 1;                                                                                 // force loop entry
231         group = 0;                                                                                              // always start with group 0
232
233         while( send_again ) {
234                 nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again );         // round robin select endpoint; again set if mult groups
235                 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n", 
236                                 msg->mtype, send_again, group, nn_sock, msg->len );
237                 group++;
238
239                 if( nn_sock < 0 ) {
240                         msg->state = RMR_ERR_NOENDPT;
241                         errno = ENXIO;                                                                                  // must ensure it's not eagain
242                         return msg;                                                                                             // caller can resend (maybe) or free
243                 }
244
245                 if( send_again ) {
246                         clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
247                         if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
248                         msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
249                         msg = send_msg( ctx, msg, nn_sock );                                    // do the hard work, msg should be nil on success
250                         /*
251                         if( msg ) {
252                                 // error do we need to count successes/errors, how to report some success, esp if last fails?
253                         } 
254                         */
255
256                         msg = clone_m;                                                                                  // clone will be the next to send
257                 } else {
258                         msg = send_msg( ctx, msg, nn_sock );                                    // send the last, and allocate a new buffer; drops the clone if it was
259                 }
260         }
261
262         return msg;                                                                     // last message caries the status of last/only send attempt
263 }
264
265 /*
266         Return to sender allows a message to be sent back to the endpoint where it originated. 
267         The source information in the message is used to select the socket on which to write
268         the message rather than using the message type and round-robin selection. This 
269         should return a message buffer with the state of the send operation set. On success
270         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
271         error it can be passed back to this function to retry the send if desired. On error,
272         errno will liklely have the failure reason set by the nanomsg send processing.
273         The following are possible values for the state in the message buffer:
274
275         Message states returned:
276                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
277                 RMR_ERR_NOHDR  - message did not have a header
278                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
279                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
280                 RMR_ERR_RETRY   - operation failed, but caller should retry
281
282         A nil message as the return value is rare, and generally indicates some kind of horrible
283         failure. The value of errno might give a clue as to what is wrong.
284
285         CAUTION:
286                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
287                 The caller must check for this and handle.
288 */
289 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
290         int nn_sock;                            // endpoint socket for send
291         uta_ctx_t*      ctx;
292         int state;
293         uta_mhdr_t*     hdr;
294         char*   hold_src;                       // we need the original source if send fails
295
296         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
297                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
298                 if( msg != NULL ) {
299                         msg->state = RMR_ERR_BADARG;
300                 }                       
301                 return msg;
302         }
303
304         errno = 0;                                                                                                              // at this point any bad state is in msg returned
305         if( msg->header == NULL ) {
306                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
307                 msg->state = RMR_ERR_NOHDR;
308                 return msg;
309         }
310
311         nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src );                  // socket of specific endpoint
312         if( nn_sock < 0 ) {
313                 msg->state = RMR_ERR_NOENDPT;
314                 return msg;                                                     // preallocated msg can be reused since not given back to nn
315         }
316
317         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
318         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                // must overlay the source to be ours
319         msg = send_msg( ctx, msg, nn_sock );
320         if( msg ) {
321                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );            // always return original source so rts can be called again
322                 msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
323         }
324
325         free( hold_src );
326         return msg;
327 }
328
329 /*
330         Call sends the message based on message routing using the message type, and waits for a
331         response message to arrive with the same transaction id that was in the outgoing message.
332         If, while wiating for the expected response,  messages are received which do not have the
333         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
334         order that they were received.
335
336         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
337         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
338         may be resent (likely the context pointer was nil).  If the message is sent, but no 
339         response is received, a nil message is returned with errno set to indicate the likley
340         issue:
341                 ETIMEDOUT -- too many messages were queued before reciving the expected response
342                 ENOBUFS -- the queued message ring is full, messages were dropped
343                 EINVAL  -- A parameter was not valid
344                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
345                                         user should call this function with the message again.
346
347
348         QUESTION:  should user specify the number of messages to allow to queue?
349 */
350 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
351         uta_ctx_t*              ctx;
352         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
353
354         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
355                 if( msg != NULL ) {
356                         msg->state = RMR_ERR_BADARG;
357                 }                       
358                 return msg;
359         }
360
361         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
362         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
363         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
364         errno = 0;
365         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
366
367         msg = rmr_send_msg( ctx, msg );
368         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
369                 if( msg->state != RMR_ERR_RETRY ) {
370                         msg->state = RMR_ERR_CALLFAILED;                // don't stomp if send_msg set retry
371                 }
372                 return msg;
373         }
374
375         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
376 }
377
378 /*
379         The outward facing receive function. When invoked it will pop the oldest message
380         from the receive ring, if any are queued, and return it. If the ring is empty
381         then the receive function is invoked to wait for the next message to arrive (blocking).
382
383         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
384         nil, a new one will be allocated. However, the caller should NOT expect to get the same
385         struct back (if a queued message is returned the message struct will be different).
386 */
387 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
388         uta_ctx_t*      ctx;
389         rmr_mbuf_t*     qm;                             // message that was queued on the ring
390
391         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
392                 if( old_msg != NULL ) {
393                         old_msg->state = RMR_ERR_BADARG;
394                 }                       
395                 errno = EINVAL;
396                 return old_msg;
397         }
398         errno = 0;
399
400         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
401         if( qm != NULL ) {
402                 if( old_msg ) {
403                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
404                 }
405
406                 return qm;
407         }
408
409         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
410 }
411
412 /*
413         Receive with a timeout.  This is a convenience function when sitting on top of 
414         nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg(). 
415 */
416 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
417         rmr_set_rtimeout( vctx, ms_to );
418         return rmr_rcv_msg( vctx, old_msg );
419 }
420
421
422 /*
423         This blocks until the message with the 'expect' ID is received. Messages which are received
424         before the expected message are queued onto the message ring.  The function will return 
425         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
426         expected message is received. If the queued message ring fills a nil pointer is returned
427         and errno is set to ENOBUFS.
428
429         Generally this will be invoked only by the call() function as it waits for a response, but 
430         it is exposed to the user application as three is no reason not to.
431 */
432 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
433         uta_ctx_t*      ctx;
434         int     queued = 0;                             // number we pushed into the ring
435         int     exp_len = 0;                    // length of expected ID
436         
437         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
438                 if( msg != NULL ) {
439                         msg->state = RMR_ERR_BADARG;
440                 }                       
441                 errno = EINVAL;
442                 return msg;
443         }
444
445         errno = 0;
446
447         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
448                 return rmr_rcv_msg( ctx, msg );
449         }
450
451         exp_len = strlen( expect );
452         if( exp_len > RMR_MAX_XID ) {
453                 exp_len = RMR_MAX_XID;
454         }
455         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
456
457         while( queued < allow2queue ) {
458                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
459                 if( msg->state == RMR_OK ) {
460                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
461                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
462                                 return msg;
463                         }
464
465                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
466                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
467                                 errno = ENOBUFS;
468                                 return NULL;
469                         }
470
471                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
472                         queued++;
473                         msg = NULL;
474                 }
475         }
476
477         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
478         errno = ETIMEDOUT;
479         return NULL;
480 }
481
482
483 /*
484         Initialise the message routing environment. Flags are one of the UTAFL_
485         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
486         (tcp) to be used, then :port is all that is needed.
487
488         At the moment it seems that TCP really is the only viable protocol, but
489         we'll allow flexibility.
490
491         The return value is a void pointer which must be passed to most uta functions. On
492         error, a nil pointer is returned and errno should be set.
493 */
494 static void* init( char* uproto_port, int max_msg_size, int flags ) {
495         uta_ctx_t*      ctx = NULL;
496         char    bind_info[NN_SOCKADDR_MAX];     // bind info
497         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
498         char*   port;
499         char*   proto_port;
500         char    wbuf[1024];                                     // work buffer 
501         char*   tok;                                            // pointer at token in a buffer
502         int             state;
503         char*   interface = NULL;                       // interface to bind to pulled from RMR_BIND_IF if set
504
505         fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n", 
506                         QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
507
508         errno = 0;
509         if( uproto_port == NULL ) {
510                 proto_port = strdup( "tcp:4567" );
511         } else {
512                 proto_port = strdup( uproto_port );             // so we can modify it
513         }
514
515         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
516                 errno = ENOMEM;
517                 return NULL;
518         }
519         memset( ctx, 0, sizeof( uta_ctx_t ) );
520
521
522         ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
523
524         ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t );               // default max buffer size
525         if( max_msg_size > 0 ) {
526                 if( max_msg_size <= ctx->max_plen ) {                                           // user defined len can be smaller
527                         ctx->max_plen = max_msg_size;
528                 } else {
529                         fprintf( stderr, "[WARN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
530                 }
531         }
532
533         ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
534
535         uta_lookup_rtg( ctx );                                                  // attempt to fill in rtg info; rtc will handle missing values/errors
536
537     ctx->nn_sock = nn_socket( AF_SP, NN_PULL );         // our 'listen' socket should allow multiple senders to connect
538         if( ctx->nn_sock < 0 ) {
539                 fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
540                 free_ctx( ctx );
541                 return NULL;
542         }
543
544         if( (port = strchr( proto_port, ':' )) != NULL ) {
545                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
546                         port++;
547                 } else {
548                         *(port++) = 0;                  // term proto string and point at port string
549                         proto = proto_port;             // user supplied proto so point at it rather than default
550                 }
551         } else {
552                 port = proto_port;                      // assume something like "1234" was passed
553         }
554
555         if( (gethostname( wbuf, sizeof( wbuf ) )) < 0 ) {
556                 fprintf( stderr, "[CRIT] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
557                 return NULL;
558         }
559         if( (tok = strchr( wbuf, '.' )) != NULL ) {
560                 *tok = 0;                                                                       // we don't keep domain portion
561         }
562         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
563         if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
564                 fprintf( stderr, "[CRIT] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
565                 return NULL;
566         }
567
568         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
569                 interface = "0.0.0.0";
570         }
571         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
572     if( nn_bind( ctx->nn_sock, bind_info ) < 0) {                       // bind and automatically accept client sessions
573                 fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
574                 nn_close( ctx->nn_sock );
575                 free_ctx( ctx );
576                 return NULL;
577         }
578
579         if( ! (flags & FL_NOTHREAD) ) {                 // skip if internal context that does not need rout table thread
580                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {                // kick the rt collector thread
581                         fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
582                 }
583         }
584
585         free( proto_port );
586         return (void *) ctx;
587 }
588
589
590 /*
591         Publicly facing initialisation function. Wrapper for the init() funcion above
592         as it needs to ensure internal flags are masked off before calling the 
593         real workhorse.
594 */
595 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
596         return init( uproto_port, max_msg_size, flags & UFL_MASK  );
597 }
598
599 /*
600         Return true if routing table is initialised etc. and app can send/receive.
601 */
602 extern int rmr_ready( void* vctx ) {
603         uta_ctx_t *ctx;
604
605         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
606                 return FALSE;
607         }
608
609         if( ctx->rtable != NULL ) {
610                 return TRUE;
611         }
612
613         return FALSE;
614 }
615
616 /*
617         Provides a non-fatal (compile) interface for the nng only function. 
618         Not supported on top of nano, so this always returns -1.
619 */
620 extern int rmr_get_rcvfd( void* vctx ) {
621         errno = ENOTSUP;
622         return -1;
623 }
624
625 /*
626         Compatability (mostly) with NNG.
627 */
628 extern void rmr_close( void* vctx ) {
629         uta_ctx_t *ctx;
630
631         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
632                 return;
633         }
634         
635         nn_close( ctx->nn_sock );
636 }