--- /dev/null
+/*\r
+==================================================================================\r
+\r
+ Copyright (c) 2018-2019 AT&T Intellectual Property.\r
+\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+==================================================================================\r
+*/\r
+\r
+#include "kpi-xapp.hpp"\r
+\r
+static int RunProg = 1; // keep loop running\r
+\r
+bool add_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){\r
+ unsigned char node_buffer[32];\r
+ std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);\r
+ node_buffer[gNodeB.length()] = '\0';\r
+ bool res = sub_handler.RequestSubscription(he, he_resp, RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));\r
+ return res;\r
+};\r
+\r
+void EndProgram(int signum){\r
+ std::cout <<"Signal received. Stopping program ....." << std::endl;\r
+ RunProg = 0;\r
+}\r
+\r
+void msg_error(rmr_mbuf_t *message){\r
+ mdclog_write(MDCLOG_ERR, "Error: %s, %d Could not send RMR message of length %d and type %d, Reason %s", __FILE__, __LINE__, message->len, message->mtype, strerror(errno) );\r
+};\r
+\r
+\r
+int main(int argc, char *argv[]){\r
+\r
+ // initially set log level to INFO\r
+ init_logger("XaPP", MDCLOG_INFO);\r
+ \r
+ configuration my_config;\r
+\r
+ // set config variables from environment\r
+ // used when deploying via start-up script\r
+ get_environment_config(my_config);\r
+\r
+ // over-ride with any command line variables if\r
+ // provided\r
+ get_command_line_config(argc, argv, my_config);\r
+\r
+ std::unique_ptr<XaPP> my_xapp;\r
+\r
+\r
+ // Reset log level based on configuration\r
+ init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));\r
+ \r
+ if (my_config.gNodeB_list.size() == 0){\r
+ mdclog_write(MDCLOG_WARN, "WARNING : gNodeB not set for subscription. Subscription MAY FAIL");\r
+ }\r
+ \r
+ // Finished parsing command line/environment arguments \r
+ //=============================================================\r
+\r
+ // instantiate xapp object\r
+ if(my_config.num_threads >= 1){\r
+ mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);\r
+ // Create XaPP that starts with specified number of threads \r
+ my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, my_config.redis_port, 1024, my_config.num_threads);\r
+ }\r
+ else{\r
+ mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");\r
+ //Let XaPP pick threads based on hardware \r
+ my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, my_config.redis_port, 1024);\r
+ }\r
+ \r
+\r
+ mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);\r
+ mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);\r
+ mdclog_write(MDCLOG_INFO, "XaPP redis port specified = %d", my_config.redis_port);\r
+\r
+ // Instantiate subscription handler\r
+ SubscriptionHandler sub_handler;\r
+\r
+ // Instantiate message handlers for RMR\r
+\r
+ std::vector<std::unique_ptr<message_processor> > message_procs;\r
+ for(int i = 0; i < my_config.num_threads; i++){\r
+ std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> ();\r
+ mp_handler.get()->register_subscription_handler(& sub_handler);\r
+ message_procs.push_back(std::move(mp_handler));\r
+ }\r
+ \r
+ \r
+ // Start the listening loops\r
+ std::vector<int> thread_ids(my_config.num_threads);\r
+ unsigned int i = 0;\r
+ for(auto &e: message_procs){\r
+ thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);\r
+ i++;\r
+ };\r
+\r
+ mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");\r
+ mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());\r
+\r
+ //======================================================\r
+ // sgnb Subscription spec\r
+\r
+ int request_id = 2; // will be over-written by subscription handler\r
+ int req_seq = 1;\r
+ int function_id = 0;\r
+ int action_id = 4;\r
+ int action_type = 0;\r
+\r
+ int message_type = TypeOfMessage_initiating_message;\r
+ int procedure_code = ProcedureCode_id_kPIMonitor;\r
+ std::string egnb_id = "Testgnb";\r
+ std::string plmn_id = "Testplmn";\r
+\r
+ unsigned char event_buf[128];\r
+ size_t event_buf_len = 128;\r
+ bool res;\r
+\r
+\r
+ e2sm_event_trigger_helper trigger_data;\r
+ e2sm_event_trigger event_trigger;\r
+ \r
+ trigger_data.egNB_id = egnb_id;\r
+ trigger_data.plmn_id = plmn_id;\r
+ trigger_data.egNB_id_type = Interface_ID_PR_global_gNB_ID;\r
+ trigger_data.interface_direction = InterfaceDirection_outgoing;\r
+ trigger_data.procedure_code = procedure_code;\r
+ trigger_data.message_type = message_type;\r
+ res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);\r
+ if (!res){\r
+ mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());\r
+ exit(0);\r
+ }\r
+\r
+ subscription_helper sgnb_add_subscr_req;\r
+ subscription_response_helper subscr_response;\r
+ \r
+ sgnb_add_subscr_req.clear();\r
+ sgnb_add_subscr_req.set_request(request_id, req_seq);\r
+ sgnb_add_subscr_req.set_function_id(function_id);\r
+ sgnb_add_subscr_req.add_action(action_id, action_type);\r
+ \r
+ \r
+ sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);\r
+ mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);\r
+ \r
+ \r
+ // keep sending subscription request till successfull for all gnodebs ?\r
+ auto it = my_config.gNodeB_list.begin();\r
+ while(my_config.gNodeB_list.size() > 0 && RunProg){\r
+ int attempt = 0;\r
+ res = false;\r
+ \r
+ while(!res){\r
+ mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);\r
+ res = add_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, *it);\r
+ if (!res){\r
+ sleep(5);\r
+ };\r
+ attempt ++;\r
+ if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){\r
+ break;\r
+ }\r
+ }\r
+ \r
+ if(res){\r
+ mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());\r
+ // remove node from list,\r
+ // move to next gnobde\r
+ it = my_config.gNodeB_list.erase(it);\r
+ }\r
+\r
+ if (it == my_config.gNodeB_list.end()){\r
+ it = my_config.gNodeB_list.begin();\r
+ }\r
+ \r
+ }\r
+ \r
+ \r
+ std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;\r
+\r
+ //Register signal handler to stop \r
+ signal(SIGINT, EndProgram);\r
+ signal(SIGTERM, EndProgram);\r
+ \r
+\r
+ //Wait for stop\r
+ while(RunProg){\r
+ sleep(10);\r
+ }\r
+ \r
+ i = 0;\r
+ for(auto &e: message_procs){\r
+ mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());\r
+ std::cout << "Thread " << thread_ids[i] << " Number of packets handled = " << e.get()->get_messages() << std::endl;\r
+ \r
+ i ++ ;\r
+ }\r
+ \r
+ std::cout <<"Stopping all running threads ..." << std::endl;\r
+ (*my_xapp).Stop();\r
+ std::cout <<"Stopped RMR processing threads ...." << std::endl;\r
+\r
+ std::cout <<"Finished ... " << std::endl;\r
+ \r
+ return 0;\r
+};\r