/*
==================================================================================
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
};
XappRmr::~XappRmr(void){
-
// free memory
if(_xapp_received_buff)
rmr_free_msg(_xapp_received_buff);
};
//Get RMR Context.
-void XappRmr::xapp_rmr_init(){
+void XappRmr::xapp_rmr_init(bool rmr_listen){
+
// Initialize the RMR context
_xapp_rmr_ctx = rmr_init(const_cast<char*>(_proto_port.c_str()), RMR_MAX_RCV_BYTES, RMRFL_NONE);
_rmr_is_ready = true;
mdclog_write(MDCLOG_INFO,"RMR Context is Ready, file= %s, line=%d",__FILE__,__LINE__);
+ //Set the listener requirement
+ _listen = rmr_listen;
return;
}
-//RMR Returning to the sender.
-bool XappRmr::xapp_rmr_rts()
-{
- mdclog_write(MDCLOG_INFO,"RMR Return to sender, file= %s, line=%d",__FILE__,__LINE__);
- if ( _xapp_rmr_ctx == NULL){
- mdclog_write(MDCLOG_ERR,"Error Initializing RMR, file= %s, line=%d",__FILE__,__LINE__);
- }
- while( ! rmr_ready(_xapp_rmr_ctx) ) {
- mdclog_write(MDCLOG_INFO,">>> waiting for RMR, file= %s, line=%d",__FILE__,__LINE__);
- sleep(1);
- }
- rmr_rts_msg(_xapp_rmr_ctx, _xapp_received_buff );
- sleep(1);
+
+bool XappRmr::rmr_header(xapp_rmr_header *hdr){
+
+ _xapp_send_buff->mtype = hdr->message_type;
+ _xapp_send_buff->len = hdr->payload_length;
+ _xapp_send_buff->sub_id = -1;
+ rmr_str2meid(_xapp_send_buff, hdr->meid);
+
+
return true;
}
//RMR Send with payload and header.
bool XappRmr::xapp_rmr_send(xapp_rmr_header *hdr, void *payload){
+ // Get the thread id
+ std::thread::id my_id = std::this_thread::get_id();
+ std::stringstream thread_id;
+ std::stringstream ss;
+
+ thread_id << my_id;
+ mdclog_write(MDCLOG_INFO, "Sending thread %s", thread_id.str().c_str());
+
+
int rmr_attempts = _nattempts;
if( _xapp_send_buff == NULL ) {
_xapp_send_buff = rmr_alloc_msg(_xapp_rmr_ctx, RMR_DEF_SIZE);
}
- _xapp_send_buff->mtype = hdr->message_type;
+
+ bool res = rmr_header(hdr);
+ if(!res){
+ mdclog_write(MDCLOG_ERR,"RMR HEADERS were incorrectly populated, file= %s, line=%d",__FILE__,__LINE__);
+ return false;
+ }
+
memcpy(_xapp_send_buff->payload, payload, hdr->payload_length);
_xapp_send_buff->len = hdr->payload_length;
}
while(rmr_attempts > 0){
- _xapp_send_buff = rmr_send_msg(_xapp_rmr_ctx,_xapp_send_buff);
+ _xapp_send_buff = rmr_send_msg(_xapp_rmr_ctx,_xapp_send_buff);
if(!_xapp_send_buff) {
mdclog_write(MDCLOG_ERR,"Error In Sending Message , file= %s, line=%d, attempt=%d",__FILE__,__LINE__,rmr_attempts);
rmr_attempts--;
else
{
mdclog_write(MDCLOG_INFO,"Need to retry RMR: state=%d, attempt=%d, file=%s, line=%d",_xapp_send_buff->state, rmr_attempts,__FILE__,__LINE__);
- rmr_attempts--;
+ if(_xapp_send_buff->state == RMR_ERR_RETRY){
+ usleep(1); }
+ rmr_attempts--;
}
sleep(1);
}