Add initial codes
[it/test.git] / ons_2019_demo / a1_med / dummy_a1_rmr_wrapper.h
1 int epoll_to = 1;// global default epoll timout -- 1ms
2 char*   eparm;                                                  // generic env pointer
3
4 struct rmr_context {
5         void* mrc;
6         int rcv_fd;     // pollable fd
7         struct epoll_event epe;                 // event definition for event to listen to
8         struct epoll_event events[10];          // wait on 10 possible events
9         int     ep_fd;
10         int max_payload_size;                   // ++SCOTT
11         rmr_mbuf_t*             sbuf;                                   // send buffer
12         rmr_mbuf_t*             rbuf;                                   // received buffer
13 };
14
15
16 struct rmr_context * rmr_init_wrapper(char* lport){
17
18         struct rmr_context *rmr_c = malloc(sizeof (struct rmr_context));
19         fprintf( stderr, "[INFO] glistening for replies on %s\n", lport );
20
21
22         rmr_c->mrc = rmr_init(lport, RMR_MAX_RCV_BYTES, RMRFL_NONE );                   // setup RMr and get a context (rmr_c.mrc)
23         rmr_c->ep_fd=-1;
24
25         //polling related initializations
26         rmr_c->rcv_fd = rmr_get_rcvfd( rmr_c->mrc );// get the fd to poll for messages received
27         if( rmr_c->rcv_fd < 0 ) {
28                 fprintf( stderr, "[FAIL] unable to set up polling fd\n" );
29                 exit( 1 );
30         }
31
32         if( (rmr_c->ep_fd = epoll_create1( 0 )) < 0 ) {
33                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
34                 exit( 1 );
35         }
36         rmr_c->epe.events = EPOLLIN;
37         rmr_c->epe.data.fd = rmr_c->rcv_fd;
38
39         if( epoll_ctl( rmr_c->ep_fd, EPOLL_CTL_ADD, rmr_c->rcv_fd, &rmr_c->epe ) != 0 )  {
40                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
41                 exit( 1 );
42         }
43         //end of polling related initializations
44
45         //++SCOTT------ next lines until !!SCOTT
46         rmr_c->max_payload_size = 240;                                                                          //default
47         if( (eparm = getenv( "DEMO_MAX_PAYLOAD_BYTES" )) != NULL ) {
48                 rmr_c->max_payload_size = atoi(eparm);                                          // override with env
49         }
50         //!!SCOTT
51
52         //~~SCOTT next line
53         rmr_c->sbuf = rmr_alloc_msg( rmr_c->mrc, rmr_c->max_payload_size );             // allocate largest payload to send
54         rmr_c->rbuf = NULL;
55
56         return rmr_c;
57 }
58
59 void rmr_pure_send (struct rmr_context *rmr_c, int mtype, char* message) {
60         //--SCOTT int max_payload_size = 240; //default
61         //--SCOTT if( (eparm = getenv( "DEMO_MAX_PAYLOAD_BYTES" )) != NULL ) {
62                 //--SCOTT max_payload_size = atoi(eparm);
63         //--SCOTT }
64
65         //~~SCOTT  next line
66         snprintf( rmr_c->sbuf->payload, rmr_c->max_payload_size, "%s", message);                        // simple send message -- replace with real content
67
68         rmr_c->sbuf->mtype = mtype;                                                                             // fill in message meta data
69         rmr_c->sbuf->len =  strlen( rmr_c->sbuf->payload ) + 1;                                 // actual length of payload (count the nil end of string)
70         rmr_c->sbuf->state = 0;
71         rmr_send_msg( rmr_c->mrc, rmr_c->sbuf );
72 //      //retry send for a few times before giving up
73 //      long    natter = 0;                                             // natter on for errors only once in a while
74 //      if( (rmr_c->sbuf = rmr_send_msg( rmr_c->mrc, rmr_c->sbuf )) != NULL ) {// unlikely, but might get a null pointer back if NNG really is buggered
75 //              if( rmr_c->sbuf->state != RMR_OK ) {
76 //                      if( errno == EAGAIN ) {
77 //                              while( rmr_c->sbuf->state != RMR_OK && errno == EAGAIN ) {// NNG likes to refuse sends, just keep trying on eagain
78 //                                      rmr_send_msg( rmr_c->mrc, rmr_c->sbuf );
79 //                              }
80 //                      } else {// most likely connection refused, don't natter on
81 //                              if( time( NULL ) > natter ) {
82 //                                      fprintf( stderr, "[WARN] send failed, pausing (%s)\n", strerror( errno ) );
83 //                                      natter = time( NULL ) + 2;
84 //                              }
85 //                              sleep( 1 );
86 //                      }
87 //              }
88 //      } else {
89 //              if( time( NULL ) > natter ) {
90 //                      fprintf( stderr, "[WARN] send failed, pausing (%s)\n", strerror( errno ) );
91 //                      natter = time( NULL ) + 2;
92 //              }
93 //              sleep( 1 );
94 //      }
95 }
96
97 int rmr_poll_for_message (struct rmr_context *rmr_c) {
98         //start polling the channel to read the acknowledgment
99         //~~SCOTT  next line
100         int     nready;
101         int     i;
102         int return_type =0;
103
104         nready = epoll_wait( rmr_c->ep_fd, rmr_c->events, 10, epoll_to );       // wait up to epoll_to ms for a response
105         for( i = 0; i < nready && i < 10; i++ ) {           // loop through to find what is ready
106                 if( rmr_c->events[i].data.fd == rmr_c->rcv_fd ) {             // RMr has something
107                         errno = 0;
108                         rmr_c->rbuf = rmr_rcv_msg( rmr_c->mrc, rmr_c->rbuf );                   // something ready; this should not block
109                         if( rmr_c->rbuf ) {
110                                 //fprintf( stderr, "<TEST> acknowledgment received:%s\n",rmr_c->rbuf->payload);
111                                 return_type = 1; //the message has been acknowledged
112                         }
113                 }
114         }
115         return return_type;
116 }
117 void rmr_close_wrapper (struct rmr_context *rmr_c){
118
119         rmr_close( rmr_c->mrc );
120         free(rmr_c);
121 }