2 ==================================================================================
\r
4 Copyright (c) 2018-2019 SAMSUNG and AT&T Intellectual Property.
\r
6 Licensed under the Apache License, Version 2.0 (the "License");
\r
7 you may not use this file except in compliance with the License.
\r
8 You may obtain a copy of the License at
\r
10 http://www.apache.org/licenses/LICENSE-2.0
\r
12 Unless required by applicable law or agreed to in writing, software
\r
13 distributed under the License is distributed on an "AS IS" BASIS,
\r
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
15 See the License for the specific language governing permissions and
\r
16 limitations under the License.
\r
17 ==================================================================================
\r
20 #include "kpi-xapp.hpp"
\r
22 static int RunProg = 1; // keep loop running
\r
24 bool add_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
\r
25 unsigned char node_buffer[32];
\r
26 std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
\r
27 node_buffer[gNodeB.length()] = '\0';
\r
28 bool res = sub_handler.RequestSubscription(he, he_resp, RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
\r
32 void EndProgram(int signum){
\r
33 std::cout <<"Signal received. Stopping program ....." << std::endl;
\r
37 void msg_error(rmr_mbuf_t *message){
\r
38 mdclog_write(MDCLOG_ERR, "Error: %s, %d Could not send RMR message of length %d and type %d, Reason %s", __FILE__, __LINE__, message->len, message->mtype, strerror(errno) );
\r
42 int main(int argc, char *argv[]){
\r
44 // initially set log level to INFO
\r
45 init_logger("XaPP", MDCLOG_INFO);
\r
47 configuration my_config;
\r
49 // set config variables from environment
\r
50 // used when deploying via start-up script
\r
51 get_environment_config(my_config);
\r
53 // over-ride with any command line variables if
\r
55 get_command_line_config(argc, argv, my_config);
\r
57 std::unique_ptr<XaPP> my_xapp;
\r
60 // Reset log level based on configuration
\r
61 init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));
\r
63 if (my_config.gNodeB_list.size() == 0){
\r
64 mdclog_write(MDCLOG_WARN, "WARNING : gNodeB not set for subscription. Subscription MAY FAIL");
\r
67 // Finished parsing command line/environment arguments
\r
68 //=============================================================
\r
70 // instantiate xapp object
\r
71 if(my_config.num_threads >= 1){
\r
72 mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
\r
73 // Create XaPP that starts with specified number of threads
\r
74 my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, my_config.redis_port, 1024, my_config.num_threads);
\r
77 mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
\r
78 //Let XaPP pick threads based on hardware
\r
79 my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, my_config.redis_port, 1024);
\r
83 mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
\r
84 mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
\r
85 mdclog_write(MDCLOG_INFO, "XaPP redis port specified = %d", my_config.redis_port);
\r
87 // Instantiate subscription handler
\r
88 SubscriptionHandler sub_handler;
\r
90 // Instantiate message handlers for RMR
\r
92 std::vector<std::unique_ptr<message_processor> > message_procs;
\r
93 for(int i = 0; i < my_config.num_threads; i++){
\r
94 std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> ();
\r
95 mp_handler.get()->register_subscription_handler(& sub_handler);
\r
96 message_procs.push_back(std::move(mp_handler));
\r
100 // Start the listening loops
\r
101 std::vector<int> thread_ids(my_config.num_threads);
\r
102 unsigned int i = 0;
\r
103 for(auto &e: message_procs){
\r
104 thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);
\r
108 mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
\r
109 mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
\r
111 //======================================================
\r
112 // sgnb Subscription spec
\r
114 int request_id = 2; // will be over-written by subscription handler
\r
116 int function_id = 0;
\r
118 int action_type = 0;
\r
120 int message_type = TypeOfMessage_initiating_message;
\r
121 int procedure_code = ProcedureCode_id_kPIMonitor;
\r
122 std::string egnb_id = "Testgnb";
\r
123 std::string plmn_id = "Testplmn";
\r
125 unsigned char event_buf[128];
\r
126 size_t event_buf_len = 128;
\r
130 e2sm_event_trigger_helper trigger_data;
\r
131 e2sm_event_trigger event_trigger;
\r
133 trigger_data.egNB_id = egnb_id;
\r
134 trigger_data.plmn_id = plmn_id;
\r
135 trigger_data.egNB_id_type = Interface_ID_PR_global_gNB_ID;
\r
136 trigger_data.interface_direction = InterfaceDirection_outgoing;
\r
137 trigger_data.procedure_code = procedure_code;
\r
138 trigger_data.message_type = message_type;
\r
139 res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
\r
141 mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());
\r
145 subscription_helper sgnb_add_subscr_req;
\r
146 subscription_response_helper subscr_response;
\r
148 sgnb_add_subscr_req.clear();
\r
149 sgnb_add_subscr_req.set_request(request_id, req_seq);
\r
150 sgnb_add_subscr_req.set_function_id(function_id);
\r
151 sgnb_add_subscr_req.add_action(action_id, action_type);
\r
154 sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);
\r
155 mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);
\r
158 // keep sending subscription request till successfull for all gnodebs ?
\r
159 auto it = my_config.gNodeB_list.begin();
\r
160 while(my_config.gNodeB_list.size() > 0 && RunProg){
\r
165 mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);
\r
166 res = add_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, *it);
\r
171 if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){
\r
177 mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());
\r
178 // remove node from list,
\r
179 // move to next gnobde
\r
180 it = my_config.gNodeB_list.erase(it);
\r
183 if (it == my_config.gNodeB_list.end()){
\r
184 it = my_config.gNodeB_list.begin();
\r
190 std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;
\r
192 //Register signal handler to stop
\r
193 signal(SIGINT, EndProgram);
\r
194 signal(SIGTERM, EndProgram);
\r
203 for(auto &e: message_procs){
\r
204 mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());
\r
205 std::cout << "Thread " << thread_ids[i] << " Number of packets handled = " << e.get()->get_messages() << std::endl;
\r
210 std::cout <<"Stopping all running threads ..." << std::endl;
\r
212 std::cout <<"Stopped RMR processing threads ...." << std::endl;
\r
214 std::cout <<"Finished ... " << std::endl;
\r