X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fxapp-utils%2Fxapp_rmr.hpp;h=ab3e77a256028174b459cb4174338cfd094641f4;hb=60af3c9e625d903c1659c703bd1e41167a972c59;hp=2ba6fb1e3f18d608475cf2712f342e2b816b2e17;hpb=6df19a4dacb4fcb6edb35a32af9c8f5c07c95e37;p=ric-app%2Fhw.git diff --git a/src/xapp-utils/xapp_rmr.hpp b/src/xapp-utils/xapp_rmr.hpp index 2ba6fb1..ab3e77a 100755 --- a/src/xapp-utils/xapp_rmr.hpp +++ b/src/xapp-utils/xapp_rmr.hpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -58,10 +59,10 @@ typedef struct{ class XappRmr{ private: - std::string _xapp_name; std::string _proto_port; int _nattempts; bool _rmr_is_ready; + bool _listen; void* _xapp_rmr_ctx; rmr_mbuf_t* _xapp_send_buff; // send buffer rmr_mbuf_t* _xapp_received_buff; // received buffer @@ -69,49 +70,93 @@ private: public: - XappRmr(std::string, std::string, int rmrattempts=10); + XappRmr(std::string, int rmrattempts=10); ~XappRmr(void); void xapp_rmr_init(void); template void xapp_rmr_receive(MessageProcessor&&, XappRmr *parent); bool xapp_rmr_send(xapp_rmr_header*, void*); - bool xapp_rmr_rts(); + + void set_listen(bool); + bool get_listen(void); + int get_is_ready(void); + bool get_isRunning(void); + void* get_rmr_context(void); }; -//RMR receive -template -void XappRmr::xapp_rmr_receive(MessageProcessor&& msgproc, XappRmr *parent){ - char* listen_port; +// main workhorse thread which does the listen->process->respond loop +template +void XappRmr::xapp_rmr_receive(MsgHandler&& msgproc, XappRmr *parent){ - if( (listen_port = getenv( "RMR_RCV_PORT" )) == NULL ) { - mdclog_write(MDCLOG_ERR,"No Listening port assigned, file= %s, line=%d",__FILE__,__LINE__); - } + bool* resend = new bool(false); + // Get the thread id + std::thread::id my_id = std::this_thread::get_id(); + std::stringstream thread_id; + std::stringstream ss; - if(!_rmr_is_ready){ + thread_id << my_id; + + // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe) + if(!parent->get_is_ready()){ mdclog_write( MDCLOG_ERR, "RMR Shows Not Ready in RECEIVER, file= %s, line=%d ",__FILE__,__LINE__); return; - } + } + void *rmr_context = parent->get_rmr_context(); + assert(rmr_context != NULL); + + // Get buffer specific to this thread + this->_xapp_received_buff = NULL; + this->_xapp_received_buff = rmr_alloc_msg(rmr_context, RMR_DEF_SIZE); + assert(this->_xapp_received_buff != NULL); + + mdclog_write(MDCLOG_INFO, "Starting receiver thread %s", thread_id.str().c_str()); + + while(parent->get_listen()) { + mdclog_write(MDCLOG_INFO, "Listening at Thread: %s", thread_id.str().c_str()); - while( 1 ) { - parent->_xapp_received_buff = rmr_rcv_msg( parent->_xapp_rmr_ctx, parent->_xapp_received_buff ); // block until one arrives - if( parent->_xapp_received_buff->mtype < 0 || parent->_xapp_received_buff->state != RMR_OK ) { - mdclog_write(MDCLOG_ERR, "bad msg: state=%d errno=%d, file= %s, line=%d", parent->_xapp_received_buff->state, errno, __FILE__,__LINE__ ); - return; - } else { - std::cout << "The Message Received is:" << (char*)parent->_xapp_received_buff->payload <mtype <_xapp_received_buff = rmr_rcv_msg( rmr_context, this->_xapp_received_buff ); + + if( this->_xapp_received_buff->mtype < 0 || this->_xapp_received_buff->state != RMR_OK ) { + mdclog_write(MDCLOG_ERR, "bad msg: state=%d errno=%d, file= %s, line=%d", this->_xapp_received_buff->state, errno, __FILE__,__LINE__ ); + return; + } + else + { + mdclog_write(MDCLOG_INFO,"RMR Received Message of Type: %d",this->_xapp_received_buff->mtype); + mdclog_write(MDCLOG_INFO,"RMR Received Message: %s",(char*)this->_xapp_received_buff->payload); + + //in case message handler returns true, need to resend the message. + msgproc(this->_xapp_received_buff, resend); + if(*resend){ + //mdclog_write(MDCLOG_INFO,"RMR Return to Sender Message of Type: %d",this->_xapp_received_buff->mtype); + //mdclog_write(MDCLOG_INFO,"RMR Return to Sender Message: %s",(char*)this->_xapp_received_buff->payload); + rmr_rts_msg(rmr_context, this->_xapp_received_buff ); + sleep(1); + *resend = false; } + continue; } - return; + + } + // Clean up + try{ + delete resend; + rmr_free_msg(this->_xapp_received_buff); + } + catch(std::runtime_error &e){ + std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; + std::string error_string = identifier = " Error freeing RMR message "; + mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); + } + + return; } + + + #endif /* XAPP_RMR_XAPP_RMR_H_ */