enhance(API): Add multi-threaded call
[ric-plt/lib/rmr.git] / test / test_nng_em.c
1 /*
2 ==================================================================================
3         Copyright (c) 2019 Nokia
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 /*
21         Mnemonic:       test_nng_em.c
22         Abstract:       A nano/NNG message emulator for testing without needing to
23                                 actually have nanomsg, nng, or external processes.
24                                 We also emulate the epoll_wait() function for controlled
25                                 poll related testing.
26
27                                 This module must be directly included to be used.
28         Date:           11 February 2019
29         Author:         E. Scott Daniels
30 */
31
32
33 #include "rmr.h"                                // we use some of rmr defs in building dummy messages, so we need these
34 #include "rmr_agnostic.h"
35
36 // ---------------------- emulated nng functions ---------------------------
37
38
39 #ifndef _em_nn
40 #define _em_nn
41
42 #include <pthread.h>
43
44 static int em_send_failures = 0;        // test programme can set this to emulate eagain send failures
45 static int em_timeout = -1;                     // set by set socket option
46 static int em_mtc_msgs = 0;                     // set to generate 'received' messages with mt-call header data
47 static int return_value = 0;            // functions should return this value
48 static int rcv_count = 0;                       // receive counter for transaction id to allow test to rest
49 static int rcv_delay = 0;                       // forced delay before call to rcvmsg starts to work
50
51 static int gates_ok = 0;
52 static pthread_mutex_t rcv_gate;
53
54
55 // ----------- epoll emulation ---------------------------------------------
56
57 // CAUTION: sys/epoll.h must be included before this define and function will properly compile.
58 #define epoll_wait em_wait
59 /*
60         Every other call returns 1 ready; alternate calls return 0 ready.
61         Mostly for testing the timeout receive call. First call should return
62         something ready and the second should return nothing ready so we can
63         drive both cases.
64 */
65 static int em_wait( int fd, void* events, int n, int to ) {
66         static int ready = 0;
67
68         ready = !ready;
69         return ready;
70 }
71
72
73
74 /*
75         Simulated v1 message for receive to return. This needs to match the RMr header
76         so that we can fill in length, type and xaction id things.
77 #define MSG_VER 1
78 struct em_msg {
79         int32_t mtype;                                          // message type  ("long" network integer)
80         int32_t plen;                                           // payload length
81         int32_t rmr_ver;                                        // our internal message version number
82         unsigned char xid[32];                          // space for user transaction id or somesuch
83         unsigned char sid[32];                          // sender ID for return to sender needs
84         unsigned char src[16];                          // name of the sender (source)
85         unsigned char meid[32];                         // managed element id.
86         struct timespec ts;                                     // timestamp ???
87 };
88 */
89
90 /*
91         v2 message; should be able to use it for everything that is set up here as
92         we don't add a payload even if setting a v1 type.
93 */
94 #define ALT_MSG_VER 1   // alternate every so often
95 #define MSG_VER 2               // default version to insert
96 struct em_msg {
97         int32_t mtype;                                          // message type  ("long" network integer)
98         int32_t plen;                                           // payload length
99         int32_t rmr_ver;                                        // our internal message version number
100         unsigned char xid[32];                          // space for user transaction id or somesuch
101         unsigned char sid[32];                          // sender ID for return to sender needs
102         unsigned char src[64];                          // name of the sender (source)
103         unsigned char meid[32];                         // managed element id.
104         struct timespec ts;                                     // timestamp ???
105
106                                             // V2 extension
107         int32_t flags;                      // HFL_* constants
108         int32_t len0;                       // length of the RMr header data
109         int32_t len1;                       // length of the tracing data
110         int32_t len2;                       // length of data 1 (d1)
111         int32_t len3;                       // length of data 2 (d2)
112         int32_t sub_id;                                         // subscription id (-1 invalid)
113 };
114
115
116
117 // --  emulation control functions ------------------------------------------------------
118
119 /*
120         Test app can call this to have all emulated functions return failure instead
121         of success.
122 */
123 static void en_set_return( int rv ) {
124         return_value = rv;
125 }
126
127
128
129 static int em_nng_foo() {
130         fprintf( stderr, "emulated functions in play" );
131 }
132
133
134 /*
135         Turns on/off the generation of multi-threaded call messages
136 */
137 static int em_set_mtc_msgs( int state ) {
138         em_mtc_msgs = state;
139 }
140
141 /*
142         Returns the size of the header we inserted
143 */
144 static int em_hdr_size() {
145         if( em_mtc_msgs ) {
146                 return (int) sizeof( struct em_msg ) + 4;
147         }
148
149         return (int) sizeof( struct em_msg );
150 }
151
152 static void em_set_rcvcount( int v ) {
153         rcv_count = v;
154 }
155
156 static void em_set_rcvdelay( int v ) {
157         rcv_delay = v;
158 }
159
160 static void em_start() {
161         if( ! gates_ok ) {
162                 pthread_mutex_init( &rcv_gate, NULL );
163                 gates_ok = 1;
164         }
165 }
166
167 //--------------------------------------------------------------------------
168 #ifdef EMULATE_NNG
169 struct nn_msghdr {
170         int boo;
171 };
172
173
174 /*
175         Receive message must allocate a new buffer and return the pointer into *m.
176         Every 9 messages or so we'll simulate an old version message
177
178         If em_mtc_msgs is set, then we add a non-zero d1 field with
179         the call-id set to 2, and alternate the call flag
180 */
181 static int em_nng_recvmsg( nng_socket s, nng_msg ** m, int i ) {
182         static int call_flag = 0;
183
184         void* b;
185         struct em_msg* msg;
186         int trace_size = 0;
187         int d1_size = 0;
188         unsigned char* d1;
189
190         if( rcv_delay > 0 ) {
191                 sleep( rcv_delay );
192         }
193
194         if( em_mtc_msgs ) {
195                 d1_size = 4;
196         }
197
198         b = (void *) malloc( 2048 );
199         if( m != NULL ) {
200                 memset( b, 0, 2048 );
201
202                 *m = (nng_msg *) b;
203                 msg = (struct em_msg *) b;
204                 if( ! em_mtc_msgs  &&  (rcv_count % 10) == 9 ) {
205                         msg->rmr_ver = ALT_MSG_VER;                                                     // allow emulation the bug in RMr v1
206                 } else {
207                         msg->rmr_ver = htonl( MSG_VER );
208                 }
209
210                 msg->mtype = htonl( 1 );
211                 msg->plen = htonl( 220 );
212                 msg->len0 = htonl( sizeof( struct em_msg ) );
213                 msg->len1 = htonl( trace_size );
214                 msg->len2 = htonl( d1_size );
215                 msg->len3 = htonl( 0 );
216
217                 pthread_mutex_lock( &rcv_gate );        // hold lock to update counter/flag
218                 if( em_mtc_msgs ) {
219                         d1 = DATA1_ADDR( msg );
220                         d1[0] = 2;                                                                      // simulated msgs always on chute 2
221                         if( call_flag ) {
222                                 rcv_count++;
223                                 msg->flags |= HFL_CALL_MSG;
224                         }
225                         if( rcv_delay > 0 ) {
226                                 fprintf( stderr, "<EM>    count=%d flag=%d %02x \n", rcv_count, call_flag, msg->flags );
227                         }
228                         call_flag = !call_flag;
229                 } else {
230                         rcv_count++;
231                 }
232                 pthread_mutex_unlock( &rcv_gate );
233                 snprintf( msg->xid, 32, "%015d", rcv_count );           // simple transaction id so we can test receive specific and ring stuff
234                 snprintf( msg->src, 16, "localhost:4562" );             // set src id (unrealistic) so that rts() can be tested
235         }
236
237         //fprintf( stderr, ">>> simulated received message: %s\n", msg->xid );
238         return return_value;
239 }
240
241 static void* em_msg_body( nng_msg* msg ) {
242         return (void *) msg;                                                            // we don't manage a real msg, so body is just the buffer we allocated
243 }
244
245 static size_t em_msg_len( const nng_msg* msg ) {
246         if( msg ) {
247                 return  2048;
248         }
249
250         return 0;
251 }
252
253
254 static int em_nng_pull_open(nng_socket * s ) {
255         return return_value;
256 }
257 static int em_nng_pull0_open(nng_socket * s ) {
258         return return_value;
259 }
260 static int em_nng_listen(nng_socket s, const char * c, nng_listener * l, int i ) {
261         return return_value;
262 }
263 static int em_nng_close(nng_socket s ) {
264         return return_value;
265 }
266 static int em_nng_push0_open(nng_socket * s ) {
267         return return_value;
268 }
269 static int em_nng_dial(nng_socket s, const char * c, nng_dialer * d, int i ) {
270         //fprintf( stderr, "<info> === simulated dialing: %s\n", c );
271         return return_value;
272 }
273 static int em_nng_setopt(nng_socket s, const char * c, const void * p, size_t t ) {
274         return return_value;
275 }
276 static int em_nng_sub_open(nng_socket * s ) {
277         return return_value;
278 }
279 static int em_nng_sub0_open(nng_socket * s ) {
280         return return_value;
281 }
282 static int em_nng_recv(nng_socket s, void * v, size_t * t, int i ) {
283
284         return return_value;
285 }
286 static int em_nng_send( nng_socket s, void* m, int l, int f ) {
287         return return_value;
288 }
289
290 /*
291         Emulate sending a message. If the global em_send_failures is set,
292         then every so often we fail with an EAGAIN to drive that part
293         of the code in RMr.
294 */
295 static int em_sendmsg( nng_socket s, nng_msg* m, int i ) {
296         static int count = 0;
297
298         if( em_send_failures && (count++ % 15 == 14) ) {
299                 //fprintf( stderr, ">>>> failing send\n\n" );
300                 return NNG_EAGAIN;
301         }
302
303         return return_value;
304 }
305
306 static void* em_nng_alloc( size_t len ) {
307         return malloc( len );
308 }
309
310 static int em_nng_msg_alloc( nng_msg** mp, size_t l ) {
311         void*   p;
312
313         if( !mp || return_value != 0  ) {
314                 return -1;
315         }
316
317         p = (void *) malloc( sizeof( char ) * l );
318         *mp = (nng_msg *) p;
319
320         return return_value;
321 }
322
323 /*
324         We just free the buffer here as it was a simple malloc.
325 */
326 static void em_nng_free( void* p, size_t l ) {
327         if( p ) {
328                 //fprintf( stderr, ">>>>> not freed: %p\n", p );
329                 free( p );
330         }
331 }
332 static void em_nng_msg_free( void* p ) {
333         if( p ) {
334                 //fprintf( stderr, ">>>>> not freed: %p\n", p );
335                 free( p );
336         }
337 }
338
339 static int em_dialer_create( void* d, nng_socket s, char* stuff ) {
340         //fprintf( stderr, ">>>> emulated dialer create\n\n" );
341         return 0;
342 }
343
344 static int em_dialer_start( nng_dialer d, int i ) {
345         //fprintf( stderr, ">>>> emulated dialer start\n\n" );
346         return return_value;
347 }
348
349
350 static int em_dialer_setopt_ms( nng_dialer dialer, void* option, int ms ) {
351         return return_value;
352 }
353
354 static int em_nng_getopt_int( nng_socket s, void* con, int* target ) {
355         if( target ) {
356                 *target = 0;
357         }
358         return return_value;
359 }
360
361
362
363 // nng redefines some of these to point directly to various 'versions' of the function (ugg, function versions, really?)
364 #undef nng_recvmsg
365 #undef nng_free
366 #undef nng_pull_open
367 #undef nng_pull0_open
368 #undef nng_listen
369 #undef nng_close
370 #undef nng_getopt_int
371 #undef nng_push0_open
372 #undef nng_dial
373 #undef nng_setopt
374 #undef nng_sub_open
375 #undef nng_sub0_open
376 #undef nng_recv
377 #undef nng_alloc
378
379 #define nng_msg_alloc em_nng_msg_alloc
380 #define nng_recvmsg em_nng_recvmsg
381 #define nng_free em_nng_free
382 #define nng_free em_nng_free
383 #define nng_msg_free em_nng_msg_free
384 #define nng_pull_open em_nng_pull_open
385 #define nng_pull0_open em_nng_pull0_open
386 #define nng_listen em_nng_listen
387 #define nng_close em_nng_close
388 #define nng_getopt_int em_nng_getopt_int
389 #define nng_push0_open em_nng_push0_open
390 #define nng_dial em_nng_dial
391 #define nng_setopt em_nng_setopt
392 #define nng_sub_open em_nng_sub_open
393 #define nng_sub0_open em_nng_sub0_open
394 #define nng_recv em_nng_recv
395 #define nng_send em_nng_send
396 #define nng_sendmsg em_sendmsg
397 #define nng_alloc em_nng_alloc
398 #define nng_free em_nng_free
399 #define nng_dialer_setopt_ms em_dialer_setopt_ms
400 #define nng_dialer_start em_dialer_start
401 #define nng_dialer_create em_dialer_create
402 #define nng_msg_body em_msg_body
403 #define nng_msg_len em_msg_len
404
405
406 #else
407
408
409 // ----------------------- emulated nano functions --------------------------
410 struct em_nn_msghdr {
411         int dummy;
412 };
413
414 static int em_nn_socket (int domain, int protocol ) {
415         static int s = 1;
416
417         return ++s;
418 }
419
420 static int em_nn_close (int s ) {
421         return 1;
422 }
423
424 //static int em_nn_setsockopt (int s, int level, int option, const void *optval, size_t optvallen ) {
425         //return 1;
426 //}
427
428 static int em_nn_getsockopt (int s, int level, int option, void *optval, size_t *optvallen ) {
429         return 1;
430 }
431
432 static int em_nn_bind (int s, const char *addr ) {
433         //      fprintf( stderr, ">>> ===== emulated bind called ====\n" );
434         return 1;
435 }
436
437 static int em_nn_connect (int s, const char *addr ) {
438         return 1;
439 }
440
441 static int em_nn_shutdown (int s, int how ) {
442         return 1;
443 }
444
445 static int em_nn_send (int s, const void *buf, size_t len, int flags ) {
446         return 1;
447 }
448
449 static int em_nn_recv (int s, void *m, size_t len, int flags ) {
450         void* b;
451         struct em_msg* msg;
452         static int count = 0;                   // we'll simulate a message going in by dropping an rmr-ish msg with transaction id only
453         int trace_size = 0;
454         static int counter = 0;                         // if timeout value is set; we return timeout (eagain) every 3 calls
455
456         if( em_timeout > 0 ) {
457                 counter++;
458                 if( counter % 3 == 0 ) {
459                         return EAGAIN;
460                 }
461         }
462
463         b = (void *) malloc( 2048 );
464         if( m != NULL ) {                                               // blindly we assume this is 2k or bigger
465                 memset( m, 0, 2048 );
466                 msg = (struct em_msg *) m;
467                 if( count % 10  == 9 ) {
468                         //msg->rmr_ver = htonl( MSG_VER );
469                         msg->rmr_ver = ALT_MSG_VER;             // emulate the bug in RMr v1
470                 } else {
471                         msg->rmr_ver = htonl( MSG_VER );
472                 }
473                 msg->mtype = htonl( 1 );
474                 msg->plen = htonl( 220 );
475                 msg->len0 = htonl( sizeof( struct em_msg ) );
476                 msg->len1 = htonl( trace_size );
477                 snprintf( msg->xid, 32, "%015d", count++ );             // simple transaction id so we can test receive specific and ring stuff
478                 snprintf( msg->src, 16, "localhost:4562" );             // set src id (unrealistic) so that rts() can be tested
479                 //fprintf( stderr, "<EM>   returning message len=%d\n\n", ntohl( msg->plen ) );
480         } else {
481                 fprintf( stderr, "<EM>   message was nil\n\n" );
482         }
483
484         //fprintf( stderr, ">>> simulated received message: %s len=%d\n", msg->xid, msg->plen );
485         return 2048;
486 }
487
488 static int em_sendmsg (int s, const struct em_nn_msghdr *msghdr, int flags ) {
489         return 1;
490 }
491
492 static int em_nn_recvmsg (int s, struct nn_msghdr *msghdr, int flags ) {
493         return 1;
494 }
495
496 static void em_nn_freemsg( void* ptr ) {
497         return;
498 }
499
500 /*
501         Hacky implementation of set sock opt. We assume value is a pointer to int and ignore size.
502 */
503 static int em_setsockopt( int sock, int foo, int action, int* value, int size ) {
504         if( action ==  NN_RCVTIMEO ) {
505                 em_timeout = *value;
506         }
507 }
508
509
510 // nanomsg
511 #define nn_socket  em_nn_socket
512 #define nn_close  em_nn_close
513 //#define nn_setsockopt  em_nn_setsockopt
514 #define nn_getsockopt  em_nn_getsockopt
515 #define nn_bind  em_nn_bind
516 #define nn_connect  em_nn_connect
517 #define nn_shutdown  em_nn_shutdown
518 #define nn_send  em_nn_send
519 #define nn_recv  em_nn_recv
520 #define nn_sendmsg  em_nn_sendmsg
521 #define nn_recvmsg  em_nn_recvmsg
522 #define nn_setsockopt  em_setsockopt
523 #define nn_freemsg  em_nn_freemsg
524
525 #endif
526
527
528 #endif