1 int epoll_to = 1;// global default epoll timout -- 1ms
2 char* eparm; // generic env pointer
6 int rcv_fd; // pollable fd
7 struct epoll_event epe; // event definition for event to listen to
8 struct epoll_event events[10]; // wait on 10 possible events
10 int max_payload_size; // ++SCOTT
11 rmr_mbuf_t* sbuf; // send buffer
12 rmr_mbuf_t* rbuf; // received buffer
16 struct rmr_context * rmr_init_wrapper(char* lport){
18 struct rmr_context *rmr_c = malloc(sizeof (struct rmr_context));
19 fprintf( stderr, "[INFO] glistening for replies on %s\n", lport );
22 rmr_c->mrc = rmr_init(lport, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // setup RMr and get a context (rmr_c.mrc)
25 //polling related initializations
26 rmr_c->rcv_fd = rmr_get_rcvfd( rmr_c->mrc );// get the fd to poll for messages received
27 if( rmr_c->rcv_fd < 0 ) {
28 fprintf( stderr, "[FAIL] unable to set up polling fd\n" );
32 if( (rmr_c->ep_fd = epoll_create1( 0 )) < 0 ) {
33 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
36 rmr_c->epe.events = EPOLLIN;
37 rmr_c->epe.data.fd = rmr_c->rcv_fd;
39 if( epoll_ctl( rmr_c->ep_fd, EPOLL_CTL_ADD, rmr_c->rcv_fd, &rmr_c->epe ) != 0 ) {
40 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
43 //end of polling related initializations
45 //++SCOTT------ next lines until !!SCOTT
46 rmr_c->max_payload_size = 240; //default
47 if( (eparm = getenv( "DEMO_MAX_PAYLOAD_BYTES" )) != NULL ) {
48 rmr_c->max_payload_size = atoi(eparm); // override with env
53 rmr_c->sbuf = rmr_alloc_msg( rmr_c->mrc, rmr_c->max_payload_size ); // allocate largest payload to send
59 void rmr_pure_send (struct rmr_context *rmr_c, int mtype, char* message) {
60 //--SCOTT int max_payload_size = 240; //default
61 //--SCOTT if( (eparm = getenv( "DEMO_MAX_PAYLOAD_BYTES" )) != NULL ) {
62 //--SCOTT max_payload_size = atoi(eparm);
66 snprintf( rmr_c->sbuf->payload, rmr_c->max_payload_size, "%s", message); // simple send message -- replace with real content
68 rmr_c->sbuf->mtype = mtype; // fill in message meta data
69 rmr_c->sbuf->len = strlen( rmr_c->sbuf->payload ) + 1; // actual length of payload (count the nil end of string)
70 rmr_c->sbuf->state = 0;
71 rmr_send_msg( rmr_c->mrc, rmr_c->sbuf );
72 // //retry send for a few times before giving up
73 // long natter = 0; // natter on for errors only once in a while
74 // if( (rmr_c->sbuf = rmr_send_msg( rmr_c->mrc, rmr_c->sbuf )) != NULL ) {// unlikely, but might get a null pointer back if NNG really is buggered
75 // if( rmr_c->sbuf->state != RMR_OK ) {
76 // if( errno == EAGAIN ) {
77 // while( rmr_c->sbuf->state != RMR_OK && errno == EAGAIN ) {// NNG likes to refuse sends, just keep trying on eagain
78 // rmr_send_msg( rmr_c->mrc, rmr_c->sbuf );
80 // } else {// most likely connection refused, don't natter on
81 // if( time( NULL ) > natter ) {
82 // fprintf( stderr, "[WARN] send failed, pausing (%s)\n", strerror( errno ) );
83 // natter = time( NULL ) + 2;
89 // if( time( NULL ) > natter ) {
90 // fprintf( stderr, "[WARN] send failed, pausing (%s)\n", strerror( errno ) );
91 // natter = time( NULL ) + 2;
97 int rmr_poll_for_message (struct rmr_context *rmr_c) {
98 //start polling the channel to read the acknowledgment
104 nready = epoll_wait( rmr_c->ep_fd, rmr_c->events, 10, epoll_to ); // wait up to epoll_to ms for a response
105 for( i = 0; i < nready && i < 10; i++ ) { // loop through to find what is ready
106 if( rmr_c->events[i].data.fd == rmr_c->rcv_fd ) { // RMr has something
108 rmr_c->rbuf = rmr_rcv_msg( rmr_c->mrc, rmr_c->rbuf ); // something ready; this should not block
110 //fprintf( stderr, "<TEST> acknowledgment received:%s\n",rmr_c->rbuf->payload);
111 return_type = 1; //the message has been acknowledged
117 void rmr_close_wrapper (struct rmr_context *rmr_c){
119 rmr_close( rmr_c->mrc );