3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: A "library" module which allows a programme to easily be a requestor
24 or replier. Some functions are compatable with publishing (mbuf
25 allocation and management). Underlying we use the NN_PAIR and NOT
26 the req/rep model as that model is an inflexible, lock step, exchange
27 which does not lend well for a request that results in more than one
28 response messages, or no response.
30 The user must be aware that once a session is established on the
31 host:port listener, another session will not be accepted until the
32 first is terminated; nano makes no provision for multiple concurrent
33 sesssions with either the PAIR or REQ/RESP models.
35 We also support starting the publisher socket as the buffer and
36 send functions can be used for the publisher too.
38 CAUTION: this is based on nanomsg, not NNG. The underlying protocols
39 are compatable, and because NNG has an emulation mode it is possible
40 to link successsfully with the nng library, BUT that will not
41 work here. Link only with nanomsg.
44 Author: E. Scott Daniels
57 #include <nanomsg/nn.h>
58 #include <nanomsg/pair.h>
59 #include <nanomsg/pipeline.h>
60 #include <nanomsg/pubsub.h>
64 #define NULL_SOCKET 0 // fluff that is treated like a nil pointer check by coverage checker
68 Connect to the host as a requestor. returns context if
71 extern void* rr_connect( char* host, char* port ) {
76 if( host == NULL || port == NULL ) {
81 ctx = (rr_ctx_t *) malloc( sizeof *ctx );
87 //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR );
88 ctx->nn_sock = nn_socket( AF_SP, NN_PUSH );
89 if( ctx->nn_sock < NULL_SOCKET ) {
93 snprintf( wbuf, sizeof( wbuf ), "tcp://%s:%s", host, port );
94 state = nn_connect( ctx->nn_sock, wbuf );
96 fprintf( stderr, "rr_conn: connect failed: %s: %d %s\n", wbuf, errno, strerror( errno ) );
97 nn_close( ctx->nn_sock );
102 //fprintf( stderr, "rr_conn: connect successful: %s\n", wbuf );
108 Set up as a listener on any interface with the given port.
110 extern void* rr_start_listening( char* port ) {
120 ctx = (rr_ctx_t *) malloc( sizeof *ctx );
126 //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR );
127 ctx->nn_sock = nn_socket( AF_SP, NN_PULL );
128 if( ctx->nn_sock < NULL_SOCKET ) {
133 snprintf( wbuf, sizeof( wbuf ), "tcp://0.0.0.0:%s", port );
134 state = nn_bind( ctx->nn_sock, wbuf );
136 nn_close( ctx->nn_sock );
145 Configure and bind the publisher. Port is a string as it's probably read from
146 the command line, so no need to atoi() it for us. We can use the rr_* functions
147 for message buffers and sending, so we reuse their context rather than define our
151 extern void* open_publisher( char* port ) {
153 char conn_info[1024];
155 if( (pctx = (rr_ctx_t *) malloc( sizeof( *pctx )) ) == NULL ) {
159 pctx->nn_sock = nn_socket( AF_SP, NN_PUB ); // publishing socket
160 if( pctx->nn_sock < 0 ) {
161 fprintf( stderr, "[CRI] unable to open publish socket: %s\n", strerror( errno ) );
166 snprintf( conn_info, sizeof( conn_info ), "tcp://0.0.0.0:%s", port ); // listen on any interface
167 if( nn_bind( pctx->nn_sock, conn_info ) < 0) { // bind and automatically accept client sessions
168 fprintf (stderr, "[CRI] unable to bind publising port: %s: %s\n", port, strerror( errno ) );
169 nn_close ( pctx->nn_sock );
174 return (void *) pctx;
177 extern rr_mbuf_t* rr_new_buffer( rr_mbuf_t* mb, int len ) {
180 mb = (rr_mbuf_t *) malloc( sizeof( *mb ) );
184 if( mb->size < len ) { // if requested len is larger than current payload
185 nn_freemsg( mb->payload );
193 if( len > 0 && !mb->payload ) { // allow a payloadless buffer to be allocated
194 mb->payload = nn_allocmsg( len, 0 );
201 Closes the currently open session.
203 extern void rr_close( void* vctx ) {
206 if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
210 if( ctx->nn_sock < NULL_SOCKET ) {
214 nn_close( ctx->nn_sock );
218 extern void rr_free( void* vctx ) {
221 if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
230 extern void rr_free_mbuf( rr_mbuf_t* mbuf ) {
231 if( mbuf->payload ) {
232 nn_freemsg( mbuf->payload );
233 mbuf->payload = NULL;
234 mbuf->used = -2; // just in case they held a pointer and try to use it
240 extern rr_mbuf_t* rr_receive( void* vctx, rr_mbuf_t* mbuf, int len ) {
243 if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
247 if( ctx->nn_sock < 0 ) {
248 errno = ESTALE; // stale/bad socket fd
252 mbuf = rr_new_buffer( mbuf, len );
258 if( (mbuf->used = nn_recv( ctx->nn_sock, mbuf->payload, mbuf->size, 0 )) > 0 ) {
259 errno = 0; // nano doesn't seem to clear errno here
264 extern rr_mbuf_t* rr_send( void* vctx, rr_mbuf_t* mbuf, int alloc_buf ) {
269 if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
274 if( ctx->nn_sock < 0 ) {
275 errno = ESTALE; // stale/bad socket fd
280 errno = ENOBUFS; // not quite right, but close enough
284 if( ! mbuf->payload ) { // no payload????
285 errno = EFAULT; // nil is a bad address after all :)
290 //fprintf( stderr, "rrsend is sending %d bytes....\n", mbuf->used );
291 if( (state = nn_send( ctx->nn_sock, &mbuf->payload, NN_MSG, 0 )) > 0 ) {
292 //fprintf( stderr, "send ok to %d: %d %s\n", ctx->nn_sock, state, strerror( errno ) );
295 mbuf->payload = nn_allocmsg( mbuf->size, 0 ); // allocate the next send buffer
297 mbuf->payload = NULL;
303 fprintf( stderr, "send failed %d %s\n", state, strerror( errno ) );
310 Set the receive timeout to time. If time >100 we assume the time is milliseconds,
311 else we assume seconds. Setting -1 is always block.
312 Returns the nn value (0 on success <0 on error).
314 extern int rr_rcv_to( void* vctx, int time ) {
317 if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
324 time = time * 1000; // assume seconds, nn wants ms
328 return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
332 Set the send timeout to time. If time >100 we assume the time is milliseconds,
333 else we assume seconds. Setting -1 is always block.
334 Returns the nn value (0 on success <0 on error).
336 extern int rr_send_to( void* vctx, int time ) {
339 if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
346 time = time * 1000; // assume seconds, nn wants ms
350 return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );