2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
19 /* Author : Ashwin Sridharan
23 #include "adm-ctrl-xapp.hpp"
26 static int RunProg = 1; // keep loop running
29 typedef std::vector<std::unique_ptr<Policy> > plugin_list;
31 std::map<int, Policy *> plugin_rmr_map;
34 int add_subscription(subscription_handler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
35 unsigned char node_buffer[32];
36 std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
37 node_buffer[gNodeB.length()] = '\0';
38 int res = sub_handler.RequestSubscription(he, he_resp, RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
43 int delete_subscription(subscription_handler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
44 unsigned char node_buffer[32];
45 std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
46 node_buffer[gNodeB.length()] = '\0';
48 int res = sub_handler.RequestSubscriptionDelete(he, he_resp, RIC_SUB_DEL_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
53 void policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
54 auto it = plugin_rmr_map.find(message_type);
56 if (it != plugin_rmr_map.end()){
58 res = it->second->setPolicy(message, message_len, response);
60 mdclog_write(MDCLOG_INFO, "A1 POLICY SET :: Successfully set A1 Policy\n");
63 mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY SET %s, %d . Unable to set policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
68 res = it->second->getPolicy(message, message_len, response);
70 mdclog_write(MDCLOG_INFO, "A1 POLICY GET : Successfully retreived A1 Policy\n");
73 mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY GET %s, %d . Unable to get policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
79 response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}";
84 void metrics_collector(std::string ves_url, plugin_list * plugins, unsigned int interval){
86 // Instantiate the ves collector
87 curl_interface curl_obj(ves_url);
88 std::string metrics_response;
91 for(unsigned int i = 0; i < plugins->size(); i++){
92 res = (*plugins)[i].get()->getMetrics(metrics_response);
94 mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), metrics_response.c_str());
97 res = curl_obj.post_metrics(metrics_response);
99 mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
102 mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", metrics_response.c_str());
108 std::cout <<"Stopped metrics collector/reporter .." << std::endl;
112 void EndProgram(int signum){
113 std::cout <<"Signal received. Stopping program ....." << std::endl;
117 void msg_error(rmr_mbuf_t *message){
118 mdclog_write(MDCLOG_ERR, "Error: %s, %d Could not send RMR message of length %d and type %d, Reason %s", __FILE__, __LINE__, message->len, message->mtype, strerror(errno) );
122 int main(int argc, char *argv[]){
124 // initially set log level to INFO
125 init_logger("XaPP", MDCLOG_INFO);
127 configuration my_config;
129 // set config variables from environment
130 // used when deploying via start-up script
131 get_environment_config(my_config);
133 // over-ride with any command line variables if
135 get_command_line_config(argc, argv, my_config);
137 std::unique_ptr<XaPP> my_xapp;
140 // Reset log level based on configuration
141 init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));
143 if (my_config.gNodeB_list.size() == 0){
144 mdclog_write(MDCLOG_WARN, "WARNING : gNodeB not set for subscription. Subscription MAY FAIL");
148 std::string operating_mode;
149 // How are we operating ?
150 if (my_config.operating_mode == "CONTROL"){
151 // Full closed loop : process E2AP indication,
152 // E2SM, X2AP and generate response
153 my_config.report_mode_only = false;
154 operating_mode = "CLOSED LOOP CONTROL";
156 else if (my_config.operating_mode == "E2AP_PROC_ONLY"){
157 // Debugging : processing only E2AP indication
158 my_config.processing_level = ProcessingLevelTypes::E2AP_PROC_ONLY;
159 operating_mode = "E2AP PROCESSING ONLY";
161 else if (my_config.operating_mode == "E2SM_PROC_ONLY"){
162 // Debugging : processing only till E2SM indication header
163 my_config.processing_level = ProcessingLevelTypes::E2SM_PROC_ONLY;
164 operating_mode = "E2SM PROCESSING ONLY";
167 // Passive : processing till X2AP but do not generate response
168 my_config.report_mode_only = true;
169 operating_mode = "REPORTING ONLY";
172 mdclog_write(MDCLOG_DEBUG, "Operating mode of Admission Control xAPP is %s\n", operating_mode.c_str());
173 // Finished passing command line/environment arguments
174 //=============================================================
176 // instantiate xapp object
177 if(my_config.num_threads >= 1){
178 mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
179 // Create XaPP that starts with specified number of threads
180 my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024, my_config.num_threads);
183 mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
184 //Let XaPP pick threads based on hardware
185 my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
189 mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
190 mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
193 // Instantiate admission logic handler
194 Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, 1, my_config.report_mode_only));
196 // Add reference to plugin list . We add twice (once for set policy and once for get policy ids)
197 // Plugin list is used by policy handler and metrics collector
198 plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_INT_CONTROL, Plugins[0].get()));
199 plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_GET_POLICY, Plugins[0].get()));
201 // instantiate curl object for ves
202 std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
205 // Instantiate subscription handler
206 subscription_handler sub_handler;
208 // Instantiate message handlers for RMR
209 // (one for each thread) and registrer
210 // subscription and admission handlers
212 std::vector<std::unique_ptr<message_processor> > message_procs;
213 for(int i = 0; i < my_config.num_threads; i++){
214 std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> (my_config.processing_level, my_config.report_mode_only);
215 mp_handler.get()->register_subscription_handler(& sub_handler);
216 mp_handler.get()->register_protector(dynamic_cast<admission *>(Plugins[0].get())->get_protector_instance(0));
217 mp_handler.get()->register_policy_handler (& policy_handler);
218 message_procs.push_back(std::move(mp_handler));
222 // Start the listening loops
223 std::vector<int> thread_ids(my_config.num_threads);
225 for(auto &e: message_procs){
226 thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);
230 mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
231 mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
233 //======================================================
234 // sgnb Subscription spec
236 int request_id = 2; // will be over-written by subscription handler
240 int action_type = my_config.report_mode_only ? 0:1;
242 int message_type = 1;
243 int procedure_code = 27;
244 std::string egnb_id = "Testgnb";
245 std::string plmn_id = "Testplmn";
247 unsigned char event_buf[128];
248 size_t event_buf_len = 128;
252 e2sm_event_trigger_helper trigger_data;
253 e2sm_event_trigger event_trigger;
255 trigger_data.egNB_id = egnb_id;
256 trigger_data.plmn_id = plmn_id;
257 trigger_data.egNB_id_type = 2;
258 trigger_data.interface_direction = 1;
259 trigger_data.procedure_code = procedure_code;
260 trigger_data.message_type = message_type;
261 res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
263 mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());
268 subscription_helper sgnb_add_subscr_req;
269 subscription_response_helper subscr_response;
271 sgnb_add_subscr_req.clear();
272 sgnb_add_subscr_req.set_request(request_id, req_seq);
273 sgnb_add_subscr_req.set_function_id(function_id);
274 sgnb_add_subscr_req.add_action(action_id, action_type);
277 sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);
278 mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);
280 //======================================================
281 // Purely for testing purposes ... write subscription ASN binary to file
283 // pfile = fopen("event_trigger.pr", "wb");
284 // fwrite(event_buf, 1, event_buf_len, pfile);
286 //======================================================
289 // keep sending subscription request till successfull for all gnodebs or exceed max attempts ?
290 // or exceed max_iterations
291 auto it = my_config.gNodeB_list.begin();
293 int num_nodes = my_config.gNodeB_list.size();
295 while((loop < num_nodes * my_config.max_sub_loops) && my_config.gNodeB_list.size() > 0 && RunProg){
297 int subscr_result = -1;
300 mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);
301 subscr_result = add_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, *it);
302 if (subscr_result == SUBSCR_SUCCESS){
307 if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){
312 if(subscr_result == SUBSCR_SUCCESS){
313 mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());
314 // remove node from list,
315 // move to next gnobde
316 it = my_config.gNodeB_list.erase(it);
319 if (it == my_config.gNodeB_list.end()){
320 it = my_config.gNodeB_list.begin();
327 if (my_config.gNodeB_list.size() == 0){
328 std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;
331 std::cerr <<"SUBSCRIPTION REQUEST :: Failed to subscribe for following gNodeBs" << std::endl;
332 for(const auto &e: my_config.gNodeB_list){
333 std::cerr <<"Failed to subscribe for gNodeB " << e << std::endl;
337 //Register signal handler to stop
338 signal(SIGINT, EndProgram);
339 signal(SIGTERM, EndProgram);
348 for(auto &e: message_procs){
349 mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());
350 std::cout << "Thread " << thread_ids[i] << " Number of packets handled = " << e.get()->get_messages() << std::endl;
355 std::cout <<"Stopping all running threads ..." << std::endl;
357 std::cout <<"Stopped RMR processing threads ...." << std::endl;
358 metrics_thread.join();
359 std::cout <<"Stopped Metric collection thread ...." << std::endl;
361 plugin_rmr_map.clear();
362 std::cout <<"Cleared Plugins .." << std::endl;
364 std::cout <<"Finished ... " << std::endl;