X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fcommon%2Fsrc%2Fwormholes.c;h=a177944682f1763aba867d0a37ff0b5ab3312083;hb=9c2f0c74adb03a21646742702813b6ba4a4ae288;hp=2bc5c9f713e5bfbdd1589c34b6300d16dc7c50af;hpb=ec88d3c0563eeb6ae5f73427edb0b3c4d7acf299;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/common/src/wormholes.c b/src/rmr/common/src/wormholes.c index 2bc5c9f..a177944 100644 --- a/src/rmr/common/src/wormholes.c +++ b/src/rmr/common/src/wormholes.c @@ -105,7 +105,7 @@ static int wh_init( uta_ctx_t* ctx ) { } if( (whm = malloc( sizeof( *whm ) )) == NULL ) { - fprintf( stderr, "mem alloc failed for whm: alloc %d bytes\n", (int) sizeof( *whm ) ); + rmr_vlog( RMR_VL_ERR, "mem alloc failed for whm: alloc %d bytes\n", (int) sizeof( *whm ) ); errno = ENOMEM; return 0; } @@ -113,7 +113,7 @@ static int wh_init( uta_ctx_t* ctx ) { whm->nalloc = 16; alloc_sz = whm->nalloc * sizeof( endpoint_t ); if( (whm->eps = (endpoint_t **) malloc( alloc_sz )) == NULL ) { - fprintf( stderr, "mem alloc failed: alloc %d bytes\n", (int) alloc_sz ); + rmr_vlog( RMR_VL_ERR, "mem alloc failed: alloc %d bytes\n", (int) alloc_sz ); free( whm ); errno = ENOMEM; return 0; @@ -187,6 +187,7 @@ extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) { rmr_whid_t whid = -1; // wormhole id is the index into the list wh_mgt_t* whm; // easy reference to wh mgt stuff int i; + route_table_t* rt; // the currently active route table if( (ctx = (uta_ctx_t *) vctx) == NULL || target == NULL || *target == 0 ) { @@ -207,9 +208,11 @@ extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) { whm = ctx->wormholes; - - if( (ep = rt_ensure_ep( ctx->rtable, target )) == NULL ) { // get pointer to ep if there, create new if not - fprintf( stderr, "ensure ep returned bad; setting no memory error\n" ); + rt = get_rt( ctx ); // get and raise ref counter + ep = rt_ensure_ep( rt, target ); // get pointer to ep if there, create new if not + release_rt( ctx, rt ); // release use counter + if( ep == NULL ) { + rmr_vlog( RMR_VL_ERR, "wormhole_open: ensure ep returned bad: target=(%s)\n", target ); return -1; // ensure sets errno } @@ -220,7 +223,12 @@ extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) { } if( whm->eps[i] == ep ) { - return i; // we're already pointing to it, just send it back again + if( whm->eps[i]->open ) { // we know about it and it's open + return i; // just send back the reference + } + + whid = i; // have it, but not open, reopen + break; } } @@ -249,6 +257,7 @@ extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg uta_ctx_t* ctx; endpoint_t* ep; // enpoint that wormhole ID references wh_mgt_t *whm; + char* d1; // point at the call-id in the header if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -273,18 +282,78 @@ extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg return msg; } - errno = 0; // nng seems not to set errno any longer, so ensure it's clear + errno = 0; if( msg->header == NULL ) { - fprintf( stderr, "[ERR] rmr_wh_send_msg: message had no header\n" ); + rmr_vlog( RMR_VL_ERR, "rmr_wh_send_msg: message had no header\n" ); msg->state = RMR_ERR_NOHDR; errno = EBADMSG; // must ensure it's not eagain return msg; } + d1 = DATA1_ADDR( msg->header ); + d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end + ep = whm->eps[whid]; + if( ! ep->open ) { + rmr_wh_open( ctx, ep->name ); + } return send2ep( ctx, ep, msg ); // send directly to the endpoint } +/* + Send a message directly to an open wormhole and then block until a response has + been received. The return is the same as for rmr_call(); the received buffer + or nil if no response was received. +*/ +extern rmr_mbuf_t* rmr_wh_call( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg, int call_id, int max_wait ) { + uta_ctx_t* ctx; + endpoint_t* ep; // enpoint that wormhole ID references + wh_mgt_t *whm; + char* d1; // point at the call-id in the header + + if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast + errno = EINVAL; // if msg is null, this is their clue + if( msg != NULL ) { + msg->state = RMR_ERR_BADARG; + errno = EINVAL; // must ensure it's not eagain + } + return msg; + } + + msg->state = RMR_OK; + + if( (whm = ctx->wormholes) == NULL ) { + errno = EINVAL; // no wormholes open + msg->state = RMR_ERR_NOWHOPEN; + return msg; + } + + if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) { + errno = EINVAL; + msg->state = RMR_ERR_WHID; + return msg; + } + + errno = 0; + if( msg->header == NULL ) { + rmr_vlog( RMR_VL_ERR, "rmr_wh_call: message had no header\n" ); + msg->state = RMR_ERR_NOHDR; + errno = EBADMSG; // must ensure it's not eagain + return msg; + } + + ep = whm->eps[whid]; + if( ep != NULL ) { + if( ! ep->open ) { + rmr_wh_open( ctx, ep->name ); + } + return mt_call( vctx, msg, call_id, max_wait, ep ); // use main (internal) call to setup and block + } + + msg->state = RMR_ERR_NOENDPT; + return msg; +} + /* This will "close" a wormhole. We don't actually drop the session as that might be needed by others, but we do pull the ep reference from the list. @@ -311,3 +380,43 @@ extern void rmr_wh_close( void* vctx, int whid ) { whm->eps[whid] = NULL; } + +/* + Check the state of an endpoint that is associated with the wormhold ID + passed in. If the state is "open" then we return RMR_OK. Other possible + return codes: + + RMR_ERR_WHID // wormhole id was invalid + RMR_ERR_NOENDPT // the endpoint connection is not open + RMR_ERR_BADARG // context or other arg was invalid + RMR_ERR_NOWHOPEN // wormhole(s) have not been initalised + +*/ +extern int rmr_wh_state( void* vctx, rmr_whid_t whid ) { + uta_ctx_t* ctx; + wh_mgt_t* whm; // easy reference to wh mgt stuff + endpoint_t* ep; // enpoint that wormhole ID references + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { // bad stuff, bail fast + errno = EINVAL; + return RMR_ERR_BADARG; + } + + if( (whm = ctx->wormholes) == NULL ) { + errno = EINVAL; // no wormholes open + return RMR_ERR_NOWHOPEN; + } + + if( whid < 0 || whid >= whm->nalloc || whm->eps[whid] == NULL ) { + errno = EINVAL; + return RMR_ERR_WHID; + } + + errno = 0; + + if( (ep = whm->eps[whid]) != NULL ) { + return ep->open ? RMR_OK : RMR_ERR_NOENDPT; + } + + return RMR_ERR_NOENDPT; +}