User story RICPLT-2620
[ric-app/admin.git] / src / adm-ctrl-xapp.cc
index f7ddb30..387ca1d 100644 (file)
 
 #include "adm-ctrl-xapp.hpp"
 
+int run_program = 1; 
+
 
-static int RunProg = 1;  // keep loop running
 
 // list of plugins 
-typedef  std::vector<std::unique_ptr<Policy> > plugin_list;
 plugin_list Plugins;
 std::map<int, Policy *> plugin_rmr_map;
 
+// Policy handler : All plugins (of abstract class Policy) are registered with the plugin list.
+// The policy handler  is registered as a call back to the RMR message handler. When a policy related RMR message
+// is received, this function is invoked. It finds the appropriate plugin from the plugin list, passes the policy message and then
+// returns back the response (in the response string)
 
-int add_subscription(subscription_handler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
-  unsigned char node_buffer[32];
-  std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
-  node_buffer[gNodeB.length()] = '\0';
-  int res = sub_handler.RequestSubscription(he, he_resp,  RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
-  return res;
-};
-
-
-int delete_subscription(subscription_handler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper  he_resp, std::string & gNodeB){
-  unsigned char node_buffer[32];
-  std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
-  node_buffer[gNodeB.length()] = '\0';
-
-  int res = sub_handler.RequestSubscriptionDelete(he, he_resp, RIC_SUB_DEL_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
-  return res;
-};
-
+// NOTE : This version of policy handler was written with R1 policy protocol in mind.
+// Specifically, it was assumed that each message type corresponds to either set/get for a specific xAPP.
+// It still works with R2, but ideally should be modified since a single policy type could be applied to multiple plugins.
 
 void  policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
   auto it = plugin_rmr_map.find(message_type);
@@ -77,29 +66,37 @@ void  policy_handler(int message_type, const char * message, int message_len, st
   }
   else{
     response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}";
+    mdclog_write(MDCLOG_ERR, "Error ! %s, %d : %s\n", __FILE__, __LINE__, response.c_str());
   }
 };
 
 
+
+// polling function that routinely queries all plugins for metrics and then posts them on
+// VES url
 void metrics_collector(std::string ves_url, plugin_list * plugins,  unsigned int interval){
   
   // Instantiate the ves collector 
   curl_interface curl_obj(ves_url);
-  std::string metrics_response;
+  std::vector<std::string> response_vector;
   int res;
-  while(RunProg){
+  while(run_program){
     for(unsigned int i = 0; i < plugins->size(); i++){
-      res = (*plugins)[i].get()->getMetrics(metrics_response);
+      response_vector.clear();
+      res =  (*plugins)[i].get()->getMetrics(response_vector);
       if (res != 0){
-       mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), metrics_response.c_str());
+       mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), (*plugins)[i].get()->get_error().c_str());
       }
       else{
-       res = curl_obj.post_metrics(metrics_response);
-       if (!res){
-         mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
-       }
-       else{
-         mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", metrics_response.c_str());
+       // send each response
+       for(auto  &e: response_vector){
+         res = curl_obj.post_metrics(e);
+         if (!res){
+           mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
+         }
+         else{
+           mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", e.c_str());
+         }
        }
       }
     }
@@ -111,9 +108,10 @@ void metrics_collector(std::string ves_url, plugin_list * plugins,  unsigned int
 
 void EndProgram(int signum){
   std::cout <<"Signal received. Stopping program ....." << std::endl;
-  RunProg = 0;
+  run_program = 0;
+  
 }
-
+// ideally should be expanded for rollback purposes etc.
 void msg_error(rmr_mbuf_t *message){
   mdclog_write(MDCLOG_ERR, "Error: %s, %d  Could not send RMR message of length %d and type %d, Reason %s",  __FILE__, __LINE__, message->len,  message->mtype, strerror(errno) );
 };
@@ -144,8 +142,8 @@ int main(int argc, char *argv[]){
     mdclog_write(MDCLOG_WARN, "WARNING  : gNodeB not set for subscription. Subscription MAY FAIL");
   }
 
-
   std::string operating_mode;
+
   // How are we operating ? 
   if (my_config.operating_mode == "CONTROL"){
     // Full closed loop : process E2AP indication,
@@ -173,35 +171,25 @@ int main(int argc, char *argv[]){
   // Finished passing command line/environment arguments 
   //=============================================================
 
-   // instantiate xapp object
-   if(my_config.num_threads >= 1){
-    mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
-    // Create XaPP that starts with specified number of threads 
-    my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024, my_config.num_threads);
-  }
-  else{
-    mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
-    //Let XaPP pick threads based on hardware 
-    my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
-  }
-  
-
+  // instantiate xapp-rmr-framework  object
+  mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
   mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
   mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
 
+  my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
   
-   // Instantiate admission logic handler
-  Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file,  1, my_config.report_mode_only));
   
