X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fxapp-utils%2Fxapp_rmr.hpp;h=41390360b019dfde9bfa8c991e4883ec5d1e2025;hb=refs%2Fchanges%2F07%2F4907%2F2;hp=2ba6fb1e3f18d608475cf2712f342e2b816b2e17;hpb=433b7b2a72c174c75ce2c0c75b225fa0bb813d32;p=ric-app%2Fhw.git diff --git a/src/xapp-utils/xapp_rmr.hpp b/src/xapp-utils/xapp_rmr.hpp index 2ba6fb1..4139036 100755 --- a/src/xapp-utils/xapp_rmr.hpp +++ b/src/xapp-utils/xapp_rmr.hpp @@ -1,7 +1,7 @@ /* ================================================================================== - Copyright (c) 2018-2019 AT&T Intellectual Property. + Copyright (c) 2019-2020 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -50,18 +51,19 @@ typedef struct{ int32_t message_type; int32_t state; int32_t payload_length; - unsigned char xid[RMR_MAX_XID]; //space for user transaction id. - unsigned char sid[RMR_MAX_SID]; //sender ID for return to sender needs.(ACKS required) - unsigned char src[RMR_MAX_SRC]; //name of the sender (source) + unsigned char sid[RMR_MAX_SID]; //Subscription ID. + unsigned char src[RMR_MAX_SRC]; //Xapp Name + unsigned char meid[RMR_MAX_MEID]={}; + } xapp_rmr_header; class XappRmr{ private: - std::string _xapp_name; std::string _proto_port; int _nattempts; bool _rmr_is_ready; + bool _listen; void* _xapp_rmr_ctx; rmr_mbuf_t* _xapp_send_buff; // send buffer rmr_mbuf_t* _xapp_received_buff; // received buffer @@ -69,49 +71,97 @@ private: public: - XappRmr(std::string, std::string, int rmrattempts=10); + XappRmr(std::string, int rmrattempts=10); ~XappRmr(void); - void xapp_rmr_init(void); + void xapp_rmr_init(bool); template void xapp_rmr_receive(MessageProcessor&&, XappRmr *parent); + bool xapp_rmr_send(xapp_rmr_header*, void*); - bool xapp_rmr_rts(); + + bool rmr_header(xapp_rmr_header*); + void set_listen(bool); + bool get_listen(void); + int get_is_ready(void); + bool get_isRunning(void); + void* get_rmr_context(void); }; -//RMR receive -template -void XappRmr::xapp_rmr_receive(MessageProcessor&& msgproc, XappRmr *parent){ - char* listen_port; - if( (listen_port = getenv( "RMR_RCV_PORT" )) == NULL ) { - mdclog_write(MDCLOG_ERR,"No Listening port assigned, file= %s, line=%d",__FILE__,__LINE__); - } +// main workhorse thread which does the listen->process->respond loop +template +void XappRmr::xapp_rmr_receive(MsgHandler&& msgproc, XappRmr *parent){ - if(!_rmr_is_ready){ + bool* resend = new bool(false); + // Get the thread id + std::thread::id my_id = std::this_thread::get_id(); + std::stringstream thread_id; + std::stringstream ss; + + thread_id << my_id; + + // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe) + if(!parent->get_is_ready()){ mdclog_write( MDCLOG_ERR, "RMR Shows Not Ready in RECEIVER, file= %s, line=%d ",__FILE__,__LINE__); return; - } + } + void *rmr_context = parent->get_rmr_context(); + assert(rmr_context != NULL); + + // Get buffer specific to this thread + this->_xapp_received_buff = NULL; + this->_xapp_received_buff = rmr_alloc_msg(rmr_context, RMR_DEF_SIZE); + assert(this->_xapp_received_buff != NULL); - while( 1 ) { - parent->_xapp_received_buff = rmr_rcv_msg( parent->_xapp_rmr_ctx, parent->_xapp_received_buff ); // block until one arrives - if( parent->_xapp_received_buff->mtype < 0 || parent->_xapp_received_buff->state != RMR_OK ) { - mdclog_write(MDCLOG_ERR, "bad msg: state=%d errno=%d, file= %s, line=%d", parent->_xapp_received_buff->state, errno, __FILE__,__LINE__ ); - return; - } else { - std::cout << "The Message Received is:" << (char*)parent->_xapp_received_buff->payload <mtype <get_listen()) { + mdclog_write(MDCLOG_INFO, "Listening at Thread: %s", thread_id.str().c_str()); + + this->_xapp_received_buff = rmr_rcv_msg( rmr_context, this->_xapp_received_buff ); + //this->_xapp_received_buff = rmr_call( rmr_context, this->_xapp_received_buff); + + if( this->_xapp_received_buff->mtype < 0 || this->_xapp_received_buff->state != RMR_OK ) { + mdclog_write(MDCLOG_ERR, "bad msg: state=%d errno=%d, file= %s, line=%d", this->_xapp_received_buff->state, errno, __FILE__,__LINE__ ); + return; + } + else + { + mdclog_write(MDCLOG_INFO,"RMR Received Message of Type: %d",this->_xapp_received_buff->mtype); + mdclog_write(MDCLOG_INFO,"RMR Received Message: %s",(char*)this->_xapp_received_buff->payload); + + //in case message handler returns true, need to resend the message. + msgproc(this->_xapp_received_buff, resend); + + if(*resend){ + mdclog_write(MDCLOG_INFO,"RMR Return to Sender Message of Type: %d",this->_xapp_received_buff->mtype); + mdclog_write(MDCLOG_INFO,"RMR Return to Sender Message: %s",(char*)this->_xapp_received_buff->payload); + rmr_rts_msg(rmr_context, this->_xapp_received_buff ); + sleep(1); + *resend = false; } + continue; } - return; + + } + // Clean up + try{ + delete resend; + rmr_free_msg(this->_xapp_received_buff); + } + catch(std::runtime_error &e){ + std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; + std::string error_string = identifier = " Error freeing RMR message "; + mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); + } + + return; } + + + #endif /* XAPP_RMR_XAPP_RMR_H_ */