3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: The bulk of the ric message routing library which is built upon
24 the older nanomsg messaging transport mehhanism.
26 To "hide" internal functions the choice was made to implement them
27 all as static functions. This means that we include nearly
28 all of our modules here as 90% of the library is not visible to
31 Author: E. Scott Daniels
32 Date: 28 November 2018
46 #include <arpa/inet.h>
47 #include <semaphore.h>
49 #include <nanomsg/nn.h>
50 #include <nanomsg/tcp.h>
51 #include <nanomsg/pair.h>
52 #include <nanomsg/pipeline.h>
53 #include <nanomsg/pubsub.h>
55 #include "rmr.h" // things the users see
56 #include "rmr_agnostic.h" // headers agnostic to the underlying transport mechanism
57 #include "rmr_private.h" // things that we need too
58 #include "rmr_symtab.h"
60 #include "ring_static.c" // message ring support
61 #include "rt_generic_static.c" // generic route table (not nng/nano specific)
62 #include "rtable_static.c" // route table things (nano specific)
63 #include "rtc_static.c" // common rt collector
64 #include "tools_static.c"
65 #include "sr_static.c" // send/receive static functions
66 #include "wormholes.c" // external wormhole api, and it's static functions (must be LAST)
68 // ------------------------------------------------------------------------------------------------------
73 static void free_ctx( uta_ctx_t* ctx ) {
76 free( ctx->rtg_addr );
81 // --------------- public functions --------------------------------------------------------------------------
84 Set the receive timeout to time (ms). A value of 0 is the same as a non-blocking
85 receive and -1 is block for ever.
86 Returns the nn value (0 on success <0 on error).
88 extern int rmr_set_rtimeout( void* vctx, int time ) {
91 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
96 if( ctx->last_rto == time ) {
100 ctx->last_rto = time;
102 return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
106 Deprecated -- use rmr_set_rtimeout()
108 extern int rmr_rcv_to( void* vctx, int time ) {
109 return rmr_rcv_to( vctx, time );
113 Set the send timeout to time. If time >1000 we assume the time is milliseconds,
114 else we assume seconds. Setting -1 is always block.
115 Returns the nn value (0 on success <0 on error).
117 extern int rmr_set_stimeout( void* vctx, int time ) {
120 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
127 time = time * 1000; // assume seconds, nn wants ms
131 return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
135 Deprecated -- use rmr_set_stimeout()
137 extern int rmr_send_to( void* vctx, int time ) {
138 return rmr_send_to( vctx, time );
142 Returns the size of the payload (bytes) that the msg buffer references.
143 Len in a message is the number of bytes which were received, or should
144 be transmitted, however, it is possible that the mbuf was allocated
145 with a larger payload space than the payload length indicates; this
146 function returns the absolute maximum space that the user has available
147 in the payload. On error (bad msg buffer) -1 is returned and errno should
150 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
151 if( msg == NULL || msg->header == NULL ) {
157 return msg->alloc_len - RMR_HDR_LEN( msg->header ); // transport buffer less header and other data bits
161 Allocates a send message as a zerocopy message allowing the underlying message protocol
162 to send the buffer without copy.
164 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
168 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
172 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );
177 Allocates a send message as a zerocopy message allowing the underlying message protocol
178 to send the buffer without copy. In addition, a trace data field of tr_size will be
179 added and the supplied data coppied to the buffer before returning the message to
182 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
187 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
191 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
193 state = rmr_set_trace( m, data, tr_size ); // roll their data in
194 if( state != tr_size ) {
195 m->state = RMR_ERR_INITFAILED;
203 Need an external path to the realloc static function as it's called by an
204 outward facing mbuf api function.
206 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
207 return realloc_msg( msg, new_tr_size );
211 Return the message to the available pool, or free it outright.
213 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
219 if( mbuf->flags & MFL_ZEROCOPY ) {
220 nn_freemsg( mbuf->header ); // must let nano free it
222 free( mbuf->header );
230 Accept a message and send it to an endpoint based on message type.
231 Allocates a new message buffer for the next send. If a message type has
232 more than one group of endpoints defined, then the message will be sent
233 in round robin fashion to one endpoint in each group.
235 CAUTION: this is a non-blocking send. If the message cannot be sent, then
236 it will return with an error and errno set to eagain. If the send is
237 a limited fanout, then the returned status is the status of the last
240 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
241 int nn_sock; // endpoint socket for send
243 int group; // selected group to get socket for
244 int send_again; // true if the message must be sent again
245 rmr_mbuf_t* clone_m; // cloned message for an nth send
246 uint64_t key; // lookup key is now subid and mtype
248 int altk_ok = 0; // ok to retry with alt key when true
250 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
251 errno = EINVAL; // if msg is null, this is their clue
253 msg->state = RMR_ERR_BADARG;
254 errno = EINVAL; // must ensure it's not eagain
259 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
260 if( msg->header == NULL ) {
261 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
262 msg->state = RMR_ERR_NOHDR;
263 errno = EBADMSG; // must ensure it's not eagain
267 send_again = 1; // force loop entry
268 group = 0; // always start with group 0
270 key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry
271 if( msg->sub_id != UNSET_SUBID ) { // if sub id set, allow retry with just mtype if no endpoint when sub-id used
275 while( send_again ) {
277 nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups
278 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
279 msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
282 if( altk_ok ) { // ok to retry with alternate key
283 key = build_rt_key( UNSET_SUBID, msg->mtype ); // build key with just mtype and retry
289 msg->state = RMR_ERR_NOENDPT;
290 errno = ENXIO; // must ensure it's not eagain
291 return msg; // caller can resend (maybe) or free
296 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
297 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len );
298 msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
299 msg = send_msg( ctx, msg, nn_sock ); // do the hard work, msg should be nil on success
300 while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) {
301 msg = send_msg( ctx, msg, nn_sock );
305 msg = clone_m; // clone will be the next to send
307 msg = send_msg( ctx, msg, nn_sock ); // send the last, and allocate a new buffer; drops the clone if it was
308 while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) {
309 msg = send_msg( ctx, msg, nn_sock );
315 return msg; // last message caries the status of last/only send attempt
319 Return to sender allows a message to be sent back to the endpoint where it originated.
320 The source information in the message is used to select the socket on which to write
321 the message rather than using the message type and round-robin selection. This
322 should return a message buffer with the state of the send operation set. On success
323 (state is RMR_OK, the caller may use the buffer for another receive operation), and on
324 error it can be passed back to this function to retry the send if desired. On error,
325 errno will liklely have the failure reason set by the nanomsg send processing.
326 The following are possible values for the state in the message buffer:
328 Message states returned:
329 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
330 RMR_ERR_NOHDR - message did not have a header
331 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
332 RMR_ERR_SENDFAILED - send failed; errno has nano error code
333 RMR_ERR_RETRY - operation failed, but caller should retry
335 A nil message as the return value is rare, and generally indicates some kind of horrible
336 failure. The value of errno might give a clue as to what is wrong.
339 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
340 The caller must check for this and handle.
342 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
343 int nn_sock = -1; // endpoint socket for send
347 char* hold_src; // we need the original source if send fails
350 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
351 errno = EINVAL; // if msg is null, this is their clue
353 msg->state = RMR_ERR_BADARG;
358 errno = 0; // at this point any bad state is in msg returned
359 if( msg->header == NULL ) {
360 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
361 msg->state = RMR_ERR_NOHDR;
365 if( HDR_VERSION( msg->header ) > 2 ) { // new version uses sender's ip address for rts
366 nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip ); // socket of specific endpoint
369 nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src ); // socket of specific endpoint
371 msg->state = RMR_ERR_NOENDPT;
372 return msg; // preallocated msg can be reused since not given back to nn
376 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
377 hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );
378 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
379 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
381 msg = send_msg( ctx, msg, nn_sock );
383 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
384 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
385 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
394 Call sends the message based on message routing using the message type, and waits for a
395 response message to arrive with the same transaction id that was in the outgoing message.
396 If, while wiating for the expected response, messages are received which do not have the
397 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
398 order that they were received.
400 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
401 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
402 may be resent (likely the context pointer was nil). If the message is sent, but no
403 response is received, a nil message is returned with errno set to indicate the likley
405 ETIMEDOUT -- too many messages were queued before reciving the expected response
406 ENOBUFS -- the queued message ring is full, messages were dropped
407 EINVAL -- A parameter was not valid
408 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
409 user should call this function with the message again.
412 QUESTION: should user specify the number of messages to allow to queue?
414 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
416 unsigned char expected_id[RMR_MAX_XID+1]; // the transaction id in the message; we wait for response with same ID
418 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
420 msg->state = RMR_ERR_BADARG;
425 memcpy( expected_id, msg->xaction, RMR_MAX_XID );
426 expected_id[RMR_MAX_XID] = 0; // ensure it's a string
427 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
429 msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send
431 msg = rmr_send_msg( ctx, msg );
432 if( msg ) { // msg should be nil, if not there was a problem; return buffer to user
433 if( msg->state != RMR_ERR_RETRY ) {
434 msg->state = RMR_ERR_CALLFAILED; // don't stomp if send_msg set retry
439 return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 ); // wait for msg allowing 20 to queue ahead
443 The outward facing receive function. When invoked it will pop the oldest message
444 from the receive ring, if any are queued, and return it. If the ring is empty
445 then the receive function is invoked to wait for the next message to arrive (blocking).
447 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
448 nil, a new one will be allocated. However, the caller should NOT expect to get the same
449 struct back (if a queued message is returned the message struct will be different).
451 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
453 rmr_mbuf_t* qm; // message that was queued on the ring
455 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
456 if( old_msg != NULL ) {
457 old_msg->state = RMR_ERR_BADARG;
464 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
467 rmr_free_msg( old_msg ); // future: push onto a free list???
473 return rcv_msg( ctx, old_msg ); // nothing queued, wait for one
477 Receive with a timeout. This is a convenience function when sitting on top of
478 nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg().
480 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
483 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
484 if( ctx->last_rto != ms_to ) { // avoid call overhead
485 rmr_set_rtimeout( vctx, ms_to );
489 return rmr_rcv_msg( vctx, old_msg );
494 This blocks until the message with the 'expect' ID is received. Messages which are received
495 before the expected message are queued onto the message ring. The function will return
496 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
497 expected message is received. If the queued message ring fills a nil pointer is returned
498 and errno is set to ENOBUFS.
500 Generally this will be invoked only by the call() function as it waits for a response, but
501 it is exposed to the user application as three is no reason not to.
503 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
505 int queued = 0; // number we pushed into the ring
506 int exp_len = 0; // length of expected ID
508 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
510 msg->state = RMR_ERR_BADARG;
518 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
519 return rmr_rcv_msg( ctx, msg );
522 exp_len = strlen( expect );
523 if( exp_len > RMR_MAX_XID ) {
524 exp_len = RMR_MAX_XID;
526 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
528 while( queued < allow2queue ) {
529 msg = rcv_msg( ctx, msg ); // hard wait for next
530 if( msg->state == RMR_OK ) {
531 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
532 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
536 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
537 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
542 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
548 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
555 Initialise the message routing environment. Flags are one of the UTAFL_
556 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
557 (tcp) to be used, then :port is all that is needed.
559 At the moment it seems that TCP really is the only viable protocol, but
560 we'll allow flexibility.
562 The return value is a void pointer which must be passed to most uta functions. On
563 error, a nil pointer is returned and errno should be set.
565 static void* init( char* uproto_port, int max_msg_size, int flags ) {
566 uta_ctx_t* ctx = NULL;
567 char bind_info[NN_SOCKADDR_MAX]; // bind info
568 char* proto = "tcp"; // pointer into the proto/port string user supplied
571 char wbuf[1024]; // work buffer
572 char* tok; // pointer at token in a buffer
574 char* interface = NULL; // interface to bind to pulled from RMR_BIND_IF if set
576 fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n",
577 QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
580 if( uproto_port == NULL ) {
581 proto_port = strdup( "tcp:4567" );
583 proto_port = strdup( uproto_port ); // so we can modify it
586 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
590 memset( ctx, 0, sizeof( uta_ctx_t ) );
593 ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
594 ctx->last_rto = -2; // last receive timeout that was set; invalid value to force first to set
596 ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size
597 if( max_msg_size > 0 ) {
598 if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller
599 ctx->max_plen = max_msg_size;
601 fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
605 ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
607 uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
609 ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); // our 'listen' socket should allow multiple senders to connect
610 if( ctx->nn_sock < 0 ) {
611 fprintf( stderr, "[CRI] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
616 if( (port = strchr( proto_port, ':' )) != NULL ) {
617 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
620 *(port++) = 0; // term proto string and point at port string
621 proto = proto_port; // user supplied proto so point at it rather than default
624 port = proto_port; // assume something like "1234" was passed
627 if( (gethostname( wbuf, sizeof( wbuf ) )) < 0 ) {
628 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
631 if( (tok = strchr( wbuf, '.' )) != NULL ) {
632 *tok = 0; // we don't keep domain portion
634 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
635 if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
636 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
640 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
641 interface = "0.0.0.0";
643 snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
644 if( nn_bind( ctx->nn_sock, bind_info ) < 0) { // bind and automatically accept client sessions
645 fprintf( stderr, "[CRI] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
646 nn_close( ctx->nn_sock );
651 if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
652 if( atoi( tok ) > 0 ) {
653 flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
657 if( flags & RMRFL_NAME_ONLY ) {
658 ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
659 if( DEBUG ) fprintf( stderr, "[DBUG] name only mode is set; not sending IP address as source\n" );
661 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
662 ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
663 if( ctx->my_ip == NULL ) {
664 strcpy( ctx->my_ip, ctx->my_name ); // revert to name if we cant suss out ip address
665 fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name as source\n" );
667 if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
671 if( ! (flags & FL_NOTHREAD) ) { // skip if internal context that does not need rout table thread
672 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread
673 fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
682 This sets the default trace length which will be added to any message buffers
683 allocated. It can be set at any time, and if rmr_set_trace() is given a
684 trace len that is different than the default allcoated in a message, the message
687 Returns 0 on failure and 1 on success. If failure, then errno will be set.
689 extern int rmr_init_trace( void* vctx, int tr_len ) {
693 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
698 ctx->trace_data_len = tr_len;
703 Publicly facing initialisation function. Wrapper for the init() funcion above
704 as it needs to ensure internal flags are masked off before calling the
707 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
708 return init( uproto_port, max_msg_size, flags & UFL_MASK );
712 Return true if routing table is initialised etc. and app can send/receive.
714 extern int rmr_ready( void* vctx ) {
717 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
721 if( ctx->rtable != NULL ) {
729 Provides a non-fatal (compile) interface for the nng only function.
730 Not supported on top of nano, so this always returns -1.
732 extern int rmr_get_rcvfd( void* vctx ) {
738 Compatability (mostly) with NNG.
740 extern void rmr_close( void* vctx ) {
743 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
747 nn_close( ctx->nn_sock );