Add initial codes
[it/test.git] / ons_2019_demo / load_gen / load_gen.c
1 /*
2  *
3  * Copyright 2019 AT&T Intellectual Property
4  * Copyright 2019 Nokia
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  */
19
20 // :vim ts=4 sw=4 noet:
21
22 /*
23         Mnemonic:       load_gen.c
24         Abstract:       Very simple load generator. Reads the message rate from a
25                                 file (argv[1]) which is expected to be between 0 and 100000.
26                                 Messages are sent with a fixed mtype of 104.
27
28         Date:           24 March 2019
29         Author:         E. Scott Daniels
30 */
31
32 #include <unistd.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <fcntl.h>
38 #include <sys/epoll.h>
39 #include <time.h>
40
41 #include <rmr/rmr.h>
42
43 /*
44         Rewinds, reads and converts the value in the file to the number of microseconds (usleep units)
45         that the caller should sleep between messages. If the effective mps is 0, then we block here
46         until the rate goes up. We'll check 1/sec if blocked.
47 */
48 int             mps = 1;                        // msg/sec rate from file  (sloppy, but global helps prevent time calls)
49 static inline int read_delay( int fd ) {
50         char    rbuf[128];              // read buffer
51         int     n;                              // bytes read
52         int             mus = 0;                // mu sec delay
53         double  v;
54
55         if( fd < 0 ) {
56                 return 0;
57         }
58
59         do {
60                 lseek( fd, 0, SEEK_SET );
61                 if( (n = read( fd, rbuf, sizeof( rbuf ) )) >0 ) {
62                         mps = atoi( rbuf );
63                         v = 1.0 / ((double) mps / 1000000.0);           // msg/microsec
64
65                         mus = (int) v;
66                 }
67
68                 if( mus <= 0 ) {
69                         //fprintf( stderr, "<LGEN> sending blocked\n" );
70                         sleep( 1 );
71                 }
72         } while( mus <= 0 );
73
74         return mus;
75 }
76
77 int main( int argc, char** argv ) {
78         int delay_fd;           // file des of the delay file
79     int rcv_fd;     // pollable fd
80     void* mrc;      //msg router context
81     struct epoll_event events[10];          // wait on 10 possible events
82     struct epoll_event epe;                 // event definition for event to listen to
83     int     ep_fd = -1;
84         int nready;
85         int i;
86         int mtype = 104;                                                // we loop through several message types
87         rmr_mbuf_t*             sbuf;                                   // send buffer
88         rmr_mbuf_t*             rbuf;                                   // received buffer
89         int     epoll_to = 1;                                           // epoll timout -- 0 don't call
90         char wbuf[2048];
91         char* lport = "12036";                                  // default listen port
92         int     delay = 2000000;                                        // microsecond delay between messages; default to very slow
93         int     next_read = 0;                                          // counter for next read of delay
94         char*   eparm;
95         char*   rate_file = "rate_file";
96
97
98         if( (eparm = getenv( "DEMO_LOAD_GEN_PORT" )) != NULL ) {
99                 lport = strdup( eparm );
100         }
101
102     mrc = rmr_init( lport, 1400, RMRFL_NONE );
103     rcv_fd = rmr_get_rcvfd( mrc );
104         if( rcv_fd < 0 ) {
105                 fprintf( stderr, "[FAIL] unable to set up polling fd\n" );
106                 exit( 1 );
107         }
108
109         if( argc > 1 ) {
110                 rate_file = argv[1];
111         }
112
113         if( (delay_fd = open( rate_file, O_RDONLY )) < 0 ) {
114                 fprintf( stderr, "abort: unable to open delay file: %s: %d\n", rate_file, errno );
115                 exit( 1 );
116         }
117
118         if( (ep_fd = epoll_create1( 0 )) < 0 ) {
119                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
120                 exit( 1 );
121         }
122     epe.events = EPOLLIN;
123     epe.data.fd = rcv_fd;
124
125     if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
126                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
127                 exit( 1 );
128         }
129
130         sbuf = rmr_alloc_msg( mrc, 256 );
131         rbuf = NULL;
132
133         while( ! rmr_ready( mrc ) ) {
134                 fprintf( stderr, "waiting for RMr to show ready\n" );
135                 sleep( 1 );
136         }
137         fprintf( stderr, "<LGEN> rmr shows ready\n" );
138
139
140         mps = 1;
141     while( 1 ) {
142                 if( next_read <= 0 ) {
143                         delay = read_delay( delay_fd );
144                         next_read = mps;
145                         //fprintf( stderr, "<LGEN> next_read=%d delay=%d\n", next_read, delay );
146                 }
147
148                 snprintf( sbuf->payload, 200, "msg from load generator %d\n", next_read );
149
150                 sbuf->mtype = mtype;
151                 sbuf->len =  strlen( sbuf->payload );
152                 sbuf->state = 0;
153                 sbuf = rmr_send_msg( mrc, sbuf );                       // we send, we dont care about success or failure, but on failure, have a break
154                 if( sbuf->state != RMR_OK ) {
155                         sleep( 1 );
156                         next_read = 0;                                                  // mostly for testing
157                 }
158
159                 nready = epoll_wait( ep_fd, events, 10, 0 );            // we shouldn't have anything, but prevent queue full issues
160                 for( i = 0; i < nready && i < 10; i++ ) {           // loop through to find what is ready
161                         if( events[i].data.fd == rcv_fd ) {             // RMr has something
162                                 errno = 0;
163                                 rbuf = rmr_rcv_msg( mrc, rbuf );
164                         }
165                 }
166
167                 next_read--;
168                 usleep( delay );
169     }
170 }