Allow RTS calls prior to initial route table load
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_nng.c
23         Abstract:       This is the compile point for the nng version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include <nng/nng.h>
57 #include <nng/protocol/pubsub0/pub.h>
58 #include <nng/protocol/pubsub0/sub.h>
59 #include <nng/protocol/pipeline0/push.h>
60 #include <nng/protocol/pipeline0/pull.h>
61
62
63 #include "rmr.h"                                // things the users see
64 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
65 #include "rmr_nng_private.h"    // things that we need too
66 #include "rmr_symtab.h"
67 #include "rmr_logging.h"
68
69 #include "ring_static.c"                        // message ring support
70 #include "rt_generic_static.c"          // route table things not transport specific
71 #include "rtable_nng_static.c"          // route table things -- transport specific
72 #include "rtc_static.c"                         // route table collector
73 #include "tools_static.c"
74 #include "sr_nng_static.c"                      // send/receive static functions
75 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
76 #include "mt_call_static.c"
77 #include "mt_call_nng_static.c"
78
79
80 //------------------------------------------------------------------------------
81
82
83 /*
84         Clean up a context.
85 */
86 static void free_ctx( uta_ctx_t* ctx ) {
87         if( ctx ) {
88                 if( ctx->rtg_addr ) {
89                         free( ctx->rtg_addr );
90                 }
91         }
92 }
93
94 // --------------- public functions --------------------------------------------------------------------------
95
96 /*
97         Returns the size of the payload (bytes) that the msg buffer references.
98         Len in a message is the number of bytes which were received, or should
99         be transmitted, however, it is possible that the mbuf was allocated
100         with a larger payload space than the payload length indicates; this
101         function returns the absolute maximum space that the user has available
102         in the payload. On error (bad msg buffer) -1 is returned and errno should
103         indicate the rason.
104 */
105 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
106         if( msg == NULL || msg->header == NULL ) {
107                 errno = EINVAL;
108                 return -1;
109         }
110
111         errno = 0;
112         return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
113 }
114
115 /*
116         Allocates a send message as a zerocopy message allowing the underlying message protocol
117         to send the buffer without copy.
118 */
119 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
120         uta_ctx_t*      ctx;
121         rmr_mbuf_t*     m;
122
123         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
124                 return NULL;
125         }
126
127         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
128         return  m;
129 }
130
131
132 /*
133         Allocates a send message as a zerocopy message allowing the underlying message protocol
134         to send the buffer without copy. In addition, a trace data field of tr_size will be
135         added and the supplied data coppied to the buffer before returning the message to
136         the caller.
137 */
138 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
139         uta_ctx_t*      ctx;
140         rmr_mbuf_t*     m;
141         int state;
142
143         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
144                 return NULL;
145         }
146
147         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
148         if( m != NULL ) {
149                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
150                 if( state != tr_size ) {
151                         m->state = RMR_ERR_INITFAILED;
152                 }
153         }
154
155         return  m;
156 }
157
158 /*
159         This provides an external path to the realloc static function as it's called by an
160         outward facing mbuf api function. Used to reallocate a message with a different
161         trace data size.
162 */
163 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
164         return realloc_msg( msg, new_tr_size );
165 }
166
167
168 /*
169         Return the message to the available pool, or free it outright.
170 */
171 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
172         if( mbuf == NULL ) {
173                 return;
174         }
175
176         if( mbuf->header ) {
177                 if( mbuf->flags & MFL_ZEROCOPY ) {
178                         //nng_free( (void *) mbuf->header, mbuf->alloc_len );
179                         if( mbuf->tp_buf ) {
180                                 nng_msg_free(  mbuf->tp_buf );
181                         }
182                 }
183         }
184
185         free( mbuf );
186 }
187
188 /*
189         This is a wrapper to the real timeout send. We must wrap it now to ensure that
190         the call flag and call-id are reset
191 */
192 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
193         char* d1;                                                                                                                       // point at the call-id in the header
194
195         if( msg != NULL ) {
196                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
197
198                 d1 = DATA1_ADDR( msg->header );
199                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
200         }       
201
202         return mtosend_msg( vctx, msg, max_to );
203 }
204
205 /*
206         Send with default max timeout as is set in the context.
207         See rmr_mtosend_msg() for more details on the parameters.
208         See rmr_stimeout() for info on setting the default timeout.
209 */
210 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
211         char* d1;                                                                                                               // point at the call-id in the header
212
213         if( msg != NULL ) {
214                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
215
216                 d1 = DATA1_ADDR( msg->header );
217                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
218         }       
219
220         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
221 }
222
223 /*
224         Return to sender allows a message to be sent back to the endpoint where it originated.
225         The source information in the message is used to select the socket on which to write
226         the message rather than using the message type and round-robin selection. This
227         should return a message buffer with the state of the send operation set. On success
228         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
229         error it can be passed back to this function to retry the send if desired. On error,
230         errno will liklely have the failure reason set by the nng send processing.
231         The following are possible values for the state in the message buffer:
232
233         Message states returned:
234                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
235                 RMR_ERR_NOHDR  - message did not have a header
236                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
237                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
238                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
239
240         A nil message as the return value is rare, and generally indicates some kind of horrible
241         failure. The value of errno might give a clue as to what is wrong.
242
243         CAUTION:
244                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
245                 The caller must check for this and handle.
246 */
247 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
248         nng_socket      nn_sock;                        // endpoint socket for send
249         uta_ctx_t*      ctx;
250         int                     state;
251         char*           hold_src;                       // we need the original source if send fails
252         char*           hold_ip;                        // also must hold original ip
253         int                     sock_ok = 0;            // true if we found a valid endpoint socket
254         endpoint_t*     ep;                                     // end point to track counts
255
256         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
257                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
258                 if( msg != NULL ) {
259                         msg->state = RMR_ERR_BADARG;
260                         msg->tp_state = errno;
261                 }
262                 return msg;
263         }
264
265         errno = 0;                                                                                                              // at this point any bad state is in msg returned
266         if( msg->header == NULL ) {
267                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
268                 msg->state = RMR_ERR_NOHDR;
269                 msg->tp_state = errno;
270                 return msg;
271         }
272
273         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
274
275         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );                   // src is always used first for rts
276         if( ! sock_ok ) {
277                 if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
278                         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
279                 }
280                 if( ! sock_ok ) {
281                         msg->state = RMR_ERR_NOENDPT;
282                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
283                 }
284         }
285
286         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
287         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
288         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
289         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
290         msg = send_msg( ctx, msg, nn_sock, -1 );
291         if( msg ) {
292                 if( ep != NULL ) {
293                         switch( msg->state ) {
294                                 case RMR_OK:
295                                         ep->scounts[EPSC_GOOD]++;
296                                         break;
297                         
298                                 case RMR_ERR_RETRY:
299                                         ep->scounts[EPSC_TRANS]++;
300                                         break;
301
302                                 default:
303                                         ep->scounts[EPSC_FAIL]++;
304                                         break;
305                         }
306                 }
307                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
308                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
309                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
310         }
311
312         free( hold_src );
313         free( hold_ip );
314         return msg;
315 }
316
317 /*
318         If multi-threading call is turned on, this invokes that mechanism with the special call
319         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
320         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
321         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
322         a flexible timeout.
323
324         On timeout this function will return a nil pointer. If the original message could not
325         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
326
327         Original behavour:
328         Call sends the message based on message routing using the message type, and waits for a
329         response message to arrive with the same transaction id that was in the outgoing message.
330         If, while wiating for the expected response,  messages are received which do not have the
331         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
332         order that they were received.
333
334         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
335         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
336         may be resent (likely the context pointer was nil).  If the message is sent, but no
337         response is received, a nil message is returned with errno set to indicate the likley
338         issue:
339                 ETIMEDOUT -- too many messages were queued before reciving the expected response
340                 ENOBUFS -- the queued message ring is full, messages were dropped
341                 EINVAL  -- A parameter was not valid
342                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
343                                         user should call this function with the message again.
344
345 */
346 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
347         uta_ctx_t*              ctx;
348         unsigned char   expected_id[RMR_MAX_XID+1];             // the transaction id in the message; we wait for response with same ID
349
350         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
351                 if( msg != NULL ) {
352                         msg->state = RMR_ERR_BADARG;
353                 }
354                 return msg;
355         }
356
357         if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
358                 return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
359         }
360
361         memcpy( expected_id, msg->xaction, RMR_MAX_XID );
362         expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
363         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rmr_call is making call, waiting for (%s)\n", expected_id );
364         errno = 0;
365         msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
366
367         msg = rmr_send_msg( ctx, msg );
368         if( msg ) {                                                                             // msg should be nil, if not there was a problem; return buffer to user
369                 if( msg->state != RMR_ERR_RETRY ) {
370                         msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
371                 }
372                 msg->tp_state = errno;
373                 return msg;
374         }
375
376         return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 );                 // wait for msg allowing 20 to queue ahead
377 }
378
379 /*
380         The outward facing receive function. When invoked it will pop the oldest message
381         from the receive ring, if any are queued, and return it. If the ring is empty
382         then the receive function is invoked to wait for the next message to arrive (blocking).
383
384         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
385         nil, a new one will be allocated. However, the caller should NOT expect to get the same
386         struct back (if a queued message is returned the message struct will be different).
387 */
388 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
389         uta_ctx_t*      ctx;
390         rmr_mbuf_t*     qm;                             // message that was queued on the ring
391
392         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
393                 errno = EINVAL;
394                 if( old_msg != NULL ) {
395                         old_msg->state = RMR_ERR_BADARG;
396                         old_msg->tp_state = errno;
397                 }
398                 return old_msg;
399         }
400         errno = 0;
401
402         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
403                 return rmr_mt_rcv( ctx, old_msg, -1 );
404         }
405
406         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
407         if( qm != NULL ) {
408                 if( old_msg ) {
409                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
410                 }
411
412                 return qm;
413         }
414
415         return rcv_msg( ctx, old_msg );                                                         // nothing queued, wait for one
416 }
417
418 /*
419         This implements a receive with a timeout via epoll. Mostly this is for
420         wrappers as native C applications can use epoll directly and will not have
421         to depend on this.
422 */
423 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
424         struct epoll_stuff* eps;        // convience pointer
425         uta_ctx_t*      ctx;
426         rmr_mbuf_t*     qm;                             // message that was queued on the ring
427         int nready;
428         rmr_mbuf_t* msg;
429
430         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
431                 errno = EINVAL;
432                 if( old_msg != NULL ) {
433                         old_msg->state = RMR_ERR_BADARG;
434                         old_msg->tp_state = errno;
435                 }
436                 return old_msg;
437         }
438
439         if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
440                 return rmr_mt_rcv( ctx, old_msg, ms_to );
441         }
442
443         qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
444         if( qm != NULL ) {
445                 if( old_msg ) {
446                         rmr_free_msg( old_msg );                                                        // future:  push onto a free list???
447                 }
448
449                 return qm;
450         }
451
452         if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
453                 eps = malloc( sizeof *eps );
454
455                 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
456                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
457                         free( eps );
458                         ctx->eps = NULL;
459                         if( old_msg != NULL ) {
460                                 old_msg->state = RMR_ERR_INITFAILED;
461                                 old_msg->tp_state = errno;
462                         }
463                         return old_msg;
464                 }
465
466                 eps->nng_fd = rmr_get_rcvfd( ctx );
467                 eps->epe.events = EPOLLIN;
468                 eps->epe.data.fd = eps->nng_fd;
469
470                 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
471                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
472                         free( eps );
473                         ctx->eps = NULL;
474                         if( old_msg != NULL ) {
475                                 old_msg->state = RMR_ERR_INITFAILED;
476                                 old_msg->tp_state = errno;
477                         }
478                         return old_msg;
479                 }
480
481                 ctx->eps = eps;
482         }
483
484         if( old_msg ) {
485                 msg = old_msg;
486         } else {
487                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
488         }
489
490         if( ms_to < 0 ) {
491                 ms_to = 0;
492         }
493
494         nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
495         if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
496                 msg->state = RMR_ERR_TIMEOUT;
497                 msg->tp_state = errno;
498         } else {
499                 return rcv_msg( ctx, msg );                                                             // receive it and return it
500         }
501
502         return msg;                             // return empty message with state set
503 }
504
505 /*
506         This blocks until the message with the 'expect' ID is received. Messages which are received
507         before the expected message are queued onto the message ring.  The function will return
508         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
509         expected message is received. If the queued message ring fills a nil pointer is returned
510         and errno is set to ENOBUFS.
511
512         Generally this will be invoked only by the call() function as it waits for a response, but
513         it is exposed to the user application as three is no reason not to.
514 */
515 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
516         uta_ctx_t*      ctx;
517         int     queued = 0;                             // number we pushed into the ring
518         int     exp_len = 0;                    // length of expected ID
519
520         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
521                 errno = EINVAL;
522                 if( msg != NULL ) {
523                         msg->state = RMR_ERR_BADARG;
524                         msg->tp_state = errno;
525                 }
526                 return msg;
527         }
528
529         errno = 0;
530
531         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
532                 return rmr_rcv_msg( ctx, msg );
533         }
534
535         exp_len = strlen( expect );
536         if( exp_len > RMR_MAX_XID ) {
537                 exp_len = RMR_MAX_XID;
538         }
539         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific waiting for id=%s\n",  expect );
540
541         while( queued < allow2queue ) {
542                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
543                 if( msg->state == RMR_OK ) {
544                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
545                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
546                                 return msg;
547                         }
548
549                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
550                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific ring is full\n" );
551                                 errno = ENOBUFS;
552                                 return NULL;
553                         }
554
555                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific queued message type=%d\n", msg->mtype );
556                         queued++;
557                         msg = NULL;
558                 }
559         }
560
561         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific timeout waiting for %s\n", expect );
562         errno = ETIMEDOUT;
563         return NULL;
564 }
565
566 /*
567         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
568         _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
569         mechnism indicates eagain or etimeedout.  All other error conditions are reported
570         without this delay. Setting a timeout of 0 causes no retries to be attempted in
571         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
572         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
573         after every 1K send attempts until the "time" value is reached. Retries are abandoned
574         if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
575
576         The default, if this function is not used, is 1; meaning that RMr will retry, but will
577         not enter a sleep.  In all cases the caller should check the status in the message returned
578         after a send call.
579
580         Returns -1 if the context was invalid; RMR_OK otherwise.
581 */
582 extern int rmr_set_stimeout( void* vctx, int time ) {
583         uta_ctx_t*      ctx;
584
585         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
586                 return -1;
587         }
588
589         if( time < 0 ) {
590                 time = 0;
591         }
592
593         ctx->send_retries = time;
594         return RMR_OK;
595 }
596
597 /*
598         Set receive timeout -- not supported in nng implementation
599
600         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
601 */
602 extern int rmr_set_rtimeout( void* vctx, int time ) {
603         rmr_vlog( RMR_VL_WARN, "Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
604         return 0;
605 }
606
607
608 /*
609         This is the actual init workhorse. The user visible function meerly ensures that the
610         calling programme does NOT set any internal flags that are supported, and then
611         invokes this.  Internal functions (the route table collector) which need additional
612         open ports without starting additional route table collectors, will invoke this
613         directly with the proper flag.
614 */
615 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
616         static  int announced = 0;
617         uta_ctx_t*      ctx = NULL;
618         char    bind_info[NNG_MAXADDRLEN];      // bind info
619         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
620         char*   port;
621         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
622         char*   proto_port;
623         char    wbuf[1024];                                     // work buffer
624         char*   tok;                                            // pointer at token in a buffer
625         char*   tok2;
626         int             state;
627         int             old_vlevel = 0;
628
629         old_vlevel = rmr_vlog_init();           // initialise and get the current level
630         rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
631
632         if( ! announced ) {
633                 rmr_vlog( RMR_VL_INFO, "ric message routing library on NNG/d mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
634                         RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
635                 announced = 1;
636         }
637         rmr_set_vlevel( old_vlevel );           // return logging to the desired state
638
639         errno = 0;
640         if( uproto_port == NULL ) {
641                 proto_port = strdup( DEF_COMM_PORT );
642         } else {
643                 proto_port = strdup( uproto_port );             // so we can modify it
644         }
645
646         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
647                 errno = ENOMEM;
648                 return NULL;
649         }
650         memset( ctx, 0, sizeof( uta_ctx_t ) );
651
652         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
653         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
654
655         if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
656                 ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
657                 init_mtcall( ctx );                                                     // set up call chutes
658         } else {
659                 ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
660         }
661
662         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
663         if( max_msg_size > 0 ) {
664                 ctx->max_plen = max_msg_size;
665         }
666
667         // we're using a listener to get rtg updates, so we do NOT need this.
668         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
669
670         if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
671                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
672                 free_ctx( ctx );
673                 return NULL;
674         }
675
676         if( (port = strchr( proto_port, ':' )) != NULL ) {
677                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
678                         port++;
679                 } else {
680                         *(port++) = 0;                  // term proto string and point at port string
681                         proto = proto_port;             // user supplied proto so point at it rather than default
682                 }
683         } else {
684                 port = proto_port;                      // assume something like "1234" was passed
685         }
686
687         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
688                 tok = strdup( tok );                                    // something we can destroy
689                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
690                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
691                 } else {
692                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
693                 }
694                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
695                         *tok2 = 0;                                                      // make it so
696                 }
697
698                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
699                 free( tok );
700         } else {
701                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
702                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
703                         free( proto_port );
704                         return NULL;
705                 }
706                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
707                         *tok = 0;                                                                       // we don't keep domain portion
708                 }
709         }
710
711         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
712         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
713                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
714                 return NULL;
715         }
716
717         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
718                 if( atoi( tok ) > 0 ) {
719                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
720                 }
721         }
722
723         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
724         if( flags & RMRFL_NAME_ONLY ) {
725                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
726         } else {
727                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
728                 if( ctx->my_ip == NULL ) {
729                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
730                         ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
731                 }
732         }
733         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip );
734
735         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
736                 if( *tok == '1' ) {
737                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
738                 }
739         }
740
741
742         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
743                 interface = "0.0.0.0";
744         }
745         // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
746         //       rather than using this generic listen() call.
747         snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
748         if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
749                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
750                 nng_close( ctx->nn_sock );
751                 free( proto_port );
752                 free_ctx( ctx );
753                 return NULL;
754         }
755
756         ctx->rtable = rt_clone_space( NULL, NULL, 0 );                  // allows wormhole and rts calls to work before rt is received
757         if( flags & FL_NOTHREAD ) {                                                             // if no rtc thread, we still need an empty route table for wormholes
758                 ctx->rmr_ready = 1;                                                                     // for a nothread instance, rmr is ready to go here
759         } else {
760                 ctx->rmr_ready = o;                                                                     // ensure not ready until static/dynamic table loaded
761
762                 if( (tok = getenv( ENV_RTG_RAW )) != NULL  && *tok == '0' ) {                   // use RMR for Rmgr comm only when specifically off
763                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rmr based rt collector thread
764                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
765                         }
766                 } else {
767                         if( pthread_create( &ctx->rtc_th,  NULL, raw_rtc, (void *) ctx ) ) {    // kick the raw msg rt collector thread
768                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
769                         }
770                 }
771         }
772
773         if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
774                 ctx->flags |= CFL_MTC_ENABLED;
775                 if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
776                         rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
777                 }
778                 
779         }
780
781         free( proto_port );
782         return (void *) ctx;
783 }
784
785 /*
786         Initialise the message routing environment. Flags are one of the UTAFL_
787         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
788         (tcp) to be used, then :port is all that is needed.
789
790         At the moment it seems that TCP really is the only viable protocol, but
791         we'll allow flexibility.
792
793         The return value is a void pointer which must be passed to most uta functions. On
794         error, a nil pointer is returned and errno should be set.
795
796         Flags:
797                 No user flags supported (needed) at the moment, but this provides for extension
798                 without drastically changing anything. The user should invoke with RMRFL_NONE to
799                 avoid any misbehavour as there are internal flags which are suported
800 */
801 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
802         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
803 }
804
805 /*
806         This sets the default trace length which will be added to any message buffers
807         allocated.  It can be set at any time, and if rmr_set_trace() is given a
808         trace len that is different than the default allcoated in a message, the message
809         will be resized.
810
811         Returns 0 on failure and 1 on success. If failure, then errno will be set.
812 */
813 extern int rmr_init_trace( void* vctx, int tr_len ) {
814         uta_ctx_t* ctx;
815
816         errno = 0;
817         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
818                 errno = EINVAL;
819                 return 0;
820         }
821
822         ctx->trace_data_len = tr_len;
823         return 1;
824 }
825
826 /*
827         Return true if routing table is initialised etc. and app can send/receive.
828 */
829 extern int rmr_ready( void* vctx ) {
830         uta_ctx_t *ctx;
831
832         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
833                 return FALSE;
834         }
835
836         return ctx->rmr_ready;
837 }
838
839 /*
840         Returns a file descriptor which can be used with epoll() to signal a receive
841         pending. The file descriptor should NOT be read from directly, nor closed, as NNG
842         does not support this.
843 */
844 extern int rmr_get_rcvfd( void* vctx ) {
845         uta_ctx_t* ctx;
846         int fd;
847         int state;
848
849         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
850                 return -1;
851         }
852
853         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
854                 rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
855                 return -1;
856         }
857
858         return fd;
859 }
860
861
862 /*
863         Clean up things.
864
865         There isn't an nng_flush() per se, but we can pause, generate
866         a context switch, which should allow the last sent buffer to
867         flow. There isn't exactly an nng_term/close either, so there
868         isn't much we can do.
869 */
870 extern void rmr_close( void* vctx ) {
871         uta_ctx_t *ctx;
872
873         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
874                 return;
875         }
876
877         ctx->shutdown = 1;
878         nng_close( ctx->nn_sock );
879 }
880
881
882 // ----- multi-threaded call/receive support -------------------------------------------------
883
884 /*
885         Blocks on the receive ring chute semaphore and then reads from the ring
886         when it is tickled.  If max_wait is -1 then the function blocks until
887         a message is ready on the ring. Else max_wait is assumed to be the number
888         of millaseconds to wait before returning a timeout message.
889 */
890 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
891         uta_ctx_t*      ctx;
892         uta_mhdr_t*     hdr;                    // header in the transport buffer
893         chute_t*        chute;
894         struct timespec ts;                     // time info if we have a timeout
895         long    new_ms;                         // adjusted mu-sec
896         long    seconds = 0;            // max wait seconds
897         long    nano_sec;                       // max wait xlated to nano seconds
898         int             state;
899         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
900         
901         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
902                 errno = EINVAL;
903                 if( mbuf ) {
904                         mbuf->state = RMR_ERR_BADARG;
905                         mbuf->tp_state = errno;
906                 }
907                 return mbuf;
908         }
909
910         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
911                 errno = EINVAL;
912                 if( mbuf != NULL ) {
913                         mbuf->state = RMR_ERR_NOTSUPP;
914                         mbuf->tp_state = errno;
915                 }
916                 return mbuf;
917         }
918
919         ombuf = mbuf;
920         if( ombuf ) {
921                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
922                 ombuf->len = 0;
923         }
924
925         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
926
927         if( max_wait >= 0 ) {
928                 clock_gettime( CLOCK_REALTIME, &ts );   
929
930                 if( max_wait > 999 ) {
931                         seconds = max_wait / 1000;
932                         max_wait -= seconds * 1000;
933                         ts.tv_sec += seconds;
934                 }
935                 if( max_wait > 0 ) {
936                         nano_sec = max_wait * 1000000;
937                         ts.tv_nsec += nano_sec;
938                         if( ts.tv_nsec > 999999999 ) {
939                                 ts.tv_nsec -= 999999999;
940                                 ts.tv_sec++;
941                         }
942                 }
943
944                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
945         }
946
947         errno = EINTR;
948         state = -1;
949         while( state < 0 && errno == EINTR ) {
950                 if( seconds ) {
951                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
952                 } else {
953                         state = sem_wait( &chute->barrier );
954                 }
955         }
956
957         if( state < 0 ) {
958                 mbuf = ombuf;                           // return caller's buffer if they passed one in
959         } else {
960                 errno = 0;                                              // interrupted call state could be left; clear
961                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_rcv extracting from normal ring\n" );
962                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
963                         mbuf->state = RMR_OK;
964
965                         if( ombuf ) {
966                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
967                         }
968                 } else {
969                         errno = ETIMEDOUT;
970                         mbuf = ombuf;                           // no buffer, return user's if there
971                 }
972         }
973
974         if( mbuf ) {
975                 mbuf->tp_state = errno;
976         }
977         return mbuf;
978 }
979
980
981 /*
982         This does the real work behind both of the outward facing call functions. See 
983         the rmr_mt_call() description for details modulo the comments blow.
984
985         If ep is given, then we skip the normal route table endpoint selection. This is
986         likely a wormhole call.
987 */
988 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
989         rmr_mbuf_t* ombuf;                      // original mbuf passed in
990         uta_ctx_t*      ctx;
991         uta_mhdr_t*     hdr;                    // header in the transport buffer
992         chute_t*        chute;
993         unsigned char*  d1;                     // d1 data in header
994         struct timespec ts;                     // time info if we have a timeout
995         long    new_ms;                         // adjusted mu-sec
996         long    seconds = 0;            // max wait seconds
997         long    nano_sec;                       // max wait xlated to nano seconds
998         int             state;
999         
1000         errno = EINVAL;
1001         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1002                 if( mbuf ) {
1003                         mbuf->tp_state = errno;
1004                         mbuf->state = RMR_ERR_BADARG;
1005                 }
1006                 return mbuf;
1007         }
1008
1009         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1010                 mbuf->state = RMR_ERR_NOTSUPP;
1011                 mbuf->tp_state = errno;
1012                 return mbuf;
1013         }
1014
1015         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
1016                 mbuf->state = RMR_ERR_BADARG;
1017                 mbuf->tp_state = errno;
1018                 return mbuf;
1019         }
1020
1021         ombuf = mbuf;                                                                                                   // save to return timeout status with
1022
1023         chute = &ctx->chutes[call_id];
1024         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
1025                 rmr_free_msg( chute->mbuf );
1026                 chute->mbuf = NULL;
1027         }
1028         
1029         hdr = (uta_mhdr_t *) mbuf->header;
1030         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1031         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1032         d1 = DATA1_ADDR( hdr );
1033         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1034         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1035
1036         if( max_wait >= 0 ) {
1037                 clock_gettime( CLOCK_REALTIME, &ts );   
1038
1039                 if( max_wait > 999 ) {
1040                         seconds = max_wait / 1000;
1041                         max_wait -= seconds * 1000;
1042                         ts.tv_sec += seconds;
1043                 }
1044                 if( max_wait > 0 ) {
1045                         nano_sec = max_wait * 1000000;
1046                         ts.tv_nsec += nano_sec;
1047                         if( ts.tv_nsec > 999999999 ) {
1048                                 ts.tv_nsec -= 999999999;
1049                                 ts.tv_sec++;
1050                         }
1051                 }
1052
1053                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1054         }
1055
1056         if( ep == NULL ) {
1057                 mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
1058         } else {
1059                 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1060         }
1061         if( mbuf ) {
1062                 if( mbuf->state != RMR_OK ) {
1063                         mbuf->tp_state = errno;
1064                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1065                 }
1066         }
1067
1068         state = -1;                                                                                             
1069         errno = 0;
1070         while( chute->mbuf == NULL && ! errno ) {
1071                 if( seconds ) {
1072                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1073                 } else {
1074                         state = sem_wait( &chute->barrier );
1075                 }
1076
1077                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1078                         errno = 0;
1079                 }
1080
1081                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1082                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1083                                 rmr_free_msg( chute->mbuf );
1084                                 chute->mbuf = NULL;
1085                                 errno = 0;
1086                         }
1087                 }
1088         }
1089
1090         if( state < 0 ) {
1091                 return NULL;                                    // leave errno as set by sem wait call
1092         }
1093
1094         if( (mbuf = chute->mbuf) != NULL ) {
1095                 mbuf->state = RMR_OK;
1096         }
1097         chute->mbuf = NULL;
1098
1099         return mbuf;
1100 }
1101
1102 /*
1103         Accept a message buffer and caller ID, send the message and then wait
1104         for the receiver to tickle the semaphore letting us know that a message
1105         has been received. The call_id is a value between 2 and 255, inclusive; if
1106         it's not in this range an error will be returned. Max wait is the amount
1107         of time in millaseconds that the call should block for. If 0 is given
1108         then no timeout is set.
1109
1110         If the mt_call feature has not been initialised, then the attempt to use this
1111         funciton will fail with RMR_ERR_NOTSUPP
1112
1113         If no matching message is received before the max_wait period expires, a
1114         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1115         occurs after the message has been sent, then a nil pointer is returned
1116         with errno set to some other value.
1117
1118         This is now just a wrapper to the real work horse so that we can provide
1119         this and wormhole call functions without duplicating code.
1120
1121 */
1122 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1123         return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1124 }
1125
1126 /*
1127         Given an existing message buffer, reallocate the payload portion to
1128         be at least new_len bytes.  The message header will remain such that
1129         the caller may use the rmr_rts_msg() function to return a payload
1130         to the sender. 
1131
1132         The mbuf passed in may or may not be reallocated and the caller must
1133         use the returned pointer and should NOT assume that it can use the 
1134         pointer passed in with the exceptions based on the clone flag.
1135
1136         If the clone flag is set, then a duplicated message, with larger payload
1137         size, is allocated and returned.  The old_msg pointer in this situation is
1138         still valid and must be explicitly freed by the application. If the clone 
1139         message is not set (0), then any memory management of the old message is
1140         handled by the function.
1141
1142         If the copy flag is set, the contents of the old message's payload is 
1143         copied to the reallocated payload.  If the flag is not set, then the 
1144         contents of the payload is undetermined.
1145 */
1146 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1147         if( old_msg == NULL ) {
1148                 return NULL;
1149         }
1150
1151         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1152 }
1153
1154 /*
1155         The following functions are "dummies" as NNG has no concept of supporting
1156         them, but are needed to resolve calls at link time.
1157 */
1158
1159 extern void rmr_set_fack( void* p ) {
1160         return;
1161 }