446c3c435548db3d07c9f905925b1c6285f855dc
[ric-plt/xapp-frame-cpp.git] / messenger.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       messenger.cpp
23         Abstract:       Message Router Messenger.
24
25         Date:           10 March 2020
26         Author:         E. Scott Daniels
27 */
28
29 #include <string.h>
30 #include <unistd.h>
31
32 #include <rmr/rmr.h>
33 #include <rmr/RIC_message_types.h>
34
35
36 #include <iostream>
37 #include <string>
38 #include <map>
39 #include <memory>
40 #include <mutex>
41
42 #include "callback.hpp"
43 #include "default_cb.hpp"               // default callback prototypes
44 #include "message.hpp"
45 #include "messenger.hpp"
46 #include "alarm.hpp"
47 #include "metrics.hpp"
48
49 namespace xapp {
50
51 // --------------- private -----------------------------------------------------
52
53
54 // ---------------- C++ buggerd up way of maintining class constants ----------
55 const int xapp::Messenger::MAX_PAYLOAD = (1024*64);
56 const int xapp::Messenger::DEFAULT_CALLBACK = -1;
57
58 // --------------- builders -----------------------------------------------
59 /*
60         If wait4table is true, then the construction of the object does not
61         complete until the underlying transport has a new copy of the route
62         table.
63
64         If port is nil, then the default port is used (4560).
65 */
66 xapp::Messenger::Messenger( const char* uport, bool wait4table ) {
67
68         if( uport == NULL ) {
69                 listen_port = strdup( "4560" );
70         } else {
71                 listen_port = strdup( uport );
72         }
73
74         gate = new std::mutex();
75         mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
76
77         if( wait4table ) {
78                 this->Wait_for_cts( 0 );
79         }
80
81         Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL );         // add our default call backs
82
83         ok_2_run = true;
84 }
85
86 /*
87         Move support. We DO allow the instance to be moved as only one copy
88         remains following the move.
89         Given a source object instance (soi) we move the information to
90         the new object, and then DELETE what was moved so that when the
91         user frees the soi, it doesn't destroy what we snarfed.
92 */
93 xapp::Messenger::Messenger( Messenger&& soi ) :
94         mrc(  soi.mrc ),
95         listen_port(  soi.listen_port ),
96         ok_2_run(  soi.ok_2_run ),
97         gate(  soi.gate ),
98         cb_hash(  soi.cb_hash )                 // this seems dodgy
99 {
100         soi.gate = NULL;
101         soi.listen_port = NULL;
102         soi.mrc = NULL;
103 }
104
105 /*
106         Move operator. Given a source object instance, movee it's contents
107         to this insance.  We must first clean up this instance.
108 */
109 xapp::Messenger& Messenger::operator=( Messenger&& soi ) {
110         if( this != &soi ) {                            // cannot move onto ourself
111                 if( mrc != NULL ) {
112                         rmr_close( mrc );
113                 }
114                 if( listen_port != NULL ) {
115                         delete( listen_port );
116                 }
117
118                 mrc = soi.mrc;
119                 listen_port = soi.listen_port;
120                 ok_2_run = soi.ok_2_run;
121                 gate = soi.gate;
122                         cb_hash = soi.cb_hash;                          // this seems dodgy
123
124                 soi.gate = NULL;
125                 soi.listen_port = NULL;
126                 soi.mrc = NULL;
127         }
128
129         return *this;
130 }
131
132 /*
133         Destroyer.
134 */
135 xapp::Messenger::~Messenger() {
136         if( mrc != NULL ) {
137                 rmr_close( mrc );
138         }
139
140         if( listen_port != NULL ) {
141                 delete( listen_port );
142         }
143 }
144
145 /*
146         Allow user to register a callback function invoked when a specific type of
147         message is received.  The user may pass an optional data pointer which
148         will be passed to the function when it is called.  The function signature
149         must be:
150                 void fun( Messenger* mr, rmr_mbuf_t* mbuf,  void* data )
151
152         The user can also invoke this function to set the "default" callback by
153         passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
154         is defined for a message type, the default callback function is invoked.
155         If a default is not provided, a non-matching message is silently dropped.
156 */
157 void xapp::Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
158         Callback*       cb;
159
160         cb = new Callback( fun_name, data );
161         cb_hash[mtype] = cb;
162
163         callbacks = true;
164 }
165
166 /*
167         Message allocation for user to send. User must destroy the message when
168         finished, but may keep the message for as long as is necessary
169         and reuse it over and over.
170 */
171 std::unique_ptr<Message> xapp::Messenger::Alloc_msg( int payload_size ) {
172         return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
173 }
174
175
176 // ----------------- alarm support -----------------------------------------------
177 /*
178         Allocate an alarm object.
179         Alarms must be allocated via the framework becasue we need a wormhole
180         id and to get one of those we need the mrc.  We can easily send with
181         just a message, but to avoid having the user pass the framework
182         object in, we'll just supply a "factory" function.
183 */
184 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( int prob_id, const std::string& meid ) {
185         std::shared_ptr<Message> m;
186         Alarm* a;
187
188         m = Alloc_msg( 4096 );
189         a = new Alarm( m, prob_id, meid );
190         a->Set_whid( Wormhole_open( a->Get_endpoint() ) );
191
192         return std::unique_ptr<Alarm>( a );
193 }
194
195 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( const std::string& meid ) {
196         return Alloc_alarm( -1, meid );
197 }
198
199 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( ) {
200         return Alloc_alarm( -1, "" );
201 }
202
203
204 // ------------------ metrics support --------------------------------------------------
205 std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( ) {
206         std::shared_ptr<Message> m;
207
208         m = Alloc_msg( 4096 );
209         return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m ) );
210 }
211
212 std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( const std::string& source ) {
213         std::shared_ptr<Message> m;
214
215         m = Alloc_msg( 4096 );
216         return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m, source ) );
217 }
218
219 std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( const std::string& reporter, const std::string& source ) {
220         std::shared_ptr<Message> m;
221
222         m = Alloc_msg( 4096 );
223         return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m, reporter, source ) );
224 }
225
226 // ------------------- listening support -----------------------------------------------
227
228 /*
229         The Listen function waits for messages and drives the appropriate callback
230         function when one is received. This function will return to the caller
231         only when the ok to run flag in the object has been set to false (likely
232         never, or only at graceful termination). Callers should normally not
233         expect to have controll returned in the calling thread.
234
235         Concurrently executing listeners are allowed.
236 */
237 void xapp::Messenger::Listen( ) {
238         rmr_mbuf_t*     mbuf = NULL;
239         std::map<int,Callback*>::iterator mi;   // map iterator; silly indirect way to point at the value
240         Callback*       dcb = NULL;                                     // default callback so we don't search
241         Callback*       sel_cb;                                         // callback selected to invoke
242         std::unique_ptr<Message> m;
243
244         if( mrc == NULL ) {
245                 return;
246         }
247
248         mi = cb_hash.find( DEFAULT_CALLBACK );
249         if( mi != cb_hash.end() ) {
250                 dcb = mi->second;                                       // oddly named second field is the address of the callback block
251         }
252
253         while( ok_2_run ) {
254                 mbuf = rmr_torcv_msg( mrc, mbuf, 2000 );                // come up for air every 2 sec to check ok2run
255                 if( mbuf != NULL ) {
256                         if( mbuf->state == RMR_OK ) {
257                                 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );       // auto delteted when scope terminates
258
259                                 sel_cb = dcb;                                                                                   // start with default
260                                 if( callbacks  && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
261                                         sel_cb = mi->second;                                                            // override with user callback
262                                 }
263                                 if( sel_cb != NULL ) {
264                                         sel_cb->Drive_cb( *m );                                                 // drive the selected one
265                                         mbuf = NULL;                                                                    // not safe to use after given to cb
266                                 }
267                         } else {
268                                 if( mbuf->state != RMR_ERR_TIMEOUT ) {
269                                         fprintf( stderr, "<LISTENER> got  bad status: %d\n", mbuf->state );
270                                 }
271                         }
272                 }
273         }
274 }
275
276 /*
277         Wait for the next message, up to a max timout, and return the message received.
278         This function allows the user xAPP to implement their own polling loop (no callbacks).
279 */
280 std::unique_ptr<Message>  xapp::Messenger::Receive( int timeout ) {
281         rmr_mbuf_t*     mbuf = NULL;
282         std::unique_ptr<Message> m = NULL;
283
284         if( mrc != NULL ) {
285                 mbuf = rmr_torcv_msg( mrc, mbuf, timeout );             // future: do we want to reuse the mbuf here?
286                 if( mbuf != NULL ) {
287                         m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );
288                 }
289         }
290
291         return m;
292 }
293
294 /*
295         Called to gracefully stop all listeners.
296 */
297 void xapp::Messenger::Stop( ) {
298         ok_2_run = false;
299 }
300
301 /*
302         RMR messages must be released by RMR as there might be transport
303         buffers that have to be dealt with. Every callback is expected to
304         call this function when finished with the message.
305 void xapp::Messenger::Release_mbuf( void* vmbuf ) {
306         rmr_free_msg( (rmr_mbuf_t *)  vmbuf );
307 }
308 */
309
310 /*
311         Wait for clear to send.
312         Until RMR loads a route table, all sends will fail with a
313         "no endpoint" state.  This function allows the user application
314         to block until RMR has a viable route table.  It does not guarentee
315         that every message that the user app will try to send has an entry.
316
317         The use of this function by the user application allows for the
318         parallel initialisation of the application while waiting for the
319         route table service to generate a table for the application. The
320         initialisation function may be callsed with "no wait" and this
321         function invoked when the application has completed initialisation
322         and is ready to start sending messages.
323
324         The max wait parameter is the maximum number of seconds to block.
325         If RMR never reports ready false is returned.  A true return
326         incidcates all is ready.  If max_wait is 0, then this will only
327         return when RMR is ready to send.
328 */
329 bool xapp::Messenger::Wait_for_cts( int max_wait ) {
330         bool block_4ever;
331         bool    state = false;
332
333         block_4ever = max_wait == 0;
334         while( block_4ever || max_wait > 0 ) {
335                 if( rmr_ready( mrc ) ) {
336                         state = true;
337                         break;
338                 }
339
340                 sleep( 1 );
341                 max_wait--;
342         }
343
344         return state;
345 }
346
347 /*
348         Open a wormhole to the indicated endpoint and return the wormhole ID.
349 */
350 int xapp::Messenger::Wormhole_open( const std::string& endpoint ) {
351         rmr_whid_t whid;
352
353         whid = rmr_wh_open( mrc, endpoint.c_str() );
354
355         return (int) whid;
356 }
357
358
359 } // namespace