-   // Add reference to plugin list . We add twice (once for set policy  and once for get policy  ids)
-   // Plugin list is used by policy handler and metrics collector
-   plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_INT_CONTROL, Plugins[0].get()));
-   plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_GET_POLICY,  Plugins[0].get()));
+  // Instantiate admission logic handler (with only one instance for now)
+  int num_instances = 1;
+  Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file,  num_instances, my_config.xapp_id, my_config.report_mode_only));
+  
+  // Add reference to plugin list . 
+  // Plugin list is used by policy handler and metrics collector
+   plugin_rmr_map.insert(std::pair<int, Policy *>(A1_POLICY_REQ, Plugins[0].get()));
    
-   // instantiate curl object for ves 
+   // instantiate ves thread (it polls all plugins and sends out their metrics) 
    std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
 
-
    // Instantiate subscription handler
    subscription_handler sub_handler;
 
@@ -219,7 +207,7 @@ int main(int argc, char *argv[]){
    }
   
   
-   // Start the listening loops
+   // Start the RMR listening loops
    std::vector<int> thread_ids(my_config.num_threads);
    unsigned int i = 0;
    for(auto  &e: message_procs){
@@ -230,120 +218,28 @@ int main(int argc, char *argv[]){
    mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
    mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
 
-   //======================================================
-   // sgnb Subscription spec
-
-   int request_id = 2; // will be over-written by subscription handler
-   int req_seq = 1;
-   int function_id = 0;
-   int action_id = 4;
-   int action_type = my_config.report_mode_only ? 0:1;
-   
-   int message_type = 1;
-   int procedure_code = 27;
-   std::string egnb_id = "Testgnb";
-   std::string plmn_id = "Testplmn";
-
-   unsigned char event_buf[128];
-   size_t event_buf_len = 128;
-   bool res;
+   //Register signal handler to stop
+   signal(SIGINT, EndProgram);
+   signal(SIGTERM, EndProgram);
 
+   // Instantiate startup/shutown subroutine objects
+   init boot_shutdown((*my_xapp), sub_handler, my_config);
 
-   e2sm_event_trigger_helper trigger_data;
-   e2sm_event_trigger event_trigger;
-  
-   trigger_data.egNB_id = egnb_id;
-   trigger_data.plmn_id = plmn_id;
-   trigger_data.egNB_id_type = 2;
-   trigger_data.interface_direction = 1;
-   trigger_data.procedure_code = procedure_code;
-   trigger_data.message_type = message_type;
-   res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
-   if (!res){
-     mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());
-     exit(0);
-   }
-  
 
-   subscription_helper sgnb_add_subscr_req;
-   subscription_response_helper subscr_response;
-  
-   sgnb_add_subscr_req.clear();
-   sgnb_add_subscr_req.set_request(request_id, req_seq);
-   sgnb_add_subscr_req.set_function_id(function_id);
-   sgnb_add_subscr_req.add_action(action_id, action_type);
-  
-  
-   sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);
-   mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);
-
-   //======================================================
-   // Purely for testing purposes ... write subscription ASN binary to file 
-   // FILE *pfile;
-   // pfile = fopen("event_trigger.pr", "wb");
-   // fwrite(event_buf, 1, event_buf_len,  pfile);
-   // fclose(pfile);
-   //======================================================
-  
-   
-   // keep sending subscription request till successfull for all gnodebs or exceed max attempts  ?
-   // or exceed max_iterations
-   auto it = my_config.gNodeB_list.begin();
-   int loop = 0;
-   int num_nodes = my_config.gNodeB_list.size();
-   
-   while((loop < num_nodes * my_config.max_sub_loops) && my_config.gNodeB_list.size() > 0 && RunProg){
-     int attempt = 0;
-     int subscr_result = -1;
-      
-     while(1){
-       mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);
-       subscr_result = add_subscription(sub_handler, my_xapp.get(),  sgnb_add_subscr_req, subscr_response, *it);
-       if (subscr_result == SUBSCR_SUCCESS){
-        break;
-       }
-       sleep(5);
-       attempt ++;
-       if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){
-        break;
-       }
-     }
-     
-     if(subscr_result == SUBSCR_SUCCESS){
-       mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());
-       // remove node from list,
-       // move to next gnobde
-       it = my_config.gNodeB_list.erase(it);
-     }
-
-     if (it == my_config.gNodeB_list.end()){
-       it = my_config.gNodeB_list.begin();
-     }
-
-     loop++;
-     
-   }
-   
-   if (my_config.gNodeB_list.size() == 0){
-     std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;
-   }
-   else{
-     std::cerr <<"SUBSCRIPTION REQUEST :: Failed to subscribe for following gNodeBs" << std::endl;
-     for(const auto &e: my_config.gNodeB_list){
-       std::cerr <<"Failed to subscribe for gNodeB " << e << std::endl;
-     }
-   }
-   
-   //Register signal handler to stop 
-   signal(SIGINT, EndProgram);
-   signal(SIGTERM, EndProgram);
+   // Trigger start functions
+   boot_shutdown.startup();
    
    
    //Wait for stop
-   while(RunProg){
+   while(run_program){
      sleep(10);
    }
-  
+
+
+   // we are in shutdown mode
+   // send out subscription deletes
+   boot_shutdown.shutdown();
+
    i = 0;
    for(auto  &e: message_procs){
      mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());