Incorporating RMR Health check code
[ric-app/hw.git] / src / xapp-utils / xapp_rmr.hpp
index 2ba6fb1..30aea8c 100755 (executable)
@@ -35,6 +35,7 @@
 #include <string.h>
 #include <error.h>
 #include <assert.h>
+#include <sstream>
 #include <thread>
 #include <functional>
 #include <map>
@@ -58,10 +59,10 @@ typedef struct{
 
 class XappRmr{
 private:
-       std::string _xapp_name;
        std::string _proto_port;
        int _nattempts;
        bool _rmr_is_ready;
+    bool _listen;
        void* _xapp_rmr_ctx;
        rmr_mbuf_t*             _xapp_send_buff;                                        // send buffer
        rmr_mbuf_t*             _xapp_received_buff;                                    // received buffer
@@ -69,7 +70,7 @@ private:
 
 public:
 
-       XappRmr(std::string, std::string, int rmrattempts=10);
+       XappRmr(std::string, int rmrattempts=10);
        ~XappRmr(void);
        void xapp_rmr_init(void);
 
@@ -78,40 +79,83 @@ public:
        bool xapp_rmr_send(xapp_rmr_header*, void*);
        bool xapp_rmr_rts();
 
+       void set_listen(bool);
+       bool get_listen(void);
+       int get_is_ready(void);
+       bool get_isRunning(void);
+       void* get_rmr_context(void);
+
 };
 
-//RMR receive
-template <class MessageProcessor>
-void XappRmr::xapp_rmr_receive(MessageProcessor&& msgproc, XappRmr *parent){
-               char*   listen_port;
+// main workhorse thread which does the listen->process->respond loop
+template <class MsgHandler>
+void XappRmr::xapp_rmr_receive(MsgHandler&& msgproc, XappRmr *parent){
 
-               if( (listen_port = getenv( "RMR_RCV_PORT" )) == NULL ) {
-                       mdclog_write(MDCLOG_ERR,"No Listening port assigned, file= %s, line=%d",__FILE__,__LINE__);
-               }
+       bool* resend = new bool(false);
+       // Get the thread id
+       std::thread::id my_id = std::this_thread::get_id();
+       std::stringstream thread_id;
+       std::stringstream ss;
 
-               if(!_rmr_is_ready){
+       thread_id << my_id;
+
+       // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe)
+       if(!parent->get_is_ready()){
                        mdclog_write( MDCLOG_ERR, "RMR Shows Not Ready in RECEIVER, file= %s, line=%d ",__FILE__,__LINE__);
                        return;
-               }
+       }
+       void *rmr_context = parent->get_rmr_context();
+       assert(rmr_context != NULL);
+
+       // Get buffer specific to this thread
+       this->_xapp_received_buff = NULL;
+       this->_xapp_received_buff = rmr_alloc_msg(rmr_context, RMR_DEF_SIZE);
+       assert(this->_xapp_received_buff != NULL);
+
+       mdclog_write(MDCLOG_INFO, "Starting thread %s",  thread_id.str().c_str());
+
+       while(parent->get_listen()) {
+               mdclog_write(MDCLOG_INFO, "Listening at Thread: %s",  thread_id.str().c_str());
 
-                 while( 1 ) {
-                         parent->_xapp_received_buff = rmr_rcv_msg( parent->_xapp_rmr_ctx, parent->_xapp_received_buff );                                              // block until one arrives
-                       if( parent->_xapp_received_buff->mtype < 0 || parent->_xapp_received_buff->state != RMR_OK ) {
-                               mdclog_write(MDCLOG_ERR, "bad msg:  state=%d  errno=%d, file= %s, line=%d", parent->_xapp_received_buff->state, errno, __FILE__,__LINE__ );
-                               return;
-                       } else {
-                               std::cout << "The Message Received is:" << (char*)parent->_xapp_received_buff->payload <<std::endl;
-                               std::cout << "The Message Received Type is:" << _xapp_received_buff->mtype <<std::endl;
-                               _xapp_send_buff = msgproc(_xapp_received_buff);
-                               if(_xapp_send_buff !=NULL)
-                                       xapp_rmr_rts();
-                               //sleep(10);
-                               //_xapp_received_buff = NULL;
+               this->_xapp_received_buff = rmr_rcv_msg( rmr_context, this->_xapp_received_buff );
 
+
+               if( this->_xapp_received_buff->mtype < 0 || this->_xapp_received_buff->state != RMR_OK ) {
+                       mdclog_write(MDCLOG_ERR, "bad msg:  state=%d  errno=%d, file= %s, line=%d", this->_xapp_received_buff->state, errno, __FILE__,__LINE__ );
+                       return;
+               }
+               else
+               {
+                       mdclog_write(MDCLOG_INFO,"RMR Received Message of Type: %d",this->_xapp_received_buff->mtype);
+                       mdclog_write(MDCLOG_INFO,"RMR Received Message: %s",(char*)this->_xapp_received_buff->payload);
+
+                       //in case message handler returns true, need to resend the message.
+                       msgproc(this->_xapp_received_buff, resend);
+                       if(*resend){
+                               rmr_rts_msg(rmr_context, this->_xapp_received_buff );
+                               sleep(1);
+                               *resend = false;
                        }
+                       continue;
 
                }
-           return;
+
+       }
+       // Clean up
+       try{
+               delete resend;
+               rmr_free_msg(this->_xapp_received_buff);
+       }
+       catch(std::runtime_error &e){
+               std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ;
+               std::string error_string = identifier = " Error freeing RMR message ";
+               mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
+       }
+
+       return;
 }
 
+
+
+
 #endif /* XAPP_RMR_XAPP_RMR_H_ */