3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: messenger.cpp
23 Abstract: Message Router Messenger.
26 Author: E. Scott Daniels
33 #include <rmr/RIC_message_types.h>
42 #include "callback.hpp"
43 #include "default_cb.hpp" // default callback prototypes
44 #include "message.hpp"
45 #include "messenger.hpp"
48 // --------------- private -----------------------------------------------------
51 // ---------------- C++ buggerd up way of maintining class constants ----------
52 const int Messenger::MAX_PAYLOAD = (1024*64);
53 const int Messenger::DEFAULT_CALLBACK = -1;
55 // --------------- builders -----------------------------------------------
57 If wait4table is true, then the construction of the object does not
58 complete until the underlying transport has a new copy of the route
61 If port is nil, then the default port is used (4560).
63 Messenger::Messenger( char* port, bool wait4table ) {
65 port = (char *) "4560";
68 gate = new std::mutex();
69 listen_port = strdup( port );
70 mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
73 this->Wait_for_cts( 0 );
76 Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL ); // add our default call backs
84 Messenger::~Messenger() {
93 Allow user to register a callback function invoked when a specific type of
94 message is received. The user may pass an optional data pointer which
95 will be passed to the function when it is called. The function signature
97 void fun( Messenger* mr, rmr_mbuf_t* mbuf, void* data );
99 The user can also invoke this function to set the "default" callback by
100 passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
101 is defined for a message type, the default callback function is invoked.
102 If a default is not provided, a non-matching message is silently dropped.
104 void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
107 cb = new Callback( fun_name, data );
114 Message allocation for user to send. User must destroy the message when
115 finished, but may keep the message for as long as is necessary
116 and reuse it over and over.
118 //Message* Messenger::Alloc_msg( int payload_size ) {
119 std::unique_ptr<Message> Messenger::Alloc_msg( int payload_size ) {
120 return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
123 void Messenger::Listen( ) {
125 rmr_mbuf_t* mbuf = NULL;
126 std::map<int,Callback*>::iterator mi; // map iterator; silly indirect way to point at the value
127 Callback* dcb = NULL; // default callback so we don't search
128 Callback* sel_cb; // callback selected to invoke
129 std::unique_ptr<Message>m;
135 mi = cb_hash.find( DEFAULT_CALLBACK );
136 if( mi != cb_hash.end() ) {
137 dcb = mi->second; // oddly named second field is the address of the callback block
141 mbuf = rmr_torcv_msg( mrc, mbuf, 2000 ); // come up for air every 2 sec to check ok2run
143 if( mbuf->state == RMR_OK ) {
144 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) ); // auto delteted when scope terminates
146 sel_cb = dcb; // start with default
147 if( callbacks && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
148 sel_cb = mi->second; // override with user callback
150 if( sel_cb != NULL ) {
151 sel_cb->Drive_cb( *this, *m ); // drive the selected one
152 mbuf = NULL; // not safe to use after given to cb
155 if( mbuf->state != RMR_ERR_TIMEOUT ) {
156 fprintf( stderr, "<LISTENER> got bad status: %d\n", mbuf->state );
164 Wait for the next message, up to a max timout, and return the message received.
166 std::unique_ptr<Message> Messenger::Receive( int timeout ) {
167 rmr_mbuf_t* mbuf = NULL;
168 //std::unique_ptr<Message> m;
174 mbuf = rmr_torcv_msg( mrc, mbuf, timeout ); // future: do we want to reuse the mbuf here?
176 return std::unique_ptr<Message>( new Message( mbuf, mrc ) );
183 Called to gracefully stop all listeners.
185 void Messenger::Stop( ) {
190 RMR messages must be released by RMR as there might be transport
191 buffers that have to be dealt with. Every callback is expected to
192 call this function when finished with the message.
194 void Messenger::Release_mbuf( void* vmbuf ) {
195 rmr_free_msg( (rmr_mbuf_t *) vmbuf );
199 Wait for clear to send.
200 Until RMR loads a route table, all sends will fail with a
201 "no endpoint" state. This function allows the user application
202 to block until RMR has a viable route table. It does not guarentee
203 that every message that the user app will try to send has an entry.
205 The use of this function by the user application allows for the
206 parallel initialisation of the application while waiting for the
207 route table service to generate a table for the application. The
208 initialisation function may be callsed with "no wait" and this
209 function invoked when the application has completed initialisation
210 and is ready to start sending messages.
212 The max wait parameter is the maximum number of seconds to block.
213 If RMR never reports ready false is returned. A true return
214 incidcates all is ready. If max_wait is 0, then this will only
215 return when RMR is ready to send.
217 bool Messenger::Wait_for_cts( int max_wait ) {
220 block_4ever = max_wait == 0;
221 while( block_4ever || max_wait > 0 ) {
222 if( rmr_ready( mrc ) ) {