70542f89710801ddd047a0356123449bd303dafc
[ric-plt/lib/rmr.git] / src / nng / src / rmr_nng.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53
54 #include <nng/nng.h>
55 #include <nng/protocol/pubsub0/pub.h>
56 #include <nng/protocol/pubsub0/sub.h>
57 #include <nng/protocol/pipeline0/push.h>
58 #include <nng/protocol/pipeline0/pull.h>
59
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_nng_private.h"    // things that we need too
64 #include "rmr_symtab.h"
65
66 #include "ring_static.c"                        // message ring support
67 #include "rt_generic_static.c"          // route table things not transport specific
68 #include "rtable_nng_static.c"          // route table things -- transport specific
69 #include "rtc_static.c"                         // route table collector
70 #include "tools_static.c"
71 #include "sr_nng_static.c"                      // send/receive static functions
72 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
73
74
75 //------------------------------------------------------------------------------
76
77
78 /*
79         Clean up a context.
80 */
81 static void free_ctx( uta_ctx_t* ctx ) {
82         if( ctx ) {
83                 if( ctx->rtg_addr ) {
84                         free( ctx->rtg_addr );
85                 }
86         }
87 }
88
89 // --------------- public functions --------------------------------------------------------------------------
90
91 /*
92         Returns the size of the payload (bytes) that the msg buffer references.
93         Len in a message is the number of bytes which were received, or should
94         be transmitted, however, it is possible that the mbuf was allocated
95         with a larger payload space than the payload length indicates; this
96         function returns the absolute maximum space that the user has available
97         in the payload. On error (bad msg buffer) -1 is returned and errno should
98         indicate the rason.
99 */
100 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
101         if( msg == NULL || msg->header == NULL ) {
102                 errno = EINVAL;
103                 return -1;
104         }
105
106         errno = 0;
107         return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
108 }
109
110 /*
111         Allocates a send message as a zerocopy message allowing the underlying message protocol
112         to send the buffer without copy.
113 */
114 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
115         uta_ctx_t*      ctx;
116         rmr_mbuf_t*     m;
117
118         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
119                 return NULL;
120         }
121
122         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
123         return  m;
124 }
125
126
127 /*
128         Allocates a send message as a zerocopy message allowing the underlying message protocol
129         to send the buffer without copy. In addition, a trace data field of tr_size will be
130         added and the supplied data coppied to the buffer before returning the message to
131         the caller.
132 */
133 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
134         uta_ctx_t*      ctx;
135         rmr_mbuf_t*     m;
136         int state;
137
138         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
139                 return NULL;
140         }
141
142         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
143         if( m != NULL ) {
144                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
145                 if( state != tr_size ) {
146                         m->state = RMR_ERR_INITFAILED;
147                 }
148         }
149
150         return  m;
151 }
152
153 /*
154         This provides an external path to the realloc static function as it's called by an
155         outward facing mbuf api function. Used to reallocate a message with a different
156         trace data size.
157 */
158 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
159         return realloc_msg( msg, new_tr_size );
160 }
161
162
163 /*
164         Return the message to the available pool, or free it outright.
165 */
166 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
167         if( mbuf == NULL ) {
168                 return;
169         }
170
171         if( mbuf->header ) {
172                 if( mbuf->flags & MFL_ZEROCOPY ) {
173                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
174                         if( mbuf->tp_buf ) {
175                                 nng_msg_free(  mbuf->tp_buf );
176                         }
177                 }
178         }
179
180         free( mbuf );
181 }
182
183 /*
184         send message with maximum timeout.
185         Accept a message and send it to an endpoint based on message type.
186         If NNG reports that the send attempt timed out, or should be retried,
187         RMr will retry for approximately max_to microseconds; rounded to the next
188         higher value of 10.
189
190         Allocates a new message buffer for the next send. If a message type has
191         more than one group of endpoints defined, then the message will be sent
192         in round robin fashion to one endpoint in each group.
193
194         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
195                 it will return with an error and errno set to eagain. If the send is
196                 a limited fanout, then the returned status is the status of the last
197                 send attempt.
198
199 */
200 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
201         nng_socket nn_sock;                     // endpoint socket for send
202         uta_ctx_t*      ctx;
203         int     group;                                  // selected group to get socket for
204         int send_again;                         // true if the message must be sent again
205         rmr_mbuf_t*     clone_m;                // cloned message for an nth send
206         int sock_ok;                            // got a valid socket from round robin select
207
208         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
209                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
210                 if( msg != NULL ) {
211                         msg->state = RMR_ERR_BADARG;
212                         errno = EINVAL;                                                                                 // must ensure it's not eagain
213                 }
214                 return msg;
215         }
216
217         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
218         if( msg->header == NULL ) {
219                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
220                 msg->state = RMR_ERR_NOHDR;
221                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
222                 return msg;
223         }
224
225         if( max_to < 0 ) {
226                 max_to = ctx->send_retries;             // convert to retries
227         }
228
229         send_again = 1;                                                                                 // force loop entry
230         group = 0;                                                                                              // always start with group 0
231
232         while( send_again ) {
233                 sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock );               // round robin sel epoint; again set if mult groups
234                 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
235                                 msg->mtype, send_again, group, msg->len, sock_ok );
236                 group++;
237
238                 if( ! sock_ok ) {
239                         msg->state = RMR_ERR_NOENDPT;
240                         errno = ENXIO;                                                                                  // must ensure it's not eagain
241                         return msg;                                                                                             // caller can resend (maybe) or free
242                 }
243
244                 if( send_again ) {
245                         clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
246                         if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
247                         msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
248                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
249                         /*
250                         if( msg ) {
251                                 // error do we need to count successes/errors, how to report some success, esp if last fails?
252                         }
253                         */
254
255                         msg = clone_m;                                                                                  // clone will be the next to send
256                 } else {
257                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
258                 }
259         }
260
261         return msg;                                                                     // last message caries the status of last/only send attempt
262 }
263
264 /*
265         Send with default max timeout as is set in the context.
266         See rmr_mtosend_msg() for more details on the parameters.
267         See rmr_stimeout() for info on setting the default timeout.
268 */
269 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
270         return rmr_mtosend_msg( vctx, msg,  -1 );                       // retries <  uses default from ctx
271 }
272
273 /*
274         Return to sender allows a message to be sent back to the endpoint where it originated.
275         The source information in the message is used to select the socket on which to write
276         the message rather than using the message type and round-robin selection. This
277         should return a message buffer with the state of the send operation set. On success
278         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
279         error it can be passed back to this function to retry the send if desired. On error,
280         errno will liklely have the failure reason set by the nng send processing.
281         The following are possible values for the state in the message buffer:
282
283         Message states returned:
284                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
285                 RMR_ERR_NOHDR  - message did not have a header
286                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
287                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
288                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
289
290         A nil message as the return value is rare, and generally indicates some kind of horrible
291         failure. The value of errno might give a clue as to what is wrong.
292
293         CAUTION:
294                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
295                 The caller must check for this and handle.
296 */
297 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
298         nng_socket nn_sock;                     // endpoint socket for send
299         uta_ctx_t*      ctx;
300         int state;
301         uta_mhdr_t*     hdr;
302         char*   hold_src;                       // we need the original source if send fails
303         int             sock_ok;                        // true if we found a valid endpoint socket
304
305         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
306                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
307                 if( msg != NULL ) {
308                         msg->state = RMR_ERR_BADARG;
309                 }
310                 return msg;
311         }
312
313         errno = 0;                                                                                                              // at this point any bad state is in msg returned
314         if( msg->header == NULL ) {
315                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
316                 msg->state = RMR_ERR_NOHDR;
317                 return msg;
318         }
319
320         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // socket of specific endpoint
321         if( ! sock_ok ) {
322                 msg->state = RMR_ERR_NOENDPT;
323                 return msg;                                                     // preallocated msg can be reused since not given back to nn
324         }
325
326         msg->state = RMR_OK;                                    // ensure it is clear before send
327         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
328         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                        // must overlay the source to be ours
329         msg = send_msg( ctx, msg, nn_sock, -1 );
330         if( msg ) {
331                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );                    // always return original source so rts can be called again
332                 msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
333         }
334
335         free( hold_src );
336         return msg;
337 }
338
339 /*
340         Call sends the message based on message routing using the message type, and waits for a
341         response message to arrive with the same transaction id that was in the outgoing message.
342         If, while wiating for the expected response,  messages are received which do not have the
343         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
344         order that they were received.
345
346         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
347         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
348         may be resent (likely the context pointer was nil).  If the message is sent, but no
349         response is received, a nil message is returned with errno set to indicate the likley
350         issue:
351                 ETIMEDOUT -- too many messages were queued before reciving the expected response
352                 ENOBUFS -- the queued message ring is full, messages were dropped
353                 EINVAL  -- A parameter was not valid
354                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
355                                         user should call this function with the message again.
356
357
358         QUESTION:  should user specify the number of messages to allow to queue?
359 */
360 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
361         uta_ctx_t*              ctx;
362         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
363
364         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
365                 if( msg != NULL ) {
366                         msg->state = RMR_ERR_BADARG;
367                 }
368                 return msg;
369         }
370
371         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
372         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
373         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
374         errno = 0;
375         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
376
377         msg = rmr_send_msg( ctx, msg );
378         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
379                 if( msg->state != RMR_ERR_RETRY ) {
380                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
381                 }
382                 return msg;
383         }
384
385         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
386 }
387
388 /*
389         The outward facing receive function. When invoked it will pop the oldest message
390         from the receive ring, if any are queued, and return it. If the ring is empty
391         then the receive function is invoked to wait for the next message to arrive (blocking).
392
393         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
394         nil, a new one will be allocated. However, the caller should NOT expect to get the same
395         struct back (if a queued message is returned the message struct will be different).
396 */
397 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
398         uta_ctx_t*      ctx;
399         rmr_mbuf_t*     qm;                             // message that was queued on the ring
400
401         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
402                 if( old_msg != NULL ) {
403                         old_msg->state = RMR_ERR_BADARG;
404                 }
405                 errno = EINVAL;
406                 return old_msg;
407         }
408         errno = 0;
409
410         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
411         if( qm != NULL ) {
412                 if( old_msg ) {
413                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
414                 }
415
416                 return qm;
417         }
418
419         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
420 }
421
422 /*
423         This implements a receive with a timeout via epoll. Mostly this is for
424         wrappers as native C applications can use epoll directly and will not have
425         to depend on this.
426 */
427 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
428         struct epoll_stuff* eps;        // convience pointer
429         uta_ctx_t*      ctx;
430         rmr_mbuf_t*     qm;                             // message that was queued on the ring
431         int nready;
432         rmr_mbuf_t* msg;
433
434         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
435                 if( old_msg != NULL ) {
436                         old_msg->state = RMR_ERR_BADARG;
437                 }
438                 errno = EINVAL;
439                 return old_msg;
440         }
441
442         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
443         if( qm != NULL ) {
444                 if( old_msg ) {
445                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
446                 }
447
448                 return qm;
449         }
450
451         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
452                 eps = malloc( sizeof *eps );
453
454                 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
455                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
456                         free( eps );
457                         return NULL;
458                 }
459
460                 eps->nng_fd = rmr_get_rcvfd( ctx );
461                 eps->epe.events = EPOLLIN;
462                 eps->epe.data.fd = eps->nng_fd;
463
464                 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
465                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
466                         free( eps );
467                         return NULL;
468                 }
469
470                 ctx->eps = eps;
471         }
472
473         if( old_msg ) {
474                 msg = old_msg;
475         } else {
476                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
477         }
478
479         if( ms_to < 0 ) {
480                 ms_to = 0;
481         }
482
483         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
484         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
485                 msg->state = RMR_ERR_TIMEOUT;
486         } else {
487                 return rcv_msg( ctx, msg );                                                             // receive it and return it
488         }
489
490         return msg;                             // return empty message with state set
491 }
492
493 /*
494         This blocks until the message with the 'expect' ID is received. Messages which are received
495         before the expected message are queued onto the message ring.  The function will return
496         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
497         expected message is received. If the queued message ring fills a nil pointer is returned
498         and errno is set to ENOBUFS.
499
500         Generally this will be invoked only by the call() function as it waits for a response, but
501         it is exposed to the user application as three is no reason not to.
502 */
503 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
504         uta_ctx_t*      ctx;
505         int     queued = 0;                             // number we pushed into the ring
506         int     exp_len = 0;                    // length of expected ID
507
508         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
509                 if( msg != NULL ) {
510                         msg->state = RMR_ERR_BADARG;
511                 }
512                 errno = EINVAL;
513                 return msg;
514         }
515
516         errno = 0;
517
518         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
519                 return rmr_rcv_msg( ctx, msg );
520         }
521
522         exp_len = strlen( expect );
523         if( exp_len > RMR_MAX_XID ) {
524                 exp_len = RMR_MAX_XID;
525         }
526         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
527
528         while( queued < allow2queue ) {
529                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
530                 if( msg->state == RMR_OK ) {
531                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
532                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
533                                 return msg;
534                         }
535
536                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
537                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
538                                 errno = ENOBUFS;
539                                 return NULL;
540                         }
541
542                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
543                         queued++;
544                         msg = NULL;
545                 }
546         }
547
548         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
549         errno = ETIMEDOUT;
550         return NULL;
551 }
552
553 //  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
554 //                              until those details are worked out, these generate a warning.
555 /*
556         Set send timeout. The value time is assumed to be microseconds.  The timeout is the
557         rough maximum amount of time that RMr will block on a send attempt when the underlying
558         mechnism indicates eagain or etimeedout.  All other error conditions are reported
559         without this delay. Setting a timeout of 0 causes no retries to be attempted in
560         RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
561         but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
562         after every 10K send attempts until the time value is reached. Retries are abandoned
563         if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
564
565         The default, if this function is not used, is 1; meaning that RMr will retry, but will
566         not enter a sleep.  In all cases the caller should check the status in the message returned
567         after a send call.
568
569         Returns -1 if the context was invalid; RMR_OK otherwise.
570 */
571 extern int rmr_set_stimeout( void* vctx, int time ) {
572         uta_ctx_t*      ctx;
573
574         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
575                 return -1;
576         }
577
578         if( time < 0 ) {
579                 time = 0;
580         }
581
582         ctx->send_retries = time;
583         return RMR_OK;
584 }
585
586 /*
587         Set receive timeout -- not supported in nng implementation
588 */
589 extern int rmr_set_rtimeout( void* vctx, int time ) {
590         fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
591         return 0;
592 }
593
594
595 /*
596         This is the actual init workhorse. The user visible function meerly ensures that the
597         calling programme does NOT set any internal flags that are supported, and then
598         invokes this.  Internal functions (the route table collector) which need additional
599         open ports without starting additional route table collectors, will invoke this
600         directly with the proper flag.
601 */
602 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
603         static  int announced = 0;
604         uta_ctx_t*      ctx = NULL;
605         char    bind_info[NNG_MAXADDRLEN];      // bind info
606         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
607         char*   port;
608         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
609         char*   proto_port;
610         char    wbuf[1024];                                     // work buffer
611         char*   tok;                                            // pointer at token in a buffer
612         int             state;
613
614         if( ! announced ) {
615                 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
616                         RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
617                 announced = 1;
618         }
619
620         errno = 0;
621         if( uproto_port == NULL ) {
622                 proto_port = strdup( DEF_COMM_PORT );
623         } else {
624                 proto_port = strdup( uproto_port );             // so we can modify it
625         }
626
627         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
628                 errno = ENOMEM;
629                 return NULL;
630         }
631         memset( ctx, 0, sizeof( uta_ctx_t ) );
632
633         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
634         ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
635
636         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
637         if( max_msg_size > 0 ) {
638                 ctx->max_plen = max_msg_size;
639         }
640
641         // we're using a listener to get rtg updates, so we do NOT need this.
642         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
643
644         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
645                 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
646                 free_ctx( ctx );
647                 return NULL;
648         }
649
650         if( (port = strchr( proto_port, ':' )) != NULL ) {
651                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
652                         port++;
653                 } else {
654                         *(port++) = 0;                  // term proto string and point at port string
655                         proto = proto_port;             // user supplied proto so point at it rather than default
656                 }
657         } else {
658                 port = proto_port;                      // assume something like "1234" was passed
659         }
660
661         if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
662                 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
663                 return NULL;
664         }
665         if( (tok = strchr( wbuf, '.' )) != NULL ) {
666                 *tok = 0;                                                                       // we don't keep domain portion
667         }
668         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
669         if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
670                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
671                 return NULL;
672         }
673
674         ctx->ip_list = mk_ip_list( port );              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
675
676
677
678         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
679                 interface = "0.0.0.0";
680         }
681         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
682         //       rather than using this generic listen() call.
683         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
684         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
685                 fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
686                 nng_close( ctx->nn_sock );
687                 free_ctx( ctx );
688                 return NULL;
689         }
690
691         if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
692                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
693                         fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
694                 }
695         }
696
697         free( proto_port );
698         return (void *) ctx;
699 }
700
701 /*
702         Initialise the message routing environment. Flags are one of the UTAFL_
703         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
704         (tcp) to be used, then :port is all that is needed.
705
706         At the moment it seems that TCP really is the only viable protocol, but
707         we'll allow flexibility.
708
709         The return value is a void pointer which must be passed to most uta functions. On
710         error, a nil pointer is returned and errno should be set.
711
712         Flags:
713                 No user flags supported (needed) at the moment, but this provides for extension
714                 without drastically changing anything. The user should invoke with RMRFL_NONE to
715                 avoid any misbehavour as there are internal flags which are suported
716 */
717 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
718         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
719 }
720
721 /*
722         This sets the default trace length which will be added to any message buffers
723         allocated.  It can be set at any time, and if rmr_set_trace() is given a
724         trace len that is different than the default allcoated in a message, the message
725         will be resized.
726
727         Returns 0 on failure and 1 on success. If failure, then errno will be set.
728 */
729 extern int rmr_init_trace( void* vctx, int tr_len ) {
730         uta_ctx_t* ctx;
731
732         errno = 0;
733         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
734                 errno = EINVAL;
735                 return 0;
736         }
737
738         ctx->trace_data_len = tr_len;
739         return 1;
740 }
741
742 /*
743         Return true if routing table is initialised etc. and app can send/receive.
744 */
745 extern int rmr_ready( void* vctx ) {
746         uta_ctx_t *ctx;
747
748         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
749                 return FALSE;
750         }
751
752         if( ctx->rtable != NULL ) {
753                 return TRUE;
754         }
755
756         return FALSE;
757 }
758
759 /*
760         Returns a file descriptor which can be used with epoll() to signal a receive
761         pending. The file descriptor should NOT be read from directly, nor closed, as NNG
762         does not support this.
763 */
764 extern int rmr_get_rcvfd( void* vctx ) {
765         uta_ctx_t* ctx;
766         int fd;
767         int state;
768
769         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
770                 return -1;
771         }
772
773         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
774                 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
775                 return -1;
776         }
777
778         return fd;
779 }
780
781
782 /*
783         Clean up things.
784
785         There isn't an nng_flush() per se, but we can pause, generate
786         a context switch, which should allow the last sent buffer to
787         flow. There isn't exactly an nng_term/close either, so there
788         isn't much we can do.
789 */
790 extern void rmr_close( void* vctx ) {
791         uta_ctx_t *ctx;
792
793         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
794                 return;
795         }
796
797         ctx->shutdown = 1;
798         nng_close( ctx->nn_sock );
799 }
800
801
802