feat(API): Add subscription id and source retrieval
[ric-plt/lib/rmr.git] / src / nanomsg / src / rmr.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr.c
23         Abstract:       The bulk of the ric message routing library which is built upon
24                                 the older nanomsg messaging transport mehhanism.
25
26                                 To "hide" internal functions the choice was made to implement them
27                                 all as static functions. This means that we include nearly 
28                                 all of our modules here as 90% of the library is not visible to
29                                 the outside world. 
30
31         Author:         E. Scott Daniels
32         Date:           28 November 2018
33 */
34
35 #include <ctype.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <netdb.h>
39 #include <errno.h>
40 #include <string.h>
41 #include <errno.h>
42 #include <pthread.h>
43 #include <unistd.h>
44 #include <stdint.h>
45 #include <time.h>
46 #include <arpa/inet.h>
47
48 #include <nanomsg/nn.h>
49 #include <nanomsg/tcp.h>
50 #include <nanomsg/pair.h>
51 #include <nanomsg/pipeline.h>
52 #include <nanomsg/pubsub.h>
53
54 #include "rmr.h"                                // things the users see
55 #include "rmr_agnostic.h"               // headers agnostic to the underlying transport mechanism
56 #include "rmr_private.h"                // things that we need too
57 #include "rmr_symtab.h"
58
59 #include "ring_static.c"                // message ring support
60 #include "rt_generic_static.c"  // generic route table (not nng/nano specific)
61 #include "rtable_static.c"              // route table things   (nano specific)
62 #include "rtc_static.c"                 // common rt collector
63 #include "tools_static.c"
64 #include "sr_static.c"                  // send/receive static functions
65 #include "wormholes.c"                  // external wormhole api, and it's static functions (must be LAST)
66
67 // ------------------------------------------------------------------------------------------------------
68
69 /*
70         Clean up a context.
71 */
72 static void free_ctx( uta_ctx_t* ctx ) {
73         if( ctx ) {
74                 if( ctx->rtg_addr ) {
75                         free( ctx->rtg_addr );
76                 }
77         }
78 }
79
80 // --------------- public functions --------------------------------------------------------------------------
81
82 /*
83         Set the receive timeout to time. If time >1000 we assume the time is milliseconds,
84         else we assume seconds. Setting -1 is always block.
85         Returns the nn value (0 on success <0 on error).
86 */
87 extern int rmr_set_rtimeout( void* vctx, int time ) {
88         uta_ctx_t* ctx;
89
90         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
91                 errno = EINVAL;
92                 return -1;
93         }
94
95         if( time > 0 ) {
96                 if( time < 1000 ) {     
97                         time = time * 1000;                     // assume seconds, nn wants ms
98                 }
99         } 
100
101         return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
102 }
103
104 /*
105         Deprecated -- use rmr_set_rtimeout()
106 */
107 extern int rmr_rcv_to( void* vctx, int time ) {
108         return rmr_rcv_to( vctx, time );
109 }
110
111
112 /*
113         Set the send timeout to time. If time >1000 we assume the time is milliseconds,
114         else we assume seconds. Setting -1 is always block.
115         Returns the nn value (0 on success <0 on error).
116 */
117 extern int rmr_set_stimeout( void* vctx, int time ) {
118         uta_ctx_t* ctx;
119
120         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
121                 errno = EINVAL;
122                 return -1;
123         }
124
125         if( time > 0 ) {
126                 if( time < 1000 ) {     
127                         time = time * 1000;                     // assume seconds, nn wants ms
128                 }
129         } 
130
131         return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
132 }
133
134 /*
135         Deprecated -- use rmr_set_stimeout()
136 */
137 extern int rmr_send_to( void* vctx, int time ) {
138         return rmr_send_to( vctx, time );
139 }
140
141 /*
142         Returns the size of the payload (bytes) that the msg buffer references.
143         Len in a message is the number of bytes which were received, or should
144         be transmitted, however, it is possible that the mbuf was allocated
145         with a larger payload space than the payload length indicates; this 
146         function returns the absolute maximum space that the user has available
147         in the payload. On error (bad msg buffer) -1 is returned and errno should
148         indicate the rason.
149 */
150 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
151         if( msg == NULL || msg->header == NULL ) {
152                 errno = EINVAL;
153                 return -1;
154         }
155
156         errno = 0;
157         return msg->alloc_len - RMR_HDR_LEN( msg->header );                     // transport buffer less header and other data bits
158 }
159
160 /*
161         Allocates a send message as a zerocopy message allowing the underlying message protocol
162         to send the buffer without copy.
163 */
164 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
165         uta_ctx_t*      ctx;
166         rmr_mbuf_t*     m;
167
168         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
169                 return NULL;
170         }
171
172         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );
173         return  m;
174 }
175
176 /*
177         Allocates a send message as a zerocopy message allowing the underlying message protocol
178         to send the buffer without copy. In addition, a trace data field of tr_size will be
179         added and the supplied data coppied to the buffer before returning the message to
180         the caller.
181 */
182 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
183         uta_ctx_t*      ctx;
184         rmr_mbuf_t*     m;
185         int state;
186
187         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
188                 return NULL;
189         }
190
191         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
192         if( m != NULL ) {
193                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
194                 if( state != tr_size ) {
195                         m->state = RMR_ERR_INITFAILED;
196                 }
197         }
198
199         return  m;
200 }
201
202 /*
203         Need an external path to the realloc static function as it's called by an
204         outward facing mbuf api function.
205 */
206 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
207         return realloc_msg( msg, new_tr_size );
208 }
209
210 /*
211         Return the message to the available pool, or free it outright.
212 */
213 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
214         if( mbuf == NULL ) {
215                 return;
216         }
217
218         if( mbuf->header ) {
219                 if( mbuf->flags & MFL_ZEROCOPY ) {
220                         nn_freemsg( mbuf->header );                             // must let nano free it
221                 } else {
222                         free( mbuf->header );
223                 }
224         }
225         
226         free( mbuf );
227 }
228
229 /*
230         Accept a message and send it to an endpoint based on message type.      
231         Allocates a new message buffer for the next send. If a message type has
232         more than one group of endpoints defined, then the message will be sent
233         in round robin fashion to one endpoint in each group. 
234
235         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
236                 it will return with an error and errno set to eagain. If the send is
237                 a limited fanout, then the returned status is the status of the last
238                 send attempt.
239 */
240 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
241         int nn_sock;                            // endpoint socket for send
242         uta_ctx_t*      ctx;
243         int     group;                                  // selected group to get socket for
244         int send_again;                         // true if the message must be sent again
245         rmr_mbuf_t*     clone_m;                // cloned message for an nth send
246
247         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
248                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
249                 if( msg != NULL ) {
250                         msg->state = RMR_ERR_BADARG;
251                         errno = EINVAL;                                                                                 // must ensure it's not eagain
252                 }                       
253                 return msg;
254         }
255
256         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
257         if( msg->header == NULL ) {
258                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
259                 msg->state = RMR_ERR_NOHDR;
260                 errno = EBADMSG;                                                                                // must ensure it's not eagain
261                 return msg;
262         }
263
264         send_again = 1;                                                                                 // force loop entry
265         group = 0;                                                                                              // always start with group 0
266
267         while( send_again ) {
268                 nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again );         // round robin select endpoint; again set if mult groups
269                 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n", 
270                                 msg->mtype, send_again, group, nn_sock, msg->len );
271                 group++;
272
273                 if( nn_sock < 0 ) {
274                         msg->state = RMR_ERR_NOENDPT;
275                         errno = ENXIO;                                                                                  // must ensure it's not eagain
276                         return msg;                                                                                             // caller can resend (maybe) or free
277                 }
278
279                 if( send_again ) {
280                         clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
281                         if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
282                         msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
283                         msg = send_msg( ctx, msg, nn_sock );                                    // do the hard work, msg should be nil on success
284                         /*
285                         if( msg ) {
286                                 // error do we need to count successes/errors, how to report some success, esp if last fails?
287                         } 
288                         */
289
290                         msg = clone_m;                                                                                  // clone will be the next to send
291                 } else {
292                         msg = send_msg( ctx, msg, nn_sock );                                    // send the last, and allocate a new buffer; drops the clone if it was
293                 }
294         }
295
296         return msg;                                                                     // last message caries the status of last/only send attempt
297 }
298
299 /*
300         Return to sender allows a message to be sent back to the endpoint where it originated. 
301         The source information in the message is used to select the socket on which to write
302         the message rather than using the message type and round-robin selection. This 
303         should return a message buffer with the state of the send operation set. On success
304         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
305         error it can be passed back to this function to retry the send if desired. On error,
306         errno will liklely have the failure reason set by the nanomsg send processing.
307         The following are possible values for the state in the message buffer:
308
309         Message states returned:
310                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
311                 RMR_ERR_NOHDR  - message did not have a header
312                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
313                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
314                 RMR_ERR_RETRY   - operation failed, but caller should retry
315
316         A nil message as the return value is rare, and generally indicates some kind of horrible
317         failure. The value of errno might give a clue as to what is wrong.
318
319         CAUTION:
320                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
321                 The caller must check for this and handle.
322 */
323 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
324         int nn_sock;                            // endpoint socket for send
325         uta_ctx_t*      ctx;
326         int state;
327         uta_mhdr_t*     hdr;
328         char*   hold_src;                       // we need the original source if send fails
329
330         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
331                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
332                 if( msg != NULL ) {
333                         msg->state = RMR_ERR_BADARG;
334                 }                       
335                 return msg;
336         }
337
338         errno = 0;                                                                                                              // at this point any bad state is in msg returned
339         if( msg->header == NULL ) {
340                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
341                 msg->state = RMR_ERR_NOHDR;
342                 return msg;
343         }
344
345         nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src );                  // socket of specific endpoint
346         if( nn_sock < 0 ) {
347                 msg->state = RMR_ERR_NOENDPT;
348                 return msg;                                                     // preallocated msg can be reused since not given back to nn
349         }
350
351         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
352         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                // must overlay the source to be ours
353         msg = send_msg( ctx, msg, nn_sock );
354         if( msg ) {
355                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );            // always return original source so rts can be called again
356                 msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
357         }
358
359         free( hold_src );
360         return msg;
361 }
362
363 /*
364         Call sends the message based on message routing using the message type, and waits for a
365         response message to arrive with the same transaction id that was in the outgoing message.
366         If, while wiating for the expected response,  messages are received which do not have the
367         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
368         order that they were received.
369
370         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
371         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
372         may be resent (likely the context pointer was nil).  If the message is sent, but no 
373         response is received, a nil message is returned with errno set to indicate the likley
374         issue:
375                 ETIMEDOUT -- too many messages were queued before reciving the expected response
376                 ENOBUFS -- the queued message ring is full, messages were dropped
377                 EINVAL  -- A parameter was not valid
378                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
379                                         user should call this function with the message again.
380
381
382         QUESTION:  should user specify the number of messages to allow to queue?
383 */
384 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
385         uta_ctx_t*              ctx;
386         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
387
388         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
389                 if( msg != NULL ) {
390                         msg->state = RMR_ERR_BADARG;
391                 }                       
392                 return msg;
393         }
394
395         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
396         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
397         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
398         errno = 0;
399         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
400
401         msg = rmr_send_msg( ctx, msg );
402         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
403                 if( msg->state != RMR_ERR_RETRY ) {
404                         msg->state = RMR_ERR_CALLFAILED;                // don't stomp if send_msg set retry
405                 }
406                 return msg;
407         }
408
409         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
410 }
411
412 /*
413         The outward facing receive function. When invoked it will pop the oldest message
414         from the receive ring, if any are queued, and return it. If the ring is empty
415         then the receive function is invoked to wait for the next message to arrive (blocking).
416
417         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
418         nil, a new one will be allocated. However, the caller should NOT expect to get the same
419         struct back (if a queued message is returned the message struct will be different).
420 */
421 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
422         uta_ctx_t*      ctx;
423         rmr_mbuf_t*     qm;                             // message that was queued on the ring
424
425         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
426                 if( old_msg != NULL ) {
427                         old_msg->state = RMR_ERR_BADARG;
428                 }                       
429                 errno = EINVAL;
430                 return old_msg;
431         }
432         errno = 0;
433
434         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
435         if( qm != NULL ) {
436                 if( old_msg ) {
437                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
438                 }
439
440                 return qm;
441         }
442
443         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
444 }
445
446 /*
447         Receive with a timeout.  This is a convenience function when sitting on top of 
448         nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg(). 
449 */
450 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
451         rmr_set_rtimeout( vctx, ms_to );
452         return rmr_rcv_msg( vctx, old_msg );
453 }
454
455
456 /*
457         This blocks until the message with the 'expect' ID is received. Messages which are received
458         before the expected message are queued onto the message ring.  The function will return 
459         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
460         expected message is received. If the queued message ring fills a nil pointer is returned
461         and errno is set to ENOBUFS.
462
463         Generally this will be invoked only by the call() function as it waits for a response, but 
464         it is exposed to the user application as three is no reason not to.
465 */
466 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
467         uta_ctx_t*      ctx;
468         int     queued = 0;                             // number we pushed into the ring
469         int     exp_len = 0;                    // length of expected ID
470         
471         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
472                 if( msg != NULL ) {
473                         msg->state = RMR_ERR_BADARG;
474                 }                       
475                 errno = EINVAL;
476                 return msg;
477         }
478
479         errno = 0;
480
481         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
482                 return rmr_rcv_msg( ctx, msg );
483         }
484
485         exp_len = strlen( expect );
486         if( exp_len > RMR_MAX_XID ) {
487                 exp_len = RMR_MAX_XID;
488         }
489         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
490
491         while( queued < allow2queue ) {
492                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
493                 if( msg->state == RMR_OK ) {
494                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
495                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
496                                 return msg;
497                         }
498
499                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
500                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
501                                 errno = ENOBUFS;
502                                 return NULL;
503                         }
504
505                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
506                         queued++;
507                         msg = NULL;
508                 }
509         }
510
511         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
512         errno = ETIMEDOUT;
513         return NULL;
514 }
515
516
517 /*
518         Initialise the message routing environment. Flags are one of the UTAFL_
519         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
520         (tcp) to be used, then :port is all that is needed.
521
522         At the moment it seems that TCP really is the only viable protocol, but
523         we'll allow flexibility.
524
525         The return value is a void pointer which must be passed to most uta functions. On
526         error, a nil pointer is returned and errno should be set.
527 */
528 static void* init( char* uproto_port, int max_msg_size, int flags ) {
529         uta_ctx_t*      ctx = NULL;
530         char    bind_info[NN_SOCKADDR_MAX];     // bind info
531         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
532         char*   port;
533         char*   proto_port;
534         char    wbuf[1024];                                     // work buffer 
535         char*   tok;                                            // pointer at token in a buffer
536         int             state;
537         char*   interface = NULL;                       // interface to bind to pulled from RMR_BIND_IF if set
538
539         fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n", 
540                         QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
541
542         errno = 0;
543         if( uproto_port == NULL ) {
544                 proto_port = strdup( "tcp:4567" );
545         } else {
546                 proto_port = strdup( uproto_port );             // so we can modify it
547         }
548
549         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
550                 errno = ENOMEM;
551                 return NULL;
552         }
553         memset( ctx, 0, sizeof( uta_ctx_t ) );
554
555
556         ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
557
558         ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t );               // default max buffer size
559         if( max_msg_size > 0 ) {
560                 if( max_msg_size <= ctx->max_plen ) {                                           // user defined len can be smaller
561                         ctx->max_plen = max_msg_size;
562                 } else {
563                         fprintf( stderr, "[WARN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
564                 }
565         }
566
567         ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
568
569         uta_lookup_rtg( ctx );                                                  // attempt to fill in rtg info; rtc will handle missing values/errors
570
571     ctx->nn_sock = nn_socket( AF_SP, NN_PULL );         // our 'listen' socket should allow multiple senders to connect
572         if( ctx->nn_sock < 0 ) {
573                 fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
574                 free_ctx( ctx );
575                 return NULL;
576         }
577
578         if( (port = strchr( proto_port, ':' )) != NULL ) {
579                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
580                         port++;
581                 } else {
582                         *(port++) = 0;                  // term proto string and point at port string
583                         proto = proto_port;             // user supplied proto so point at it rather than default
584                 }
585         } else {
586                 port = proto_port;                      // assume something like "1234" was passed
587         }
588
589         if( (gethostname( wbuf, sizeof( wbuf ) )) < 0 ) {
590                 fprintf( stderr, "[CRIT] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
591                 return NULL;
592         }
593         if( (tok = strchr( wbuf, '.' )) != NULL ) {
594                 *tok = 0;                                                                       // we don't keep domain portion
595         }
596         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
597         if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
598                 fprintf( stderr, "[CRIT] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
599                 return NULL;
600         }
601
602         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
603                 interface = "0.0.0.0";
604         }
605         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
606     if( nn_bind( ctx->nn_sock, bind_info ) < 0) {                       // bind and automatically accept client sessions
607                 fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
608                 nn_close( ctx->nn_sock );
609                 free_ctx( ctx );
610                 return NULL;
611         }
612
613         if( ! (flags & FL_NOTHREAD) ) {                 // skip if internal context that does not need rout table thread
614                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {                // kick the rt collector thread
615                         fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
616                 }
617         }
618
619         free( proto_port );
620         return (void *) ctx;
621 }
622
623
624 /*
625         Publicly facing initialisation function. Wrapper for the init() funcion above
626         as it needs to ensure internal flags are masked off before calling the 
627         real workhorse.
628 */
629 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
630         return init( uproto_port, max_msg_size, flags & UFL_MASK  );
631 }
632
633 /*
634         Return true if routing table is initialised etc. and app can send/receive.
635 */
636 extern int rmr_ready( void* vctx ) {
637         uta_ctx_t *ctx;
638
639         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
640                 return FALSE;
641         }
642
643         if( ctx->rtable != NULL ) {
644                 return TRUE;
645         }
646
647         return FALSE;
648 }
649
650 /*
651         Provides a non-fatal (compile) interface for the nng only function. 
652         Not supported on top of nano, so this always returns -1.
653 */
654 extern int rmr_get_rcvfd( void* vctx ) {
655         errno = ENOTSUP;
656         return -1;
657 }
658
659 /*
660         Compatability (mostly) with NNG.
661 */
662 extern void rmr_close( void* vctx ) {
663         uta_ctx_t *ctx;
664
665         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
666                 return;
667         }
668         
669         nn_close( ctx->nn_sock );
670 }