X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fxapp-utils%2Fxapp_rmr.hpp;fp=src%2Fxapp-utils%2Fxapp_rmr.hpp;h=30aea8ca6271929f4dccf6a46453d70c7fefede2;hb=b85024cd183a527cd8d61353637850cb6d30cf36;hp=2ba6fb1e3f18d608475cf2712f342e2b816b2e17;hpb=d63f834656922171ffa04347e34c7bce8a0d41be;p=ric-app%2Fhw.git diff --git a/src/xapp-utils/xapp_rmr.hpp b/src/xapp-utils/xapp_rmr.hpp index 2ba6fb1..30aea8c 100755 --- a/src/xapp-utils/xapp_rmr.hpp +++ b/src/xapp-utils/xapp_rmr.hpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -58,10 +59,10 @@ typedef struct{ class XappRmr{ private: - std::string _xapp_name; std::string _proto_port; int _nattempts; bool _rmr_is_ready; + bool _listen; void* _xapp_rmr_ctx; rmr_mbuf_t* _xapp_send_buff; // send buffer rmr_mbuf_t* _xapp_received_buff; // received buffer @@ -69,7 +70,7 @@ private: public: - XappRmr(std::string, std::string, int rmrattempts=10); + XappRmr(std::string, int rmrattempts=10); ~XappRmr(void); void xapp_rmr_init(void); @@ -78,40 +79,83 @@ public: bool xapp_rmr_send(xapp_rmr_header*, void*); bool xapp_rmr_rts(); + void set_listen(bool); + bool get_listen(void); + int get_is_ready(void); + bool get_isRunning(void); + void* get_rmr_context(void); + }; -//RMR receive -template -void XappRmr::xapp_rmr_receive(MessageProcessor&& msgproc, XappRmr *parent){ - char* listen_port; +// main workhorse thread which does the listen->process->respond loop +template +void XappRmr::xapp_rmr_receive(MsgHandler&& msgproc, XappRmr *parent){ - if( (listen_port = getenv( "RMR_RCV_PORT" )) == NULL ) { - mdclog_write(MDCLOG_ERR,"No Listening port assigned, file= %s, line=%d",__FILE__,__LINE__); - } + bool* resend = new bool(false); + // Get the thread id + std::thread::id my_id = std::this_thread::get_id(); + std::stringstream thread_id; + std::stringstream ss; - if(!_rmr_is_ready){ + thread_id << my_id; + + // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe) + if(!parent->get_is_ready()){ mdclog_write( MDCLOG_ERR, "RMR Shows Not Ready in RECEIVER, file= %s, line=%d ",__FILE__,__LINE__); return; - } + } + void *rmr_context = parent->get_rmr_context(); + assert(rmr_context != NULL); + + // Get buffer specific to this thread + this->_xapp_received_buff = NULL; + this->_xapp_received_buff = rmr_alloc_msg(rmr_context, RMR_DEF_SIZE); + assert(this->_xapp_received_buff != NULL); + + mdclog_write(MDCLOG_INFO, "Starting thread %s", thread_id.str().c_str()); + + while(parent->get_listen()) { + mdclog_write(MDCLOG_INFO, "Listening at Thread: %s", thread_id.str().c_str()); - while( 1 ) { - parent->_xapp_received_buff = rmr_rcv_msg( parent->_xapp_rmr_ctx, parent->_xapp_received_buff ); // block until one arrives - if( parent->_xapp_received_buff->mtype < 0 || parent->_xapp_received_buff->state != RMR_OK ) { - mdclog_write(MDCLOG_ERR, "bad msg: state=%d errno=%d, file= %s, line=%d", parent->_xapp_received_buff->state, errno, __FILE__,__LINE__ ); - return; - } else { - std::cout << "The Message Received is:" << (char*)parent->_xapp_received_buff->payload <mtype <_xapp_received_buff = rmr_rcv_msg( rmr_context, this->_xapp_received_buff ); + + if( this->_xapp_received_buff->mtype < 0 || this->_xapp_received_buff->state != RMR_OK ) { + mdclog_write(MDCLOG_ERR, "bad msg: state=%d errno=%d, file= %s, line=%d", this->_xapp_received_buff->state, errno, __FILE__,__LINE__ ); + return; + } + else + { + mdclog_write(MDCLOG_INFO,"RMR Received Message of Type: %d",this->_xapp_received_buff->mtype); + mdclog_write(MDCLOG_INFO,"RMR Received Message: %s",(char*)this->_xapp_received_buff->payload); + + //in case message handler returns true, need to resend the message. + msgproc(this->_xapp_received_buff, resend); + if(*resend){ + rmr_rts_msg(rmr_context, this->_xapp_received_buff ); + sleep(1); + *resend = false; } + continue; } - return; + + } + // Clean up + try{ + delete resend; + rmr_free_msg(this->_xapp_received_buff); + } + catch(std::runtime_error &e){ + std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; + std::string error_string = identifier = " Error freeing RMR message "; + mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); + } + + return; } + + + #endif /* XAPP_RMR_XAPP_RMR_H_ */