CI: Add silent cmake SonarCloud scan
[ric-plt/lib/rmr.git] / test / rtg_sim / req_resp.c
1 // :vi ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       req_rep.c
23         Abstract:       A "library" module which allows a programme to easily be a requestor
24                                 or replier.  Some functions are compatable with publishing (mbuf
25                                 allocation and management).  Underlying we use the NN_PAIR and NOT
26                                 the req/rep model as that model is an inflexible, lock step, exchange
27                                 which does not lend well for a request that results in more than one
28                                 response messages, or no response.
29
30                                 The user must be aware that once a session is established on the
31                                 host:port listener, another session will not be accepted until the
32                                 first is terminated; nano makes no provision for multiple concurrent
33                                 sesssions with either the PAIR or REQ/RESP models.
34
35                                 We also support starting the publisher socket as the buffer and
36                                 send functions can be used for the publisher too.
37
38                                 CAUTION:  this is based on nanomsg, not NNG. The underlying protocols
39                                         are compatable, and because NNG has an emulation mode it is possible
40                                         to link successsfully with the nng library, BUT that will not
41                                         work here.   Link only with nanomsg.
42
43         Date:           18 January 2018
44         Author:         E. Scott Daniels
45
46 */
47
48 #include <ctype.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51 #include <string.h>
52 #include <time.h>
53 #include <unistd.h>
54 #include <errno.h>
55 #include <stdint.h>
56
57 #include <nanomsg/nn.h>
58 #include <nanomsg/pair.h>
59 #include <nanomsg/pipeline.h>
60 #include <nanomsg/pubsub.h>
61
62 #include "req_resp.h"
63
64 #define NULL_SOCKET 0           // fluff that is treated like a nil pointer check by coverage checker
65
66
67 /*
68         Connect to the host as a requestor. returns context if
69         successful.
70 */
71 extern void* rr_connect( char* host, char* port ) {
72         rr_ctx_t* ctx = NULL;
73         char    wbuf[1024];
74         int             state;
75
76         if( host == NULL || port == NULL ) {
77                 errno = EINVAL;
78                 return NULL;
79         }
80
81         ctx = (rr_ctx_t *) malloc( sizeof *ctx );
82         if( ctx == NULL ) {
83                 errno = ENOMEM;
84                 return NULL;
85         }
86
87         //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR );
88         ctx->nn_sock = nn_socket( AF_SP, NN_PUSH );
89         if( ctx->nn_sock < NULL_SOCKET ) {
90                 free( ctx );
91                 return NULL;
92         }
93         snprintf( wbuf, sizeof( wbuf ), "tcp://%s:%s", host, port );
94         state = nn_connect( ctx->nn_sock, wbuf );
95         if( state < 0 ) {
96                 fprintf( stderr, "rr_conn: connect failed: %s: %d %s\n", wbuf, errno, strerror( errno ) );
97                 nn_close( ctx->nn_sock );
98                 free( ctx );
99                 return NULL;
100         }
101
102         //fprintf( stderr, "rr_conn: connect successful: %s\n", wbuf );
103         return (void *) ctx;
104 }
105
106
107 /*
108         Set up as a listener on any interface with the given port.
109 */
110 extern void* rr_start_listening( char* port ) {
111         rr_ctx_t* ctx;
112         char    wbuf[1024];
113         int             state;
114
115         if( port == NULL ) {
116                 errno = EINVAL;
117                 return NULL;
118         }
119
120         ctx = (rr_ctx_t *) malloc( sizeof *ctx );
121         if( ctx == NULL ) {
122                 errno = EINVAL;
123                 return NULL;
124         }
125
126         //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR );
127         ctx->nn_sock = nn_socket( AF_SP, NN_PULL );
128         if( ctx->nn_sock < NULL_SOCKET ) {
129                 free( ctx );
130                 return NULL;
131         }
132
133         snprintf( wbuf, sizeof( wbuf ), "tcp://0.0.0.0:%s", port );
134         state = nn_bind( ctx->nn_sock, wbuf );
135         if( state < 0 ) {
136                 nn_close( ctx->nn_sock );
137                 free( ctx );
138                 return NULL;
139         }
140
141         return (void *) ctx;
142 }
143
144 /*
145         Configure and bind the publisher. Port is a string as it's probably read from
146         the command line, so no need to atoi() it for us.  We can use the rr_* functions
147         for message buffers and sending, so we reuse their context rather than define our
148         own.
149
150 */
151 extern void*  open_publisher( char*  port ) {
152         rr_ctx_t*       pctx;
153         char            conn_info[1024];
154
155         if( (pctx = (rr_ctx_t *) malloc( sizeof( *pctx )) ) == NULL ) {
156                 return NULL;
157         }
158
159     pctx->nn_sock = nn_socket( AF_SP, NN_PUB );         // publishing socket
160     if( pctx->nn_sock < 0 ) {
161         fprintf( stderr, "[CRI] unable to open publish socket: %s\n", strerror( errno ) );
162                 free( pctx );
163         return NULL;
164     }
165
166         snprintf( conn_info, sizeof( conn_info ), "tcp://0.0.0.0:%s", port );                   // listen on any interface
167     if( nn_bind( pctx->nn_sock, conn_info ) < 0) {                                                                      // bind and automatically accept client sessions
168         fprintf (stderr, "[CRI] unable to bind publising port: %s: %s\n", port, strerror( errno ) );
169         nn_close ( pctx->nn_sock );
170                 free( pctx );
171         return NULL;
172     }
173
174         return (void *) pctx;
175 }
176
177 extern rr_mbuf_t* rr_new_buffer( rr_mbuf_t* mb, int len ) {
178
179         if( ! mb ) {
180                 mb = (rr_mbuf_t *) malloc( sizeof( *mb ) );
181                 mb->size = len;
182                 mb->payload = NULL;
183         } else {
184                 if( mb->size < len ) {                                  // if requested len is larger than current payload
185                         nn_freemsg( mb->payload );
186                         mb->payload = NULL;
187                 } else {
188                         len = mb->size;
189                 }
190         }
191         mb->used = 0;
192
193         if( len > 0 && !mb->payload ) {                                                         // allow a payloadless buffer to be allocated
194                 mb->payload = nn_allocmsg( len, 0 );
195         }
196
197         return mb;
198 }
199
200 /*
201         Closes the currently open session.
202 */
203 extern void rr_close( void* vctx ) {
204         rr_ctx_t* ctx;
205
206         if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
207                 return;
208         }
209
210         if( ctx->nn_sock < NULL_SOCKET ) {
211                 return;
212         }
213
214         nn_close( ctx->nn_sock );
215         ctx->nn_sock = -1;
216 }
217
218 extern void rr_free( void* vctx ) {
219         rr_ctx_t* ctx;
220
221         if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
222                 return;
223         }
224
225         rr_close( ctx );
226         nn_term();
227         free( ctx );
228 }
229
230 extern void rr_free_mbuf( rr_mbuf_t* mbuf ) {
231         if( mbuf->payload ) {
232                 nn_freemsg( mbuf->payload );
233                 mbuf->payload = NULL;
234                 mbuf->used = -2;                                // just in case they held a pointer and try to use it
235         }
236
237         free( mbuf );
238 }
239
240 extern rr_mbuf_t*  rr_receive( void* vctx, rr_mbuf_t* mbuf, int len ) {
241         rr_ctx_t* ctx;
242
243         if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
244                 errno = EINVAL;
245                 return NULL;
246         }
247         if( ctx->nn_sock < 0 ) {
248                 errno = ESTALE;                                 // stale/bad socket fd
249                 return NULL;
250         }
251
252         mbuf = rr_new_buffer( mbuf, len );
253         if( mbuf == NULL ) {
254                 return NULL;
255         }
256
257         *mbuf->payload = 0;
258         if( (mbuf->used = nn_recv( ctx->nn_sock, mbuf->payload, mbuf->size, 0 )) > 0 ) {
259                 errno = 0;                                              // nano doesn't seem to clear errno here
260         }
261         return mbuf;
262 }
263
264 extern rr_mbuf_t* rr_send( void* vctx, rr_mbuf_t* mbuf, int alloc_buf ) {
265         rr_ctx_t* ctx;
266         int len;
267         int state;
268
269         if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
270                 errno = EINVAL;
271                 return NULL;
272         }
273
274         if( ctx->nn_sock < 0 ) {
275                 errno = ESTALE;                                 // stale/bad socket fd
276                 return NULL;
277         }
278
279         if( ! mbuf ) {
280                 errno = ENOBUFS;                        // not quite right, but close enough
281                 return NULL;
282         }
283
284         if( ! mbuf->payload ) {                 // no payload????
285                 errno = EFAULT;                         // nil is a bad address after all :)
286                 return mbuf;
287         }
288
289         errno = 0;
290         //fprintf( stderr, "rrsend is sending %d bytes....\n", mbuf->used );
291         if( (state = nn_send( ctx->nn_sock, &mbuf->payload, NN_MSG, 0 )) > 0 ) {
292                 //fprintf( stderr, "send ok to %d:  %d %s\n", ctx->nn_sock, state, strerror( errno ) );
293                 mbuf->used = 0;
294                 if( alloc_buf ) {
295                         mbuf->payload = nn_allocmsg( mbuf->size, 0 );                                   // allocate the next send buffer
296                 } else {
297                         mbuf->payload = NULL;
298                         mbuf->used = -1;
299                 }
300
301                 errno = 0;
302         } else {
303                 fprintf( stderr, "send failed %d %s\n", state, strerror( errno ) );
304         }
305
306         return mbuf;
307 }
308
309 /*
310         Set the receive timeout to time. If time >100 we assume the time is milliseconds,
311         else we assume seconds. Setting -1 is always block.
312         Returns the nn value (0 on success <0 on error).
313 */
314 extern int rr_rcv_to( void* vctx, int time ) {
315         rr_ctx_t* ctx;
316
317         if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
318                 errno = EINVAL;
319                 return -1;
320         }
321
322         if( time > 0 ) {
323                 if( time < 100 ) {
324                         time = time * 1000;                     // assume seconds, nn wants ms
325                 }
326         }
327
328         return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
329 }
330
331 /*
332         Set the send timeout to time. If time >100 we assume the time is milliseconds,
333         else we assume seconds. Setting -1 is always block.
334         Returns the nn value (0 on success <0 on error).
335 */
336 extern int rr_send_to( void* vctx, int time ) {
337         rr_ctx_t* ctx;
338
339         if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
340                 errno = EINVAL;
341                 return -1;
342         }
343
344         if( time > 0 ) {
345                 if( time < 100 ) {
346                         time = time * 1000;                     // assume seconds, nn wants ms
347                 }
348         }
349
350         return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
351 }
352