Updated INFO.yaml file
[ric-app/kpimon.git] / src / kpi-xapp.cc
1 /*\r
2 ==================================================================================\r
3 \r
4         Copyright (c) 2018-2019 SAMSUNG and AT&T Intellectual Property.\r
5 \r
6    Licensed under the Apache License, Version 2.0 (the "License");\r
7    you may not use this file except in compliance with the License.\r
8    You may obtain a copy of the License at\r
9 \r
10        http://www.apache.org/licenses/LICENSE-2.0\r
11 \r
12    Unless required by applicable law or agreed to in writing, software\r
13    distributed under the License is distributed on an "AS IS" BASIS,\r
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
15    See the License for the specific language governing permissions and\r
16    limitations under the License.\r
17 ==================================================================================\r
18 */\r
19 \r
20 #include "kpi-xapp.hpp"\r
21 \r
22 static int RunProg = 1;  // keep loop running\r
23 \r
24 bool add_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){\r
25   unsigned char node_buffer[32];\r
26   std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);\r
27   node_buffer[gNodeB.length()] = '\0';\r
28   bool res = sub_handler.RequestSubscription(he, he_resp,  RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));\r
29   return res;\r
30 };\r
31 \r
32 void EndProgram(int signum){\r
33   std::cout <<"Signal received. Stopping program ....." << std::endl;\r
34   RunProg = 0;\r
35 }\r
36 \r
37 void msg_error(rmr_mbuf_t *message){\r
38   mdclog_write(MDCLOG_ERR, "Error: %s, %d  Could not send RMR message of length %d and type %d, Reason %s",  __FILE__, __LINE__, message->len,  message->mtype, strerror(errno) );\r
39 };\r
40 \r
41 \r
42 int main(int argc, char *argv[]){\r
43 \r
44   // initially set log level to INFO\r
45   init_logger("XaPP", MDCLOG_INFO);\r
46   \r
47   configuration my_config;\r
48 \r
49   // set config variables from environment\r
50   // used when deploying via start-up script\r
51   get_environment_config(my_config);\r
52 \r
53   // over-ride with any command line variables if\r
54   // provided\r
55   get_command_line_config(argc, argv, my_config);\r
56 \r
57   std::unique_ptr<XaPP> my_xapp;\r
58 \r
59 \r
60   // Reset log level based on configuration\r
61   init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));\r
62   \r
63   if (my_config.gNodeB_list.size() == 0){\r
64     mdclog_write(MDCLOG_WARN, "WARNING  : gNodeB not set for subscription. Subscription MAY FAIL");\r
65   }\r
66   \r
67   // Finished parsing command line/environment arguments \r
68   //=============================================================\r
69 \r
70    // instantiate xapp object\r
71    if(my_config.num_threads >= 1){\r
72     mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);\r
73     // Create XaPP that starts with specified number of threads \r
74     my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, my_config.redis_port, 1024, my_config.num_threads);\r
75   }\r
76   else{\r
77     mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");\r
78     //Let XaPP pick threads based on hardware \r
79     my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, my_config.redis_port, 1024);\r
80   }\r
81   \r
82 \r
83   mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);\r
84   mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);\r
85   mdclog_write(MDCLOG_INFO, "XaPP redis port specified = %d", my_config.redis_port);\r
86 \r
87    // Instantiate subscription handler\r
88    SubscriptionHandler sub_handler;\r
89 \r
90    // Instantiate message handlers for RMR\r
91 \r
92    std::vector<std::unique_ptr<message_processor> > message_procs;\r
93    for(int i = 0; i < my_config.num_threads; i++){\r
94      std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> ();\r
95      mp_handler.get()->register_subscription_handler(& sub_handler);\r
96      message_procs.push_back(std::move(mp_handler));\r
97    }\r
98   \r
99   \r
100    // Start the listening loops\r
101    std::vector<int> thread_ids(my_config.num_threads);\r
102    unsigned int i = 0;\r
103    for(auto  &e: message_procs){\r
104      thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);\r
105      i++;\r
106    };\r
107 \r
108    mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");\r
109    mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());\r
110 \r
111    //======================================================\r
112    // sgnb Subscription spec\r
113 \r
114    int request_id = 2; // will be over-written by subscription handler\r
115    int req_seq = 1;\r
116    int function_id = 0;\r
117    int action_id = 4;\r
118    int action_type = 0;\r
119 \r
120    int message_type = TypeOfMessage_initiating_message;\r
121    int procedure_code = ProcedureCode_id_kPIMonitor;\r
122    std::string egnb_id = "Testgnb";\r
123    std::string plmn_id = "Testplmn";\r
124 \r
125    unsigned char event_buf[128];\r
126    size_t event_buf_len = 128;\r
127    bool res;\r
128 \r
129 \r
130    e2sm_event_trigger_helper trigger_data;\r
131    e2sm_event_trigger event_trigger;\r
132   \r
133    trigger_data.egNB_id = egnb_id;\r
134    trigger_data.plmn_id = plmn_id;\r
135    trigger_data.egNB_id_type = Interface_ID_PR_global_gNB_ID;\r
136    trigger_data.interface_direction = InterfaceDirection_outgoing;\r
137    trigger_data.procedure_code = procedure_code;\r
138    trigger_data.message_type = message_type;\r
139    res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);\r
140    if (!res){\r
141      mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());\r
142      exit(0);\r
143    }\r
144 \r
145    subscription_helper sgnb_add_subscr_req;\r
146    subscription_response_helper subscr_response;\r
147   \r
148    sgnb_add_subscr_req.clear();\r
149    sgnb_add_subscr_req.set_request(request_id, req_seq);\r
150    sgnb_add_subscr_req.set_function_id(function_id);\r
151    sgnb_add_subscr_req.add_action(action_id, action_type);\r
152   \r
153   \r
154    sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);\r
155    mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);\r
156   \r
157    \r
158    // keep sending subscription request till successfull for all gnodebs ?\r
159    auto it = my_config.gNodeB_list.begin();\r
160    while(my_config.gNodeB_list.size() > 0 && RunProg){\r
161      int attempt = 0;\r
162      res = false;\r
163       \r
164      while(!res){\r
165        mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);\r
166        res = add_subscription(sub_handler, my_xapp.get(),  sgnb_add_subscr_req, subscr_response, *it);\r
167        if (!res){\r
168          sleep(5);\r
169        };\r
170        attempt ++;\r
171        if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){\r
172          break;\r
173        }\r
174      }\r
175      \r
176      if(res){\r
177        mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());\r
178        // remove node from list,\r
179        // move to next gnobde\r
180        it = my_config.gNodeB_list.erase(it);\r
181      }\r
182 \r
183      if (it == my_config.gNodeB_list.end()){\r
184        it = my_config.gNodeB_list.begin();\r
185      }\r
186      \r
187    }\r
188    \r
189    \r
190    std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;\r
191 \r
192    //Register signal handler to stop \r
193    signal(SIGINT, EndProgram);\r
194    signal(SIGTERM, EndProgram);\r
195    \r
196 \r
197    //Wait for stop\r
198    while(RunProg){\r
199      sleep(10);\r
200    }\r
201   \r
202    i = 0;\r
203    for(auto  &e: message_procs){\r
204      mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());\r
205      std::cout << "Thread " << thread_ids[i] << "  Number of packets handled = " <<  e.get()->get_messages() << std::endl;\r
206      \r
207      i ++ ;\r
208    }\r
209    \r
210    std::cout <<"Stopping all running threads ..." << std::endl;\r
211    (*my_xapp).Stop();\r
212    std::cout <<"Stopped RMR processing threads ...." << std::endl;\r
213 \r
214    std::cout <<"Finished ... " << std::endl;\r
215  \r
216    return 0;\r
217 };\r