2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
19 /* Author : Ashwin Sridharan
23 #include "adm-ctrl-xapp.hpp"
25 bool report_mode_only = true;
26 static int RunProg = 1; // keep loop running
29 typedef std::vector<std::unique_ptr<Policy> > plugin_list;
31 std::map<int, Policy *> plugin_rmr_map;
34 bool add_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
35 unsigned char node_buffer[32];
36 std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
37 node_buffer[gNodeB.length()] = '\0';
38 bool res = sub_handler.RequestSubscription(he, he_resp, RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
43 bool delete_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
44 unsigned char node_buffer[32];
45 std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
46 node_buffer[gNodeB.length()] = '\0';
48 bool res = sub_handler.RequestSubscriptionDelete(he, he_resp, RIC_SUB_DEL_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
53 void policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
54 auto it = plugin_rmr_map.find(message_type);
56 if (it != plugin_rmr_map.end()){
58 res = it->second->setPolicy(message, message_len, response);
60 mdclog_write(MDCLOG_INFO, "A1 POLICY SET :: Successfully set A1 Policy\n");
63 mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY SET %s, %d . Unable to set policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
68 res = it->second->getPolicy(message, message_len, response);
70 mdclog_write(MDCLOG_INFO, "A1 POLICY GET : Successfully retreived A1 Policy\n");
73 mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY GET %s, %d . Unable to get policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
79 response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}";
84 void metrics_collector(std::string ves_url, plugin_list * plugins, unsigned int interval){
86 // Instantiate the ves collector
87 curl_interface curl_obj(ves_url);
88 std::string metrics_response;
91 for(unsigned int i = 0; i < plugins->size(); i++){
92 res = (*plugins)[i].get()->getMetrics(metrics_response);
94 mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), metrics_response.c_str());
97 res = curl_obj.post_metrics(metrics_response);
99 mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
102 mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", metrics_response.c_str());
108 std::cout <<"Stopped metrics collector/reporter .." << std::endl;
112 void EndProgram(int signum){
113 std::cout <<"Signal received. Stopping program ....." << std::endl;
117 void msg_error(rmr_mbuf_t *message){
118 mdclog_write(MDCLOG_ERR, "Error: %s, %d Could not send RMR message of length %d and type %d, Reason %s", __FILE__, __LINE__, message->len, message->mtype, strerror(errno) );
122 int main(int argc, char *argv[]){
124 // initially set log level to INFO
125 init_logger("XaPP", MDCLOG_INFO);
127 configuration my_config;
129 // set config variables from environment
130 // used when deploying via start-up script
131 get_environment_config(my_config);
133 // over-ride with any command line variables if
135 get_command_line_config(argc, argv, my_config);
137 std::unique_ptr<XaPP> my_xapp;
140 // Reset log level based on configuration
141 init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));
143 if (my_config.gNodeB_list.size() == 0){
144 mdclog_write(MDCLOG_WARN, "WARNING : gNodeB not set for subscription. Subscription MAY FAIL");
147 if (my_config.operating_mode == "CONTROL"){
148 report_mode_only = false;
151 report_mode_only = true;
154 // Finished passing command line/environment arguments
155 //=============================================================
157 // instantiate xapp object
158 if(my_config.num_threads >= 1){
159 mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
160 // Create XaPP that starts with specified number of threads
161 my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024, my_config.num_threads);
164 mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
165 //Let XaPP pick threads based on hardware
166 my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
170 mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
171 mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
174 // Instantiate admission logic handler
175 Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, 1));
177 // Add reference to plugin list . We add twice (once for set policy and once for get policy ids)
178 // Plugin list is used by policy handler and metrics collector
179 plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_INT_CONTROL, Plugins[0].get()));
180 plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_GET_POLICY, Plugins[0].get()));
182 // instantiate curl object for ves
183 std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
186 // Instantiate subscription handler
187 SubscriptionHandler sub_handler;
189 // Instantiate message handlers for RMR
190 // (one for each thread) and registrer
191 // subscription and admission handlers
193 std::vector<std::unique_ptr<message_processor> > message_procs;
194 for(int i = 0; i < my_config.num_threads; i++){
195 std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> ();
196 mp_handler.get()->register_subscription_handler(& sub_handler);
197 mp_handler.get()->register_protector(dynamic_cast<admission *>(Plugins[0].get())->get_protector_instance(0));
198 mp_handler.get()->register_policy_handler (& policy_handler);
199 message_procs.push_back(std::move(mp_handler));
203 // Start the listening loops
204 std::vector<int> thread_ids(my_config.num_threads);
206 for(auto &e: message_procs){
207 thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);
211 mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
212 mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
214 //======================================================
215 // sgnb Subscription spec
217 int request_id = 2; // will be over-written by subscription handler
221 int action_type = report_mode_only ? 0:1;
223 int message_type = 1;
224 int procedure_code = 27;
225 std::string egnb_id = "Testgnb";
226 std::string plmn_id = "Testplmn";
228 unsigned char event_buf[128];
229 size_t event_buf_len = 128;
233 e2sm_event_trigger_helper trigger_data;
234 e2sm_event_trigger event_trigger;
236 trigger_data.egNB_id = egnb_id;
237 trigger_data.plmn_id = plmn_id;
238 trigger_data.egNB_id_type = 2;
239 trigger_data.interface_direction = 1;
240 trigger_data.procedure_code = procedure_code;
241 trigger_data.message_type = message_type;
242 res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
244 mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());
249 subscription_helper sgnb_add_subscr_req;
250 subscription_response_helper subscr_response;
252 sgnb_add_subscr_req.clear();
253 sgnb_add_subscr_req.set_request(request_id, req_seq);
254 sgnb_add_subscr_req.set_function_id(function_id);
255 sgnb_add_subscr_req.add_action(action_id, action_type);
258 sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);
259 mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);
261 //======================================================
262 // Purely for testing purposes ... write subscription ASN binary to file
264 // pfile = fopen("event_trigger.pr", "wb");
265 // fwrite(event_buf, 1, event_buf_len, pfile);
267 //======================================================
270 // keep sending subscription request till successfull for all gnodebs ?
271 auto it = my_config.gNodeB_list.begin();
272 while(my_config.gNodeB_list.size() > 0 && RunProg){
277 mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);
278 res = add_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, *it);
283 if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){
289 mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());
290 // remove node from list,
291 // move to next gnobde
292 it = my_config.gNodeB_list.erase(it);
295 if (it == my_config.gNodeB_list.end()){
296 it = my_config.gNodeB_list.begin();
302 std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;
304 //Register signal handler to stop
305 signal(SIGINT, EndProgram);
306 signal(SIGTERM, EndProgram);
309 // Purely for testing purposes ....
310 // If in test mode, we wait an interval and then send delete subscription request for each gNodeB
311 if(my_config.test_mode){
312 std::cout <<"====================== " << std::endl;
313 std::cout <<"WE ARE IN TEST MODE. " << std::endl;
314 std::cout <<"====================== " << std::endl;
315 std::cout <<"WILL SEND SUBSCRIPTION DELETE REQUEST AFTER " << my_config.measurement_interval << " SECONDS " << std::endl;
316 sleep(my_config.measurement_interval);
318 // keep sending subscription delete request till successfull ?
321 mdclog_write(MDCLOG_INFO, "Sending subscription delete request for id = %d ... Attempt number = %d\n", sgnb_add_subscr_req.get_request_id(), attempt);
322 res = delete_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, my_config.gNodeB_list[0]);
329 std::cout <<"SUBSCRIPTION DELETE REQUEST :: Successfuly deleted subscription request " << request_id << std::endl;
339 for(auto &e: message_procs){
340 mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());
341 std::cout << "Thread " << thread_ids[i] << " Number of packets handled = " << e.get()->get_messages() << std::endl;
346 std::cout <<"Stopping all running threads ..." << std::endl;
348 std::cout <<"Stopped RMR processing threads ...." << std::endl;
349 metrics_thread.join();
350 std::cout <<"Stopped Metric collection thread ...." << std::endl;
352 plugin_rmr_map.clear();
353 std::cout <<"Cleared Plugins .." << std::endl;
355 std::cout <<"Finished ... " << std::endl;