enhance(API): Add source IP support to msg header
[ric-plt/lib/rmr.git] / src / rmr / nanomsg / src / rmr.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr.c
23         Abstract:       The bulk of the ric message routing library which is built upon
24                                 the older nanomsg messaging transport mehhanism.
25
26                                 To "hide" internal functions the choice was made to implement them
27                                 all as static functions. This means that we include nearly
28                                 all of our modules here as 90% of the library is not visible to
29                                 the outside world.
30
31         Author:         E. Scott Daniels
32         Date:           28 November 2018
33 */
34
35 #include <ctype.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <netdb.h>
39 #include <errno.h>
40 #include <string.h>
41 #include <errno.h>
42 #include <pthread.h>
43 #include <unistd.h>
44 #include <stdint.h>
45 #include <time.h>
46 #include <arpa/inet.h>
47 #include <semaphore.h>
48
49 #include <nanomsg/nn.h>
50 #include <nanomsg/tcp.h>
51 #include <nanomsg/pair.h>
52 #include <nanomsg/pipeline.h>
53 #include <nanomsg/pubsub.h>
54
55 #include "rmr.h"                                // things the users see
56 #include "rmr_agnostic.h"               // headers agnostic to the underlying transport mechanism
57 #include "rmr_private.h"                // things that we need too
58 #include "rmr_symtab.h"
59
60 #include "ring_static.c"                // message ring support
61 #include "rt_generic_static.c"  // generic route table (not nng/nano specific)
62 #include "rtable_static.c"              // route table things   (nano specific)
63 #include "rtc_static.c"                 // common rt collector
64 #include "tools_static.c"
65 #include "sr_static.c"                  // send/receive static functions
66 #include "wormholes.c"                  // external wormhole api, and it's static functions (must be LAST)
67
68 // ------------------------------------------------------------------------------------------------------
69
70 /*
71         Clean up a context.
72 */
73 static void free_ctx( uta_ctx_t* ctx ) {
74         if( ctx ) {
75                 if( ctx->rtg_addr ) {
76                         free( ctx->rtg_addr );
77                 }
78         }
79 }
80
81 // --------------- public functions --------------------------------------------------------------------------
82
83 /*
84         Set the receive timeout to time (ms). A value of 0 is the same as a non-blocking
85         receive and -1 is block for ever.
86         Returns the nn value (0 on success <0 on error).
87 */
88 extern int rmr_set_rtimeout( void* vctx, int time ) {
89         uta_ctx_t* ctx;
90
91         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
92                 errno = EINVAL;
93                 return -1;
94         }
95
96         if( ctx->last_rto == time ) {
97                 return 0;
98         }
99
100         ctx->last_rto = time;
101
102         return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
103 }
104
105 /*
106         Deprecated -- use rmr_set_rtimeout()
107 */
108 extern int rmr_rcv_to( void* vctx, int time ) {
109         return rmr_rcv_to( vctx, time );
110 }
111
112 /*
113         Set the send timeout to time. If time >1000 we assume the time is milliseconds,
114         else we assume seconds. Setting -1 is always block.
115         Returns the nn value (0 on success <0 on error).
116 */
117 extern int rmr_set_stimeout( void* vctx, int time ) {
118         uta_ctx_t* ctx;
119
120         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
121                 errno = EINVAL;
122                 return -1;
123         }
124
125         if( time > 0 ) {
126                 if( time < 1000 ) {
127                         time = time * 1000;                     // assume seconds, nn wants ms
128                 }
129         }
130
131         return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
132 }
133
134 /*
135         Deprecated -- use rmr_set_stimeout()
136 */
137 extern int rmr_send_to( void* vctx, int time ) {
138         return rmr_send_to( vctx, time );
139 }
140
141 /*
142         Returns the size of the payload (bytes) that the msg buffer references.
143         Len in a message is the number of bytes which were received, or should
144         be transmitted, however, it is possible that the mbuf was allocated
145         with a larger payload space than the payload length indicates; this
146         function returns the absolute maximum space that the user has available
147         in the payload. On error (bad msg buffer) -1 is returned and errno should
148         indicate the rason.
149 */
150 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
151         if( msg == NULL || msg->header == NULL ) {
152                 errno = EINVAL;
153                 return -1;
154         }
155
156         errno = 0;
157         return msg->alloc_len - RMR_HDR_LEN( msg->header );                     // transport buffer less header and other data bits
158 }
159
160 /*
161         Allocates a send message as a zerocopy message allowing the underlying message protocol
162         to send the buffer without copy.
163 */
164 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
165         uta_ctx_t*      ctx;
166         rmr_mbuf_t*     m;
167
168         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
169                 return NULL;
170         }
171
172         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );
173         return  m;
174 }
175
176 /*
177         Allocates a send message as a zerocopy message allowing the underlying message protocol
178         to send the buffer without copy. In addition, a trace data field of tr_size will be
179         added and the supplied data coppied to the buffer before returning the message to
180         the caller.
181 */
182 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
183         uta_ctx_t*      ctx;
184         rmr_mbuf_t*     m;
185         int state;
186
187         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
188                 return NULL;
189         }
190
191         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
192         if( m != NULL ) {
193                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
194                 if( state != tr_size ) {
195                         m->state = RMR_ERR_INITFAILED;
196                 }
197         }
198
199         return  m;
200 }
201
202 /*
203         Need an external path to the realloc static function as it's called by an
204         outward facing mbuf api function.
205 */
206 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
207         return realloc_msg( msg, new_tr_size );
208 }
209
210 /*
211         Return the message to the available pool, or free it outright.
212 */
213 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
214         if( mbuf == NULL ) {
215                 return;
216         }
217
218         if( mbuf->header ) {
219                 if( mbuf->flags & MFL_ZEROCOPY ) {
220                         nn_freemsg( mbuf->header );                             // must let nano free it
221                 } else {
222                         free( mbuf->header );
223                 }
224         }
225
226         free( mbuf );
227 }
228
229 /*
230         Accept a message and send it to an endpoint based on message type.
231         Allocates a new message buffer for the next send. If a message type has
232         more than one group of endpoints defined, then the message will be sent
233         in round robin fashion to one endpoint in each group.
234
235         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
236                 it will return with an error and errno set to eagain. If the send is
237                 a limited fanout, then the returned status is the status of the last
238                 send attempt.
239 */
240 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
241         int nn_sock;                            // endpoint socket for send
242         uta_ctx_t*      ctx;
243         int     group;                                  // selected group to get socket for
244         int send_again;                         // true if the message must be sent again
245         rmr_mbuf_t*     clone_m;                // cloned message for an nth send
246         uint64_t key;                           // lookup key is now subid and mtype
247         int max_rt = 1000;
248         int     altk_ok = 0;                    // ok to retry with alt key when true
249
250         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
251                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
252                 if( msg != NULL ) {
253                         msg->state = RMR_ERR_BADARG;
254                         errno = EINVAL;                                                                                 // must ensure it's not eagain
255                 }
256                 return msg;
257         }
258
259         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
260         if( msg->header == NULL ) {
261                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
262                 msg->state = RMR_ERR_NOHDR;
263                 errno = EBADMSG;                                                                                // must ensure it's not eagain
264                 return msg;
265         }
266
267         send_again = 1;                                                                                 // force loop entry
268         group = 0;                                                                                              // always start with group 0
269
270         key = build_rt_key( msg->sub_id, msg->mtype );                  // what we need to find the route table entry
271         if( msg->sub_id != UNSET_SUBID ) {                                              // if sub id set, allow retry with just mtype if no endpoint when sub-id used
272                 altk_ok = 1;
273         }
274
275         while( send_again ) {
276                 max_rt = 1000;
277                 nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again );                // round robin select endpoint; again set if mult groups
278                 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
279                                 msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
280
281                 if( nn_sock < 0 ) {
282                         if( altk_ok ) {                                                                                 // ok to retry with alternate key
283                                 key = build_rt_key( UNSET_SUBID, msg->mtype );          // build key with just mtype and retry
284                                 send_again = 1;
285                                 altk_ok = 0;
286                                 continue;
287                         }
288
289                         msg->state = RMR_ERR_NOENDPT;
290                         errno = ENXIO;                                                                                  // must ensure it's not eagain
291                         return msg;                                                                                             // caller can resend (maybe) or free
292                 }
293                 group++;
294
295                 if( send_again ) {
296                         clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
297                         if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len );
298                         msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
299                         msg = send_msg( ctx, msg, nn_sock );                                    // do the hard work, msg should be nil on success
300                         while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
301                                 msg = send_msg( ctx, msg, nn_sock );
302                                 max_rt--;
303                         }
304
305                         msg = clone_m;                                                                                  // clone will be the next to send
306                 } else {
307                         msg = send_msg( ctx, msg, nn_sock );                                    // send the last, and allocate a new buffer; drops the clone if it was
308                         while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
309                                 msg = send_msg( ctx, msg, nn_sock );
310                                 max_rt--;
311                         }
312                 }
313         }
314
315         return msg;                                                                     // last message caries the status of last/only send attempt
316 }
317
318 /*
319         Return to sender allows a message to be sent back to the endpoint where it originated.
320         The source information in the message is used to select the socket on which to write
321         the message rather than using the message type and round-robin selection. This
322         should return a message buffer with the state of the send operation set. On success
323         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
324         error it can be passed back to this function to retry the send if desired. On error,
325         errno will liklely have the failure reason set by the nanomsg send processing.
326         The following are possible values for the state in the message buffer:
327
328         Message states returned:
329                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
330                 RMR_ERR_NOHDR  - message did not have a header
331                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
332                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
333                 RMR_ERR_RETRY   - operation failed, but caller should retry
334
335         A nil message as the return value is rare, and generally indicates some kind of horrible
336         failure. The value of errno might give a clue as to what is wrong.
337
338         CAUTION:
339                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
340                 The caller must check for this and handle.
341 */
342 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
343         int nn_sock = -1;                       // endpoint socket for send
344         uta_ctx_t*      ctx;
345         int state;
346         uta_mhdr_t*     hdr;
347         char*   hold_src;                       // we need the original source if send fails
348         char*   hold_ip;
349
350         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
351                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
352                 if( msg != NULL ) {
353                         msg->state = RMR_ERR_BADARG;
354                 }
355                 return msg;
356         }
357
358         errno = 0;                                                                                                              // at this point any bad state is in msg returned
359         if( msg->header == NULL ) {
360                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
361                 msg->state = RMR_ERR_NOHDR;
362                 return msg;
363         }
364
365         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // new version uses sender's ip address for rts
366                 nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip );                // socket of specific endpoint
367         }
368         if(  nn_sock < 0 ) {
369                 nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src );                  // socket of specific endpoint
370                 if(  nn_sock < 0 ) {
371                         msg->state = RMR_ERR_NOENDPT;
372                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
373                 }
374         }
375
376         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
377         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );
378         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                // must overlay the source to be ours
379         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
380
381         msg = send_msg( ctx, msg, nn_sock );
382         if( msg ) {
383                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );            // always return original source so rts can be called again
384                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
385                 msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
386         }
387
388         free( hold_src );
389         free( hold_ip );
390         return msg;
391 }
392
393 /*
394         Call sends the message based on message routing using the message type, and waits for a
395         response message to arrive with the same transaction id that was in the outgoing message.
396         If, while wiating for the expected response,  messages are received which do not have the
397         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
398         order that they were received.
399
400         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
401         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
402         may be resent (likely the context pointer was nil).  If the message is sent, but no
403         response is received, a nil message is returned with errno set to indicate the likley
404         issue:
405                 ETIMEDOUT -- too many messages were queued before reciving the expected response
406                 ENOBUFS -- the queued message ring is full, messages were dropped
407                 EINVAL  -- A parameter was not valid
408                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
409                                         user should call this function with the message again.
410
411
412         QUESTION:  should user specify the number of messages to allow to queue?
413 */
414 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
415         uta_ctx_t*              ctx;
416         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
417
418         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
419                 if( msg != NULL ) {
420                         msg->state = RMR_ERR_BADARG;
421                 }
422                 return msg;
423         }
424
425         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
426         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
427         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
428         errno = 0;
429         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
430
431         msg = rmr_send_msg( ctx, msg );
432         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
433                 if( msg->state != RMR_ERR_RETRY ) {
434                         msg->state = RMR_ERR_CALLFAILED;                // don't stomp if send_msg set retry
435                 }
436                 return msg;
437         }
438
439         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
440 }
441
442 /*
443         The outward facing receive function. When invoked it will pop the oldest message
444         from the receive ring, if any are queued, and return it. If the ring is empty
445         then the receive function is invoked to wait for the next message to arrive (blocking).
446
447         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
448         nil, a new one will be allocated. However, the caller should NOT expect to get the same
449         struct back (if a queued message is returned the message struct will be different).
450 */
451 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
452         uta_ctx_t*      ctx;
453         rmr_mbuf_t*     qm;                             // message that was queued on the ring
454
455         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
456                 if( old_msg != NULL ) {
457                         old_msg->state = RMR_ERR_BADARG;
458                 }
459                 errno = EINVAL;
460                 return old_msg;
461         }
462         errno = 0;
463
464         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
465         if( qm != NULL ) {
466                 if( old_msg ) {
467                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
468                 }
469
470                 return qm;
471         }
472
473         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
474 }
475
476 /*
477         Receive with a timeout.  This is a convenience function when sitting on top of
478         nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg().
479 */
480 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
481         uta_ctx_t*      ctx;
482
483         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
484                 if( ctx->last_rto != ms_to ) {                                                  // avoid call overhead
485                         rmr_set_rtimeout( vctx, ms_to );
486                 }
487         }
488
489         return rmr_rcv_msg( vctx, old_msg );
490 }
491
492
493 /*
494         This blocks until the message with the 'expect' ID is received. Messages which are received
495         before the expected message are queued onto the message ring.  The function will return
496         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
497         expected message is received. If the queued message ring fills a nil pointer is returned
498         and errno is set to ENOBUFS.
499
500         Generally this will be invoked only by the call() function as it waits for a response, but
501         it is exposed to the user application as three is no reason not to.
502 */
503 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
504         uta_ctx_t*      ctx;
505         int     queued = 0;                             // number we pushed into the ring
506         int     exp_len = 0;                    // length of expected ID
507
508         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
509                 if( msg != NULL ) {
510                         msg->state = RMR_ERR_BADARG;
511                 }
512                 errno = EINVAL;
513                 return msg;
514         }
515
516         errno = 0;
517
518         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
519                 return rmr_rcv_msg( ctx, msg );
520         }
521
522         exp_len = strlen( expect );
523         if( exp_len > RMR_MAX_XID ) {
524                 exp_len = RMR_MAX_XID;
525         }
526         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
527
528         while( queued < allow2queue ) {
529                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
530                 if( msg->state == RMR_OK ) {
531                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
532                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
533                                 return msg;
534                         }
535
536                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
537                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
538                                 errno = ENOBUFS;
539                                 return NULL;
540                         }
541
542                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
543                         queued++;
544                         msg = NULL;
545                 }
546         }
547
548         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
549         errno = ETIMEDOUT;
550         return NULL;
551 }
552
553
554 /*
555         Initialise the message routing environment. Flags are one of the UTAFL_
556         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
557         (tcp) to be used, then :port is all that is needed.
558
559         At the moment it seems that TCP really is the only viable protocol, but
560         we'll allow flexibility.
561
562         The return value is a void pointer which must be passed to most uta functions. On
563         error, a nil pointer is returned and errno should be set.
564 */
565 static void* init( char* uproto_port, int max_msg_size, int flags ) {
566         uta_ctx_t*      ctx = NULL;
567         char    bind_info[NN_SOCKADDR_MAX];     // bind info
568         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
569         char*   port;
570         char*   proto_port;
571         char    wbuf[1024];                                     // work buffer
572         char*   tok;                                            // pointer at token in a buffer
573         int             state;
574         char*   interface = NULL;                       // interface to bind to pulled from RMR_BIND_IF if set
575
576         fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n",
577                         QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
578
579         errno = 0;
580         if( uproto_port == NULL ) {
581                 proto_port = strdup( "tcp:4567" );
582         } else {
583                 proto_port = strdup( uproto_port );             // so we can modify it
584         }
585
586         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
587                 errno = ENOMEM;
588                 return NULL;
589         }
590         memset( ctx, 0, sizeof( uta_ctx_t ) );
591
592
593         ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
594         ctx->last_rto = -2;                                                             // last receive timeout that was set; invalid value to force first to set
595
596         ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t );               // default max buffer size
597         if( max_msg_size > 0 ) {
598                 if( max_msg_size <= ctx->max_plen ) {                                           // user defined len can be smaller
599                         ctx->max_plen = max_msg_size;
600                 } else {
601                         fprintf( stderr, "[WARN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
602                 }
603         }
604
605         ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
606
607         uta_lookup_rtg( ctx );                                                  // attempt to fill in rtg info; rtc will handle missing values/errors
608
609         ctx->nn_sock = nn_socket( AF_SP, NN_PULL );             // our 'listen' socket should allow multiple senders to connect
610         if( ctx->nn_sock < 0 ) {
611                 fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
612                 free_ctx( ctx );
613                 return NULL;
614         }
615
616         if( (port = strchr( proto_port, ':' )) != NULL ) {
617                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
618                         port++;
619                 } else {
620                         *(port++) = 0;                  // term proto string and point at port string
621                         proto = proto_port;             // user supplied proto so point at it rather than default
622                 }
623         } else {
624                 port = proto_port;                      // assume something like "1234" was passed
625         }
626
627         if( (gethostname( wbuf, sizeof( wbuf ) )) < 0 ) {
628                 fprintf( stderr, "[CRIT] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
629                 return NULL;
630         }
631         if( (tok = strchr( wbuf, '.' )) != NULL ) {
632                 *tok = 0;                                                                       // we don't keep domain portion
633         }
634         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
635         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
636                 fprintf( stderr, "[CRIT] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
637                 return NULL;
638         }
639
640         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
641                 interface = "0.0.0.0";
642         }
643         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
644         if( nn_bind( ctx->nn_sock, bind_info ) < 0) {                   // bind and automatically accept client sessions
645                 fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
646                 nn_close( ctx->nn_sock );
647                 free_ctx( ctx );
648                 return NULL;
649         }
650
651         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
652                 if( atoi( tok ) > 0 ) {
653                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
654                 }
655         }
656
657         if( flags & RMRFL_NAME_ONLY ) {
658                 ctx->my_ip = strdup( ctx->my_name );                            // user application or env var has specified that IP address is NOT sent out, use name
659                 if( DEBUG ) fprintf( stderr, "[DBUG] name only mode is set; not sending IP address as source\n" );
660         } else {
661                 ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
662                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
663                 if( ctx->my_ip == NULL ) {
664                         strcpy( ctx->my_ip, ctx->my_name );                     // revert to name if we cant suss out ip address
665                         fprintf( stderr, "[WARN] rmr_init: default ip address could not be sussed out, using name as source\n" );
666                 } else {
667                         if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
668                 }
669         }
670
671         if( ! (flags & FL_NOTHREAD) ) {                 // skip if internal context that does not need rout table thread
672                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {                // kick the rt collector thread
673                         fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
674                 }
675         }
676
677         free( proto_port );
678         return (void *) ctx;
679 }
680
681 /*
682         This sets the default trace length which will be added to any message buffers
683         allocated.  It can be set at any time, and if rmr_set_trace() is given a
684         trace len that is different than the default allcoated in a message, the message
685         will be resized.
686
687         Returns 0 on failure and 1 on success. If failure, then errno will be set.
688 */
689 extern int rmr_init_trace( void* vctx, int tr_len ) {
690         uta_ctx_t* ctx;
691
692         errno = 0;
693         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
694                 errno = EINVAL;
695                 return 0;
696         }
697
698         ctx->trace_data_len = tr_len;
699         return 1;
700 }
701
702 /*
703         Publicly facing initialisation function. Wrapper for the init() funcion above
704         as it needs to ensure internal flags are masked off before calling the
705         real workhorse.
706 */
707 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
708         return init( uproto_port, max_msg_size, flags & UFL_MASK  );
709 }
710
711 /*
712         Return true if routing table is initialised etc. and app can send/receive.
713 */
714 extern int rmr_ready( void* vctx ) {
715         uta_ctx_t *ctx;
716
717         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
718                 return FALSE;
719         }
720
721         if( ctx->rtable != NULL ) {
722                 return TRUE;
723         }
724
725         return FALSE;
726 }
727
728 /*
729         Provides a non-fatal (compile) interface for the nng only function.
730         Not supported on top of nano, so this always returns -1.
731 */
732 extern int rmr_get_rcvfd( void* vctx ) {
733         errno = ENOTSUP;
734         return -1;
735 }
736
737 /*
738         Compatability (mostly) with NNG.
739 */
740 extern void rmr_close( void* vctx ) {
741         uta_ctx_t *ctx;
742
743         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
744                 return;
745         }
746
747         nn_close( ctx->nn_sock );
748 }