X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Frtg_sim%2Freq_resp.c;fp=test%2Frtg_sim%2Freq_resp.c;h=9d58472c58924a1bd20d5c5e0a182d0721735940;hb=28b0c411053c155cb849cd269acb71ec701d79ac;hp=0000000000000000000000000000000000000000;hpb=29d871650240d6b4fba0ca46d7f13a90c004003c;p=ric-plt%2Flib%2Frmr.git diff --git a/test/rtg_sim/req_resp.c b/test/rtg_sim/req_resp.c new file mode 100644 index 0000000..9d58472 --- /dev/null +++ b/test/rtg_sim/req_resp.c @@ -0,0 +1,352 @@ +// :vi ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: req_rep.c + Abstract: A "library" module which allows a programme to easily be a requestor + or replier. Some functions are compatable with publishing (mbuf + allocation and management). Underlying we use the NN_PAIR and NOT + the req/rep model as that model is an inflexible, lock step, exchange + which does not lend well for a request that results in more than one + response messages, or no response. + + The user must be aware that once a session is established on the + host:port listener, another session will not be accepted until the + first is terminated; nano makes no provision for multiple concurrent + sesssions with either the PAIR or REQ/RESP models. + + We also support starting the publisher socket as the buffer and + send functions can be used for the publisher too. + + CAUTION: this is based on nanomsg, not NNG. The underlying protocols + are compatable, and because NNG has an emulation mode it is possible + to link successsfully with the nng library, BUT that will not + work here. Link only with nanomsg. + + Date: 18 January 2018 + Author: E. Scott Daniels + +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "req_resp.h" + +#define NULL_SOCKET 0 // fluff that is treated like a nil pointer check by coverage checker + + +/* + Connect to the host as a requestor. returns context if + successful. +*/ +extern void* rr_connect( char* host, char* port ) { + rr_ctx_t* ctx = NULL; + char wbuf[1024]; + int state; + + if( host == NULL || port == NULL ) { + errno = EINVAL; + return NULL; + } + + ctx = (rr_ctx_t *) malloc( sizeof *ctx ); + if( ctx == NULL ) { + errno = ENOMEM; + return NULL; + } + + //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR ); + ctx->nn_sock = nn_socket( AF_SP, NN_PUSH ); + if( ctx->nn_sock < NULL_SOCKET ) { + free( ctx ); + return NULL; + } + snprintf( wbuf, sizeof( wbuf ), "tcp://%s:%s", host, port ); + state = nn_connect( ctx->nn_sock, wbuf ); + if( state < 0 ) { + fprintf( stderr, "rr_conn: connect failed: %s: %d %s\n", wbuf, errno, strerror( errno ) ); + nn_close( ctx->nn_sock ); + free( ctx ); + return NULL; + } + + //fprintf( stderr, "rr_conn: connect successful: %s\n", wbuf ); + return (void *) ctx; +} + + +/* + Set up as a listener on any interface with the given port. +*/ +extern void* rr_start_listening( char* port ) { + rr_ctx_t* ctx; + char wbuf[1024]; + int state; + + if( port == NULL ) { + errno = EINVAL; + return NULL; + } + + ctx = (rr_ctx_t *) malloc( sizeof *ctx ); + if( ctx == NULL ) { + errno = EINVAL; + return NULL; + } + + //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR ); + ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); + if( ctx->nn_sock < NULL_SOCKET ) { + free( ctx ); + return NULL; + } + + snprintf( wbuf, sizeof( wbuf ), "tcp://0.0.0.0:%s", port ); + state = nn_bind( ctx->nn_sock, wbuf ); + if( state < 0 ) { + nn_close( ctx->nn_sock ); + free( ctx ); + return NULL; + } + + return (void *) ctx; +} + +/* + Configure and bind the publisher. Port is a string as it's probably read from + the command line, so no need to atoi() it for us. We can use the rr_* functions + for message buffers and sending, so we reuse their context rather than define our + own. + +*/ +extern void* open_publisher( char* port ) { + rr_ctx_t* pctx; + char conn_info[1024]; + + if( (pctx = (rr_ctx_t *) malloc( sizeof( *pctx )) ) == NULL ) { + return NULL; + } + + pctx->nn_sock = nn_socket( AF_SP, NN_PUB ); // publishing socket + if( pctx->nn_sock < 0 ) { + fprintf( stderr, "[CRI] unable to open publish socket: %s\n", strerror( errno ) ); + free( pctx ); + return NULL; + } + + snprintf( conn_info, sizeof( conn_info ), "tcp://0.0.0.0:%s", port ); // listen on any interface + if( nn_bind( pctx->nn_sock, conn_info ) < 0) { // bind and automatically accept client sessions + fprintf (stderr, "[CRI] unable to bind publising port: %s: %s\n", port, strerror( errno ) ); + nn_close ( pctx->nn_sock ); + free( pctx ); + return NULL; + } + + return (void *) pctx; +} + +extern rr_mbuf_t* rr_new_buffer( rr_mbuf_t* mb, int len ) { + + if( ! mb ) { + mb = (rr_mbuf_t *) malloc( sizeof( *mb ) ); + mb->size = len; + mb->payload = NULL; + } else { + if( mb->size < len ) { // if requested len is larger than current payload + nn_freemsg( mb->payload ); + mb->payload = NULL; + } else { + len = mb->size; + } + } + mb->used = 0; + + if( len > 0 && !mb->payload ) { // allow a payloadless buffer to be allocated + mb->payload = nn_allocmsg( len, 0 ); + } + + return mb; +} + +/* + Closes the currently open session. +*/ +extern void rr_close( void* vctx ) { + rr_ctx_t* ctx; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + return; + } + + if( ctx->nn_sock < NULL_SOCKET ) { + return; + } + + nn_close( ctx->nn_sock ); + ctx->nn_sock = -1; +} + +extern void rr_free( void* vctx ) { + rr_ctx_t* ctx; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + return; + } + + rr_close( ctx ); + nn_term(); + free( ctx ); +} + +extern void rr_free_mbuf( rr_mbuf_t* mbuf ) { + if( mbuf->payload ) { + nn_freemsg( mbuf->payload ); + mbuf->payload = NULL; + mbuf->used = -2; // just in case they held a pointer and try to use it + } + + free( mbuf ); +} + +extern rr_mbuf_t* rr_receive( void* vctx, rr_mbuf_t* mbuf, int len ) { + rr_ctx_t* ctx; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return NULL; + } + if( ctx->nn_sock < 0 ) { + errno = ESTALE; // stale/bad socket fd + return NULL; + } + + mbuf = rr_new_buffer( mbuf, len ); + if( mbuf == NULL ) { + return NULL; + } + + *mbuf->payload = 0; + if( (mbuf->used = nn_recv( ctx->nn_sock, mbuf->payload, mbuf->size, 0 )) > 0 ) { + errno = 0; // nano doesn't seem to clear errno here + } + return mbuf; +} + +extern rr_mbuf_t* rr_send( void* vctx, rr_mbuf_t* mbuf, int alloc_buf ) { + rr_ctx_t* ctx; + int len; + int state; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return NULL; + } + + if( ctx->nn_sock < 0 ) { + errno = ESTALE; // stale/bad socket fd + return NULL; + } + + if( ! mbuf ) { + errno = ENOBUFS; // not quite right, but close enough + return NULL; + } + + if( ! mbuf->payload ) { // no payload???? + errno = EFAULT; // nil is a bad address after all :) + return mbuf; + } + + errno = 0; + //fprintf( stderr, "rrsend is sending %d bytes....\n", mbuf->used ); + if( (state = nn_send( ctx->nn_sock, &mbuf->payload, NN_MSG, 0 )) > 0 ) { + //fprintf( stderr, "send ok to %d: %d %s\n", ctx->nn_sock, state, strerror( errno ) ); + mbuf->used = 0; + if( alloc_buf ) { + mbuf->payload = nn_allocmsg( mbuf->size, 0 ); // allocate the next send buffer + } else { + mbuf->payload = NULL; + mbuf->used = -1; + } + + errno = 0; + } else { + fprintf( stderr, "send failed %d %s\n", state, strerror( errno ) ); + } + + return mbuf; +} + +/* + Set the receive timeout to time. If time >100 we assume the time is milliseconds, + else we assume seconds. Setting -1 is always block. + Returns the nn value (0 on success <0 on error). +*/ +extern int rr_rcv_to( void* vctx, int time ) { + rr_ctx_t* ctx; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return -1; + } + + if( time > 0 ) { + if( time < 100 ) { + time = time * 1000; // assume seconds, nn wants ms + } + } + + return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) ); +} + +/* + Set the send timeout to time. If time >100 we assume the time is milliseconds, + else we assume seconds. Setting -1 is always block. + Returns the nn value (0 on success <0 on error). +*/ +extern int rr_send_to( void* vctx, int time ) { + rr_ctx_t* ctx; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return -1; + } + + if( time > 0 ) { + if( time < 100 ) { + time = time * 1000; // assume seconds, nn wants ms + } + } + + return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) ); +} +