Initial commit of Admission Control xAPP and E2AP/X2AP definitions
[ric-app/admin.git] / src / adm-ctrl-xapp.cc
diff --git a/src/adm-ctrl-xapp.cc b/src/adm-ctrl-xapp.cc
new file mode 100644 (file)
index 0000000..f9994a3
--- /dev/null
@@ -0,0 +1,358 @@
+/*
+==================================================================================
+
+        Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+/* Author : Ashwin Sridharan
+   Date    : Feb 2019
+*/
+
+#include "adm-ctrl-xapp.hpp"
+
+bool report_mode_only = true;
+static int RunProg = 1;  // keep loop running
+
+// list of plugins 
+typedef  std::vector<std::unique_ptr<Policy> > plugin_list;
+plugin_list Plugins;
+std::map<int, Policy *> plugin_rmr_map;
+
+
+bool add_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
+  unsigned char node_buffer[32];
+  std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
+  node_buffer[gNodeB.length()] = '\0';
+  bool res = sub_handler.RequestSubscription(he, he_resp,  RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
+  return res;
+};
+
+
+bool delete_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper  he_resp, std::string & gNodeB){
+  unsigned char node_buffer[32];
+  std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
+  node_buffer[gNodeB.length()] = '\0';
+
+  bool res = sub_handler.RequestSubscriptionDelete(he, he_resp, RIC_SUB_DEL_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
+  return res;
+};
+
+
+void  policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
+  auto it = plugin_rmr_map.find(message_type);
+  bool res;
+  if (it != plugin_rmr_map.end()){
+    if (set){
+      res = it->second->setPolicy(message, message_len, response);
+      if (res){
+       mdclog_write(MDCLOG_INFO, "A1 POLICY SET :: Successfully set A1 Policy\n");
+      }
+      else{
+       mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY SET   %s, %d . Unable to set policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
+      }
+       
+    }
+    else{
+      res = it->second->getPolicy(message, message_len, response);
+      if (res){
+       mdclog_write(MDCLOG_INFO, "A1 POLICY GET : Successfully retreived  A1 Policy\n");
+      }
+      else{
+       mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY GET  %s, %d . Unable to get policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
+      }
+      
+    }
+  }
+  else{
+    response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}";
+  }
+};
+
+
+void metrics_collector(std::string ves_url, plugin_list * plugins,  unsigned int interval){
+  
+  // Instantiate the ves collector 
+  curl_interface curl_obj(ves_url);
+  std::string metrics_response;
+  int res;
+  while(RunProg){
+    for(unsigned int i = 0; i < plugins->size(); i++){
+      res = (*plugins)[i].get()->getMetrics(metrics_response);
+      if (res != 0){
+       mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), metrics_response.c_str());
+      }
+      else{
+       res = curl_obj.post_metrics(metrics_response);
+       if (!res){
+         mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
+       }
+       else{
+         mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", metrics_response.c_str());
+       }
+      }
+    }
+    sleep(interval);
+  }
+  std::cout <<"Stopped metrics collector/reporter .." << std::endl;
+};
+    
+
+void EndProgram(int signum){
+  std::cout <<"Signal received. Stopping program ....." << std::endl;
+  RunProg = 0;
+}
+
+void msg_error(rmr_mbuf_t *message){
+  mdclog_write(MDCLOG_ERR, "Error: %s, %d  Could not send RMR message of length %d and type %d, Reason %s",  __FILE__, __LINE__, message->len,  message->mtype, strerror(errno) );
+};
+
+
+int main(int argc, char *argv[]){
+
+  // initially set log level to INFO
+  init_logger("XaPP", MDCLOG_INFO);
+  
+  configuration my_config;
+
+  // set config variables from environment
+  // used when deploying via start-up script
+  get_environment_config(my_config);
+
+  // over-ride with any command line variables if
+  // provided
+  get_command_line_config(argc, argv, my_config);
+
+  std::unique_ptr<XaPP> my_xapp;
+
+
+  // Reset log level based on configuration
+  init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));
+  
+  if (my_config.gNodeB_list.size() == 0){
+    mdclog_write(MDCLOG_WARN, "WARNING  : gNodeB not set for subscription. Subscription MAY FAIL");
+  }
+
+  if (my_config.operating_mode == "CONTROL"){
+    report_mode_only = false;
+  }
+  else{
+    report_mode_only = true;
+  }
+  
+  // Finished passing command line/environment arguments 
+  //=============================================================
+
+   // instantiate xapp object
+   if(my_config.num_threads >= 1){
+    mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
+    // Create XaPP that starts with specified number of threads 
+    my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024, my_config.num_threads);
+  }
+  else{
+    mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
+    //Let XaPP pick threads based on hardware 
+    my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
+  }
+  
+
+  mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
+  mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
+
+  
+   // Instantiate admission logic handler
+   Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file,  1));
+  
+   // Add reference to plugin list . We add twice (once for set policy  and once for get policy  ids)
+   // Plugin list is used by policy handler and metrics collector
+   plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_INT_CONTROL, Plugins[0].get()));
+   plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_GET_POLICY,  Plugins[0].get()));
+   
+   // instantiate curl object for ves 
+   std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
+
+
+   // Instantiate subscription handler
+   SubscriptionHandler sub_handler;
+
+   // Instantiate message handlers for RMR
+   // (one for each thread) and registrer
+   // subscription and admission handlers
+   
+   std::vector<std::unique_ptr<message_processor> > message_procs;
+   for(int i = 0; i < my_config.num_threads; i++){
+     std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> ();
+     mp_handler.get()->register_subscription_handler(& sub_handler);
+     mp_handler.get()->register_protector(dynamic_cast<admission *>(Plugins[0].get())->get_protector_instance(0));
+     mp_handler.get()->register_policy_handler (& policy_handler);
+     message_procs.push_back(std::move(mp_handler));
+   }
+  
+  
+   // Start the listening loops
+   std::vector<int> thread_ids(my_config.num_threads);
+   unsigned int i = 0;
+   for(auto  &e: message_procs){
+     thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);
+     i++;
+   };
+
+   mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
+   mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
+
+   //======================================================
+   // sgnb Subscription spec
+
+   int request_id = 2; // will be over-written by subscription handler
+   int req_seq = 1;
+   int function_id = 0;
+   int action_id = 4;
+   int action_type = report_mode_only ? 0:1;
+   
+   int message_type = 1;
+   int procedure_code = 27;
+   std::string egnb_id = "Testgnb";
+   std::string plmn_id = "Testplmn";
+
+   unsigned char event_buf[128];
+   size_t event_buf_len = 128;
+   bool res;
+
+
+   e2sm_event_trigger_helper trigger_data;
+   e2sm_event_trigger event_trigger;
+  
+   trigger_data.egNB_id = egnb_id;
+   trigger_data.plmn_id = plmn_id;
+   trigger_data.egNB_id_type = 2;
+   trigger_data.interface_direction = 1;
+   trigger_data.procedure_code = procedure_code;
+   trigger_data.message_type = message_type;
+   res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
+   if (!res){
+     mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());
+     exit(0);
+   }
+  
+
+   subscription_helper sgnb_add_subscr_req;
+   subscription_response_helper subscr_response;
+  
+   sgnb_add_subscr_req.clear();
+   sgnb_add_subscr_req.set_request(request_id, req_seq);
+   sgnb_add_subscr_req.set_function_id(function_id);
+   sgnb_add_subscr_req.add_action(action_id, action_type);
+  
+  
+   sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);
+   mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);
+
+   //======================================================
+   // Purely for testing purposes ... write subscription ASN binary to file 
+   // FILE *pfile;
+   // pfile = fopen("event_trigger.pr", "wb");
+   // fwrite(event_buf, 1, event_buf_len,  pfile);
+   // fclose(pfile);
+   //======================================================
+  
+   
+   // keep sending subscription request till successfull for all gnodebs ?
+   auto it = my_config.gNodeB_list.begin();
+   while(my_config.gNodeB_list.size() > 0 && RunProg){
+     int attempt = 0;
+     res = false;
+      
+     while(!res){
+       mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);
+       res = add_subscription(sub_handler, my_xapp.get(),  sgnb_add_subscr_req, subscr_response, *it);
+       if (!res){
+        sleep(5);
+       };
+       attempt ++;
+       if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){
+        break;
+       }
+     }
+     
+     if(res){
+       mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());
+       // remove node from list,
+       // move to next gnobde
+       it = my_config.gNodeB_list.erase(it);
+     }
+
+     if (it == my_config.gNodeB_list.end()){
+       it = my_config.gNodeB_list.begin();
+     }
+     
+   }
+   
+   
+   std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;
+
+   //Register signal handler to stop 
+   signal(SIGINT, EndProgram);
+   signal(SIGTERM, EndProgram);
+   
+
+   // Purely for testing purposes ....
+   // If in test mode, we wait an interval and then send delete subscription request for each gNodeB
+   if(my_config.test_mode){
+     std::cout <<"====================== " << std::endl;
+     std::cout <<"WE ARE IN TEST MODE. " << std::endl;
+     std::cout <<"====================== " << std::endl;
+     std::cout <<"WILL SEND SUBSCRIPTION DELETE REQUEST AFTER " << my_config.measurement_interval << " SECONDS " << std::endl;
+     sleep(my_config.measurement_interval); 
+     res = false;
+     // keep sending subscription delete request till successfull ? 
+     int attempt = 0;
+     while(!res){
+       mdclog_write(MDCLOG_INFO, "Sending subscription delete request for id = %d ... Attempt number = %d\n", sgnb_add_subscr_req.get_request_id(), attempt);
+       res = delete_subscription(sub_handler, my_xapp.get(),  sgnb_add_subscr_req, subscr_response, my_config.gNodeB_list[0]);
+       if (!res){
+        sleep(5);
+       };
+       attempt ++;
+     }
+
+     std::cout <<"SUBSCRIPTION DELETE REQUEST :: Successfuly deleted subscription request " << request_id << std::endl;
+     
+   };
+   
+   //Wait for stop
+   while(RunProg){
+     sleep(10);
+   }
+  
+   i = 0;
+   for(auto  &e: message_procs){
+     mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());
+     std::cout << "Thread " << thread_ids[i] << "  Number of packets handled = " <<  e.get()->get_messages() << std::endl;
+     
+     i ++ ;
+   }
+   
+   std::cout <<"Stopping all running threads ..." << std::endl;
+   (*my_xapp).Stop();
+   std::cout <<"Stopped RMR processing threads ...." << std::endl;
+   metrics_thread.join();
+   std::cout <<"Stopped Metric collection thread ...." << std::endl;
+   Plugins.clear();
+   plugin_rmr_map.clear();
+   std::cout <<"Cleared Plugins .." << std::endl;
+  
+   std::cout <<"Finished ... " << std::endl;
+   return 0;
+};