enhance(header): Add message type
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53
54 #include <nng/nng.h>
55 #include <nng/protocol/pubsub0/pub.h>
56 #include <nng/protocol/pubsub0/sub.h>
57 #include <nng/protocol/pipeline0/push.h>
58 #include <nng/protocol/pipeline0/pull.h>
59
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_nng_private.h"    // things that we need too
64 #include "rmr_symtab.h"
65
66 #include "ring_static.c"                        // message ring support
67 #include "rt_generic_static.c"          // route table things not transport specific
68 #include "rtable_nng_static.c"          // route table things -- transport specific
69 #include "rtc_static.c"                         // route table collector
70 #include "tools_static.c"
71 #include "sr_nng_static.c"                      // send/receive static functions
72 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
73
74
75 //------------------------------------------------------------------------------
76
77
78 /*
79         Clean up a context.
80 */
81 static void free_ctx( uta_ctx_t* ctx ) {
82         if( ctx ) {
83                 if( ctx->rtg_addr ) {
84                         free( ctx->rtg_addr );
85                 }
86         }
87 }
88
89 // --------------- public functions --------------------------------------------------------------------------
90
91 /*
92         Returns the size of the payload (bytes) that the msg buffer references.
93         Len in a message is the number of bytes which were received, or should
94         be transmitted, however, it is possible that the mbuf was allocated
95         with a larger payload space than the payload length indicates; this
96         function returns the absolute maximum space that the user has available
97         in the payload. On error (bad msg buffer) -1 is returned and errno should
98         indicate the rason.
99 */
100 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
101         if( msg == NULL || msg->header == NULL ) {
102                 errno = EINVAL;
103                 return -1;
104         }
105
106         errno = 0;
107         return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
108 }
109
110 /*
111         Allocates a send message as a zerocopy message allowing the underlying message protocol
112         to send the buffer without copy.
113 */
114 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
115         uta_ctx_t*      ctx;
116         rmr_mbuf_t*     m;
117
118         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
119                 return NULL;
120         }
121
122         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
123         return  m;
124 }
125
126
127 /*
128         Allocates a send message as a zerocopy message allowing the underlying message protocol
129         to send the buffer without copy. In addition, a trace data field of tr_size will be
130         added and the supplied data coppied to the buffer before returning the message to
131         the caller.
132 */
133 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
134         uta_ctx_t*      ctx;
135         rmr_mbuf_t*     m;
136         int state;
137
138         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
139                 return NULL;
140         }
141
142         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
143         if( m != NULL ) {
144                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
145                 if( state != tr_size ) {
146                         m->state = RMR_ERR_INITFAILED;
147                 }
148         }
149
150         return  m;
151 }
152
153 /*
154         This provides an external path to the realloc static function as it's called by an
155         outward facing mbuf api function. Used to reallocate a message with a different
156         trace data size.
157 */
158 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
159         return realloc_msg( msg, new_tr_size );
160 }
161
162
163 /*
164         Return the message to the available pool, or free it outright.
165 */
166 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
167         if( mbuf == NULL ) {
168                 return;
169         }
170
171         if( mbuf->header ) {
172                 if( mbuf->flags & MFL_ZEROCOPY ) {
173                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
174                         if( mbuf->tp_buf ) {
175                                 nng_msg_free(  mbuf->tp_buf );
176                         }
177                 }
178         }
179
180         free( mbuf );
181 }
182
183 /*
184         send message with maximum timeout.
185         Accept a message and send it to an endpoint based on message type.
186         If NNG reports that the send attempt timed out, or should be retried,
187         RMr will retry for approximately max_to microseconds; rounded to the next
188         higher value of 10.
189
190         Allocates a new message buffer for the next send. If a message type has
191         more than one group of endpoints defined, then the message will be sent
192         in round robin fashion to one endpoint in each group.
193
194         An endpoint will be looked up in the route table using the message type and
195         the subscription id. If the subscription id is "UNSET_SUBID", then only the
196         message type is used.  If the initial lookup, with a subid, fails, then a
197         second lookup using just the mtype is tried.
198
199         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
200                 it will return with an error and errno set to eagain. If the send is
201                 a limited fanout, then the returned status is the status of the last
202                 send attempt.
203
204 */
205 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
206         nng_socket nn_sock;                     // endpoint socket for send
207         uta_ctx_t*      ctx;
208         int     group;                                  // selected group to get socket for
209         int send_again;                         // true if the message must be sent again
210         rmr_mbuf_t*     clone_m;                // cloned message for an nth send
211         int sock_ok;                            // got a valid socket from round robin select
212         uint64_t key;                           // mtype or sub-id/mtype sym table key
213         int     altk_ok = 0;                    // set true if we can lookup on alternate key if mt/sid lookup fails
214
215         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
216                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
217                 if( msg != NULL ) {
218                         msg->state = RMR_ERR_BADARG;
219                         errno = EINVAL;                                                                                 // must ensure it's not eagain
220                 }
221                 return msg;
222         }
223
224         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
225         if( msg->header == NULL ) {
226                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
227                 msg->state = RMR_ERR_NOHDR;
228                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
229                 return msg;
230         }
231
232         if( max_to < 0 ) {
233                 max_to = ctx->send_retries;             // convert to retries
234         }
235
236         send_again = 1;                                                                                 // force loop entry
237         group = 0;                                                                                              // always start with group 0
238
239         key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
240         if( msg->sub_id != UNSET_SUBID ) {
241                 altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
242         }
243         while( send_again ) {
244                 sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
245                 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
246                                 msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
247
248                 if( ! sock_ok ) {
249                         if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
250                                 altk_ok = 0;
251                                 key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
252                                 send_again = 1;                                                                         // ensure we don't exit the while
253                                 continue;
254                         }
255
256                         msg->state = RMR_ERR_NOENDPT;
257                         errno = ENXIO;                                                                                  // must ensure it's not eagain
258                         return msg;                                                                                             // caller can resend (maybe) or free
259                 }
260
261                 group++;
262
263                 if( send_again ) {
264                         clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
265                         if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
266                         msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
267                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
268                         /*
269                         if( msg ) {
270                                 // error do we need to count successes/errors, how to report some success, esp if last fails?
271                         }
272                         */
273
274                         msg = clone_m;                                                                                  // clone will be the next to send
275                 } else {
276                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
277                 }
278         }
279
280         return msg;                                                                     // last message caries the status of last/only send attempt
281 }
282
283 /*
284         Send with default max timeout as is set in the context.
285         See rmr_mtosend_msg() for more details on the parameters.
286         See rmr_stimeout() for info on setting the default timeout.
287 */
288 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
289         return rmr_mtosend_msg( vctx, msg,  -1 );                       // retries <  uses default from ctx
290 }
291
292 /*
293         Return to sender allows a message to be sent back to the endpoint where it originated.
294         The source information in the message is used to select the socket on which to write
295         the message rather than using the message type and round-robin selection. This
296         should return a message buffer with the state of the send operation set. On success
297         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
298         error it can be passed back to this function to retry the send if desired. On error,
299         errno will liklely have the failure reason set by the nng send processing.
300         The following are possible values for the state in the message buffer:
301
302         Message states returned:
303                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
304                 RMR_ERR_NOHDR  - message did not have a header
305                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
306                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
307                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
308
309         A nil message as the return value is rare, and generally indicates some kind of horrible
310         failure. The value of errno might give a clue as to what is wrong.
311
312         CAUTION:
313                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
314                 The caller must check for this and handle.
315 */
316 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
317         nng_socket nn_sock;                     // endpoint socket for send
318         uta_ctx_t*      ctx;
319         int state;
320         uta_mhdr_t*     hdr;
321         char*   hold_src;                       // we need the original source if send fails
322         int             sock_ok;                        // true if we found a valid endpoint socket
323
324         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
325                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
326                 if( msg != NULL ) {
327                         msg->state = RMR_ERR_BADARG;
328                 }
329                 return msg;
330         }
331
332         errno = 0;                                                                                                              // at this point any bad state is in msg returned
333         if( msg->header == NULL ) {
334                 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
335                 msg->state = RMR_ERR_NOHDR;
336                 return msg;
337         }
338
339         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // socket of specific endpoint
340         if( ! sock_ok ) {
341                 msg->state = RMR_ERR_NOENDPT;
342                 return msg;                                                     // preallocated msg can be reused since not given back to nn
343         }
344
345         msg->state = RMR_OK;                                    // ensure it is clear before send
346         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
347         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                        // must overlay the source to be ours
348         msg = send_msg( ctx, msg, nn_sock, -1 );
349         if( msg ) {
350                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );                    // always return original source so rts can be called again
351                 msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
352         }
353
354         free( hold_src );
355         return msg;
356 }
357
358 /*
359         Call sends the message based on message routing using the message type, and waits for a
360         response message to arrive with the same transaction id that was in the outgoing message.
361         If, while wiating for the expected response,  messages are received which do not have the
362         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
363         order that they were received.
364
365         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
366         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
367         may be resent (likely the context pointer was nil).  If the message is sent, but no
368         response is received, a nil message is returned with errno set to indicate the likley
369         issue:
370                 ETIMEDOUT -- too many messages were queued before reciving the expected response
371                 ENOBUFS -- the queued message ring is full, messages were dropped
372                 EINVAL  -- A parameter was not valid
373                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
374                                         user should call this function with the message again.
375
376
377         QUESTION:  should user specify the number of messages to allow to queue?
378 */
379 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
380         uta_ctx_t*              ctx;
381         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
382
383         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
384                 if( msg != NULL ) {
385                         msg->state = RMR_ERR_BADARG;
386                 }
387                 return msg;
388         }
389
390         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
391         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
392         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
393         errno = 0;
394         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
395
396         msg = rmr_send_msg( ctx, msg );
397         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
398                 if( msg->state != RMR_ERR_RETRY ) {
399                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
400                 }
401                 return msg;
402         }
403
404         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
405 }
406
407 /*
408         The outward facing receive function. When invoked it will pop the oldest message
409         from the receive ring, if any are queued, and return it. If the ring is empty
410         then the receive function is invoked to wait for the next message to arrive (blocking).
411
412         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
413         nil, a new one will be allocated. However, the caller should NOT expect to get the same
414         struct back (if a queued message is returned the message struct will be different).
415 */
416 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
417         uta_ctx_t*      ctx;
418         rmr_mbuf_t*     qm;                             // message that was queued on the ring
419
420         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
421                 if( old_msg != NULL ) {
422                         old_msg->state = RMR_ERR_BADARG;
423                 }
424                 errno = EINVAL;
425                 return old_msg;
426         }
427         errno = 0;
428
429         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
430         if( qm != NULL ) {
431                 if( old_msg ) {
432                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
433                 }
434
435                 return qm;
436         }
437
438         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
439 }
440
441 /*
442         This implements a receive with a timeout via epoll. Mostly this is for
443         wrappers as native C applications can use epoll directly and will not have
444         to depend on this.
445 */
446 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
447         struct epoll_stuff* eps;        // convience pointer
448         uta_ctx_t*      ctx;
449         rmr_mbuf_t*     qm;                             // message that was queued on the ring
450         int nready;
451         rmr_mbuf_t* msg;
452
453         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
454                 if( old_msg != NULL ) {
455                         old_msg->state = RMR_ERR_BADARG;
456                 }
457                 errno = EINVAL;
458                 return old_msg;
459         }
460
461         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
462         if( qm != NULL ) {
463                 if( old_msg ) {
464                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
465                 }
466
467                 return qm;
468         }
469
470         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
471                 eps = malloc( sizeof *eps );
472
473                 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
474                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
475                         free( eps );
476                         return NULL;
477                 }
478
479                 eps->nng_fd = rmr_get_rcvfd( ctx );
480                 eps->epe.events = EPOLLIN;
481                 eps->epe.data.fd = eps->nng_fd;
482
483                 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
484                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
485                         free( eps );
486                         return NULL;
487                 }
488
489                 ctx->eps = eps;
490         }
491
492         if( old_msg ) {
493                 msg = old_msg;
494         } else {
495                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
496         }
497
498         if( ms_to < 0 ) {
499                 ms_to = 0;
500         }
501
502         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
503         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
504                 msg->state = RMR_ERR_TIMEOUT;
505         } else {
506                 return rcv_msg( ctx, msg );                                                             // receive it and return it
507         }
508
509         return msg;                             // return empty message with state set
510 }
511
512 /*
513         This blocks until the message with the 'expect' ID is received. Messages which are received
514         before the expected message are queued onto the message ring.  The function will return
515         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
516         expected message is received. If the queued message ring fills a nil pointer is returned
517         and errno is set to ENOBUFS.
518
519         Generally this will be invoked only by the call() function as it waits for a response, but
520         it is exposed to the user application as three is no reason not to.
521 */
522 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
523         uta_ctx_t*      ctx;
524         int     queued = 0;                             // number we pushed into the ring
525         int     exp_len = 0;                    // length of expected ID
526
527         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
528                 if( msg != NULL ) {
529                         msg->state = RMR_ERR_BADARG;
530                 }
531                 errno = EINVAL;
532                 return msg;
533         }
534
535         errno = 0;
536
537         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
538                 return rmr_rcv_msg( ctx, msg );
539         }
540
541         exp_len = strlen( expect );
542         if( exp_len > RMR_MAX_XID ) {
543                 exp_len = RMR_MAX_XID;
544         }
545         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
546
547         while( queued < allow2queue ) {
548                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
549                 if( msg->state == RMR_OK ) {
550                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
551                                 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
552                                 return msg;
553                         }
554
555                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
556                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
557                                 errno = ENOBUFS;
558                                 return NULL;
559                         }
560
561                         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
562                         queued++;
563                         msg = NULL;
564                 }
565         }
566
567         if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
568         errno = ETIMEDOUT;
569         return NULL;
570 }
571
572 //  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
573 //                              until those details are worked out, these generate a warning.
574 /*
575         Set send timeout. The value time is assumed to be microseconds.  The timeout is the
576         rough maximum amount of time that RMr will block on a send attempt when the underlying
577         mechnism indicates eagain or etimeedout.  All other error conditions are reported
578         without this delay. Setting a timeout of 0 causes no retries to be attempted in
579         RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
580         but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
581         after every 10K send attempts until the time value is reached. Retries are abandoned
582         if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
583
584         The default, if this function is not used, is 1; meaning that RMr will retry, but will
585         not enter a sleep.  In all cases the caller should check the status in the message returned
586         after a send call.
587
588         Returns -1 if the context was invalid; RMR_OK otherwise.
589 */
590 extern int rmr_set_stimeout( void* vctx, int time ) {
591         uta_ctx_t*      ctx;
592
593         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
594                 return -1;
595         }
596
597         if( time < 0 ) {
598                 time = 0;
599         }
600
601         ctx->send_retries = time;
602         return RMR_OK;
603 }
604
605 /*
606         Set receive timeout -- not supported in nng implementation
607 */
608 extern int rmr_set_rtimeout( void* vctx, int time ) {
609         fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
610         return 0;
611 }
612
613
614 /*
615         This is the actual init workhorse. The user visible function meerly ensures that the
616         calling programme does NOT set any internal flags that are supported, and then
617         invokes this.  Internal functions (the route table collector) which need additional
618         open ports without starting additional route table collectors, will invoke this
619         directly with the proper flag.
620 */
621 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
622         static  int announced = 0;
623         uta_ctx_t*      ctx = NULL;
624         char    bind_info[NNG_MAXADDRLEN];      // bind info
625         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
626         char*   port;
627         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
628         char*   proto_port;
629         char    wbuf[1024];                                     // work buffer
630         char*   tok;                                            // pointer at token in a buffer
631         int             state;
632
633         if( ! announced ) {
634                 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
635                         RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
636                 announced = 1;
637         }
638
639         errno = 0;
640         if( uproto_port == NULL ) {
641                 proto_port = strdup( DEF_COMM_PORT );
642         } else {
643                 proto_port = strdup( uproto_port );             // so we can modify it
644         }
645
646         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
647                 errno = ENOMEM;
648                 return NULL;
649         }
650         memset( ctx, 0, sizeof( uta_ctx_t ) );
651
652         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
653         ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
654
655         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
656         if( max_msg_size > 0 ) {
657                 ctx->max_plen = max_msg_size;
658         }
659
660         // we're using a listener to get rtg updates, so we do NOT need this.
661         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
662
663         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
664                 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
665                 free_ctx( ctx );
666                 return NULL;
667         }
668
669         if( (port = strchr( proto_port, ':' )) != NULL ) {
670                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
671                         port++;
672                 } else {
673                         *(port++) = 0;                  // term proto string and point at port string
674                         proto = proto_port;             // user supplied proto so point at it rather than default
675                 }
676         } else {
677                 port = proto_port;                      // assume something like "1234" was passed
678         }
679
680         if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
681                 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
682                 return NULL;
683         }
684         if( (tok = strchr( wbuf, '.' )) != NULL ) {
685                 *tok = 0;                                                                       // we don't keep domain portion
686         }
687         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
688         if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
689                 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
690                 return NULL;
691         }
692
693         ctx->ip_list = mk_ip_list( port );              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
694
695
696
697         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
698                 interface = "0.0.0.0";
699         }
700         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
701         //       rather than using this generic listen() call.
702         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
703         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
704                 fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
705                 nng_close( ctx->nn_sock );
706                 free_ctx( ctx );
707                 return NULL;
708         }
709
710         if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
711                 if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
712                         fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
713                 }
714         }
715
716         free( proto_port );
717         return (void *) ctx;
718 }
719
720 /*
721         Initialise the message routing environment. Flags are one of the UTAFL_
722         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
723         (tcp) to be used, then :port is all that is needed.
724
725         At the moment it seems that TCP really is the only viable protocol, but
726         we'll allow flexibility.
727
728         The return value is a void pointer which must be passed to most uta functions. On
729         error, a nil pointer is returned and errno should be set.
730
731         Flags:
732                 No user flags supported (needed) at the moment, but this provides for extension
733                 without drastically changing anything. The user should invoke with RMRFL_NONE to
734                 avoid any misbehavour as there are internal flags which are suported
735 */
736 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
737         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
738 }
739
740 /*
741         This sets the default trace length which will be added to any message buffers
742         allocated.  It can be set at any time, and if rmr_set_trace() is given a
743         trace len that is different than the default allcoated in a message, the message
744         will be resized.
745
746         Returns 0 on failure and 1 on success. If failure, then errno will be set.
747 */
748 extern int rmr_init_trace( void* vctx, int tr_len ) {
749         uta_ctx_t* ctx;
750
751         errno = 0;
752         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
753                 errno = EINVAL;
754                 return 0;
755         }
756
757         ctx->trace_data_len = tr_len;
758         return 1;
759 }
760
761 /*
762         Return true if routing table is initialised etc. and app can send/receive.
763 */
764 extern int rmr_ready( void* vctx ) {
765         uta_ctx_t *ctx;
766
767         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
768                 return FALSE;
769         }
770
771         if( ctx->rtable != NULL ) {
772                 return TRUE;
773         }
774
775         return FALSE;
776 }
777
778 /*
779         Returns a file descriptor which can be used with epoll() to signal a receive
780         pending. The file descriptor should NOT be read from directly, nor closed, as NNG
781         does not support this.
782 */
783 extern int rmr_get_rcvfd( void* vctx ) {
784         uta_ctx_t* ctx;
785         int fd;
786         int state;
787
788         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
789                 return -1;
790         }
791
792         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
793                 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
794                 return -1;
795         }
796
797         return fd;
798 }
799
800
801 /*
802         Clean up things.
803
804         There isn't an nng_flush() per se, but we can pause, generate
805         a context switch, which should allow the last sent buffer to
806         flow. There isn't exactly an nng_term/close either, so there
807         isn't much we can do.
808 */
809 extern void rmr_close( void* vctx ) {
810         uta_ctx_t *ctx;
811
812         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
813                 return;
814         }
815
816         ctx->shutdown = 1;
817         nng_close( ctx->nn_sock );
818 }
819
820
821