Adding initial code jy.oak@samsung.com
[ric-app/kpimon.git] / src / kpi-xapp.cc
diff --git a/src/kpi-xapp.cc b/src/kpi-xapp.cc
new file mode 100755 (executable)
index 0000000..8625510
--- /dev/null
@@ -0,0 +1,217 @@
+/*\r
+==================================================================================\r
+\r
+        Copyright (c) 2018-2019 AT&T Intellectual Property.\r
+\r
+   Licensed under the Apache License, Version 2.0 (the "License");\r
+   you may not use this file except in compliance with the License.\r
+   You may obtain a copy of the License at\r
+\r
+       http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+   Unless required by applicable law or agreed to in writing, software\r
+   distributed under the License is distributed on an "AS IS" BASIS,\r
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+   See the License for the specific language governing permissions and\r
+   limitations under the License.\r
+==================================================================================\r
+*/\r
+\r
+#include "kpi-xapp.hpp"\r
+\r
+static int RunProg = 1;  // keep loop running\r
+\r
+bool add_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){\r
+  unsigned char node_buffer[32];\r
+  std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);\r
+  node_buffer[gNodeB.length()] = '\0';\r
+  bool res = sub_handler.RequestSubscription(he, he_resp,  RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));\r
+  return res;\r
+};\r
+\r
+void EndProgram(int signum){\r
+  std::cout <<"Signal received. Stopping program ....." << std::endl;\r
+  RunProg = 0;\r
+}\r
+\r
+void msg_error(rmr_mbuf_t *message){\r
+  mdclog_write(MDCLOG_ERR, "Error: %s, %d  Could not send RMR message of length %d and type %d, Reason %s",  __FILE__, __LINE__, message->len,  message->mtype, strerror(errno) );\r
+};\r
+\r
+\r
+int main(int argc, char *argv[]){\r
+\r
+  // initially set log level to INFO\r
+  init_logger("XaPP", MDCLOG_INFO);\r
+  \r
+  configuration my_config;\r
+\r
+  // set config variables from environment\r
+  // used when deploying via start-up script\r
+  get_environment_config(my_config);\r
+\r
+  // over-ride with any command line variables if\r
+  // provided\r
+  get_command_line_config(argc, argv, my_config);\r
+\r
+  std::unique_ptr<XaPP> my_xapp;\r
+\r
+\r
+  // Reset log level based on configuration\r
+  init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));\r
+  \r
+  if (my_config.gNodeB_list.size() == 0){\r
+    mdclog_write(MDCLOG_WARN, "WARNING  : gNodeB not set for subscription. Subscription MAY FAIL");\r
+  }\r
+  \r
+  // Finished parsing command line/environment arguments \r
+  //=============================================================\r
+\r
+   // instantiate xapp object\r
+   if(my_config.num_threads >= 1){\r
+    mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);\r
+    // Create XaPP that starts with specified number of threads \r
+    my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, my_config.redis_port, 1024, my_config.num_threads);\r
+  }\r
+  else{\r
+    mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");\r
+    //Let XaPP pick threads based on hardware \r
+    my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, my_config.redis_port, 1024);\r
+  }\r
+  \r
+\r
+  mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);\r
+  mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);\r
+  mdclog_write(MDCLOG_INFO, "XaPP redis port specified = %d", my_config.redis_port);\r
+\r
+   // Instantiate subscription handler\r
+   SubscriptionHandler sub_handler;\r
+\r
+   // Instantiate message handlers for RMR\r
+\r
+   std::vector<std::unique_ptr<message_processor> > message_procs;\r
+   for(int i = 0; i < my_config.num_threads; i++){\r
+     std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> ();\r
+     mp_handler.get()->register_subscription_handler(& sub_handler);\r
+     message_procs.push_back(std::move(mp_handler));\r
+   }\r
+  \r
+  \r
+   // Start the listening loops\r
+   std::vector<int> thread_ids(my_config.num_threads);\r
+   unsigned int i = 0;\r
+   for(auto  &e: message_procs){\r
+     thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);\r
+     i++;\r
+   };\r
+\r
+   mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");\r
+   mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());\r
+\r
+   //======================================================\r
+   // sgnb Subscription spec\r
+\r
+   int request_id = 2; // will be over-written by subscription handler\r
+   int req_seq = 1;\r
+   int function_id = 0;\r
+   int action_id = 4;\r
+   int action_type = 0;\r
+\r
+   int message_type = TypeOfMessage_initiating_message;\r
+   int procedure_code = ProcedureCode_id_kPIMonitor;\r
+   std::string egnb_id = "Testgnb";\r
+   std::string plmn_id = "Testplmn";\r
+\r
+   unsigned char event_buf[128];\r
+   size_t event_buf_len = 128;\r
+   bool res;\r
+\r
+\r
+   e2sm_event_trigger_helper trigger_data;\r
+   e2sm_event_trigger event_trigger;\r
+  \r
+   trigger_data.egNB_id = egnb_id;\r
+   trigger_data.plmn_id = plmn_id;\r
+   trigger_data.egNB_id_type = Interface_ID_PR_global_gNB_ID;\r
+   trigger_data.interface_direction = InterfaceDirection_outgoing;\r
+   trigger_data.procedure_code = procedure_code;\r
+   trigger_data.message_type = message_type;\r
+   res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);\r
+   if (!res){\r
+     mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());\r
+     exit(0);\r
+   }\r
+\r
+   subscription_helper sgnb_add_subscr_req;\r
+   subscription_response_helper subscr_response;\r
+  \r
+   sgnb_add_subscr_req.clear();\r
+   sgnb_add_subscr_req.set_request(request_id, req_seq);\r
+   sgnb_add_subscr_req.set_function_id(function_id);\r
+   sgnb_add_subscr_req.add_action(action_id, action_type);\r
+  \r
+  \r
+   sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);\r
+   mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);\r
+  \r
+   \r
+   // keep sending subscription request till successfull for all gnodebs ?\r
+   auto it = my_config.gNodeB_list.begin();\r
+   while(my_config.gNodeB_list.size() > 0 && RunProg){\r
+     int attempt = 0;\r
+     res = false;\r
+      \r
+     while(!res){\r
+       mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);\r
+       res = add_subscription(sub_handler, my_xapp.get(),  sgnb_add_subscr_req, subscr_response, *it);\r
+       if (!res){\r
+        sleep(5);\r
+       };\r
+       attempt ++;\r
+       if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){\r
+        break;\r
+       }\r
+     }\r
+     \r
+     if(res){\r
+       mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());\r
+       // remove node from list,\r
+       // move to next gnobde\r
+       it = my_config.gNodeB_list.erase(it);\r
+     }\r
+\r
+     if (it == my_config.gNodeB_list.end()){\r
+       it = my_config.gNodeB_list.begin();\r
+     }\r
+     \r
+   }\r
+   \r
+   \r
+   std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;\r
+\r
+   //Register signal handler to stop \r
+   signal(SIGINT, EndProgram);\r
+   signal(SIGTERM, EndProgram);\r
+   \r
+\r
+   //Wait for stop\r
+   while(RunProg){\r
+     sleep(10);\r
+   }\r
+  \r
+   i = 0;\r
+   for(auto  &e: message_procs){\r
+     mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());\r
+     std::cout << "Thread " << thread_ids[i] << "  Number of packets handled = " <<  e.get()->get_messages() << std::endl;\r
+     \r
+     i ++ ;\r
+   }\r
+   \r
+   std::cout <<"Stopping all running threads ..." << std::endl;\r
+   (*my_xapp).Stop();\r
+   std::cout <<"Stopped RMR processing threads ...." << std::endl;\r
+\r
+   std::cout <<"Finished ... " << std::endl;\r
\r
+   return 0;\r
+};\r