Initial source commit
[ric-plt/xapp-frame-cpp.git] / src / messaging / messenger.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       messenger.cpp
23         Abstract:       Message Router Messenger.
24
25         Date:           10 March 2020
26         Author:         E. Scott Daniels
27 */
28
29 #include <string.h>
30 #include <unistd.h>
31
32 #include <rmr/rmr.h>
33 #include <rmr/RIC_message_types.h>
34
35
36 #include <iostream>
37 #include <string>
38 #include <map>
39 #include <memory>
40 #include <mutex>
41
42 #include "callback.hpp"
43 #include "default_cb.hpp"               // default callback prototypes
44 #include "message.hpp"
45 #include "messenger.hpp"
46
47
48 // --------------- private -----------------------------------------------------
49
50
51 // ---------------- C++ buggerd up way of maintining class constants ----------
52 const int Messenger::MAX_PAYLOAD = (1024*64);
53 const int Messenger::DEFAULT_CALLBACK = -1;
54
55 // --------------- builders -----------------------------------------------
56 /*
57         If wait4table is true, then the construction of the object does not
58         complete until the underlying transport has a new copy of the route
59         table.
60
61         If port is nil, then the default port is used (4560).
62 */
63 Messenger::Messenger( char* port, bool wait4table ) {
64         if( port == NULL ) {
65                 port = (char *) "4560";
66         }
67
68         gate = new std::mutex();
69         listen_port = strdup( port );
70         mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
71
72         if( wait4table ) {
73                 this->Wait_for_cts( 0 );
74         }
75
76         Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL );         // add our default call backs
77
78         ok_2_run = true;
79 }
80
81 /*
82         Destroyer.
83 */
84 Messenger::~Messenger() {
85         if( mrc != NULL ) {
86                 rmr_close( mrc );
87         }
88
89         free( listen_port );
90 }
91
92 /*
93         Allow user to register a callback function invoked when a specific type of
94         message is received.  The user may pass an optional data pointer which
95         will be passed to the function when it is called.  The function signature
96         must be:
97                 void fun( Messenger* mr, rmr_mbuf_t* mbuf,  void* data );
98
99         The user can also invoke this function to set the "default" callback by
100         passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
101         is defined for a message type, the default callback function is invoked.
102         If a default is not provided, a non-matching message is silently dropped.
103 */
104 void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
105         Callback*       cb;
106
107         cb = new Callback( fun_name, data );
108         cb_hash[mtype] = cb;
109
110         callbacks = true;
111 }
112
113 /*
114         Message allocation for user to send. User must destroy the message when
115         finished, but may keep the message for as long as is necessary
116         and reuse it over and over.
117 */
118 //Message* Messenger::Alloc_msg( int payload_size ) {
119 std::unique_ptr<Message> Messenger::Alloc_msg( int payload_size ) {
120         return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
121 }
122
123 void Messenger::Listen( ) {
124         int count = 0;
125         rmr_mbuf_t*     mbuf = NULL;
126         std::map<int,Callback*>::iterator mi;   // map iterator; silly indirect way to point at the value
127         Callback*       dcb = NULL;                                     // default callback so we don't search
128         Callback*       sel_cb;                                         // callback selected to invoke
129         std::unique_ptr<Message>m;
130
131         if( mrc == NULL ) {
132                 return;
133         }
134
135         mi = cb_hash.find( DEFAULT_CALLBACK );
136         if( mi != cb_hash.end() ) {
137                 dcb = mi->second;                                       // oddly named second field is the address of the callback block
138         }
139
140         while( ok_2_run ) {
141                 mbuf = rmr_torcv_msg( mrc, mbuf, 2000 );                // come up for air every 2 sec to check ok2run
142                 if( mbuf != NULL ) {
143                         if( mbuf->state == RMR_OK ) {
144                                 m = std::unique_ptr<Message>( new Message( mbuf, mrc ) );       // auto delteted when scope terminates
145
146                                 sel_cb = dcb;                                                                                   // start with default
147                                 if( callbacks  && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
148                                         sel_cb = mi->second;                                                            // override with user callback
149                                 }
150                                 if( sel_cb != NULL ) {
151                                         sel_cb->Drive_cb( *this, *m );                                          // drive the selected one
152                                         mbuf = NULL;                                                                            // not safe to use after given to cb
153                                 }
154                         } else {
155                                 if( mbuf->state != RMR_ERR_TIMEOUT ) {
156                                         fprintf( stderr, "<LISTENER> got  bad status: %d\n", mbuf->state );
157                                 }
158                         }
159                 }
160         }
161 }
162
163 /*
164         Wait for the next message, up to a max timout, and return the message received.
165 */
166 std::unique_ptr<Message>  Messenger::Receive( int timeout ) {
167         rmr_mbuf_t*     mbuf = NULL;
168         //std::unique_ptr<Message> m;
169
170         if( mrc == NULL ) {
171                 return NULL;
172         }
173
174         mbuf = rmr_torcv_msg( mrc, mbuf, timeout );             // future: do we want to reuse the mbuf here?
175         if( mbuf != NULL ) {
176                 return std::unique_ptr<Message>( new Message( mbuf, mrc ) );
177         }
178
179         return NULL;
180 }
181
182 /*
183         Called to gracefully stop all listeners.
184 */
185 void Messenger::Stop( ) {
186         ok_2_run = false;
187 }
188
189 /*
190         RMR messages must be released by RMR as there might be transport
191         buffers that have to be dealt with. Every callback is expected to
192         call this function when finished with the message.
193 */
194 void Messenger::Release_mbuf( void* vmbuf ) {
195         rmr_free_msg( (rmr_mbuf_t *)  vmbuf );
196 }
197
198 /*
199         Wait for clear to send.
200         Until RMR loads a route table, all sends will fail with a
201         "no endpoint" state.  This function allows the user application
202         to block until RMR has a viable route table.  It does not guarentee
203         that every message that the user app will try to send has an entry.
204
205         The use of this function by the user application allows for the
206         parallel initialisation of the application while waiting for the
207         route table service to generate a table for the application. The
208         initialisation function may be callsed with "no wait" and this
209         function invoked when the application has completed initialisation
210         and is ready to start sending messages.
211
212         The max wait parameter is the maximum number of seconds to block.
213         If RMR never reports ready false is returned.  A true return
214         incidcates all is ready.  If max_wait is 0, then this will only
215         return when RMR is ready to send.
216 */
217 bool Messenger::Wait_for_cts( int max_wait ) {
218         bool block_4ever;
219
220         block_4ever = max_wait == 0;
221         while( block_4ever || max_wait > 0 ) {
222                 if( rmr_ready( mrc ) ) {
223                         return true;
224                 }
225
226                 sleep( 1 );
227                 max_wait--;
228         }
229
230         return false;
231 }