Initialization script now passes signal to xapp process
[ric-app/admin.git] / src / xapp_utils.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 /*  Author : Ashwin Sridharan
21     Date    : Feb 2019
22 */
23
24
25 #include "xapp_utils.hpp"
26
27 // Constructor
28 XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size): _is_ready(0), _listen(false),  _msg_size(msg_size) {
29
30   strcpy(_xapp_name, xapp_name);
31   strcpy(_proto_port, proto_port);
32   init(_msg_size);
33   get_routes();
34
35 };
36   
37 // Destructor closes rmr context if available
38 XaPP::~XaPP(void){
39
40   // Call stop to clear thread table
41   Stop();
42
43   // free memory
44   rmr_free_msg(_rmr_tx_message);
45   
46   if (_rmr_ctx){
47     rmr_close(_rmr_ctx);
48   }
49   
50   // delete mutex
51   delete _transmit;
52 };
53
54 // Get the RMR context 
55 void XaPP::init(int msg_size){
56
57   if (msg_size > RMR_BUFFER_SIZE or msg_size <= 0){
58     std::stringstream ss;
59     ss << "Error ::" << __FILE__ << "," << __LINE__ << " Invalid buffer size  " << msg_size << " for RMR initialization context. Must be between " << 1 << " and " << RMR_BUFFER_SIZE << std::endl;
60     ss << " To change max buffer requested, update RMR_BUFFER_SIZE " << std::endl;
61     mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
62     throw ss.str();
63   }
64   
65   // Initialze the rmr context
66   _rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE);
67   assert(_rmr_ctx != NULL);
68   
69 }
70
71
72 // Blocks till routing table is received
73 void XaPP::get_routes(void){
74   
75   // Wait for getting a routing table ...
76   int i = 0;
77   _is_ready = 0;
78   while( i < MAX_WAIT_TIME){
79     std::cout <<"Waiting for RMR to be ready " << std::endl;
80     if ((_is_ready = rmr_ready(_rmr_ctx)) == 1){
81       break;
82     }
83     sleep(1);
84     i++;
85   };
86
87   if(!_is_ready){
88     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; 
89     std::string error_string = identifier + " Error getting routing table";
90     throw std::runtime_error(error_string);
91   }
92   
93
94   // Get a tx buffer in case we need to do a transmit from the main thread itself
95   _rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE);
96   assert(_rmr_tx_message != NULL);
97   std::cout <<"Route Table received. Send buffer allocated" << std::endl;
98   _transmit = new std::mutex();
99
100 }
101
102 // Send method that takes TLV (type/length/value) input
103 bool XaPP::Send(int type,  size_t payload_len, void *payload, link_types mode, tx_types send_type){
104
105   if (likely(payload_len <= RMR_BUFFER_SIZE)){
106     _rmr_tx_message->mtype  = type;
107     memcpy(_rmr_tx_message->payload, payload, payload_len);
108     _rmr_tx_message->len = payload_len;
109     return Send(_rmr_tx_message, mode, send_type);
110   }
111   else{
112     std::stringstream ss;
113     ss << __FILE__ << "," << __LINE__ << " message payload length " << payload_len << " exceeds maximum allowed size " << RMR_BUFFER_SIZE << std::endl;
114     mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
115   }
116
117   return false;
118 }
119
120 // Send method that takes TLV (type/length/value) input + MEID
121 bool XaPP::Send(int type,  size_t payload_len, void *payload, unsigned char const * meid, link_types mode, tx_types send_type){
122   if (likely(payload_len <= RMR_BUFFER_SIZE)){
123     _rmr_tx_message->mtype  = type;  
124     memcpy(_rmr_tx_message->payload, payload, payload_len);
125     _rmr_tx_message->len = payload_len;
126     rmr_str2meid(_rmr_tx_message, meid);
127     return Send(_rmr_tx_message, mode, send_type);
128   }
129   else{
130     std::stringstream ss;
131     ss << __FILE__ << "," << __LINE__ << " message payload length " << payload_len << " exceeds maximum allowed size " << RMR_BUFFER_SIZE << std::endl;
132     mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
133   }
134
135   return false;
136 }
137
138
139 // Send method that takes a buffer
140 bool XaPP::Send(rmr_mbuf_t * rmr_tx_message, link_types mode, tx_types send_type){
141
142   if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE  && rmr_tx_message->len > 0)){
143     unsigned int i = 0;
144     rmr_tx_message->sub_id = RMR_VOID_SUBID;
145     
146     while(i <= link_retries[mode]){
147       
148       //rmr_tx_message->state = 0; // fix for nng
149       // how to send
150       if(likely(send_type == ROUTE)){
151         rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message);
152       }
153       else{
154         rmr_tx_message = rmr_rts_msg(_rmr_ctx, rmr_tx_message);
155       }
156       
157       if (! rmr_tx_message){
158         // CRITICAL EROR .. log it and return
159         std::stringstream ss;
160         ss << __FILE__ << "," << __LINE__ << " RMR send function returned NULL. Reason = " << strerror(errno) << std::endl;
161         mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
162         return false;
163       }
164       else if (rmr_tx_message->state == RMR_OK){
165         return true;
166       }
167       else  if(rmr_tx_message->state == RMR_ERR_RETRY){
168         i++;
169         std::this_thread::sleep_for(std::chrono::milliseconds(link_delays[mode]));
170       }
171       else {
172         mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno));
173         return false;
174       }
175     
176     }
177   }
178   else{
179     mdclog_write(MDCLOG_ERR, "Error: %s, %d  Invalid state/message for RMR tx. Ready = %d, Message len = %d\n", __FILE__, __LINE__, _is_ready, rmr_tx_message->len);
180     return false;
181   }
182   
183   std::stringstream ss;
184   ss << "Error ::" << __FILE__ << "," << __LINE__ << " Could not send message of type " << rmr_tx_message->mtype << " and size " << rmr_tx_message->len << ". Reason = " << strerror(errno);
185   mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
186   
187   return false;
188 }
189
190
191
192
193 void XaPP::Stop(void){
194   // Get the mutex lock 
195   std::lock_guard<std::mutex> guard(*_transmit);
196   _listen = false;
197
198   //Wait for all threads to stop
199   for(auto & t: thread_group){
200     std::thread::id my_id = t.second.get_id();
201     std::stringstream thread_id;
202     thread_id << my_id;
203     t.second.join();
204
205   }
206
207   // Clear thread table ...
208   thread_group.clear();
209   
210 }
211
212 // default error handler if non specified by user
213 // pass through for now
214 void XaPP::_error_handler(rmr_mbuf_t *message){
215 };
216
217
218
219
220 //----------------------------------------
221 // Some get/set methods
222 //---------------------------------------
223
224 std::string XaPP::get_name(void){
225   return std::string(_xapp_name);
226 }
227
228 int XaPP::get_status(void){
229   return _is_ready;
230 }
231
232 bool XaPP::_isRunning(void){
233   return _listen;
234 }
235
236
237 void * XaPP::get_rmr_context(void){
238   return _rmr_ctx;
239 }
240
241
242 void init_logger(const char  *AppName, mdclog_severity_t log_level)
243 {
244     mdclog_attr_t *attr;
245     mdclog_attr_init(&attr);
246     mdclog_attr_set_ident(attr, AppName);
247     mdclog_init(attr);
248     mdclog_level_set(log_level);
249     mdclog_attr_destroy(attr);
250 }