X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fmessaging%2Fmessenger.cpp;h=446c3c435548db3d07c9f905925b1c6285f855dc;hb=refs%2Fheads%2Fg-release;hp=4e5a278c4dca1a67dfafaf1d1f50c5ec6ab0266f;hpb=0ac0fb452f47bf32beb4965bf7569034c415be93;p=ric-plt%2Fxapp-frame-cpp.git diff --git a/src/messaging/messenger.cpp b/src/messaging/messenger.cpp index 4e5a278..446c3c4 100644 --- a/src/messaging/messenger.cpp +++ b/src/messaging/messenger.cpp @@ -43,14 +43,17 @@ #include "default_cb.hpp" // default callback prototypes #include "message.hpp" #include "messenger.hpp" +#include "alarm.hpp" +#include "metrics.hpp" +namespace xapp { // --------------- private ----------------------------------------------------- // ---------------- C++ buggerd up way of maintining class constants ---------- -const int Messenger::MAX_PAYLOAD = (1024*64); -const int Messenger::DEFAULT_CALLBACK = -1; +const int xapp::Messenger::MAX_PAYLOAD = (1024*64); +const int xapp::Messenger::DEFAULT_CALLBACK = -1; // --------------- builders ----------------------------------------------- /* @@ -60,7 +63,7 @@ const int Messenger::DEFAULT_CALLBACK = -1; If port is nil, then the default port is used (4560). */ -Messenger::Messenger( const char* uport, bool wait4table ) { +xapp::Messenger::Messenger( const char* uport, bool wait4table ) { if( uport == NULL ) { listen_port = strdup( "4560" ); @@ -87,13 +90,13 @@ Messenger::Messenger( const char* uport, bool wait4table ) { the new object, and then DELETE what was moved so that when the user frees the soi, it doesn't destroy what we snarfed. */ -Messenger::Messenger( Messenger&& soi ) { - mrc = soi.mrc; - listen_port = soi.listen_port; - ok_2_run = soi.ok_2_run; - gate = soi.gate; - cb_hash = soi.cb_hash; // this seems dodgy - +xapp::Messenger::Messenger( Messenger&& soi ) : + mrc( soi.mrc ), + listen_port( soi.listen_port ), + ok_2_run( soi.ok_2_run ), + gate( soi.gate ), + cb_hash( soi.cb_hash ) // this seems dodgy +{ soi.gate = NULL; soi.listen_port = NULL; soi.mrc = NULL; @@ -103,13 +106,13 @@ Messenger::Messenger( Messenger&& soi ) { Move operator. Given a source object instance, movee it's contents to this insance. We must first clean up this instance. */ -Messenger& Messenger::operator=( Messenger&& soi ) { +xapp::Messenger& Messenger::operator=( Messenger&& soi ) { if( this != &soi ) { // cannot move onto ourself if( mrc != NULL ) { rmr_close( mrc ); } if( listen_port != NULL ) { - free( listen_port ); + delete( listen_port ); } mrc = soi.mrc; @@ -129,13 +132,13 @@ Messenger& Messenger::operator=( Messenger&& soi ) { /* Destroyer. */ -Messenger::~Messenger() { +xapp::Messenger::~Messenger() { if( mrc != NULL ) { rmr_close( mrc ); } if( listen_port != NULL ) { - free( listen_port ); + delete( listen_port ); } } @@ -144,14 +147,14 @@ Messenger::~Messenger() { message is received. The user may pass an optional data pointer which will be passed to the function when it is called. The function signature must be: - void fun( Messenger* mr, rmr_mbuf_t* mbuf, void* data ); + void fun( Messenger* mr, rmr_mbuf_t* mbuf, void* data ) The user can also invoke this function to set the "default" callback by passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback is defined for a message type, the default callback function is invoked. If a default is not provided, a non-matching message is silently dropped. */ -void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) { +void xapp::Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) { Callback* cb; cb = new Callback( fun_name, data ); @@ -165,18 +168,78 @@ void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) { finished, but may keep the message for as long as is necessary and reuse it over and over. */ -//Message* Messenger::Alloc_msg( int payload_size ) { -std::unique_ptr Messenger::Alloc_msg( int payload_size ) { +std::unique_ptr xapp::Messenger::Alloc_msg( int payload_size ) { return std::unique_ptr( new Message( mrc, payload_size ) ); } -void Messenger::Listen( ) { - int count = 0; + +// ----------------- alarm support ----------------------------------------------- +/* + Allocate an alarm object. + Alarms must be allocated via the framework becasue we need a wormhole + id and to get one of those we need the mrc. We can easily send with + just a message, but to avoid having the user pass the framework + object in, we'll just supply a "factory" function. +*/ +std::unique_ptr xapp::Messenger::Alloc_alarm( int prob_id, const std::string& meid ) { + std::shared_ptr m; + Alarm* a; + + m = Alloc_msg( 4096 ); + a = new Alarm( m, prob_id, meid ); + a->Set_whid( Wormhole_open( a->Get_endpoint() ) ); + + return std::unique_ptr( a ); +} + +std::unique_ptr xapp::Messenger::Alloc_alarm( const std::string& meid ) { + return Alloc_alarm( -1, meid ); +} + +std::unique_ptr xapp::Messenger::Alloc_alarm( ) { + return Alloc_alarm( -1, "" ); +} + + +// ------------------ metrics support -------------------------------------------------- +std::unique_ptr xapp::Messenger::Alloc_metrics( ) { + std::shared_ptr m; + + m = Alloc_msg( 4096 ); + return std::unique_ptr( new xapp::Metrics( m ) ); +} + +std::unique_ptr xapp::Messenger::Alloc_metrics( const std::string& source ) { + std::shared_ptr m; + + m = Alloc_msg( 4096 ); + return std::unique_ptr( new xapp::Metrics( m, source ) ); +} + +std::unique_ptr xapp::Messenger::Alloc_metrics( const std::string& reporter, const std::string& source ) { + std::shared_ptr m; + + m = Alloc_msg( 4096 ); + return std::unique_ptr( new xapp::Metrics( m, reporter, source ) ); +} + +// ------------------- listening support ----------------------------------------------- + +/* + The Listen function waits for messages and drives the appropriate callback + function when one is received. This function will return to the caller + only when the ok to run flag in the object has been set to false (likely + never, or only at graceful termination). Callers should normally not + expect to have controll returned in the calling thread. + + Concurrently executing listeners are allowed. +*/ +void xapp::Messenger::Listen( ) { rmr_mbuf_t* mbuf = NULL; std::map::iterator mi; // map iterator; silly indirect way to point at the value Callback* dcb = NULL; // default callback so we don't search Callback* sel_cb; // callback selected to invoke - std::unique_ptrm; + std::unique_ptr m; if( mrc == NULL ) { return; @@ -212,8 +275,9 @@ void Messenger::Listen( ) { /* Wait for the next message, up to a max timout, and return the message received. + This function allows the user xAPP to implement their own polling loop (no callbacks). */ -std::unique_ptr Messenger::Receive( int timeout ) { +std::unique_ptr xapp::Messenger::Receive( int timeout ) { rmr_mbuf_t* mbuf = NULL; std::unique_ptr m = NULL; @@ -230,7 +294,7 @@ std::unique_ptr Messenger::Receive( int timeout ) { /* Called to gracefully stop all listeners. */ -void Messenger::Stop( ) { +void xapp::Messenger::Stop( ) { ok_2_run = false; } @@ -238,7 +302,7 @@ void Messenger::Stop( ) { RMR messages must be released by RMR as there might be transport buffers that have to be dealt with. Every callback is expected to call this function when finished with the message. -void Messenger::Release_mbuf( void* vmbuf ) { +void xapp::Messenger::Release_mbuf( void* vmbuf ) { rmr_free_msg( (rmr_mbuf_t *) vmbuf ); } */ @@ -262,7 +326,7 @@ void Messenger::Release_mbuf( void* vmbuf ) { incidcates all is ready. If max_wait is 0, then this will only return when RMR is ready to send. */ -bool Messenger::Wait_for_cts( int max_wait ) { +bool xapp::Messenger::Wait_for_cts( int max_wait ) { bool block_4ever; bool state = false; @@ -279,3 +343,17 @@ bool Messenger::Wait_for_cts( int max_wait ) { return state; } + +/* + Open a wormhole to the indicated endpoint and return the wormhole ID. +*/ +int xapp::Messenger::Wormhole_open( const std::string& endpoint ) { + rmr_whid_t whid; + + whid = rmr_wh_open( mrc, endpoint.c_str() ); + + return (int) whid; +} + + +} // namespace