3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: messenger.cpp
23 Abstract: Message Router Messenger.
26 Author: E. Scott Daniels
33 #include <rmr/RIC_message_types.h>
42 #include "callback.hpp"
43 #include "default_cb.hpp" // default callback prototypes
44 #include "message.hpp"
45 #include "messenger.hpp"
50 // --------------- private -----------------------------------------------------
53 // ---------------- C++ buggerd up way of maintining class constants ----------
54 const int xapp::Messenger::MAX_PAYLOAD = (1024*64);
55 const int xapp::Messenger::DEFAULT_CALLBACK = -1;
57 // --------------- builders -----------------------------------------------
59 If wait4table is true, then the construction of the object does not
60 complete until the underlying transport has a new copy of the route
63 If port is nil, then the default port is used (4560).
65 xapp::Messenger::Messenger( const char* uport, bool wait4table ) {
68 listen_port = strdup( "4560" );
70 listen_port = strdup( uport );
73 gate = new std::mutex();
74 mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
77 this->Wait_for_cts( 0 );
80 Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL ); // add our default call backs
86 Move support. We DO allow the instance to be moved as only one copy
87 remains following the move.
88 Given a source object instance (soi) we move the information to
89 the new object, and then DELETE what was moved so that when the
90 user frees the soi, it doesn't destroy what we snarfed.
92 xapp::Messenger::Messenger( Messenger&& soi ) {
94 listen_port = soi.listen_port;
95 ok_2_run = soi.ok_2_run;
97 cb_hash = soi.cb_hash; // this seems dodgy
100 soi.listen_port = NULL;
105 Move operator. Given a source object instance, movee it's contents
106 to this insance. We must first clean up this instance.
108 xapp::Messenger& Messenger::operator=( Messenger&& soi ) {
109 if( this != &soi ) { // cannot move onto ourself
113 if( listen_port != NULL ) {
118 listen_port = soi.listen_port;
119 ok_2_run = soi.ok_2_run;
121 cb_hash = soi.cb_hash; // this seems dodgy
124 soi.listen_port = NULL;
134 xapp::Messenger::~Messenger() {
139 if( listen_port != NULL ) {
145 Allow user to register a callback function invoked when a specific type of
146 message is received. The user may pass an optional data pointer which
147 will be passed to the function when it is called. The function signature
149 void fun( Messenger* mr, rmr_mbuf_t* mbuf, void* data );
151 The user can also invoke this function to set the "default" callback by
152 passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
153 is defined for a message type, the default callback function is invoked.
154 If a default is not provided, a non-matching message is silently dropped.
156 void xapp::Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
159 cb = new Callback( fun_name, data );
166 Message allocation for user to send. User must destroy the message when
167 finished, but may keep the message for as long as is necessary
168 and reuse it over and over.
170 std::unique_ptr<Message> xapp::Messenger::Alloc_msg( int payload_size ) {
171 return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
175 // ----------------- alarm support -----------------------------------------------
177 Allocate an alarm object.
178 Alarms must be allocated via the framework becasue we need a wormhole
179 id and to get one of those we need the mrc. We can easily send with
180 just a message, but to avoid having the user pass the framework
181 object in, we'll just supply a "factory" function.
183 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( int prob_id, std::string meid ) {
184 std::shared_ptr<Message> m;
187 m = Alloc_msg( 4096 );
188 a = new Alarm( m, prob_id, meid );
189 a->Set_whid( Wormhole_open( a->Get_endpoint() ) );
191 return std::unique_ptr<Alarm>( a );
194 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( std::string meid ) {
195 return Alloc_alarm( -1, meid );
198 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( ) {
199 return Alloc_alarm( -1, "" );
203 // ------------------- listening support -----------------------------------------------
206 The Listen function waits for messages and drives the appropriate callback
207 function when one is received. This function will return to the caller
208 only when the ok to run flag in the object has been set to false (likely
209 never, or only at graceful termination). Callers should normally not
210 expect to have controll returned in the calling thread.
212 Concurrently executing listeners are allowed.
214 void xapp::Messenger::Listen( ) {
216 rmr_mbuf_t* mbuf = NULL;
217 std::map<int,Callback*>::iterator mi; // map iterator; silly indirect way to point at the value
218 Callback* dcb = NULL; // default callback so we don't search
219 Callback* sel_cb; // callback selected to invoke
220 std::unique_ptr<Message> m;
226 mi = cb_hash.find( DEFAULT_CALLBACK );
227 if( mi != cb_hash.end() ) {
228 dcb = mi->second; // oddly named second field is the address of the callback block
232 mbuf = rmr_torcv_msg( mrc, mbuf, 2000 ); // come up for air every 2 sec to check ok2run
234 if( mbuf->state == RMR_OK ) {
235 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) ); // auto delteted when scope terminates
237 sel_cb = dcb; // start with default
238 if( callbacks && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
239 sel_cb = mi->second; // override with user callback
241 if( sel_cb != NULL ) {
242 sel_cb->Drive_cb( *m ); // drive the selected one
243 mbuf = NULL; // not safe to use after given to cb
246 if( mbuf->state != RMR_ERR_TIMEOUT ) {
247 fprintf( stderr, "<LISTENER> got bad status: %d\n", mbuf->state );
255 Wait for the next message, up to a max timout, and return the message received.
256 This function allows the user xAPP to implement their own polling loop (no callbacks).
258 std::unique_ptr<Message> xapp::Messenger::Receive( int timeout ) {
259 rmr_mbuf_t* mbuf = NULL;
260 std::unique_ptr<Message> m = NULL;
263 mbuf = rmr_torcv_msg( mrc, mbuf, timeout ); // future: do we want to reuse the mbuf here?
265 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );
273 Called to gracefully stop all listeners.
275 void xapp::Messenger::Stop( ) {
280 RMR messages must be released by RMR as there might be transport
281 buffers that have to be dealt with. Every callback is expected to
282 call this function when finished with the message.
283 void xapp::Messenger::Release_mbuf( void* vmbuf ) {
284 rmr_free_msg( (rmr_mbuf_t *) vmbuf );
289 Wait for clear to send.
290 Until RMR loads a route table, all sends will fail with a
291 "no endpoint" state. This function allows the user application
292 to block until RMR has a viable route table. It does not guarentee
293 that every message that the user app will try to send has an entry.
295 The use of this function by the user application allows for the
296 parallel initialisation of the application while waiting for the
297 route table service to generate a table for the application. The
298 initialisation function may be callsed with "no wait" and this
299 function invoked when the application has completed initialisation
300 and is ready to start sending messages.
302 The max wait parameter is the maximum number of seconds to block.
303 If RMR never reports ready false is returned. A true return
304 incidcates all is ready. If max_wait is 0, then this will only
305 return when RMR is ready to send.
307 bool xapp::Messenger::Wait_for_cts( int max_wait ) {
311 block_4ever = max_wait == 0;
312 while( block_4ever || max_wait > 0 ) {
313 if( rmr_ready( mrc ) ) {
326 Open a wormhole to the indicated endpoint and return the wormhole ID.
328 int xapp::Messenger::Wormhole_open( std::string endpoint ) {
331 whid = rmr_wh_open( mrc, endpoint.c_str() );