3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: messenger.cpp
23 Abstract: Message Router Messenger.
26 Author: E. Scott Daniels
33 #include <rmr/RIC_message_types.h>
42 #include "callback.hpp"
43 #include "default_cb.hpp" // default callback prototypes
44 #include "message.hpp"
45 #include "messenger.hpp"
48 // --------------- private -----------------------------------------------------
51 // ---------------- C++ buggerd up way of maintining class constants ----------
52 const int Messenger::MAX_PAYLOAD = (1024*64);
53 const int Messenger::DEFAULT_CALLBACK = -1;
55 // --------------- builders -----------------------------------------------
57 If wait4table is true, then the construction of the object does not
58 complete until the underlying transport has a new copy of the route
61 If port is nil, then the default port is used (4560).
63 Messenger::Messenger( const char* uport, bool wait4table ) {
66 listen_port = strdup( "4560" );
68 listen_port = strdup( uport );
71 gate = new std::mutex();
72 mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
75 this->Wait_for_cts( 0 );
78 Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL ); // add our default call backs
84 Move support. We DO allow the instance to be moved as only one copy
85 remains following the move.
86 Given a source object instance (soi) we move the information to
87 the new object, and then DELETE what was moved so that when the
88 user frees the soi, it doesn't destroy what we snarfed.
90 Messenger::Messenger( Messenger&& soi ) {
92 listen_port = soi.listen_port;
93 ok_2_run = soi.ok_2_run;
95 cb_hash = soi.cb_hash; // this seems dodgy
98 soi.listen_port = NULL;
103 Move operator. Given a source object instance, movee it's contents
104 to this insance. We must first clean up this instance.
106 Messenger& Messenger::operator=( Messenger&& soi ) {
107 if( this != &soi ) { // cannot move onto ourself
111 if( listen_port != NULL ) {
116 listen_port = soi.listen_port;
117 ok_2_run = soi.ok_2_run;
119 cb_hash = soi.cb_hash; // this seems dodgy
122 soi.listen_port = NULL;
132 Messenger::~Messenger() {
137 if( listen_port != NULL ) {
143 Allow user to register a callback function invoked when a specific type of
144 message is received. The user may pass an optional data pointer which
145 will be passed to the function when it is called. The function signature
147 void fun( Messenger* mr, rmr_mbuf_t* mbuf, void* data );
149 The user can also invoke this function to set the "default" callback by
150 passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
151 is defined for a message type, the default callback function is invoked.
152 If a default is not provided, a non-matching message is silently dropped.
154 void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
157 cb = new Callback( fun_name, data );
164 Message allocation for user to send. User must destroy the message when
165 finished, but may keep the message for as long as is necessary
166 and reuse it over and over.
168 //Message* Messenger::Alloc_msg( int payload_size ) {
169 std::unique_ptr<Message> Messenger::Alloc_msg( int payload_size ) {
170 return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
173 void Messenger::Listen( ) {
175 rmr_mbuf_t* mbuf = NULL;
176 std::map<int,Callback*>::iterator mi; // map iterator; silly indirect way to point at the value
177 Callback* dcb = NULL; // default callback so we don't search
178 Callback* sel_cb; // callback selected to invoke
179 std::unique_ptr<Message>m;
185 mi = cb_hash.find( DEFAULT_CALLBACK );
186 if( mi != cb_hash.end() ) {
187 dcb = mi->second; // oddly named second field is the address of the callback block
191 mbuf = rmr_torcv_msg( mrc, mbuf, 2000 ); // come up for air every 2 sec to check ok2run
193 if( mbuf->state == RMR_OK ) {
194 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) ); // auto delteted when scope terminates
196 sel_cb = dcb; // start with default
197 if( callbacks && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
198 sel_cb = mi->second; // override with user callback
200 if( sel_cb != NULL ) {
201 sel_cb->Drive_cb( *m ); // drive the selected one
202 mbuf = NULL; // not safe to use after given to cb
205 if( mbuf->state != RMR_ERR_TIMEOUT ) {
206 fprintf( stderr, "<LISTENER> got bad status: %d\n", mbuf->state );
214 Wait for the next message, up to a max timout, and return the message received.
216 std::unique_ptr<Message> Messenger::Receive( int timeout ) {
217 rmr_mbuf_t* mbuf = NULL;
218 std::unique_ptr<Message> m = NULL;
221 mbuf = rmr_torcv_msg( mrc, mbuf, timeout ); // future: do we want to reuse the mbuf here?
223 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );
231 Called to gracefully stop all listeners.
233 void Messenger::Stop( ) {
238 RMR messages must be released by RMR as there might be transport
239 buffers that have to be dealt with. Every callback is expected to
240 call this function when finished with the message.
241 void Messenger::Release_mbuf( void* vmbuf ) {
242 rmr_free_msg( (rmr_mbuf_t *) vmbuf );
247 Wait for clear to send.
248 Until RMR loads a route table, all sends will fail with a
249 "no endpoint" state. This function allows the user application
250 to block until RMR has a viable route table. It does not guarentee
251 that every message that the user app will try to send has an entry.
253 The use of this function by the user application allows for the
254 parallel initialisation of the application while waiting for the
255 route table service to generate a table for the application. The
256 initialisation function may be callsed with "no wait" and this
257 function invoked when the application has completed initialisation
258 and is ready to start sending messages.
260 The max wait parameter is the maximum number of seconds to block.
261 If RMR never reports ready false is returned. A true return
262 incidcates all is ready. If max_wait is 0, then this will only
263 return when RMR is ready to send.
265 bool Messenger::Wait_for_cts( int max_wait ) {
269 block_4ever = max_wait == 0;
270 while( block_4ever || max_wait > 0 ) {
271 if( rmr_ready( mrc ) ) {