fix(internal): Correct potential nil pointer err
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include <nng/nng.h>
57 #include <nng/protocol/pubsub0/pub.h>
58 #include <nng/protocol/pubsub0/sub.h>
59 #include <nng/protocol/pipeline0/push.h>
60 #include <nng/protocol/pipeline0/pull.h>
61
62
63 #include "rmr.h"                                // things the users see
64 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
65 #include "rmr_nng_private.h"    // things that we need too
66 #include "rmr_symtab.h"
67
68 #include "ring_static.c"                        // message ring support
69 #include "rt_generic_static.c"          // route table things not transport specific
70 #include "rtable_nng_static.c"          // route table things -- transport specific
71 #include "rtc_static.c"                         // route table collector
72 #include "tools_static.c"
73 #include "sr_nng_static.c"                      // send/receive static functions
74 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_nng_static.c"
77
78
79 //------------------------------------------------------------------------------
80
81
82 /*
83         Clean up a context.
84 */
85 static void free_ctx( uta_ctx_t* ctx ) {
86         if( ctx ) {
87                 if( ctx->rtg_addr ) {
88                         free( ctx->rtg_addr );
89                 }
90         }
91 }
92
93 // --------------- public functions --------------------------------------------------------------------------
94
95 /*
96         Returns the size of the payload (bytes) that the msg buffer references.
97         Len in a message is the number of bytes which were received, or should
98         be transmitted, however, it is possible that the mbuf was allocated
99         with a larger payload space than the payload length indicates; this
100         function returns the absolute maximum space that the user has available
101         in the payload. On error (bad msg buffer) -1 is returned and errno should
102         indicate the rason.
103 */
104 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
105         if( msg == NULL || msg->header == NULL ) {
106                 errno = EINVAL;
107                 return -1;
108         }
109
110         errno = 0;
111         return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
112 }
113
114 /*
115         Allocates a send message as a zerocopy message allowing the underlying message protocol
116         to send the buffer without copy.
117 */
118 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
119         uta_ctx_t*      ctx;
120         rmr_mbuf_t*     m;
121
122         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
123                 return NULL;
124         }
125
126         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
127         return  m;
128 }
129
130
131 /*
132         Allocates a send message as a zerocopy message allowing the underlying message protocol
133         to send the buffer without copy. In addition, a trace data field of tr_size will be
134         added and the supplied data coppied to the buffer before returning the message to
135         the caller.
136 */
137 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
138         uta_ctx_t*      ctx;
139         rmr_mbuf_t*     m;
140         int state;
141
142         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
143                 return NULL;
144         }
145
146         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
147         if( m != NULL ) {
148                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
149                 if( state != tr_size ) {
150                         m->state = RMR_ERR_INITFAILED;
151                 }
152         }
153
154         return  m;
155 }
156
157 /*
158         This provides an external path to the realloc static function as it's called by an
159         outward facing mbuf api function. Used to reallocate a message with a different
160         trace data size.
161 */
162 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
163         return realloc_msg( msg, new_tr_size );
164 }
165
166
167 /*
168         Return the message to the available pool, or free it outright.
169 */
170 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
171         if( mbuf == NULL ) {
172                 return;
173         }
174
175         if( mbuf->header ) {
176                 if( mbuf->flags & MFL_ZEROCOPY ) {
177                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
178                         if( mbuf->tp_buf ) {
179                                 nng_msg_free(  mbuf->tp_buf );
180                         }
181                 }
182         }
183
184         free( mbuf );
185 }
186
187 /*
188         This is a wrapper to the real timeout send. We must wrap it now to ensure that
189         the call flag and call-id are reset
190 */
191 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
192         char* d1;                                                                                                                       // point at the call-id in the header
193
194         if( msg != NULL ) {
195                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
196
197                 d1 = DATA1_ADDR( msg->header );
198                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
199         }       
200
201         return mtosend_msg( vctx, msg, max_to );
202 }
203
204 /*
205         Send with default max timeout as is set in the context.
206         See rmr_mtosend_msg() for more details on the parameters.
207         See rmr_stimeout() for info on setting the default timeout.
208 */
209 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
210         char* d1;                                                                                                               // point at the call-id in the header
211
212         if( msg != NULL ) {
213                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
214
215                 d1 = DATA1_ADDR( msg->header );
216                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
217         }       
218
219         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
220 }
221
222 /*
223         Return to sender allows a message to be sent back to the endpoint where it originated.
224         The source information in the message is used to select the socket on which to write
225         the message rather than using the message type and round-robin selection. This
226         should return a message buffer with the state of the send operation set. On success
227         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
228         error it can be passed back to this function to retry the send if desired. On error,
229         errno will liklely have the failure reason set by the nng send processing.
230         The following are possible values for the state in the message buffer:
231
232         Message states returned:
233                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
234                 RMR_ERR_NOHDR  - message did not have a header
235                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
236                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
237                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
238
239         A nil message as the return value is rare, and generally indicates some kind of horrible
240         failure. The value of errno might give a clue as to what is wrong.
241
242         CAUTION:
243                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
244                 The caller must check for this and handle.
245 */
246 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
247         nng_socket      nn_sock;                        // endpoint socket for send
248         uta_ctx_t*      ctx;
249         int                     state;
250         char*           hold_src;                       // we need the original source if send fails
251         int                     sock_ok;                        // true if we found a valid endpoint socket
252
253         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
254                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
255                 if( msg != NULL ) {
256                         msg->state = RMR_ERR_BADARG;
257                 }
258                 return msg;
259         }
260
261         errno = 0;                                                                                                              // at this point any bad state is in msg returned
262         if( msg->header == NULL ) {
263                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
264                 msg->state = RMR_ERR_NOHDR;
265                 return msg;
266         }
267
268         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
269         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // socket of specific endpoint
270         if( ! sock_ok ) {
271                 msg->state = RMR_ERR_NOENDPT;
272                 return msg;                                                     // preallocated msg can be reused since not given back to nn
273         }
274
275         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
276         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
277         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );        // must overlay the source to be ours
278         msg = send_msg( ctx, msg, nn_sock, -1 );
279         if( msg ) {
280                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );    // always return original source so rts can be called again
281                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
282         }
283
284         free( hold_src );
285         return msg;
286 }
287
288 /*
289         If multi-threading call is turned on, this invokes that mechanism with the special call
290         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
291         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
292         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
293         a flexible timeout.
294
295         On timeout this function will return a nil pointer. If the original message could not
296         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
297
298         Original behavour:
299         Call sends the message based on message routing using the message type, and waits for a
300         response message to arrive with the same transaction id that was in the outgoing message.
301         If, while wiating for the expected response,  messages are received which do not have the
302         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
303         order that they were received.
304
305         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
306         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
307         may be resent (likely the context pointer was nil).  If the message is sent, but no
308         response is received, a nil message is returned with errno set to indicate the likley
309         issue:
310                 ETIMEDOUT -- too many messages were queued before reciving the expected response
311                 ENOBUFS -- the queued message ring is full, messages were dropped
312                 EINVAL  -- A parameter was not valid
313                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
314                                         user should call this function with the message again.
315
316 */
317 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
318         uta_ctx_t*              ctx;
319         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
320
321         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
322                 if( msg != NULL ) {
323                         msg->state = RMR_ERR_BADARG;
324                 }
325                 return msg;
326         }
327
328         if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
329                 return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
330         }
331
332         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
333         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
334         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
335         errno = 0;
336         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
337
338         msg = rmr_send_msg( ctx, msg );
339         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
340                 if( msg->state != RMR_ERR_RETRY ) {
341                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
342                 }
343                 return msg;
344         }
345
346         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
347 }
348
349 /*
350         The outward facing receive function. When invoked it will pop the oldest message
351         from the receive ring, if any are queued, and return it. If the ring is empty
352         then the receive function is invoked to wait for the next message to arrive (blocking).
353
354         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
355         nil, a new one will be allocated. However, the caller should NOT expect to get the same
356         struct back (if a queued message is returned the message struct will be different).
357 */
358 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
359         uta_ctx_t*      ctx;
360         rmr_mbuf_t*     qm;                             // message that was queued on the ring
361
362         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
363                 if( old_msg != NULL ) {
364                         old_msg->state = RMR_ERR_BADARG;
365                 }
366                 errno = EINVAL;
367                 return old_msg;
368         }
369         errno = 0;
370
371         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
372                 return rmr_mt_rcv( ctx, old_msg, -1 );
373         }
374
375         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
376         if( qm != NULL ) {
377                 if( old_msg ) {
378                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
379                 }
380
381                 return qm;
382         }
383
384         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
385 }
386
387 /*
388         This implements a receive with a timeout via epoll. Mostly this is for
389         wrappers as native C applications can use epoll directly and will not have
390         to depend on this.
391 */
392 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
393         struct epoll_stuff* eps;        // convience pointer
394         uta_ctx_t*      ctx;
395         rmr_mbuf_t*     qm;                             // message that was queued on the ring
396         int nready;
397         rmr_mbuf_t* msg;
398
399         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
400                 if( old_msg != NULL ) {
401                         old_msg->state = RMR_ERR_BADARG;
402                 }
403                 errno = EINVAL;
404                 return old_msg;
405         }
406
407         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
408                 return rmr_mt_rcv( ctx, old_msg, ms_to );
409         }
410
411         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
412         if( qm != NULL ) {
413                 if( old_msg ) {
414                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
415                 }
416
417                 return qm;
418         }
419
420         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
421                 eps = malloc( sizeof *eps );
422
423                 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
424                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
425                         free( eps );
426                         return NULL;
427                 }
428
429                 eps->nng_fd = rmr_get_rcvfd( ctx );
430                 eps->epe.events = EPOLLIN;
431                 eps->epe.data.fd = eps->nng_fd;
432
433                 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
434                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
435                         free( eps );
436                         return NULL;
437                 }
438
439                 ctx->eps = eps;
440         }
441
442         if( old_msg ) {
443                 msg = old_msg;
444         } else {
445                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
446         }
447
448         if( ms_to < 0 ) {
449                 ms_to = 0;
450         }
451
452         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
453         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
454                 msg->state = RMR_ERR_TIMEOUT;
455         } else {
456                 return rcv_msg( ctx, msg );                                                             // receive it and return it
457         }
458
459         return msg;                             // return empty message with state set
460 }
461
462 /*
463         This blocks until the message with the 'expect' ID is received. Messages which are received
464         before the expected message are queued onto the message ring.  The function will return
465         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
466         expected message is received. If the queued message ring fills a nil pointer is returned
467         and errno is set to ENOBUFS.
468
469         Generally this will be invoked only by the call() function as it waits for a response, but
470         it is exposed to the user application as three is no reason not to.
471 */
472 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
473         uta_ctx_t*      ctx;
474         int     queued = 0;                             // number we pushed into the ring
475         int     exp_len = 0;                    // length of expected ID
476
477         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
478                 if( msg != NULL ) {
479                         msg->state = RMR_ERR_BADARG;
480                 }
481                 errno = EINVAL;
482                 return msg;
483         }
484
485         errno = 0;
486
487         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
488                 return rmr_rcv_msg( ctx, msg );
489         }
490
491         exp_len = strlen( expect );
492         if( exp_len > RMR_MAX_XID ) {
493                 exp_len = RMR_MAX_XID;
494         }
495         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
496
497         while( queued < allow2queue ) {
498                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
499                 if( msg->state == RMR_OK ) {
500                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
501                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
502                                 return msg;
503                         }
504
505                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
506                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
507                                 errno = ENOBUFS;
508                                 return NULL;
509                         }
510
511                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
512                         queued++;
513                         msg = NULL;
514                 }
515         }
516
517         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
518         errno = ETIMEDOUT;
519         return NULL;
520 }
521
522 //  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
523 //                              until those details are worked out, these generate a warning.
524 /*
525         Set send timeout. The value time is assumed to be microseconds.  The timeout is the
526         rough maximum amount of time that RMr will block on a send attempt when the underlying
527         mechnism indicates eagain or etimeedout.  All other error conditions are reported
528         without this delay. Setting a timeout of 0 causes no retries to be attempted in
529         RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
530         but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
531         after every 10K send attempts until the time value is reached. Retries are abandoned
532         if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
533
534         The default, if this function is not used, is 1; meaning that RMr will retry, but will
535         not enter a sleep.  In all cases the caller should check the status in the message returned
536         after a send call.
537
538         Returns -1 if the context was invalid; RMR_OK otherwise.
539 */
540 extern int rmr_set_stimeout( void* vctx, int time ) {
541         uta_ctx_t*      ctx;
542
543         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
544                 return -1;
545         }
546
547         if( time < 0 ) {
548                 time = 0;
549         }
550
551         ctx->send_retries = time;
552         return RMR_OK;
553 }
554
555 /*
556         Set receive timeout -- not supported in nng implementation
557 */
558 extern int rmr_set_rtimeout( void* vctx, int time ) {
559         fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
560         return 0;
561 }
562
563
564 /*
565         This is the actual init workhorse. The user visible function meerly ensures that the
566         calling programme does NOT set any internal flags that are supported, and then
567         invokes this.  Internal functions (the route table collector) which need additional
568         open ports without starting additional route table collectors, will invoke this
569         directly with the proper flag.
570 */
571 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
572         static  int announced = 0;
573         uta_ctx_t*      ctx = NULL;
574         char    bind_info[NNG_MAXADDRLEN];      // bind info
575         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
576         char*   port;
577         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
578         char*   proto_port;
579         char    wbuf[1024];                                     // work buffer
580         char*   tok;                                            // pointer at token in a buffer
581         int             state;
582
583         if( ! announced ) {
584                 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
585                         RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
586                 announced = 1;
587         }
588
589         errno = 0;
590         if( uproto_port == NULL ) {
591                 proto_port = strdup( DEF_COMM_PORT );
592         } else {
593                 proto_port = strdup( uproto_port );             // so we can modify it
594         }
595
596         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
597                 errno = ENOMEM;
598                 return NULL;
599         }
600         memset( ctx, 0, sizeof( uta_ctx_t ) );
601
602         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
603         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
604
605         if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
606                 ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
607                 init_mtcall( ctx );                                                     // set up call chutes
608         } else {
609                 ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
610         }
611
612         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
613         if( max_msg_size > 0 ) {
614                 ctx->max_plen = max_msg_size;
615         }
616
617         // we're using a listener to get rtg updates, so we do NOT need this.
618         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
619
620         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
621                 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
622                 free_ctx( ctx );
623                 return NULL;
624         }
625
626         if( (port = strchr( proto_port, ':' )) != NULL ) {
627                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
628                         port++;
629                 } else {
630                         *(port++) = 0;                  // term proto string and point at port string
631                         proto = proto_port;             // user supplied proto so point at it rather than default
632                 }
633         } else {
634                 port = proto_port;                      // assume something like "1234" was passed
635         }
636
637         if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
638                 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
639                 return NULL;
640         }
641         if( (tok = strchr( wbuf, '.' )) != NULL ) {
642                 *tok = 0;                                                                       // we don't keep domain portion
643         }
644         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
645         if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
646                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
647                 return NULL;
648         }
649
650         ctx->ip_list = mk_ip_list( port );              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
651
652
653
654         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
655                 interface = "0.0.0.0";
656         }
657         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
658         //       rather than using this generic listen() call.
659         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
660         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
661                 fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
662                 nng_close( ctx->nn_sock );
663                 free_ctx( ctx );
664                 return NULL;
665         }
666
667         if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
668                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
669                         fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
670                 }
671         }
672
673         if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
674                 ctx->flags |= CFL_MTC_ENABLED;
675                 if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
676                         fprintf( stderr, "[WARN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
677                 }
678                 
679         }
680
681         free( proto_port );
682         return (void *) ctx;
683 }
684
685 /*
686         Initialise the message routing environment. Flags are one of the UTAFL_
687         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
688         (tcp) to be used, then :port is all that is needed.
689
690         At the moment it seems that TCP really is the only viable protocol, but
691         we'll allow flexibility.
692
693         The return value is a void pointer which must be passed to most uta functions. On
694         error, a nil pointer is returned and errno should be set.
695
696         Flags:
697                 No user flags supported (needed) at the moment, but this provides for extension
698                 without drastically changing anything. The user should invoke with RMRFL_NONE to
699                 avoid any misbehavour as there are internal flags which are suported
700 */
701 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
702         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
703 }
704
705 /*
706         This sets the default trace length which will be added to any message buffers
707         allocated.  It can be set at any time, and if rmr_set_trace() is given a
708         trace len that is different than the default allcoated in a message, the message
709         will be resized.
710
711         Returns 0 on failure and 1 on success. If failure, then errno will be set.
712 */
713 extern int rmr_init_trace( void* vctx, int tr_len ) {
714         uta_ctx_t* ctx;
715
716         errno = 0;
717         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
718                 errno = EINVAL;
719                 return 0;
720         }
721
722         ctx->trace_data_len = tr_len;
723         return 1;
724 }
725
726 /*
727         Return true if routing table is initialised etc. and app can send/receive.
728 */
729 extern int rmr_ready( void* vctx ) {
730         uta_ctx_t *ctx;
731
732         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
733                 return FALSE;
734         }
735
736         if( ctx->rtable != NULL ) {
737                 return TRUE;
738         }
739
740         return FALSE;
741 }
742
743 /*
744         Returns a file descriptor which can be used with epoll() to signal a receive
745         pending. The file descriptor should NOT be read from directly, nor closed, as NNG
746         does not support this.
747 */
748 extern int rmr_get_rcvfd( void* vctx ) {
749         uta_ctx_t* ctx;
750         int fd;
751         int state;
752
753         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
754                 return -1;
755         }
756
757         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
758                 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
759                 return -1;
760         }
761
762         return fd;
763 }
764
765
766 /*
767         Clean up things.
768
769         There isn't an nng_flush() per se, but we can pause, generate
770         a context switch, which should allow the last sent buffer to
771         flow. There isn't exactly an nng_term/close either, so there
772         isn't much we can do.
773 */
774 extern void rmr_close( void* vctx ) {
775         uta_ctx_t *ctx;
776
777         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
778                 return;
779         }
780
781         ctx->shutdown = 1;
782         nng_close( ctx->nn_sock );
783 }
784
785
786 // ----- multi-threaded call/receive support -------------------------------------------------
787
788 /*
789         Blocks on the receive ring chute semaphore and then reads from the ring
790         when it is tickled.  If max_wait is -1 then the function blocks until
791         a message is ready on the ring. Else max_wait is assumed to be the number
792         of millaseconds to wait before returning a timeout message.
793 */
794 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
795         uta_ctx_t*      ctx;
796         uta_mhdr_t*     hdr;                    // header in the transport buffer
797         chute_t*        chute;
798         struct timespec ts;                     // time info if we have a timeout
799         long    new_ms;                         // adjusted mu-sec
800         long    seconds = 0;            // max wait seconds
801         long    nano_sec;                       // max wait xlated to nano seconds
802         int             state;
803         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
804         
805         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
806                 errno = EINVAL;
807                 if( mbuf ) {
808                         mbuf->state = RMR_ERR_BADARG;
809                 }
810                 return mbuf;
811         }
812
813         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
814                 errno = EINVAL;
815                 if( mbuf != NULL ) {
816                         mbuf->state = RMR_ERR_NOTSUPP;
817                 }
818                 return mbuf;
819         }
820
821         ombuf = mbuf;
822         if( ombuf ) {
823                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
824                 ombuf->len = 0;
825         }
826
827         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
828         
829         if( max_wait > 0 ) {
830                 clock_gettime( CLOCK_REALTIME, &ts );   
831
832                 if( max_wait > 999 ) {
833                         seconds = (max_wait - 999)/1000;
834                         max_wait -= seconds * 1000;
835                         ts.tv_sec += seconds;
836                 }
837                 if( max_wait > 0 ) {
838                         nano_sec = max_wait * 1000000;
839                         ts.tv_nsec += nano_sec;
840                         if( ts.tv_nsec > 999999999 ) {
841                                 ts.tv_nsec -= 999999999;
842                                 ts.tv_sec++;
843                         }
844                 }
845
846                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
847         }
848
849         errno = 0;
850         while( chute->mbuf == NULL && ! errno ) {
851                 if( seconds ) {
852                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
853                 } else {
854                         state = sem_wait( &chute->barrier );
855                 }
856
857                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
858                         errno = 0;
859                 }
860         }
861
862         if( state < 0 ) {
863                 mbuf = ombuf;                           // return caller's buffer if they passed one in
864         } else {
865                 if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
866                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
867                         if( mbuf ) {
868                                 mbuf->state = RMR_OK;
869
870                                 if( ombuf ) {
871                                         rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
872                                 }
873                         } else {
874                                 mbuf = ombuf;                           // no buffer, return user's if there
875                         }
876                 }
877         }
878
879         return mbuf;
880 }
881
882 /*
883         Accept a message buffer and caller ID, send the message and then wait
884         for the receiver to tickle the semaphore letting us know that a message
885         has been received. The call_id is a value between 2 and 255, inclusive; if
886         it's not in this range an error will be returned. Max wait is the amount
887         of time in millaseconds that the call should block for. If 0 is given
888         then no timeout is set.
889
890         If the mt_call feature has not been initialised, then the attempt to use this
891         funciton will fail with RMR_ERR_NOTSUPP
892
893         If no matching message is received before the max_wait period expires, a
894         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
895         occurs after the message has been sent, then a nil pointer is returned
896         with errno set to some other value.
897 */
898 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
899         rmr_mbuf_t* ombuf;                      // original mbuf passed in
900         uta_ctx_t*      ctx;
901         uta_mhdr_t*     hdr;                    // header in the transport buffer
902         chute_t*        chute;
903         unsigned char*  d1;                     // d1 data in header
904         struct timespec ts;                     // time info if we have a timeout
905         long    new_ms;                         // adjusted mu-sec
906         long    seconds = 0;            // max wait seconds
907         long    nano_sec;                       // max wait xlated to nano seconds
908         int             state;
909         
910         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
911                 errno = EINVAL;
912                 if( mbuf ) {
913                         mbuf->state = RMR_ERR_BADARG;
914                 }
915                 return mbuf;
916         }
917
918         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
919                 mbuf->state = RMR_ERR_NOTSUPP;
920                 return mbuf;
921         }
922
923         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
924                 mbuf->state = RMR_ERR_BADARG;
925                 return mbuf;
926         }
927
928         ombuf = mbuf;                                                                                                   // save to return timeout status with
929
930         chute = &ctx->chutes[call_id];
931         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
932                 rmr_free_msg( chute->mbuf );
933                 chute->mbuf = NULL;
934         }
935         
936         hdr = (uta_mhdr_t *) mbuf->header;
937         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
938         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
939         d1 = DATA1_ADDR( hdr );
940         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
941         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
942
943         if( max_wait > 0 ) {
944                 clock_gettime( CLOCK_REALTIME, &ts );   
945
946                 if( max_wait > 999 ) {
947                         seconds = (max_wait - 999)/1000;
948                         max_wait -= seconds * 1000;
949                         ts.tv_sec += seconds;
950                 }
951                 if( max_wait > 0 ) {
952                         nano_sec = max_wait * 1000000;
953                         ts.tv_nsec += nano_sec;
954                         if( ts.tv_nsec > 999999999 ) {
955                                 ts.tv_nsec -= 999999999;
956                                 ts.tv_sec++;
957                         }
958                 }
959
960                 seconds = 1;                                                                            // use as flag later to invoked timed wait
961         }
962
963         mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
964         if( mbuf ) {
965                 if( mbuf->state != RMR_OK ) {
966                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
967                 }
968         }
969
970         errno = 0;
971         while( chute->mbuf == NULL && ! errno ) {
972                 if( seconds ) {
973                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
974                 } else {
975                         state = sem_wait( &chute->barrier );
976                 }
977
978                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
979                         errno = 0;
980                 }
981
982                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
983                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
984                                 rmr_free_msg( chute->mbuf );
985                                 chute->mbuf = NULL;
986                                 errno = 0;
987                         }
988                 }
989         }
990
991         if( state < 0 ) {
992                 return NULL;                                    // leave errno as set by sem wait call
993         }
994
995         mbuf = chute->mbuf;
996         mbuf->state = RMR_OK;
997         chute->mbuf = NULL;
998
999         return mbuf;
1000 }