Enable multi-thread receiver support
[ric-plt/lib/rmr.git] / test / test_nng_em.c
1 /*
2 ==================================================================================
3         Copyright (c) 2019 Nokia
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 /*
21         Mnemonic:       test_nng_em.c
22         Abstract:       A nano/NNG message emulator for testing without needing to
23                                 actually have nanomsg, nng, or external processes.
24                                 We also emulate the epoll_wait() function for controlled
25                                 poll related testing.
26
27                                 This module must be directly included to be used.
28         Date:           11 February 2019
29         Author:         E. Scott Daniels
30 */
31
32
33 #include "rmr.h"                                // we use some of rmr defs in building dummy messages, so we need these
34 #include "rmr_agnostic.h"
35
36 // ---------------------- emulated nng functions ---------------------------
37
38
39 #ifndef _em_nn
40 #define _em_nn
41
42 #include <pthread.h>
43
44 static int em_send_failures = 0;        // test programme can set this to emulate eagain send failures
45 static int em_timeout = -1;                     // set by set socket option
46 static int em_mtc_msgs = 0;                     // set to generate 'received' messages with mt-call header data
47 static int return_value = 0;            // functions should return this value
48 static int rcv_count = 0;                       // receive counter for transaction id to allow test to rest
49 static int rcv_delay = 0;                       // forced delay before call to rcvmsg starts to work
50
51 static int gates_ok = 0;
52 static pthread_mutex_t rcv_gate;
53 static int em_gen_long_hostname = 0;            // if set the emulated hostname generates a longer name (>40 char)
54
55
56 // ----------- gethostname emulation ---------------------------------------
57 #define gethostname  em_gethostname
58 static int em_gethostname( char* buf, size_t len ) {
59         if( len < 1 ) {
60                 errno = EINVAL;
61                 return 1;
62         }
63
64         if( em_gen_long_hostname ) {
65                 snprintf( buf, len, "hostname-which-is-long-a860430b890219-dfw82" );
66         } else {
67                 snprintf( buf, len, "em-hostname" );
68         }
69
70         return 0;
71 }
72
73 static int em_set_long_hostname( int v ) {
74         em_gen_long_hostname = !!v;
75 }
76
77 // ----------- epoll emulation ---------------------------------------------
78
79 // CAUTION: sys/epoll.h must be included before these define and function will properly compile.
80 #define epoll_wait em_wait
81 #define epoll_ctl  em_ep_ctl
82 #define epoll_create  em_ep_create
83
84 /*
85         Every other call returns 1 ready; alternate calls return 0 ready.
86         Mostly for testing the timeout receive call. First call should return
87         something ready and the second should return nothing ready so we can
88         drive both cases.
89 */
90 static int em_wait( int fd, void* events, int n, int to ) {
91         static int ready = 0;
92
93         ready = !ready;
94         return ready;
95 }
96
97 int em_ep_ctl( int epfd, int op, int fd, struct epoll_event *event ) {
98         return 0;
99 }
100
101 int em_ep_create( int size ) {
102         return 0;
103 }
104
105
106
107 /*
108         Simulated v1 message for receive to return. This needs to match the RMr header
109         so that we can fill in length, type and xaction id things.
110 #define MSG_VER 1
111 struct em_msg {
112         int32_t mtype;                                          // message type  ("long" network integer)
113         int32_t plen;                                           // payload length
114         int32_t rmr_ver;                                        // our internal message version number
115         unsigned char xid[32];                          // space for user transaction id or somesuch
116         unsigned char sid[32];                          // sender ID for return to sender needs
117         unsigned char src[16];                          // name of the sender (source)
118         unsigned char meid[32];                         // managed element id.
119         struct timespec ts;                                     // timestamp ???
120 };
121 */
122
123 /*
124         v2 message; should be able to use it for everything that is set up here as
125         we don't add a payload even if setting a v1 type.
126 */
127 #define ALT_MSG_VER 1   // alternate every so often
128 #define MSG_VER 3               // default version to insert
129 struct em_msg {
130         int32_t mtype;                                          // message type  ("long" network integer)
131         int32_t plen;                                           // payload length
132         int32_t rmr_ver;                                        // our internal message version number
133         unsigned char xid[32];                          // space for user transaction id or somesuch
134         unsigned char sid[32];                          // sender ID for return to sender needs
135         unsigned char src[64];                          // name of the sender (source)
136         unsigned char meid[32];                         // managed element id.
137         struct timespec ts;                                     // timestamp ???
138
139                                             // V2 extension
140         int32_t flags;                      // HFL_* constants
141         int32_t len0;                       // length of the RMr header data
142         int32_t len1;                       // length of the tracing data
143         int32_t len2;                       // length of data 1 (d1)
144         int32_t len3;                       // length of data 2 (d2)
145         int32_t sub_id;                                         // subscription id (-1 invalid)
146
147                                                                                 // V3 stuff
148         unsigned char srcip[64];                                // sender ID for return to sender needs
149 };
150
151
152
153 // --  emulation control functions ------------------------------------------------------
154
155 /*
156         Test app can call this to have all emulated functions return failure instead
157         of success.
158 */
159 static void en_set_return( int rv ) {
160         return_value = rv;
161 }
162
163
164
165 static int em_nng_foo() {
166         fprintf( stderr, "emulated functions in play" );
167 }
168
169
170 /*
171         Turns on/off the generation of multi-threaded call messages
172 */
173 static int em_set_mtc_msgs( int state ) {
174         em_mtc_msgs = state;
175 }
176
177 /*
178         Returns the size of the header we inserted
179 */
180 static int em_hdr_size() {
181         if( em_mtc_msgs ) {
182                 return (int) sizeof( struct em_msg ) + 4;
183         }
184
185         return (int) sizeof( struct em_msg );
186 }
187
188 static void em_set_rcvcount( int v ) {
189         rcv_count = v;
190 }
191
192 static void em_set_rcvdelay( int v ) {
193         if( v < 0 ) {
194                 fprintf( stderr, "<EM>   ##ERR## attempt to set receive delay with invalid value was ignored: %d seconds\n", v );
195                 return;
196         }
197         fprintf( stderr, "<EM>   receive delay is now %d seconds\n", v );
198         rcv_delay = v;
199 }
200
201 static void em_start() {
202         if( ! gates_ok ) {
203                 pthread_mutex_init( &rcv_gate, NULL );
204                 gates_ok = 1;
205         }
206 }
207
208 //--------------------------------------------------------------------------
209 #ifdef EMULATE_NNG
210 struct nn_msghdr {
211         int boo;
212 };
213
214
215 /*
216         Receive message must allocate a new buffer and return the pointer into *m.
217         Every 9 messages or so we'll simulate an old version message
218
219         If em_mtc_msgs is set, then we add a non-zero d1 field with
220         the call-id set to 2, and alternate the call flag
221 */
222 static int em_nng_recvmsg( nng_socket s, nng_msg ** m, int i ) {
223         static int call_flag = 0;
224
225         void* b;
226         struct em_msg* msg;
227         int trace_size = 0;
228         int d1_size = 0;
229         unsigned char* d1;
230
231         if( rcv_delay > 0 ) {
232                 sleep( rcv_delay );
233         }
234
235         if( em_mtc_msgs ) {
236                 d1_size = 4;
237         }
238
239         if( m != NULL ) {
240                 b = (void *) malloc( 2048 );
241                 memset( b, 0, 2048 );
242
243                 *m = (nng_msg *) b;
244                 msg = (struct em_msg *) b;
245                 if( ! em_mtc_msgs  &&  (rcv_count % 10) == 9 ) {
246                         msg->rmr_ver = ALT_MSG_VER;                                                     // allow emulation the bug in RMr v1
247                 } else {
248                         msg->rmr_ver = htonl( MSG_VER );
249                 }
250
251                 msg->mtype = htonl( 1 );
252                 msg->plen = htonl( 220 );
253                 msg->len0 = htonl( sizeof( struct em_msg ) );
254                 msg->len1 = htonl( trace_size );
255                 msg->len2 = htonl( d1_size );
256                 msg->len3 = htonl( 0 );
257
258                 pthread_mutex_lock( &rcv_gate );        // hold lock to update counter/flag
259                 if( em_mtc_msgs ) {
260                         d1 = DATA1_ADDR( msg );
261                         d1[0] = 2;                                                                      // simulated msgs always on chute 2
262                         if( call_flag ) {
263                                 rcv_count++;
264                                 msg->flags |= HFL_CALL_MSG;
265                         }
266                         if( rcv_delay > 0 ) {
267                                 fprintf( stderr, "<EM>    count=%d flag=%d %02x \n", rcv_count, call_flag, msg->flags );
268                         }
269                         call_flag = !call_flag;
270                 } else {
271                         rcv_count++;
272                 }
273                 pthread_mutex_unlock( &rcv_gate );
274                 snprintf( msg->xid, 32, "%015d", rcv_count );           // simple transaction id so we can test receive specific and ring stuff
275                 snprintf( msg->src, 64, "localhost:4562" );             // set src id (unrealistic) so that rts() can be tested
276                 snprintf( msg->srcip, 64, "89.2.19.19:4562" );          // set src ip for rts testing
277
278                 //fprintf( stderr, ">>> simulated received message: %s %s p=%p len0=%d\n", msg->src, msg->srcip, msg, (int) ntohl( msg->len0 ) );
279         } else {
280                 fprintf( stderr, "<WARN> em: simulated receive no msg pointer provided\n" );
281         }
282
283         return return_value;
284 }
285
286 static void* em_msg_body( nng_msg* msg ) {
287         return (void *) msg;                                                            // we don't manage a real msg, so body is just the buffer we allocated
288 }
289
290 static size_t em_msg_len( const nng_msg* msg ) {
291         if( msg ) {
292                 return  2048;
293         }
294
295         return 0;
296 }
297
298
299 static int em_nng_pull_open(nng_socket * s ) {
300         return return_value;
301 }
302 static int em_nng_pull0_open(nng_socket * s ) {
303         return return_value;
304 }
305 static int em_nng_listen(nng_socket s, const char * c, nng_listener * l, int i ) {
306         return return_value;
307 }
308 static int em_nng_close(nng_socket s ) {
309         return return_value;
310 }
311 static int em_nng_push0_open(nng_socket * s ) {
312         return return_value;
313 }
314 static int em_nng_dial(nng_socket s, const char * c, nng_dialer * d, int i ) {
315         //fprintf( stderr, "<info> === simulated dialing: %s\n", c );
316         return return_value;
317 }
318 static int em_nng_setopt(nng_socket s, const char * c, const void * p, size_t t ) {
319         return return_value;
320 }
321 static int em_nng_sub_open(nng_socket * s ) {
322         return return_value;
323 }
324 static int em_nng_sub0_open(nng_socket * s ) {
325         return return_value;
326 }
327 static int em_nng_recv(nng_socket s, void * v, size_t * t, int i ) {
328         return return_value;
329 }
330 static int em_nng_send( nng_socket s, void* m, int l, int f ) {
331         free( m );                                      // we must ditch the message as nng does (or reuses)
332         return return_value;
333 }
334
335 /*
336         Emulate sending a message. If the global em_send_failures is set,
337         then every so often we fail with an EAGAIN to drive that part
338         of the code in RMr.
339 */
340 static int em_sendmsg( nng_socket s, nng_msg* m, int i ) {
341         static int count = 0;
342
343         if( em_send_failures && (count++ % 15 == 14) ) {
344                 //fprintf( stderr, ">>>> failing send\n\n" );
345                 return NNG_EAGAIN;
346         }
347
348         return return_value;
349 }
350
351 static void* em_nng_alloc( size_t len ) {
352         return malloc( len );
353 }
354
355 static int em_nng_msg_alloc( nng_msg** mp, size_t l ) {
356         void*   p;
357
358         if( !mp || return_value != 0  ) {
359                 return -1;
360         }
361
362         p = (void *) malloc( sizeof( char ) * l );
363         *mp = (nng_msg *) p;
364
365         return return_value;
366 }
367
368 /*
369         We just free the buffer here as it was a simple malloc.
370 */
371 static void em_nng_free( void* p, size_t l ) {
372         if( p ) {
373                 free( p );
374         }
375 }
376 static void em_nng_msg_free( void* p ) {
377         if( p ) {
378                 free( p );
379         }
380 }
381
382 static int em_dialer_create( void* d, nng_socket s, char* stuff ) {
383         //fprintf( stderr, ">>>> emulated dialer create\n\n" );
384         return 0;
385 }
386
387 static int em_dialer_start( nng_dialer d, int i ) {
388         //fprintf( stderr, ">>>> emulated dialer start\n\n" );
389         return return_value;
390 }
391
392
393 static int em_dialer_setopt_ms( nng_dialer dialer, void* option, int ms ) {
394         return return_value;
395 }
396
397 static int em_nng_getopt_int( nng_socket s, void* con, int* target ) {
398         if( target ) {
399                 *target = 0;
400         }
401         return return_value;
402 }
403
404
405
406 // nng redefines some of these to point directly to various 'versions' of the function (ugg, function versions, really?)
407 #undef nng_recvmsg
408 #undef nng_free
409 #undef nng_pull_open
410 #undef nng_pull0_open
411 #undef nng_listen
412 #undef nng_close
413 #undef nng_getopt_int
414 #undef nng_push0_open
415 #undef nng_dial
416 #undef nng_setopt
417 #undef nng_sub_open
418 #undef nng_sub0_open
419 #undef nng_recv
420 #undef nng_alloc
421
422 #define nng_msg_alloc em_nng_msg_alloc
423 #define nng_recvmsg em_nng_recvmsg
424 #define nng_free em_nng_free
425 #define nng_free em_nng_free
426 #define nng_msg_free em_nng_msg_free
427 #define nng_pull_open em_nng_pull_open
428 #define nng_pull0_open em_nng_pull0_open
429 #define nng_listen em_nng_listen
430 #define nng_close em_nng_close
431 #define nng_getopt_int em_nng_getopt_int
432 #define nng_push0_open em_nng_push0_open
433 #define nng_dial em_nng_dial
434 #define nng_setopt em_nng_setopt
435 #define nng_sub_open em_nng_sub_open
436 #define nng_sub0_open em_nng_sub0_open
437 #define nng_recv em_nng_recv
438 #define nng_send em_nng_send
439 #define nng_sendmsg em_sendmsg
440 #define nng_alloc em_nng_alloc
441 #define nng_free em_nng_free
442 #define nng_dialer_setopt_ms em_dialer_setopt_ms
443 #define nng_dialer_start em_dialer_start
444 #define nng_dialer_create em_dialer_create
445 #define nng_msg_body em_msg_body
446 #define nng_msg_len em_msg_len
447
448
449 #else
450
451
452 // ----------------------- emulated nano functions --------------------------
453 struct em_nn_msghdr {
454         int dummy;
455 };
456
457 static int em_nn_socket (int domain, int protocol ) {
458         static int s = 1;
459
460         return ++s;
461 }
462
463 static int em_nn_close (int s ) {
464         return 1;
465 }
466
467 //static int em_nn_setsockopt (int s, int level, int option, const void *optval, size_t optvallen ) {
468         //return 1;
469 //}
470
471 static int em_nn_getsockopt (int s, int level, int option, void *optval, size_t *optvallen ) {
472         return 1;
473 }
474
475 static int em_nn_bind (int s, const char *addr ) {
476         //      fprintf( stderr, ">>> ===== emulated bind called ====\n" );
477         return 1;
478 }
479
480 static int em_nn_connect (int s, const char *addr ) {
481         return 1;
482 }
483
484 static int em_nn_shutdown (int s, int how ) {
485         return 1;
486 }
487
488 static int em_nn_send (int s, const void *buf, size_t len, int flags ) {
489         return 1;
490 }
491
492 static int em_nn_recv (int s, void *m, size_t len, int flags ) {
493         void* b;
494         struct em_msg* msg;
495         static int count = 0;                   // we'll simulate a message going in by dropping an rmr-ish msg with transaction id only
496         int trace_size = 0;
497         static int counter = 0;                         // if timeout value is set; we return timeout (eagain) every 3 calls
498         int d1_size = 0;
499
500         if( em_timeout > 0 ) {
501                 counter++;
502                 if( counter % 3 == 0 ) {
503                         return EAGAIN;
504                 }
505         }
506
507         if( em_mtc_msgs ) {
508                 d1_size = 4;
509         }
510
511         b = (void *) malloc( 2048 );
512         if( m != NULL ) {                                               // blindly we assume this is 2k or bigger
513                 memset( m, 0, 2048 );
514                 msg = (struct em_msg *) m;
515                 if( count % 10  == 9 ) {
516                         //msg->rmr_ver = htonl( MSG_VER );
517                         msg->rmr_ver = ALT_MSG_VER;             // emulate the bug in RMr v1
518                 } else {
519                         msg->rmr_ver = htonl( MSG_VER );
520                 }
521                 msg->mtype = htonl( 1 );
522                 msg->plen = htonl( 220 );
523                 msg->len0 = htonl( sizeof( struct em_msg ) );
524                 msg->len1 = htonl( trace_size );
525                 msg->len2 = htonl( d1_size );
526                 msg->len3 = htonl( 0 );
527                 snprintf( msg->xid, 32, "%015d", count++ );             // simple transaction id so we can test receive specific and ring stuff
528                 snprintf( msg->src, 64, "localhost:4562" );             // set src id (unrealistic) so that rts() can be tested
529                 snprintf( msg->srcip, 64, "89.2.19.19:4562" );          // set src ip for rts testing
530                 //fprintf( stderr, "<EM>   returning message len=%d\n\n", ntohl( msg->plen ) );
531         } else {
532                 fprintf( stderr, "<EM>   message was nil\n\n" );
533         }
534
535         //fprintf( stderr, ">>> simulated received message: %s %s len=%d p=%p\n", msg->src, msg->srcip, ntohl( msg->plen ), m );
536         return 2048;
537 }
538
539 static int em_sendmsg (int s, const struct em_nn_msghdr *msghdr, int flags ) {
540         return 1;
541 }
542
543 static int em_nn_recvmsg (int s, struct nn_msghdr *msghdr, int flags ) {
544         return 1;
545 }
546
547 static void em_nn_freemsg( void* ptr ) {
548         free( ptr );
549         return;
550 }
551
552 /*
553         Hacky implementation of set sock opt. We assume value is a pointer to int and ignore size.
554 */
555 static int em_setsockopt( int sock, int foo, int action, int* value, int size ) {
556         if( action ==  NN_RCVTIMEO ) {
557                 em_timeout = *value;
558         }
559 }
560
561
562 // nanomsg
563 #define nn_socket  em_nn_socket
564 #define nn_close  em_nn_close
565 //#define nn_setsockopt  em_nn_setsockopt
566 #define nn_getsockopt  em_nn_getsockopt
567 #define nn_bind  em_nn_bind
568 #define nn_connect  em_nn_connect
569 #define nn_shutdown  em_nn_shutdown
570 #define nn_send  em_nn_send
571 #define nn_recv  em_nn_recv
572 #define nn_sendmsg  em_nn_sendmsg
573 #define nn_recvmsg  em_nn_recvmsg
574 #define nn_setsockopt  em_setsockopt
575 #define nn_freemsg  em_nn_freemsg
576
577 #endif
578
579
580 #endif