2 ==================================================================================
3 Copyright (c) 2019 Nokia
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
21 Mnemonic: test_nng_em.c
22 Abstract: A nano/NNG message emulator for testing without needing to
23 actually have nanomsg, nng, or external processes.
24 We also emulate the epoll_wait() function for controlled
27 This module must be directly included to be used.
28 Date: 11 February 2019
29 Author: E. Scott Daniels
33 #include "rmr.h" // we use some of rmr defs in building dummy messages, so we need these
34 #include "rmr_agnostic.h"
36 // ---------------------- emulated nng functions ---------------------------
43 #include "test_common_em.c" // things common to all emulation code
45 //--------------------------------------------------------------------------
51 #define SOCKET_TYPE nng_socket // socket representation is different in each transport
54 Receive message must allocate a new buffer and return the pointer into *m.
55 Every 9 messages or so we'll simulate an old version message
57 If em_mtc_msgs is set, then we add a non-zero d1 field with
58 the call-id set to 2, and alternate the call flag
60 static int em_nng_recvmsg( nng_socket s, nng_msg ** m, int i ) {
61 static int call_flag = 0;
78 b = (void *) malloc( 2048 );
82 msg = (struct em_msg *) b;
83 if( ! em_mtc_msgs && (rcv_count % 10) == 9 ) {
84 msg->rmr_ver = ALT_MSG_VER; // allow emulation the bug in RMr v1
86 msg->rmr_ver = htonl( MSG_VER );
89 msg->mtype = htonl( 1 );
90 msg->plen = htonl( 220 );
91 msg->len0 = htonl( sizeof( struct em_msg ) );
92 msg->len1 = htonl( trace_size );
93 msg->len2 = htonl( d1_size );
94 msg->len3 = htonl( 0 );
96 pthread_mutex_lock( &rcv_gate ); // hold lock to update counter/flag
98 d1 = DATA1_ADDR( msg );
99 d1[0] = 2; // simulated msgs always on chute 2
102 msg->flags |= HFL_CALL_MSG;
104 if( rcv_delay > 0 ) {
105 fprintf( stderr, "<EM> count=%d flag=%d %02x \n", rcv_count, call_flag, msg->flags );
107 call_flag = !call_flag;
111 pthread_mutex_unlock( &rcv_gate );
112 snprintf( msg->xid, 32, "%015d", rcv_count ); // simple transaction id so we can test receive specific and ring stuff
113 snprintf( msg->src, 64, "localhost:4562" ); // set src id (unrealistic) so that rts() can be tested
114 snprintf( msg->srcip, 64, "89.2.19.19:4562" ); // set src ip for rts testing
116 //fprintf( stderr, ">>> simulated received message: %s %s p=%p len0=%d\n", msg->src, msg->srcip, msg, (int) ntohl( msg->len0 ) );
118 fprintf( stderr, "<WARN> em: simulated receive no msg pointer provided\n" );
124 static void* em_msg_body( nng_msg* msg ) {
125 return (void *) msg; // we don't manage a real msg, so body is just the buffer we allocated
128 static size_t em_msg_len( const nng_msg* msg ) {
137 static int em_nng_pull_open(nng_socket * s ) {
140 static int em_nng_pull0_open(nng_socket * s ) {
143 static int em_nng_listen(nng_socket s, const char * c, nng_listener * l, int i ) {
146 static int em_nng_close(nng_socket s ) {
149 static int em_nng_push0_open(nng_socket * s ) {
152 static int em_nng_dial(nng_socket s, const char * c, nng_dialer * d, int i ) {
153 //fprintf( stderr, "<info> === simulated dialing: %s\n", c );
156 static int em_nng_setopt(nng_socket s, const char * c, const void * p, size_t t ) {
159 static int em_nng_sub_open(nng_socket * s ) {
162 static int em_nng_sub0_open(nng_socket * s ) {
165 static int em_nng_recv(nng_socket s, void * v, size_t * t, int i ) {
168 static int em_nng_send( nng_socket s, void* m, int l, int f ) {
169 free( m ); // we must ditch the message as nng does (or reuses)
174 Emulate sending a message. If the global em_send_failures is set,
175 then every so often we fail with an EAGAIN to drive that part
178 static int em_sendmsg( nng_socket s, nng_msg* m, int i ) {
179 static int count = 0;
181 if( em_send_failures && (count++ % 15 == 14) ) {
182 //fprintf( stderr, ">>>> failing send\n\n" );
189 static void* em_nng_alloc( size_t len ) {
190 return malloc( len );
193 static int em_nng_msg_alloc( nng_msg** mp, size_t l ) {
196 if( !mp || return_value != 0 ) {
200 p = (void *) malloc( sizeof( char ) * l );
207 We just free the buffer here as it was a simple malloc.
209 static void em_nng_free( void* p, size_t l ) {
214 static void em_nng_msg_free( void* p ) {
220 static int em_dialer_create( void* d, nng_socket s, char* stuff ) {
221 //fprintf( stderr, ">>>> emulated dialer create\n\n" );
225 static int em_dialer_start( nng_dialer d, int i ) {
226 //fprintf( stderr, ">>>> emulated dialer start\n\n" );
231 static int em_dialer_setopt_ms( nng_dialer dialer, void* option, int ms ) {
235 static int em_nng_getopt_int( nng_socket s, void* con, int* target ) {
244 // nng redefines some of these to point directly to various 'versions' of the function (ugg, function versions, really?)
248 #undef nng_pull0_open
251 #undef nng_getopt_int
252 #undef nng_push0_open
260 #define nng_msg_alloc em_nng_msg_alloc
261 #define nng_recvmsg em_nng_recvmsg
262 #define nng_free em_nng_free
263 #define nng_free em_nng_free
264 #define nng_msg_free em_nng_msg_free
265 #define nng_pull_open em_nng_pull_open
266 #define nng_pull0_open em_nng_pull0_open
267 #define nng_listen em_nng_listen
268 #define nng_close em_nng_close
269 #define nng_getopt_int em_nng_getopt_int
270 #define nng_push0_open em_nng_push0_open
271 #define nng_dial em_nng_dial
272 #define nng_setopt em_nng_setopt
273 #define nng_sub_open em_nng_sub_open
274 #define nng_sub0_open em_nng_sub0_open
275 #define nng_recv em_nng_recv
276 #define nng_send em_nng_send
277 #define nng_sendmsg em_sendmsg
278 #define nng_alloc em_nng_alloc
279 #define nng_free em_nng_free
280 #define nng_dialer_setopt_ms em_dialer_setopt_ms
281 #define nng_dialer_start em_dialer_start
282 #define nng_dialer_create em_dialer_create
283 #define nng_msg_body em_msg_body
284 #define nng_msg_len em_msg_len
289 #define SOCKET_TYPE int // socket representation is different in each transport
292 // ----------------------- emulated nano functions --------------------------
293 struct em_nn_msghdr {
297 static int em_nn_socket (int domain, int protocol ) {
303 static int em_nn_close (int s ) {
307 //static int em_nn_setsockopt (int s, int level, int option, const void *optval, size_t optvallen ) {
311 static int em_nn_getsockopt (int s, int level, int option, void *optval, size_t *optvallen ) {
315 static int em_nn_bind (int s, const char *addr ) {
316 // fprintf( stderr, ">>> ===== emulated bind called ====\n" );
320 static int em_nn_connect (int s, const char *addr ) {
324 static int em_nn_shutdown (int s, int how ) {
328 static int em_nn_send (int s, const void *buf, size_t len, int flags ) {
332 static int em_nn_recv (int s, void *m, size_t len, int flags ) {
335 static int count = 0; // we'll simulate a message going in by dropping an rmr-ish msg with transaction id only
337 static int counter = 0; // if timeout value is set; we return timeout (eagain) every 3 calls
340 if( em_timeout > 0 ) {
342 if( counter % 3 == 0 ) {
351 b = (void *) malloc( 2048 );
352 if( m != NULL ) { // blindly we assume this is 2k or bigger
353 memset( m, 0, 2048 );
354 msg = (struct em_msg *) m;
355 if( count % 10 == 9 ) {
356 //msg->rmr_ver = htonl( MSG_VER );
357 msg->rmr_ver = ALT_MSG_VER; // emulate the bug in RMr v1
359 msg->rmr_ver = htonl( MSG_VER );
361 msg->mtype = htonl( 1 );
362 msg->plen = htonl( 220 );
363 msg->len0 = htonl( sizeof( struct em_msg ) );
364 msg->len1 = htonl( trace_size );
365 msg->len2 = htonl( d1_size );
366 msg->len3 = htonl( 0 );
367 snprintf( msg->xid, 32, "%015d", count++ ); // simple transaction id so we can test receive specific and ring stuff
368 snprintf( msg->src, 64, "localhost:4562" ); // set src id (unrealistic) so that rts() can be tested
369 snprintf( msg->srcip, 64, "89.2.19.19:4562" ); // set src ip for rts testing
370 //fprintf( stderr, "<EM> returning message len=%d\n\n", ntohl( msg->plen ) );
372 fprintf( stderr, "<EM> message was nil\n\n" );
375 //fprintf( stderr, ">>> simulated received message: %s %s len=%d p=%p\n", msg->src, msg->srcip, ntohl( msg->plen ), m );
379 static int em_sendmsg (int s, const struct em_nn_msghdr *msghdr, int flags ) {
383 static int em_nn_recvmsg (int s, struct nn_msghdr *msghdr, int flags ) {
387 static void em_nn_freemsg( void* ptr ) {
393 Hacky implementation of set sock opt. We assume value is a pointer to int and ignore size.
395 static int em_setsockopt( int sock, int foo, int action, int* value, int size ) {
396 if( action == NN_RCVTIMEO ) {
403 #define nn_socket em_nn_socket
404 #define nn_close em_nn_close
405 //#define nn_setsockopt em_nn_setsockopt
406 #define nn_getsockopt em_nn_getsockopt
407 #define nn_bind em_nn_bind
408 #define nn_connect em_nn_connect
409 #define nn_shutdown em_nn_shutdown
410 #define nn_send em_nn_send
411 #define nn_recv em_nn_recv
412 #define nn_sendmsg em_nn_sendmsg
413 #define nn_recvmsg em_nn_recvmsg
414 #define nn_setsockopt em_setsockopt
415 #define nn_freemsg em_nn_freemsg