Add const qualifier to constructor port parameter
[ric-plt/xapp-frame-cpp.git] / src / messaging / messenger.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       messenger.cpp
23         Abstract:       Message Router Messenger.
24
25         Date:           10 March 2020
26         Author:         E. Scott Daniels
27 */
28
29 #include <string.h>
30 #include <unistd.h>
31
32 #include <rmr/rmr.h>
33 #include <rmr/RIC_message_types.h>
34
35
36 #include <iostream>
37 #include <string>
38 #include <map>
39 #include <memory>
40 #include <mutex>
41
42 #include "callback.hpp"
43 #include "default_cb.hpp"               // default callback prototypes
44 #include "message.hpp"
45 #include "messenger.hpp"
46
47
48 // --------------- private -----------------------------------------------------
49
50
51 // ---------------- C++ buggerd up way of maintining class constants ----------
52 const int Messenger::MAX_PAYLOAD = (1024*64);
53 const int Messenger::DEFAULT_CALLBACK = -1;
54
55 // --------------- builders -----------------------------------------------
56 /*
57         If wait4table is true, then the construction of the object does not
58         complete until the underlying transport has a new copy of the route
59         table.
60
61         If port is nil, then the default port is used (4560).
62 */
63 Messenger::Messenger( const char* uport, bool wait4table ) {
64
65         if( uport == NULL ) {
66                 listen_port = strdup( "4560" );
67         } else {
68                 listen_port = strdup( uport );
69         }
70
71         gate = new std::mutex();
72         mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
73
74         if( wait4table ) {
75                 this->Wait_for_cts( 0 );
76         }
77
78         Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL );         // add our default call backs
79
80         ok_2_run = true;
81 }
82
83 /*
84         Move support. We DO allow the instance to be moved as only one copy
85         remains following the move.
86         Given a source object instance (soi) we move the information to
87         the new object, and then DELETE what was moved so that when the
88         user frees the soi, it doesn't destroy what we snarfed.
89 */
90 Messenger::Messenger( Messenger&& soi ) {
91         mrc = soi.mrc;
92         listen_port = soi.listen_port;
93         ok_2_run = soi.ok_2_run;
94         gate = soi.gate;
95                 cb_hash = soi.cb_hash;                          // this seems dodgy
96
97         soi.gate = NULL;
98         soi.listen_port = NULL;
99         soi.mrc = NULL;
100 }
101
102 /*
103         Move operator. Given a source object instance, movee it's contents
104         to this insance.  We must first clean up this instance.
105 */
106 Messenger& Messenger::operator=( Messenger&& soi ) {
107         if( this != &soi ) {                            // cannot move onto ourself
108                 if( mrc != NULL ) {
109                         rmr_close( mrc );
110                 }
111                 if( listen_port != NULL ) {
112                         free( listen_port );
113                 }
114
115                 mrc = soi.mrc;
116                 listen_port = soi.listen_port;
117                 ok_2_run = soi.ok_2_run;
118                 gate = soi.gate;
119                         cb_hash = soi.cb_hash;                          // this seems dodgy
120
121                 soi.gate = NULL;
122                 soi.listen_port = NULL;
123                 soi.mrc = NULL;
124         }
125
126         return *this;
127 }
128
129 /*
130         Destroyer.
131 */
132 Messenger::~Messenger() {
133         if( mrc != NULL ) {
134                 rmr_close( mrc );
135         }
136
137         if( listen_port != NULL ) {
138                 free( listen_port );
139         }
140 }
141
142 /*
143         Allow user to register a callback function invoked when a specific type of
144         message is received.  The user may pass an optional data pointer which
145         will be passed to the function when it is called.  The function signature
146         must be:
147                 void fun( Messenger* mr, rmr_mbuf_t* mbuf,  void* data );
148
149         The user can also invoke this function to set the "default" callback by
150         passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
151         is defined for a message type, the default callback function is invoked.
152         If a default is not provided, a non-matching message is silently dropped.
153 */
154 void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
155         Callback*       cb;
156
157         cb = new Callback( fun_name, data );
158         cb_hash[mtype] = cb;
159
160         callbacks = true;
161 }
162
163 /*
164         Message allocation for user to send. User must destroy the message when
165         finished, but may keep the message for as long as is necessary
166         and reuse it over and over.
167 */
168 //Message* Messenger::Alloc_msg( int payload_size ) {
169 std::unique_ptr<Message> Messenger::Alloc_msg( int payload_size ) {
170         return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
171 }
172
173 void Messenger::Listen( ) {
174         int count = 0;
175         rmr_mbuf_t*     mbuf = NULL;
176         std::map<int,Callback*>::iterator mi;   // map iterator; silly indirect way to point at the value
177         Callback*       dcb = NULL;                                     // default callback so we don't search
178         Callback*       sel_cb;                                         // callback selected to invoke
179         std::unique_ptr<Message>m;
180
181         if( mrc == NULL ) {
182                 return;
183         }
184
185         mi = cb_hash.find( DEFAULT_CALLBACK );
186         if( mi != cb_hash.end() ) {
187                 dcb = mi->second;                                       // oddly named second field is the address of the callback block
188         }
189
190         while( ok_2_run ) {
191                 mbuf = rmr_torcv_msg( mrc, mbuf, 2000 );                // come up for air every 2 sec to check ok2run
192                 if( mbuf != NULL ) {
193                         if( mbuf->state == RMR_OK ) {
194                                 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );       // auto delteted when scope terminates
195
196                                 sel_cb = dcb;                                                                                   // start with default
197                                 if( callbacks  && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
198                                         sel_cb = mi->second;                                                            // override with user callback
199                                 }
200                                 if( sel_cb != NULL ) {
201                                         sel_cb->Drive_cb( *m );                                                 // drive the selected one
202                                         mbuf = NULL;                                                                    // not safe to use after given to cb
203                                 }
204                         } else {
205                                 if( mbuf->state != RMR_ERR_TIMEOUT ) {
206                                         fprintf( stderr, "<LISTENER> got  bad status: %d\n", mbuf->state );
207                                 }
208                         }
209                 }
210         }
211 }
212
213 /*
214         Wait for the next message, up to a max timout, and return the message received.
215 */
216 std::unique_ptr<Message>  Messenger::Receive( int timeout ) {
217         rmr_mbuf_t*     mbuf = NULL;
218         std::unique_ptr<Message> m = NULL;
219
220         if( mrc != NULL ) {
221                 mbuf = rmr_torcv_msg( mrc, mbuf, timeout );             // future: do we want to reuse the mbuf here?
222                 if( mbuf != NULL ) {
223                         m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );
224                 }
225         }
226
227         return m;
228 }
229
230 /*
231         Called to gracefully stop all listeners.
232 */
233 void Messenger::Stop( ) {
234         ok_2_run = false;
235 }
236
237 /*
238         RMR messages must be released by RMR as there might be transport
239         buffers that have to be dealt with. Every callback is expected to
240         call this function when finished with the message.
241 void Messenger::Release_mbuf( void* vmbuf ) {
242         rmr_free_msg( (rmr_mbuf_t *)  vmbuf );
243 }
244 */
245
246 /*
247         Wait for clear to send.
248         Until RMR loads a route table, all sends will fail with a
249         "no endpoint" state.  This function allows the user application
250         to block until RMR has a viable route table.  It does not guarentee
251         that every message that the user app will try to send has an entry.
252
253         The use of this function by the user application allows for the
254         parallel initialisation of the application while waiting for the
255         route table service to generate a table for the application. The
256         initialisation function may be callsed with "no wait" and this
257         function invoked when the application has completed initialisation
258         and is ready to start sending messages.
259
260         The max wait parameter is the maximum number of seconds to block.
261         If RMR never reports ready false is returned.  A true return
262         incidcates all is ready.  If max_wait is 0, then this will only
263         return when RMR is ready to send.
264 */
265 bool Messenger::Wait_for_cts( int max_wait ) {
266         bool block_4ever;
267         bool    state = false;
268
269         block_4ever = max_wait == 0;
270         while( block_4ever || max_wait > 0 ) {
271                 if( rmr_ready( mrc ) ) {
272                         state = true;
273                         break;
274                 }
275
276                 sleep( 1 );
277                 max_wait--;
278         }
279
280         return state;
281 }