Add API allowing xAPPs to send alarm messages
[ric-plt/xapp-frame-cpp.git] / src / messaging / messenger.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       messenger.cpp
23         Abstract:       Message Router Messenger.
24
25         Date:           10 March 2020
26         Author:         E. Scott Daniels
27 */
28
29 #include <string.h>
30 #include <unistd.h>
31
32 #include <rmr/rmr.h>
33 #include <rmr/RIC_message_types.h>
34
35
36 #include <iostream>
37 #include <string>
38 #include <map>
39 #include <memory>
40 #include <mutex>
41
42 #include "callback.hpp"
43 #include "default_cb.hpp"               // default callback prototypes
44 #include "message.hpp"
45 #include "messenger.hpp"
46 #include "alarm.hpp"
47
48 namespace xapp {
49
50 // --------------- private -----------------------------------------------------
51
52
53 // ---------------- C++ buggerd up way of maintining class constants ----------
54 const int xapp::Messenger::MAX_PAYLOAD = (1024*64);
55 const int xapp::Messenger::DEFAULT_CALLBACK = -1;
56
57 // --------------- builders -----------------------------------------------
58 /*
59         If wait4table is true, then the construction of the object does not
60         complete until the underlying transport has a new copy of the route
61         table.
62
63         If port is nil, then the default port is used (4560).
64 */
65 xapp::Messenger::Messenger( const char* uport, bool wait4table ) {
66
67         if( uport == NULL ) {
68                 listen_port = strdup( "4560" );
69         } else {
70                 listen_port = strdup( uport );
71         }
72
73         gate = new std::mutex();
74         mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
75
76         if( wait4table ) {
77                 this->Wait_for_cts( 0 );
78         }
79
80         Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL );         // add our default call backs
81
82         ok_2_run = true;
83 }
84
85 /*
86         Move support. We DO allow the instance to be moved as only one copy
87         remains following the move.
88         Given a source object instance (soi) we move the information to
89         the new object, and then DELETE what was moved so that when the
90         user frees the soi, it doesn't destroy what we snarfed.
91 */
92 xapp::Messenger::Messenger( Messenger&& soi ) {
93         mrc = soi.mrc;
94         listen_port = soi.listen_port;
95         ok_2_run = soi.ok_2_run;
96         gate = soi.gate;
97                 cb_hash = soi.cb_hash;                          // this seems dodgy
98
99         soi.gate = NULL;
100         soi.listen_port = NULL;
101         soi.mrc = NULL;
102 }
103
104 /*
105         Move operator. Given a source object instance, movee it's contents
106         to this insance.  We must first clean up this instance.
107 */
108 xapp::Messenger& Messenger::operator=( Messenger&& soi ) {
109         if( this != &soi ) {                            // cannot move onto ourself
110                 if( mrc != NULL ) {
111                         rmr_close( mrc );
112                 }
113                 if( listen_port != NULL ) {
114                         free( listen_port );
115                 }
116
117                 mrc = soi.mrc;
118                 listen_port = soi.listen_port;
119                 ok_2_run = soi.ok_2_run;
120                 gate = soi.gate;
121                         cb_hash = soi.cb_hash;                          // this seems dodgy
122
123                 soi.gate = NULL;
124                 soi.listen_port = NULL;
125                 soi.mrc = NULL;
126         }
127
128         return *this;
129 }
130
131 /*
132         Destroyer.
133 */
134 xapp::Messenger::~Messenger() {
135         if( mrc != NULL ) {
136                 rmr_close( mrc );
137         }
138
139         if( listen_port != NULL ) {
140                 free( listen_port );
141         }
142 }
143
144 /*
145         Allow user to register a callback function invoked when a specific type of
146         message is received.  The user may pass an optional data pointer which
147         will be passed to the function when it is called.  The function signature
148         must be:
149                 void fun( Messenger* mr, rmr_mbuf_t* mbuf,  void* data );
150
151         The user can also invoke this function to set the "default" callback by
152         passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
153         is defined for a message type, the default callback function is invoked.
154         If a default is not provided, a non-matching message is silently dropped.
155 */
156 void xapp::Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
157         Callback*       cb;
158
159         cb = new Callback( fun_name, data );
160         cb_hash[mtype] = cb;
161
162         callbacks = true;
163 }
164
165 /*
166         Message allocation for user to send. User must destroy the message when
167         finished, but may keep the message for as long as is necessary
168         and reuse it over and over.
169 */
170 std::unique_ptr<Message> xapp::Messenger::Alloc_msg( int payload_size ) {
171         return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
172 }
173
174
175 // ----------------- alarm support -----------------------------------------------
176 /*
177         Allocate an alarm object.
178         Alarms must be allocated via the framework becasue we need a wormhole
179         id and to get one of those we need the mrc.  We can easily send with
180         just a message, but to avoid having the user pass the framework
181         object in, we'll just supply a "factory" function.
182 */
183 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( int prob_id, std::string meid ) {
184         std::shared_ptr<Message> m;
185         Alarm* a;
186
187         m = Alloc_msg( 4096 );
188         a = new Alarm( m, prob_id, meid );
189         a->Set_whid( Wormhole_open( a->Get_endpoint() ) );
190
191         return std::unique_ptr<Alarm>( a );
192 }
193
194 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( std::string meid ) {
195         return Alloc_alarm( -1, meid );
196 }
197
198 std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( ) {
199         return Alloc_alarm( -1, "" );
200 }
201
202
203 // ------------------- listening support -----------------------------------------------
204
205 /*
206         The Listen function waits for messages and drives the appropriate callback
207         function when one is received. This function will return to the caller
208         only when the ok to run flag in the object has been set to false (likely
209         never, or only at graceful termination). Callers should normally not
210         expect to have controll returned in the calling thread.
211
212         Concurrently executing listeners are allowed.
213 */
214 void xapp::Messenger::Listen( ) {
215         int count = 0;
216         rmr_mbuf_t*     mbuf = NULL;
217         std::map<int,Callback*>::iterator mi;   // map iterator; silly indirect way to point at the value
218         Callback*       dcb = NULL;                                     // default callback so we don't search
219         Callback*       sel_cb;                                         // callback selected to invoke
220         std::unique_ptr<Message> m;
221
222         if( mrc == NULL ) {
223                 return;
224         }
225
226         mi = cb_hash.find( DEFAULT_CALLBACK );
227         if( mi != cb_hash.end() ) {
228                 dcb = mi->second;                                       // oddly named second field is the address of the callback block
229         }
230
231         while( ok_2_run ) {
232                 mbuf = rmr_torcv_msg( mrc, mbuf, 2000 );                // come up for air every 2 sec to check ok2run
233                 if( mbuf != NULL ) {
234                         if( mbuf->state == RMR_OK ) {
235                                 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );       // auto delteted when scope terminates
236
237                                 sel_cb = dcb;                                                                                   // start with default
238                                 if( callbacks  && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
239                                         sel_cb = mi->second;                                                            // override with user callback
240                                 }
241                                 if( sel_cb != NULL ) {
242                                         sel_cb->Drive_cb( *m );                                                 // drive the selected one
243                                         mbuf = NULL;                                                                    // not safe to use after given to cb
244                                 }
245                         } else {
246                                 if( mbuf->state != RMR_ERR_TIMEOUT ) {
247                                         fprintf( stderr, "<LISTENER> got  bad status: %d\n", mbuf->state );
248                                 }
249                         }
250                 }
251         }
252 }
253
254 /*
255         Wait for the next message, up to a max timout, and return the message received.
256         This function allows the user xAPP to implement their own polling loop (no callbacks).
257 */
258 std::unique_ptr<Message>  xapp::Messenger::Receive( int timeout ) {
259         rmr_mbuf_t*     mbuf = NULL;
260         std::unique_ptr<Message> m = NULL;
261
262         if( mrc != NULL ) {
263                 mbuf = rmr_torcv_msg( mrc, mbuf, timeout );             // future: do we want to reuse the mbuf here?
264                 if( mbuf != NULL ) {
265                         m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );
266                 }
267         }
268
269         return m;
270 }
271
272 /*
273         Called to gracefully stop all listeners.
274 */
275 void xapp::Messenger::Stop( ) {
276         ok_2_run = false;
277 }
278
279 /*
280         RMR messages must be released by RMR as there might be transport
281         buffers that have to be dealt with. Every callback is expected to
282         call this function when finished with the message.
283 void xapp::Messenger::Release_mbuf( void* vmbuf ) {
284         rmr_free_msg( (rmr_mbuf_t *)  vmbuf );
285 }
286 */
287
288 /*
289         Wait for clear to send.
290         Until RMR loads a route table, all sends will fail with a
291         "no endpoint" state.  This function allows the user application
292         to block until RMR has a viable route table.  It does not guarentee
293         that every message that the user app will try to send has an entry.
294
295         The use of this function by the user application allows for the
296         parallel initialisation of the application while waiting for the
297         route table service to generate a table for the application. The
298         initialisation function may be callsed with "no wait" and this
299         function invoked when the application has completed initialisation
300         and is ready to start sending messages.
301
302         The max wait parameter is the maximum number of seconds to block.
303         If RMR never reports ready false is returned.  A true return
304         incidcates all is ready.  If max_wait is 0, then this will only
305         return when RMR is ready to send.
306 */
307 bool xapp::Messenger::Wait_for_cts( int max_wait ) {
308         bool block_4ever;
309         bool    state = false;
310
311         block_4ever = max_wait == 0;
312         while( block_4ever || max_wait > 0 ) {
313                 if( rmr_ready( mrc ) ) {
314                         state = true;
315                         break;
316                 }
317
318                 sleep( 1 );
319                 max_wait--;
320         }
321
322         return state;
323 }
324
325 /*
326         Open a wormhole to the indicated endpoint and return the wormhole ID.
327 */
328 int xapp::Messenger::Wormhole_open( std::string endpoint ) {
329         rmr_whid_t whid;
330
331         whid = rmr_wh_open( mrc, endpoint.c_str() );
332
333         return (int) whid;
334 }
335
336
337 } // namespace