Unit test improvements, add example programme
[ric-plt/xapp-frame-cpp.git] / src / messaging / messenger.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       messenger.cpp
23         Abstract:       Message Router Messenger.
24
25         Date:           10 March 2020
26         Author:         E. Scott Daniels
27 */
28
29 #include <string.h>
30 #include <unistd.h>
31
32 #include <rmr/rmr.h>
33 #include <rmr/RIC_message_types.h>
34
35
36 #include <iostream>
37 #include <string>
38 #include <map>
39 #include <memory>
40 #include <mutex>
41
42 #include "callback.hpp"
43 #include "default_cb.hpp"               // default callback prototypes
44 #include "message.hpp"
45 #include "messenger.hpp"
46
47
48 // --------------- private -----------------------------------------------------
49
50
51 // ---------------- C++ buggerd up way of maintining class constants ----------
52 const int Messenger::MAX_PAYLOAD = (1024*64);
53 const int Messenger::DEFAULT_CALLBACK = -1;
54
55 // --------------- builders -----------------------------------------------
56 /*
57         If wait4table is true, then the construction of the object does not
58         complete until the underlying transport has a new copy of the route
59         table.
60
61         If port is nil, then the default port is used (4560).
62 */
63 Messenger::Messenger( char* port, bool wait4table ) {
64         if( port == NULL ) {
65                 port = (char *) "4560";
66         }
67
68         gate = new std::mutex();
69         listen_port = strdup( port );
70         mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
71
72         if( wait4table ) {
73                 this->Wait_for_cts( 0 );
74         }
75
76         Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL );         // add our default call backs
77
78         ok_2_run = true;
79 }
80
81 /*
82         Move support. We DO allow the instance to be moved as only one copy 
83         remains following the move.
84         Given a source object instance (soi) we move the information to 
85         the new object, and then DELETE what was moved so that when the
86         user frees the soi, it doesn't destroy what we snarfed.
87 */
88 Messenger::Messenger( Messenger&& soi ) {
89         mrc = soi.mrc;
90         listen_port = soi.listen_port;
91         ok_2_run = soi.ok_2_run;
92         gate = soi.gate;
93                 cb_hash = soi.cb_hash;                          // this seems dodgy
94         
95         soi.gate = NULL;
96         soi.listen_port = NULL;
97         soi.mrc = NULL;
98 }
99
100 /*
101         Move operator. Given a source object instance, movee it's contents
102         to this insance.  We must first clean up this instance.
103 */
104 Messenger& Messenger::operator=( Messenger&& soi ) {
105         if( this != &soi ) {                            // cannot move onto ourself
106                 if( mrc != NULL ) {
107                         rmr_close( mrc );
108                 }
109                 if( listen_port != NULL ) {
110                         free( listen_port );
111                 }
112
113                 mrc = soi.mrc;
114                 listen_port = soi.listen_port;
115                 ok_2_run = soi.ok_2_run;
116                 gate = soi.gate;
117                         cb_hash = soi.cb_hash;                          // this seems dodgy
118                 
119                 soi.gate = NULL;
120                 soi.listen_port = NULL;
121                 soi.mrc = NULL;
122         }
123
124         return *this;
125 }
126
127 /*
128         Destroyer.
129 */
130 Messenger::~Messenger() {
131         if( mrc != NULL ) {
132                 rmr_close( mrc );
133         }
134
135         if( listen_port != NULL ) {
136                 free( listen_port );
137         }
138 }
139
140 /*
141         Allow user to register a callback function invoked when a specific type of
142         message is received.  The user may pass an optional data pointer which
143         will be passed to the function when it is called.  The function signature
144         must be:
145                 void fun( Messenger* mr, rmr_mbuf_t* mbuf,  void* data );
146
147         The user can also invoke this function to set the "default" callback by
148         passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
149         is defined for a message type, the default callback function is invoked.
150         If a default is not provided, a non-matching message is silently dropped.
151 */
152 void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
153         Callback*       cb;
154
155         cb = new Callback( fun_name, data );
156         cb_hash[mtype] = cb;
157
158         callbacks = true;
159 }
160
161 /*
162         Message allocation for user to send. User must destroy the message when
163         finished, but may keep the message for as long as is necessary
164         and reuse it over and over.
165 */
166 //Message* Messenger::Alloc_msg( int payload_size ) {
167 std::unique_ptr<Message> Messenger::Alloc_msg( int payload_size ) {
168         return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
169 }
170
171 void Messenger::Listen( ) {
172         int count = 0;
173         rmr_mbuf_t*     mbuf = NULL;
174         std::map<int,Callback*>::iterator mi;   // map iterator; silly indirect way to point at the value
175         Callback*       dcb = NULL;                                     // default callback so we don't search
176         Callback*       sel_cb;                                         // callback selected to invoke
177         std::unique_ptr<Message>m;
178
179         if( mrc == NULL ) {
180                 return;
181         }
182
183         mi = cb_hash.find( DEFAULT_CALLBACK );
184         if( mi != cb_hash.end() ) {
185                 dcb = mi->second;                                       // oddly named second field is the address of the callback block
186         }
187
188         while( ok_2_run ) {
189                 mbuf = rmr_torcv_msg( mrc, mbuf, 2000 );                // come up for air every 2 sec to check ok2run
190                 if( mbuf != NULL ) {
191                         if( mbuf->state == RMR_OK ) {
192                                 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );       // auto delteted when scope terminates
193
194                                 sel_cb = dcb;                                                                                   // start with default
195                                 if( callbacks  && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
196                                         sel_cb = mi->second;                                                            // override with user callback
197                                 }
198                                 if( sel_cb != NULL ) {
199                                         sel_cb->Drive_cb( *m );                                                 // drive the selected one
200                                         mbuf = NULL;                                                                    // not safe to use after given to cb
201                                 }
202                         } else {
203                                 if( mbuf->state != RMR_ERR_TIMEOUT ) {
204                                         fprintf( stderr, "<LISTENER> got  bad status: %d\n", mbuf->state );
205                                 }
206                         }
207                 }
208         }
209 }
210
211 /*
212         Wait for the next message, up to a max timout, and return the message received.
213 */
214 std::unique_ptr<Message>  Messenger::Receive( int timeout ) {
215         rmr_mbuf_t*     mbuf = NULL;
216         std::unique_ptr<Message> m = NULL;
217
218         if( mrc != NULL ) {
219                 mbuf = rmr_torcv_msg( mrc, mbuf, timeout );             // future: do we want to reuse the mbuf here?
220                 if( mbuf != NULL ) {
221                         m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );
222                 }
223         }
224
225         return m;
226 }
227
228 /*
229         Called to gracefully stop all listeners.
230 */
231 void Messenger::Stop( ) {
232         ok_2_run = false;
233 }
234
235 /*
236         RMR messages must be released by RMR as there might be transport
237         buffers that have to be dealt with. Every callback is expected to
238         call this function when finished with the message.
239 void Messenger::Release_mbuf( void* vmbuf ) {
240         rmr_free_msg( (rmr_mbuf_t *)  vmbuf );
241 }
242 */
243
244 /*
245         Wait for clear to send.
246         Until RMR loads a route table, all sends will fail with a
247         "no endpoint" state.  This function allows the user application
248         to block until RMR has a viable route table.  It does not guarentee
249         that every message that the user app will try to send has an entry.
250
251         The use of this function by the user application allows for the
252         parallel initialisation of the application while waiting for the
253         route table service to generate a table for the application. The
254         initialisation function may be callsed with "no wait" and this
255         function invoked when the application has completed initialisation
256         and is ready to start sending messages.
257
258         The max wait parameter is the maximum number of seconds to block.
259         If RMR never reports ready false is returned.  A true return
260         incidcates all is ready.  If max_wait is 0, then this will only
261         return when RMR is ready to send.
262 */
263 bool Messenger::Wait_for_cts( int max_wait ) {
264         bool block_4ever;
265         bool    state = false;
266
267         block_4ever = max_wait == 0;
268         while( block_4ever || max_wait > 0 ) {
269                 if( rmr_ready( mrc ) ) {
270                         state = true;
271                         break;
272                 }
273
274                 sleep( 1 );
275                 max_wait--;
276         }
277
278         return state;
279 }