#include "default_cb.hpp" // default callback prototypes
#include "message.hpp"
#include "messenger.hpp"
+#include "alarm.hpp"
+#include "metrics.hpp"
+namespace xapp {
// --------------- private -----------------------------------------------------
// ---------------- C++ buggerd up way of maintining class constants ----------
-const int Messenger::MAX_PAYLOAD = (1024*64);
-const int Messenger::DEFAULT_CALLBACK = -1;
+const int xapp::Messenger::MAX_PAYLOAD = (1024*64);
+const int xapp::Messenger::DEFAULT_CALLBACK = -1;
// --------------- builders -----------------------------------------------
/*
If port is nil, then the default port is used (4560).
*/
-Messenger::Messenger( char* port, bool wait4table ) {
- if( port == NULL ) {
- port = (char *) "4560";
+xapp::Messenger::Messenger( const char* uport, bool wait4table ) {
+
+ if( uport == NULL ) {
+ listen_port = strdup( "4560" );
+ } else {
+ listen_port = strdup( uport );
}
gate = new std::mutex();
- listen_port = strdup( port );
mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
if( wait4table ) {
}
/*
- Move support. We DO allow the instance to be moved as only one copy
+ Move support. We DO allow the instance to be moved as only one copy
remains following the move.
- Given a source object instance (soi) we move the information to
+ Given a source object instance (soi) we move the information to
the new object, and then DELETE what was moved so that when the
user frees the soi, it doesn't destroy what we snarfed.
*/
-Messenger::Messenger( Messenger&& soi ) {
+xapp::Messenger::Messenger( Messenger&& soi ) {
mrc = soi.mrc;
listen_port = soi.listen_port;
ok_2_run = soi.ok_2_run;
gate = soi.gate;
cb_hash = soi.cb_hash; // this seems dodgy
-
+
soi.gate = NULL;
soi.listen_port = NULL;
soi.mrc = NULL;
Move operator. Given a source object instance, movee it's contents
to this insance. We must first clean up this instance.
*/
-Messenger& Messenger::operator=( Messenger&& soi ) {
+xapp::Messenger& Messenger::operator=( Messenger&& soi ) {
if( this != &soi ) { // cannot move onto ourself
if( mrc != NULL ) {
rmr_close( mrc );
ok_2_run = soi.ok_2_run;
gate = soi.gate;
cb_hash = soi.cb_hash; // this seems dodgy
-
+
soi.gate = NULL;
soi.listen_port = NULL;
soi.mrc = NULL;
/*
Destroyer.
*/
-Messenger::~Messenger() {
+xapp::Messenger::~Messenger() {
if( mrc != NULL ) {
rmr_close( mrc );
}
is defined for a message type, the default callback function is invoked.
If a default is not provided, a non-matching message is silently dropped.
*/
-void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
+void xapp::Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
Callback* cb;
cb = new Callback( fun_name, data );
finished, but may keep the message for as long as is necessary
and reuse it over and over.
*/
-//Message* Messenger::Alloc_msg( int payload_size ) {
-std::unique_ptr<Message> Messenger::Alloc_msg( int payload_size ) {
+std::unique_ptr<Message> xapp::Messenger::Alloc_msg( int payload_size ) {
return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
}
-void Messenger::Listen( ) {
+
+// ----------------- alarm support -----------------------------------------------
+/*
+ Allocate an alarm object.
+ Alarms must be allocated via the framework becasue we need a wormhole
+ id and to get one of those we need the mrc. We can easily send with
+ just a message, but to avoid having the user pass the framework
+ object in, we'll just supply a "factory" function.
+*/
+std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( int prob_id, std::string meid ) {
+ std::shared_ptr<Message> m;
+ Alarm* a;
+
+ m = Alloc_msg( 4096 );
+ a = new Alarm( m, prob_id, meid );
+ a->Set_whid( Wormhole_open( a->Get_endpoint() ) );
+
+ return std::unique_ptr<Alarm>( a );
+}
+
+std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( std::string meid ) {
+ return Alloc_alarm( -1, meid );
+}
+
+std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( ) {
+ return Alloc_alarm( -1, "" );
+}
+
+
+// ------------------ metrics support --------------------------------------------------
+std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( ) {
+ std::shared_ptr<Message> m;
+
+ m = Alloc_msg( 4096 );
+ return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m ) );
+}
+
+std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( std::string source ) {
+ std::shared_ptr<Message> m;
+
+ m = Alloc_msg( 4096 );
+ return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m, source ) );
+}
+
+std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( std::string reporter, std::string source ) {
+ std::shared_ptr<Message> m;
+
+ m = Alloc_msg( 4096 );
+ return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m, reporter, source ) );
+}
+
+// ------------------- listening support -----------------------------------------------
+
+/*
+ The Listen function waits for messages and drives the appropriate callback
+ function when one is received. This function will return to the caller
+ only when the ok to run flag in the object has been set to false (likely
+ never, or only at graceful termination). Callers should normally not
+ expect to have controll returned in the calling thread.
+
+ Concurrently executing listeners are allowed.
+*/
+void xapp::Messenger::Listen( ) {
int count = 0;
rmr_mbuf_t* mbuf = NULL;
std::map<int,Callback*>::iterator mi; // map iterator; silly indirect way to point at the value
Callback* dcb = NULL; // default callback so we don't search
Callback* sel_cb; // callback selected to invoke
- std::unique_ptr<Message>m;
+ std::unique_ptr<Message> m;
if( mrc == NULL ) {
return;
/*
Wait for the next message, up to a max timout, and return the message received.
+ This function allows the user xAPP to implement their own polling loop (no callbacks).
*/
-std::unique_ptr<Message> Messenger::Receive( int timeout ) {
+std::unique_ptr<Message> xapp::Messenger::Receive( int timeout ) {
rmr_mbuf_t* mbuf = NULL;
std::unique_ptr<Message> m = NULL;
/*
Called to gracefully stop all listeners.
*/
-void Messenger::Stop( ) {
+void xapp::Messenger::Stop( ) {
ok_2_run = false;
}
RMR messages must be released by RMR as there might be transport
buffers that have to be dealt with. Every callback is expected to
call this function when finished with the message.
-void Messenger::Release_mbuf( void* vmbuf ) {
+void xapp::Messenger::Release_mbuf( void* vmbuf ) {
rmr_free_msg( (rmr_mbuf_t *) vmbuf );
}
*/
incidcates all is ready. If max_wait is 0, then this will only
return when RMR is ready to send.
*/
-bool Messenger::Wait_for_cts( int max_wait ) {
+bool xapp::Messenger::Wait_for_cts( int max_wait ) {
bool block_4ever;
bool state = false;
return state;
}
+
+/*
+ Open a wormhole to the indicated endpoint and return the wormhole ID.
+*/
+int xapp::Messenger::Wormhole_open( std::string endpoint ) {
+ rmr_whid_t whid;
+
+ whid = rmr_wh_open( mrc, endpoint.c_str() );
+
+ return (int) whid;
+}
+
+
+} // namespace