Add API allowing xAPPs to send alarm messages
[ric-plt/xapp-frame-cpp.git] / src / messaging / messenger.cpp
index 6951f7d..67d5e2d 100644 (file)
 #include "default_cb.hpp"              // default callback prototypes
 #include "message.hpp"
 #include "messenger.hpp"
+#include "alarm.hpp"
 
+namespace xapp {
 
 // --------------- private -----------------------------------------------------
 
 
 // ---------------- C++ buggerd up way of maintining class constants ----------
-const int Messenger::MAX_PAYLOAD = (1024*64);
-const int Messenger::DEFAULT_CALLBACK = -1;
+const int xapp::Messenger::MAX_PAYLOAD = (1024*64);
+const int xapp::Messenger::DEFAULT_CALLBACK = -1;
 
 // --------------- builders -----------------------------------------------
 /*
@@ -60,13 +62,15 @@ const int Messenger::DEFAULT_CALLBACK = -1;
 
        If port is nil, then the default port is used (4560).
 */
-Messenger::Messenger( char* port, bool wait4table ) {
-       if( port == NULL ) {
-               port = (char *) "4560";
+xapp::Messenger::Messenger( const char* uport, bool wait4table ) {
+
+       if( uport == NULL ) {
+               listen_port = strdup( "4560" );
+       } else {
+               listen_port = strdup( uport );
        }
 
        gate = new std::mutex();
-       listen_port = strdup( port );
        mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
 
        if( wait4table ) {
@@ -78,15 +82,63 @@ Messenger::Messenger( char* port, bool wait4table ) {
        ok_2_run = true;
 }
 
+/*
+       Move support. We DO allow the instance to be moved as only one copy
+       remains following the move.
+       Given a source object instance (soi) we move the information to
+       the new object, and then DELETE what was moved so that when the
+       user frees the soi, it doesn't destroy what we snarfed.
+*/
+xapp::Messenger::Messenger( Messenger&& soi ) {
+       mrc = soi.mrc;
+       listen_port = soi.listen_port;
+       ok_2_run = soi.ok_2_run;
+       gate = soi.gate;
+               cb_hash = soi.cb_hash;                          // this seems dodgy
+
+       soi.gate = NULL;
+       soi.listen_port = NULL;
+       soi.mrc = NULL;
+}
+
+/*
+       Move operator. Given a source object instance, movee it's contents
+       to this insance.  We must first clean up this instance.
+*/
+xapp::Messenger& Messenger::operator=( Messenger&& soi ) {
+       if( this != &soi ) {                            // cannot move onto ourself
+               if( mrc != NULL ) {
+                       rmr_close( mrc );
+               }
+               if( listen_port != NULL ) {
+                       free( listen_port );
+               }
+
+               mrc = soi.mrc;
+               listen_port = soi.listen_port;
+               ok_2_run = soi.ok_2_run;
+               gate = soi.gate;
+                       cb_hash = soi.cb_hash;                          // this seems dodgy
+
+               soi.gate = NULL;
+               soi.listen_port = NULL;
+               soi.mrc = NULL;
+       }
+
+       return *this;
+}
+
 /*
        Destroyer.
 */
-Messenger::~Messenger() {
+xapp::Messenger::~Messenger() {
        if( mrc != NULL ) {
                rmr_close( mrc );
        }
 
-       free( listen_port );
+       if( listen_port != NULL ) {
+               free( listen_port );
+       }
 }
 
 /*
@@ -101,7 +153,7 @@ Messenger::~Messenger() {
        is defined for a message type, the default callback function is invoked.
        If a default is not provided, a non-matching message is silently dropped.
 */
-void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
+void xapp::Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
        Callback*       cb;
 
        cb = new Callback( fun_name, data );
@@ -115,18 +167,57 @@ void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
        finished, but may keep the message for as long as is necessary
        and reuse it over and over.
 */
-//Message* Messenger::Alloc_msg( int payload_size ) {
-std::unique_ptr<Message> Messenger::Alloc_msg( int payload_size ) {
+std::unique_ptr<Message> xapp::Messenger::Alloc_msg( int payload_size ) {
        return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
 }
 
-void Messenger::Listen( ) {
+
+// ----------------- alarm support -----------------------------------------------
+/*
+       Allocate an alarm object.
+       Alarms must be allocated via the framework becasue we need a wormhole
+       id and to get one of those we need the mrc.  We can easily send with
+       just a message, but to avoid having the user pass the framework
+       object in, we'll just supply a "factory" function.
+*/
+std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( int prob_id, std::string meid ) {
+       std::shared_ptr<Message> m;
+       Alarm* a;
+
+       m = Alloc_msg( 4096 );
+       a = new Alarm( m, prob_id, meid );
+       a->Set_whid( Wormhole_open( a->Get_endpoint() ) );
+
+       return std::unique_ptr<Alarm>( a );
+}
+
+std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( std::string meid ) {
+       return Alloc_alarm( -1, meid );
+}
+
+std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( ) {
+       return Alloc_alarm( -1, "" );
+}
+
+
+// ------------------- listening support -----------------------------------------------
+
+/*
+       The Listen function waits for messages and drives the appropriate callback
+       function when one is received. This function will return to the caller
+       only when the ok to run flag in the object has been set to false (likely
+       never, or only at graceful termination). Callers should normally not
+       expect to have controll returned in the calling thread.
+
+       Concurrently executing listeners are allowed.
+*/
+void xapp::Messenger::Listen( ) {
        int count = 0;
        rmr_mbuf_t*     mbuf = NULL;
        std::map<int,Callback*>::iterator mi;   // map iterator; silly indirect way to point at the value
        Callback*       dcb = NULL;                                     // default callback so we don't search
        Callback*       sel_cb;                                         // callback selected to invoke
-       std::unique_ptr<Message>m;
+       std::unique_ptr<Message> m;
 
        if( mrc == NULL ) {
                return;
@@ -162,27 +253,26 @@ void Messenger::Listen( ) {
 
 /*
        Wait for the next message, up to a max timout, and return the message received.
+       This function allows the user xAPP to implement their own polling loop (no callbacks).
 */
-std::unique_ptr<Message>  Messenger::Receive( int timeout ) {
+std::unique_ptr<Message>  xapp::Messenger::Receive( int timeout ) {
        rmr_mbuf_t*     mbuf = NULL;
-       //std::unique_ptr<Message> m;
-
-       if( mrc == NULL ) {
-               return NULL;
-       }
+       std::unique_ptr<Message> m = NULL;
 
-       mbuf = rmr_torcv_msg( mrc, mbuf, timeout );             // future: do we want to reuse the mbuf here?
-       if( mbuf != NULL ) {
-               return std::unique_ptr<Message>( new Message( mbuf, mrc ) );
+       if( mrc != NULL ) {
+               mbuf = rmr_torcv_msg( mrc, mbuf, timeout );             // future: do we want to reuse the mbuf here?
+               if( mbuf != NULL ) {
+                       m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );
+               }
        }
 
-       return NULL;
+       return m;
 }
 
 /*
        Called to gracefully stop all listeners.
 */
-void Messenger::Stop( ) {
+void xapp::Messenger::Stop( ) {
        ok_2_run = false;
 }
 
@@ -190,7 +280,7 @@ void Messenger::Stop( ) {
        RMR messages must be released by RMR as there might be transport
        buffers that have to be dealt with. Every callback is expected to
        call this function when finished with the message.
-void Messenger::Release_mbuf( void* vmbuf ) {
+void xapp::Messenger::Release_mbuf( void* vmbuf ) {
        rmr_free_msg( (rmr_mbuf_t *)  vmbuf );
 }
 */
@@ -214,7 +304,7 @@ void Messenger::Release_mbuf( void* vmbuf ) {
        incidcates all is ready.  If max_wait is 0, then this will only
        return when RMR is ready to send.
 */
-bool Messenger::Wait_for_cts( int max_wait ) {
+bool xapp::Messenger::Wait_for_cts( int max_wait ) {
        bool block_4ever;
        bool    state = false;
 
@@ -231,3 +321,17 @@ bool Messenger::Wait_for_cts( int max_wait ) {
 
        return state;
 }
+
+/*
+       Open a wormhole to the indicated endpoint and return the wormhole ID.
+*/
+int xapp::Messenger::Wormhole_open( std::string endpoint ) {
+       rmr_whid_t whid;
+
+       whid = rmr_wh_open( mrc, endpoint.c_str() );
+
+       return (int) whid;
+}
+
+
+} // namespace