--- /dev/null
+// vi: ts=4 sw=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2020 Nokia
+ Copyright (c) 2020 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: messenger.cpp
+ Abstract: Message Router Messenger.
+
+ Date: 10 March 2020
+ Author: E. Scott Daniels
+*/
+
+#include <string.h>
+#include <unistd.h>
+
+#include <rmr/rmr.h>
+#include <rmr/RIC_message_types.h>
+
+
+#include <iostream>
+#include <string>
+#include <map>
+#include <memory>
+#include <mutex>
+
+#include "callback.hpp"
+#include "default_cb.hpp" // default callback prototypes
+#include "message.hpp"
+#include "messenger.hpp"
+
+
+// --------------- private -----------------------------------------------------
+
+
+// ---------------- C++ buggerd up way of maintining class constants ----------
+const int Messenger::MAX_PAYLOAD = (1024*64);
+const int Messenger::DEFAULT_CALLBACK = -1;
+
+// --------------- builders -----------------------------------------------
+/*
+ If wait4table is true, then the construction of the object does not
+ complete until the underlying transport has a new copy of the route
+ table.
+
+ If port is nil, then the default port is used (4560).
+*/
+Messenger::Messenger( char* port, bool wait4table ) {
+ if( port == NULL ) {
+ port = (char *) "4560";
+ }
+
+ gate = new std::mutex();
+ listen_port = strdup( port );
+ mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
+
+ if( wait4table ) {
+ this->Wait_for_cts( 0 );
+ }
+
+ Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL ); // add our default call backs
+
+ ok_2_run = true;
+}
+
+/*
+ Destroyer.
+*/
+Messenger::~Messenger() {
+ if( mrc != NULL ) {
+ rmr_close( mrc );
+ }
+
+ free( listen_port );
+}
+
+/*
+ Allow user to register a callback function invoked when a specific type of
+ message is received. The user may pass an optional data pointer which
+ will be passed to the function when it is called. The function signature
+ must be:
+ void fun( Messenger* mr, rmr_mbuf_t* mbuf, void* data );
+
+ The user can also invoke this function to set the "default" callback by
+ passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
+ is defined for a message type, the default callback function is invoked.
+ If a default is not provided, a non-matching message is silently dropped.
+*/
+void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
+ Callback* cb;
+
+ cb = new Callback( fun_name, data );
+ cb_hash[mtype] = cb;
+
+ callbacks = true;
+}
+
+/*
+ Message allocation for user to send. User must destroy the message when
+ finished, but may keep the message for as long as is necessary
+ and reuse it over and over.
+*/
+//Message* Messenger::Alloc_msg( int payload_size ) {
+std::unique_ptr<Message> Messenger::Alloc_msg( int payload_size ) {
+ return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
+}
+
+void Messenger::Listen( ) {
+ int count = 0;
+ rmr_mbuf_t* mbuf = NULL;
+ std::map<int,Callback*>::iterator mi; // map iterator; silly indirect way to point at the value
+ Callback* dcb = NULL; // default callback so we don't search
+ Callback* sel_cb; // callback selected to invoke
+ std::unique_ptr<Message>m;
+
+ if( mrc == NULL ) {
+ return;
+ }
+
+ mi = cb_hash.find( DEFAULT_CALLBACK );
+ if( mi != cb_hash.end() ) {
+ dcb = mi->second; // oddly named second field is the address of the callback block
+ }
+
+ while( ok_2_run ) {
+ mbuf = rmr_torcv_msg( mrc, mbuf, 2000 ); // come up for air every 2 sec to check ok2run
+ if( mbuf != NULL ) {
+ if( mbuf->state == RMR_OK ) {
+ m = std::unique_ptr<Message>( new Message( mbuf, mrc ) ); // auto delteted when scope terminates
+
+ sel_cb = dcb; // start with default
+ if( callbacks && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
+ sel_cb = mi->second; // override with user callback
+ }
+ if( sel_cb != NULL ) {
+ sel_cb->Drive_cb( *this, *m ); // drive the selected one
+ mbuf = NULL; // not safe to use after given to cb
+ }
+ } else {
+ if( mbuf->state != RMR_ERR_TIMEOUT ) {
+ fprintf( stderr, "<LISTENER> got bad status: %d\n", mbuf->state );
+ }
+ }
+ }
+ }
+}
+
+/*
+ Wait for the next message, up to a max timout, and return the message received.
+*/
+std::unique_ptr<Message> Messenger::Receive( int timeout ) {
+ rmr_mbuf_t* mbuf = NULL;
+ //std::unique_ptr<Message> m;
+
+ if( mrc == NULL ) {
+ return NULL;
+ }
+
+ mbuf = rmr_torcv_msg( mrc, mbuf, timeout ); // future: do we want to reuse the mbuf here?
+ if( mbuf != NULL ) {
+ return std::unique_ptr<Message>( new Message( mbuf, mrc ) );
+ }
+
+ return NULL;
+}
+
+/*
+ Called to gracefully stop all listeners.
+*/
+void Messenger::Stop( ) {
+ ok_2_run = false;
+}
+
+/*
+ RMR messages must be released by RMR as there might be transport
+ buffers that have to be dealt with. Every callback is expected to
+ call this function when finished with the message.
+*/
+void Messenger::Release_mbuf( void* vmbuf ) {
+ rmr_free_msg( (rmr_mbuf_t *) vmbuf );
+}
+
+/*
+ Wait for clear to send.
+ Until RMR loads a route table, all sends will fail with a
+ "no endpoint" state. This function allows the user application
+ to block until RMR has a viable route table. It does not guarentee
+ that every message that the user app will try to send has an entry.
+
+ The use of this function by the user application allows for the
+ parallel initialisation of the application while waiting for the
+ route table service to generate a table for the application. The
+ initialisation function may be callsed with "no wait" and this
+ function invoked when the application has completed initialisation
+ and is ready to start sending messages.
+
+ The max wait parameter is the maximum number of seconds to block.
+ If RMR never reports ready false is returned. A true return
+ incidcates all is ready. If max_wait is 0, then this will only
+ return when RMR is ready to send.
+*/
+bool Messenger::Wait_for_cts( int max_wait ) {
+ bool block_4ever;
+
+ block_4ever = max_wait == 0;
+ while( block_4ever || max_wait > 0 ) {
+ if( rmr_ready( mrc ) ) {
+ return true;
+ }
+
+ sleep( 1 );
+ max_wait--;
+ }
+
+ return false;
+}