3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: messenger.cpp
23 Abstract: Message Router Messenger.
26 Author: E. Scott Daniels
33 #include <rmr/RIC_message_types.h>
42 #include "callback.hpp"
43 #include "default_cb.hpp" // default callback prototypes
44 #include "message.hpp"
45 #include "messenger.hpp"
47 #include "metrics.hpp"
51 // --------------- private -----------------------------------------------------
54 // ---------------- C++ buggerd up way of maintining class constants ----------
55 const int xapp::Messenger::MAX_PAYLOAD = (1024*64);
56 const int xapp::Messenger::DEFAULT_CALLBACK = -1;
58 // --------------- builders -----------------------------------------------
60 If wait4table is true, then the construction of the object does not
61 complete until the underlying transport has a new copy of the route
64 If port is nil, then the default port is used (4560).
66 xapp::Messenger::Messenger( const char* uport, bool wait4table ) {
69 listen_port = strdup( "4560" );
71 listen_port = strdup( uport );
74 gate = new std::mutex();
75 mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
78 this->Wait_for_cts( 0 );
81 Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL ); // add our default call backs
87 Move support. We DO allow the instance to be moved as only one copy
88 remains following the move.
89 Given a source object instance (soi) we move the information to
90 the new object, and then DELETE what was moved so that when the
91 user frees the soi, it doesn't destroy what we snarfed.
93 xapp::Messenger::Messenger( Messenger&& soi ) :
95 listen_port( soi.listen_port ),
96 ok_2_run( soi.ok_2_run ),
98 cb_hash( soi.cb_hash ) // this seems dodgy
101 soi.listen_port = NULL;
106 Move operator. Given a source object instance, movee it's contents
107 to this insance. We must first clean up this instance.
109 xapp::Messenger& Messenger::operator=( Messenger&& soi ) {
110 if( this != &soi ) { // cannot move onto ourself
114 if( listen_port != NULL ) {
115 delete( listen_port );
119 listen_port = soi.listen_port;
120 ok_2_run = soi.ok_2_run;
122 cb_hash = soi.cb_hash; // this seems dodgy
125 soi.listen_port = NULL;
135 xapp::Messenger::~Messenger() {
140 if( listen_port != NULL ) {
141 delete( listen_port );
146 Allow user to register a callback function invoked when a specific type of
147 message is received. The user may pass an optional data pointer which
148 will be passed to the function when it is called. The function signature
150 void fun( Messenger* mr, rmr_mbuf_t* mbuf, void* data )
152 The user can also invoke this function to set the "default" callback by
153 passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
154 is defined for a message type, the default callback function is invoked.
155 If a default is not provided, a non-matching message is silently dropped.
157 void xapp::Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
160 cb = new Callback( fun_name, data );
167 Message allocation for user to send. User must destroy the message when
168 finished, but may keep the message for as long as is necessary
169 and reuse it over and over.
171 std::unique_ptr<Message> xapp::Messenger::Alloc_msg( int payload_size ) {
172 return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
176 // ----------------- alarm support -----------------------------------------------
178 Allocate an alarm object.
179 Alarms must be allocated via the framework becasue we need a wormhole
180 id and to get one of those we need the mrc. We can easily send with
181 just a message, but to avoid having the user pass the framework
182 object in, we'll just supply a "factory" function.
184 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( int prob_id, const std::string& meid ) {
185 std::shared_ptr<Message> m;
188 m = Alloc_msg( 4096 );
189 a = new Alarm( m, prob_id, meid );
190 a->Set_whid( Wormhole_open( a->Get_endpoint() ) );
192 return std::unique_ptr<Alarm>( a );
195 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( const std::string& meid ) {
196 return Alloc_alarm( -1, meid );
199 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( ) {
200 return Alloc_alarm( -1, "" );
204 // ------------------ metrics support --------------------------------------------------
205 std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( ) {
206 std::shared_ptr<Message> m;
208 m = Alloc_msg( 4096 );
209 return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m ) );
212 std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( const std::string& source ) {
213 std::shared_ptr<Message> m;
215 m = Alloc_msg( 4096 );
216 return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m, source ) );
219 std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( const std::string& reporter, const std::string& source ) {
220 std::shared_ptr<Message> m;
222 m = Alloc_msg( 4096 );
223 return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m, reporter, source ) );
226 // ------------------- listening support -----------------------------------------------
229 The Listen function waits for messages and drives the appropriate callback
230 function when one is received. This function will return to the caller
231 only when the ok to run flag in the object has been set to false (likely
232 never, or only at graceful termination). Callers should normally not
233 expect to have controll returned in the calling thread.
235 Concurrently executing listeners are allowed.
237 void xapp::Messenger::Listen( ) {
238 rmr_mbuf_t* mbuf = NULL;
239 std::map<int,Callback*>::iterator mi; // map iterator; silly indirect way to point at the value
240 Callback* dcb = NULL; // default callback so we don't search
241 Callback* sel_cb; // callback selected to invoke
242 std::unique_ptr<Message> m;
248 mi = cb_hash.find( DEFAULT_CALLBACK );
249 if( mi != cb_hash.end() ) {
250 dcb = mi->second; // oddly named second field is the address of the callback block
254 mbuf = rmr_torcv_msg( mrc, mbuf, 2000 ); // come up for air every 2 sec to check ok2run
256 if( mbuf->state == RMR_OK ) {
257 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) ); // auto delteted when scope terminates
259 sel_cb = dcb; // start with default
260 if( callbacks && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
261 sel_cb = mi->second; // override with user callback
263 if( sel_cb != NULL ) {
264 sel_cb->Drive_cb( *m ); // drive the selected one
265 mbuf = NULL; // not safe to use after given to cb
268 if( mbuf->state != RMR_ERR_TIMEOUT ) {
269 fprintf( stderr, "<LISTENER> got bad status: %d\n", mbuf->state );
277 Wait for the next message, up to a max timout, and return the message received.
278 This function allows the user xAPP to implement their own polling loop (no callbacks).
280 std::unique_ptr<Message> xapp::Messenger::Receive( int timeout ) {
281 rmr_mbuf_t* mbuf = NULL;
282 std::unique_ptr<Message> m = NULL;
285 mbuf = rmr_torcv_msg( mrc, mbuf, timeout ); // future: do we want to reuse the mbuf here?
287 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );
295 Called to gracefully stop all listeners.
297 void xapp::Messenger::Stop( ) {
302 RMR messages must be released by RMR as there might be transport
303 buffers that have to be dealt with. Every callback is expected to
304 call this function when finished with the message.
305 void xapp::Messenger::Release_mbuf( void* vmbuf ) {
306 rmr_free_msg( (rmr_mbuf_t *) vmbuf );
311 Wait for clear to send.
312 Until RMR loads a route table, all sends will fail with a
313 "no endpoint" state. This function allows the user application
314 to block until RMR has a viable route table. It does not guarentee
315 that every message that the user app will try to send has an entry.
317 The use of this function by the user application allows for the
318 parallel initialisation of the application while waiting for the
319 route table service to generate a table for the application. The
320 initialisation function may be callsed with "no wait" and this
321 function invoked when the application has completed initialisation
322 and is ready to start sending messages.
324 The max wait parameter is the maximum number of seconds to block.
325 If RMR never reports ready false is returned. A true return
326 incidcates all is ready. If max_wait is 0, then this will only
327 return when RMR is ready to send.
329 bool xapp::Messenger::Wait_for_cts( int max_wait ) {
333 block_4ever = max_wait == 0;
334 while( block_4ever || max_wait > 0 ) {
335 if( rmr_ready( mrc ) ) {
348 Open a wormhole to the indicated endpoint and return the wormhole ID.
350 int xapp::Messenger::Wormhole_open( const std::string& endpoint ) {
353 whid = rmr_wh_open( mrc, endpoint.c_str() );