fix(send): Add second key lookup if sub-id is set
[ric-plt/lib/rmr.git] / src / nanomsg / src / rmr.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr.c
23         Abstract:       The bulk of the ric message routing library which is built upon
24                                 the older nanomsg messaging transport mehhanism.
25
26                                 To "hide" internal functions the choice was made to implement them
27                                 all as static functions. This means that we include nearly
28                                 all of our modules here as 90% of the library is not visible to
29                                 the outside world.
30
31         Author:         E. Scott Daniels
32         Date:           28 November 2018
33 */
34
35 #include <ctype.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <netdb.h>
39 #include <errno.h>
40 #include <string.h>
41 #include <errno.h>
42 #include <pthread.h>
43 #include <unistd.h>
44 #include <stdint.h>
45 #include <time.h>
46 #include <arpa/inet.h>
47
48 #include <nanomsg/nn.h>
49 #include <nanomsg/tcp.h>
50 #include <nanomsg/pair.h>
51 #include <nanomsg/pipeline.h>
52 #include <nanomsg/pubsub.h>
53
54 #include "rmr.h"                                // things the users see
55 #include "rmr_agnostic.h"               // headers agnostic to the underlying transport mechanism
56 #include "rmr_private.h"                // things that we need too
57 #include "rmr_symtab.h"
58
59 #include "ring_static.c"                // message ring support
60 #include "rt_generic_static.c"  // generic route table (not nng/nano specific)
61 #include "rtable_static.c"              // route table things   (nano specific)
62 #include "rtc_static.c"                 // common rt collector
63 #include "tools_static.c"
64 #include "sr_static.c"                  // send/receive static functions
65 #include "wormholes.c"                  // external wormhole api, and it's static functions (must be LAST)
66
67 // ------------------------------------------------------------------------------------------------------
68
69 /*
70         Clean up a context.
71 */
72 static void free_ctx( uta_ctx_t* ctx ) {
73         if( ctx ) {
74                 if( ctx->rtg_addr ) {
75                         free( ctx->rtg_addr );
76                 }
77         }
78 }
79
80 // --------------- public functions --------------------------------------------------------------------------
81
82 /*
83         Set the receive timeout to time (ms). A value of 0 is the same as a non-blocking
84         receive and -1 is block for ever.
85         Returns the nn value (0 on success <0 on error).
86 */
87 extern int rmr_set_rtimeout( void* vctx, int time ) {
88         uta_ctx_t* ctx;
89
90         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
91                 errno = EINVAL;
92                 return -1;
93         }
94
95         if( ctx->last_rto == time ) {
96                 return 0;
97         }
98
99         ctx->last_rto = time;
100
101         return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
102 }
103
104 /*
105         Deprecated -- use rmr_set_rtimeout()
106 */
107 extern int rmr_rcv_to( void* vctx, int time ) {
108         return rmr_rcv_to( vctx, time );
109 }
110
111 /*
112         Set the send timeout to time. If time >1000 we assume the time is milliseconds,
113         else we assume seconds. Setting -1 is always block.
114         Returns the nn value (0 on success <0 on error).
115 */
116 extern int rmr_set_stimeout( void* vctx, int time ) {
117         uta_ctx_t* ctx;
118
119         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
120                 errno = EINVAL;
121                 return -1;
122         }
123
124         if( time > 0 ) {
125                 if( time < 1000 ) {
126                         time = time * 1000;                     // assume seconds, nn wants ms
127                 }
128         }
129
130         return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
131 }
132
133 /*
134         Deprecated -- use rmr_set_stimeout()
135 */
136 extern int rmr_send_to( void* vctx, int time ) {
137         return rmr_send_to( vctx, time );
138 }
139
140 /*
141         Returns the size of the payload (bytes) that the msg buffer references.
142         Len in a message is the number of bytes which were received, or should
143         be transmitted, however, it is possible that the mbuf was allocated
144         with a larger payload space than the payload length indicates; this
145         function returns the absolute maximum space that the user has available
146         in the payload. On error (bad msg buffer) -1 is returned and errno should
147         indicate the rason.
148 */
149 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
150         if( msg == NULL || msg->header == NULL ) {
151                 errno = EINVAL;
152                 return -1;
153         }
154
155         errno = 0;
156         return msg->alloc_len - RMR_HDR_LEN( msg->header );                     // transport buffer less header and other data bits
157 }
158
159 /*
160         Allocates a send message as a zerocopy message allowing the underlying message protocol
161         to send the buffer without copy.
162 */
163 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
164         uta_ctx_t*      ctx;
165         rmr_mbuf_t*     m;
166
167         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
168                 return NULL;
169         }
170
171         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );
172         return  m;
173 }
174
175 /*
176         Allocates a send message as a zerocopy message allowing the underlying message protocol
177         to send the buffer without copy. In addition, a trace data field of tr_size will be
178         added and the supplied data coppied to the buffer before returning the message to
179         the caller.
180 */
181 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
182         uta_ctx_t*      ctx;
183         rmr_mbuf_t*     m;
184         int state;
185
186         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
187                 return NULL;
188         }
189
190         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
191         if( m != NULL ) {
192                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
193                 if( state != tr_size ) {
194                         m->state = RMR_ERR_INITFAILED;
195                 }
196         }
197
198         return  m;
199 }
200
201 /*
202         Need an external path to the realloc static function as it's called by an
203         outward facing mbuf api function.
204 */
205 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
206         return realloc_msg( msg, new_tr_size );
207 }
208
209 /*
210         Return the message to the available pool, or free it outright.
211 */
212 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
213         if( mbuf == NULL ) {
214                 return;
215         }
216
217         if( mbuf->header ) {
218                 if( mbuf->flags & MFL_ZEROCOPY ) {
219                         nn_freemsg( mbuf->header );                             // must let nano free it
220                 } else {
221                         free( mbuf->header );
222                 }
223         }
224
225         free( mbuf );
226 }
227
228 /*
229         Accept a message and send it to an endpoint based on message type.
230         Allocates a new message buffer for the next send. If a message type has
231         more than one group of endpoints defined, then the message will be sent
232         in round robin fashion to one endpoint in each group.
233
234         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
235                 it will return with an error and errno set to eagain. If the send is
236                 a limited fanout, then the returned status is the status of the last
237                 send attempt.
238 */
239 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
240         int nn_sock;                            // endpoint socket for send
241         uta_ctx_t*      ctx;
242         int     group;                                  // selected group to get socket for
243         int send_again;                         // true if the message must be sent again
244         rmr_mbuf_t*     clone_m;                // cloned message for an nth send
245         uint64_t key;                           // lookup key is now subid and mtype
246         int max_rt = 1000;
247         int     altk_ok = 0;                    // ok to retry with alt key when true
248
249         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
250                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
251                 if( msg != NULL ) {
252                         msg->state = RMR_ERR_BADARG;
253                         errno = EINVAL;                                                                                 // must ensure it's not eagain
254                 }
255                 return msg;
256         }
257
258         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
259         if( msg->header == NULL ) {
260                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
261                 msg->state = RMR_ERR_NOHDR;
262                 errno = EBADMSG;                                                                                // must ensure it's not eagain
263                 return msg;
264         }
265
266         send_again = 1;                                                                                 // force loop entry
267         group = 0;                                                                                              // always start with group 0
268
269         key = build_rt_key( msg->sub_id, msg->mtype );                  // what we need to find the route table entry
270         if( msg->sub_id != UNSET_SUBID ) {                                              // if sub id set, allow retry with just mtype if no endpoint when sub-id used
271                 altk_ok = 1;
272         }
273
274         while( send_again ) {
275                 max_rt = 1000;
276                 nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again );                // round robin select endpoint; again set if mult groups
277                 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
278                                 msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
279
280                 if( nn_sock < 0 ) {
281                         if( altk_ok ) {                                                                                 // ok to retry with alternate key
282                                 key = build_rt_key( UNSET_SUBID, msg->mtype );          // build key with just mtype and retry
283                                 send_again = 1;
284                                 altk_ok = 0;            
285                                 continue;
286                         }
287
288                         msg->state = RMR_ERR_NOENDPT;
289                         errno = ENXIO;                                                                                  // must ensure it's not eagain
290                         return msg;                                                                                             // caller can resend (maybe) or free
291                 }
292                 group++;
293
294                 if( send_again ) {
295                         clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
296                         if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len );
297                         msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
298                         msg = send_msg( ctx, msg, nn_sock );                                    // do the hard work, msg should be nil on success
299                         while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
300                                 msg = send_msg( ctx, msg, nn_sock );
301                                 max_rt--;
302                         }
303
304                         msg = clone_m;                                                                                  // clone will be the next to send
305                 } else {
306                         msg = send_msg( ctx, msg, nn_sock );                                    // send the last, and allocate a new buffer; drops the clone if it was
307                         while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
308                                 msg = send_msg( ctx, msg, nn_sock );
309                                 max_rt--;
310                         }
311                 }
312         }
313
314         return msg;                                                                     // last message caries the status of last/only send attempt
315 }
316
317 /*
318         Return to sender allows a message to be sent back to the endpoint where it originated.
319         The source information in the message is used to select the socket on which to write
320         the message rather than using the message type and round-robin selection. This
321         should return a message buffer with the state of the send operation set. On success
322         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
323         error it can be passed back to this function to retry the send if desired. On error,
324         errno will liklely have the failure reason set by the nanomsg send processing.
325         The following are possible values for the state in the message buffer:
326
327         Message states returned:
328                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
329                 RMR_ERR_NOHDR  - message did not have a header
330                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
331                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
332                 RMR_ERR_RETRY   - operation failed, but caller should retry
333
334         A nil message as the return value is rare, and generally indicates some kind of horrible
335         failure. The value of errno might give a clue as to what is wrong.
336
337         CAUTION:
338                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
339                 The caller must check for this and handle.
340 */
341 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
342         int nn_sock;                            // endpoint socket for send
343         uta_ctx_t*      ctx;
344         int state;
345         uta_mhdr_t*     hdr;
346         char*   hold_src;                       // we need the original source if send fails
347
348         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
349                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
350                 if( msg != NULL ) {
351                         msg->state = RMR_ERR_BADARG;
352                 }
353                 return msg;
354         }
355
356         errno = 0;                                                                                                              // at this point any bad state is in msg returned
357         if( msg->header == NULL ) {
358                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
359                 msg->state = RMR_ERR_NOHDR;
360                 return msg;
361         }
362
363         nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src );                  // socket of specific endpoint
364         if( nn_sock < 0 ) {
365                 msg->state = RMR_ERR_NOENDPT;
366                 return msg;                                                     // preallocated msg can be reused since not given back to nn
367         }
368
369         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
370         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                // must overlay the source to be ours
371         msg = send_msg( ctx, msg, nn_sock );
372         if( msg ) {
373                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );            // always return original source so rts can be called again
374                 msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
375         }
376
377         free( hold_src );
378         return msg;
379 }
380
381 /*
382         Call sends the message based on message routing using the message type, and waits for a
383         response message to arrive with the same transaction id that was in the outgoing message.
384         If, while wiating for the expected response,  messages are received which do not have the
385         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
386         order that they were received.
387
388         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
389         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
390         may be resent (likely the context pointer was nil).  If the message is sent, but no
391         response is received, a nil message is returned with errno set to indicate the likley
392         issue:
393                 ETIMEDOUT -- too many messages were queued before reciving the expected response
394                 ENOBUFS -- the queued message ring is full, messages were dropped
395                 EINVAL  -- A parameter was not valid
396                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
397                                         user should call this function with the message again.
398
399
400         QUESTION:  should user specify the number of messages to allow to queue?
401 */
402 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
403         uta_ctx_t*              ctx;
404         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
405
406         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
407                 if( msg != NULL ) {
408                         msg->state = RMR_ERR_BADARG;
409                 }
410                 return msg;
411         }
412
413         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
414         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
415         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
416         errno = 0;
417         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
418
419         msg = rmr_send_msg( ctx, msg );
420         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
421                 if( msg->state != RMR_ERR_RETRY ) {
422                         msg->state = RMR_ERR_CALLFAILED;                // don't stomp if send_msg set retry
423                 }
424                 return msg;
425         }
426
427         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
428 }
429
430 /*
431         The outward facing receive function. When invoked it will pop the oldest message
432         from the receive ring, if any are queued, and return it. If the ring is empty
433         then the receive function is invoked to wait for the next message to arrive (blocking).
434
435         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
436         nil, a new one will be allocated. However, the caller should NOT expect to get the same
437         struct back (if a queued message is returned the message struct will be different).
438 */
439 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
440         uta_ctx_t*      ctx;
441         rmr_mbuf_t*     qm;                             // message that was queued on the ring
442
443         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
444                 if( old_msg != NULL ) {
445                         old_msg->state = RMR_ERR_BADARG;
446                 }
447                 errno = EINVAL;
448                 return old_msg;
449         }
450         errno = 0;
451
452         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
453         if( qm != NULL ) {
454                 if( old_msg ) {
455                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
456                 }
457
458                 return qm;
459         }
460
461         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
462 }
463
464 /*
465         Receive with a timeout.  This is a convenience function when sitting on top of
466         nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg().
467 */
468 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
469         uta_ctx_t*      ctx;
470
471         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
472                 if( ctx->last_rto != ms_to ) {                                                  // avoid call overhead
473                         rmr_set_rtimeout( vctx, ms_to );
474                 }
475         }
476
477         return rmr_rcv_msg( vctx, old_msg );
478 }
479
480
481 /*
482         This blocks until the message with the 'expect' ID is received. Messages which are received
483         before the expected message are queued onto the message ring.  The function will return
484         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
485         expected message is received. If the queued message ring fills a nil pointer is returned
486         and errno is set to ENOBUFS.
487
488         Generally this will be invoked only by the call() function as it waits for a response, but
489         it is exposed to the user application as three is no reason not to.
490 */
491 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
492         uta_ctx_t*      ctx;
493         int     queued = 0;                             // number we pushed into the ring
494         int     exp_len = 0;                    // length of expected ID
495
496         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
497                 if( msg != NULL ) {
498                         msg->state = RMR_ERR_BADARG;
499                 }
500                 errno = EINVAL;
501                 return msg;
502         }
503
504         errno = 0;
505
506         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
507                 return rmr_rcv_msg( ctx, msg );
508         }
509
510         exp_len = strlen( expect );
511         if( exp_len > RMR_MAX_XID ) {
512                 exp_len = RMR_MAX_XID;
513         }
514         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
515
516         while( queued < allow2queue ) {
517                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
518                 if( msg->state == RMR_OK ) {
519                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
520                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
521                                 return msg;
522                         }
523
524                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
525                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
526                                 errno = ENOBUFS;
527                                 return NULL;
528                         }
529
530                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
531                         queued++;
532                         msg = NULL;
533                 }
534         }
535
536         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
537         errno = ETIMEDOUT;
538         return NULL;
539 }
540
541
542 /*
543         Initialise the message routing environment. Flags are one of the UTAFL_
544         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
545         (tcp) to be used, then :port is all that is needed.
546
547         At the moment it seems that TCP really is the only viable protocol, but
548         we'll allow flexibility.
549
550         The return value is a void pointer which must be passed to most uta functions. On
551         error, a nil pointer is returned and errno should be set.
552 */
553 static void* init( char* uproto_port, int max_msg_size, int flags ) {
554         uta_ctx_t*      ctx = NULL;
555         char    bind_info[NN_SOCKADDR_MAX];     // bind info
556         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
557         char*   port;
558         char*   proto_port;
559         char    wbuf[1024];                                     // work buffer
560         char*   tok;                                            // pointer at token in a buffer
561         int             state;
562         char*   interface = NULL;                       // interface to bind to pulled from RMR_BIND_IF if set
563
564         fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n",
565                         QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
566
567         errno = 0;
568         if( uproto_port == NULL ) {
569                 proto_port = strdup( "tcp:4567" );
570         } else {
571                 proto_port = strdup( uproto_port );             // so we can modify it
572         }
573
574         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
575                 errno = ENOMEM;
576                 return NULL;
577         }
578         memset( ctx, 0, sizeof( uta_ctx_t ) );
579
580
581         ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
582         ctx->last_rto = -2;                                                             // last receive timeout that was set; invalid value to force first to set
583
584         ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t );               // default max buffer size
585         if( max_msg_size > 0 ) {
586                 if( max_msg_size <= ctx->max_plen ) {                                           // user defined len can be smaller
587                         ctx->max_plen = max_msg_size;
588                 } else {
589                         fprintf( stderr, "[WARN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
590                 }
591         }
592
593         ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
594
595         uta_lookup_rtg( ctx );                                                  // attempt to fill in rtg info; rtc will handle missing values/errors
596
597         ctx->nn_sock = nn_socket( AF_SP, NN_PULL );             // our 'listen' socket should allow multiple senders to connect
598         if( ctx->nn_sock < 0 ) {
599                 fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
600                 free_ctx( ctx );
601                 return NULL;
602         }
603
604         if( (port = strchr( proto_port, ':' )) != NULL ) {
605                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
606                         port++;
607                 } else {
608                         *(port++) = 0;                  // term proto string and point at port string
609                         proto = proto_port;             // user supplied proto so point at it rather than default
610                 }
611         } else {
612                 port = proto_port;                      // assume something like "1234" was passed
613         }
614
615         if( (gethostname( wbuf, sizeof( wbuf ) )) < 0 ) {
616                 fprintf( stderr, "[CRIT] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
617                 return NULL;
618         }
619         if( (tok = strchr( wbuf, '.' )) != NULL ) {
620                 *tok = 0;                                                                       // we don't keep domain portion
621         }
622         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
623         if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
624                 fprintf( stderr, "[CRIT] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
625                 return NULL;
626         }
627
628         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
629                 interface = "0.0.0.0";
630         }
631         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
632         if( nn_bind( ctx->nn_sock, bind_info ) < 0) {                   // bind and automatically accept client sessions
633                 fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
634                 nn_close( ctx->nn_sock );
635                 free_ctx( ctx );
636                 return NULL;
637         }
638
639         if( ! (flags & FL_NOTHREAD) ) {                 // skip if internal context that does not need rout table thread
640                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {                // kick the rt collector thread
641                         fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
642                 }
643         }
644
645         free( proto_port );
646         return (void *) ctx;
647 }
648
649 /*
650         This sets the default trace length which will be added to any message buffers
651         allocated.  It can be set at any time, and if rmr_set_trace() is given a
652         trace len that is different than the default allcoated in a message, the message
653         will be resized.
654
655         Returns 0 on failure and 1 on success. If failure, then errno will be set.
656 */
657 extern int rmr_init_trace( void* vctx, int tr_len ) {
658         uta_ctx_t* ctx;
659
660         errno = 0;
661         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
662                 errno = EINVAL;
663                 return 0;
664         }
665
666         ctx->trace_data_len = tr_len;
667         return 1;
668 }
669
670 /*
671         Publicly facing initialisation function. Wrapper for the init() funcion above
672         as it needs to ensure internal flags are masked off before calling the
673         real workhorse.
674 */
675 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
676         return init( uproto_port, max_msg_size, flags & UFL_MASK  );
677 }
678
679 /*
680         Return true if routing table is initialised etc. and app can send/receive.
681 */
682 extern int rmr_ready( void* vctx ) {
683         uta_ctx_t *ctx;
684
685         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
686                 return FALSE;
687         }
688
689         if( ctx->rtable != NULL ) {
690                 return TRUE;
691         }
692
693         return FALSE;
694 }
695
696 /*
697         Provides a non-fatal (compile) interface for the nng only function.
698         Not supported on top of nano, so this always returns -1.
699 */
700 extern int rmr_get_rcvfd( void* vctx ) {
701         errno = ENOTSUP;
702         return -1;
703 }
704
705 /*
706         Compatability (mostly) with NNG.
707 */
708 extern void rmr_close( void* vctx ) {
709         uta_ctx_t *ctx;
710
711         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
712                 return;
713         }
714
715         nn_close( ctx->nn_sock );
716 }