2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
19 /* Author : Ashwin Sridharan
23 #include "adm-ctrl-xapp.hpp"
31 std::map<int, Policy *> plugin_rmr_map;
33 // Policy handler : All plugins (of abstract class Policy) are registered with the plugin list.
34 // The policy handler is registered as a call back to the RMR message handler. When a policy related RMR message
35 // is received, this function is invoked. It finds the appropriate plugin from the plugin list, passes the policy message and then
36 // returns back the response (in the response string)
38 // NOTE : This version of policy handler was written with R1 policy protocol in mind.
39 // Specifically, it was assumed that each message type corresponds to either set/get for a specific xAPP.
40 // It still works with R2, but ideally should be modified since a single policy type could be applied to multiple plugins.
42 void policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
44 // Extract policy id type : for now we assume exactly 1 plugin maps to each policy type
45 rapidjson::Pointer policy_type_ref("/policy_type_id");
46 rapidjson::Document doc;
47 if (doc.Parse(message).HasParseError()){
48 mdclog_write(MDCLOG_ERR, "Error: %s, %d :: Could not decode A1 JSON message %s\n", __FILE__, __LINE__, message);
51 rapidjson::Value * ref = policy_type_ref.Get(doc);
53 mdclog_write(MDCLOG_ERR, "Error : %s, %d:: Could not extract policy type id from %s\n", __FILE__, __LINE__, message);
56 int policy_type_id = ref->GetInt();
58 auto it = plugin_rmr_map.find(policy_type_id);
60 if (it != plugin_rmr_map.end()){
62 res = it->second->setPolicy(message, message_len, response);
64 mdclog_write(MDCLOG_INFO, "A1 POLICY SET :: Successfully set A1 Policy\n");
67 mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY SET %s, %d . Unable to set policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
72 res = it->second->getPolicy(message, message_len, response);
74 mdclog_write(MDCLOG_INFO, "A1 POLICY GET : Successfully retreived A1 Policy\n");
77 mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY GET %s, %d . Unable to get policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
83 response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with policy type id = " + std::to_string(policy_type_id) + "\"}";
84 mdclog_write(MDCLOG_ERR, "Error ! %s, %d : %s\n", __FILE__, __LINE__, response.c_str());
90 // polling function that routinely queries all plugins for metrics and then posts them on
92 void metrics_collector(std::string ves_url, plugin_list * plugins, unsigned int interval){
94 // Instantiate the ves collector
95 curl_interface curl_obj(ves_url);
96 std::vector<std::string> response_vector;
99 for(unsigned int i = 0; i < plugins->size(); i++){
100 response_vector.clear();
101 res = (*plugins)[i].get()->getMetrics(response_vector);
103 mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), (*plugins)[i].get()->get_error().c_str());
106 // send each response
107 for(auto &e: response_vector){
108 res = curl_obj.post_metrics(e);
110 mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
113 mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", e.c_str());
120 std::cout <<"Stopped metrics collector/reporter .." << std::endl;
124 void EndProgram(int signum){
125 std::cout <<"Signal received. Stopping program ....." << std::endl;
129 // ideally should be expanded for rollback purposes etc.
130 void msg_error(rmr_mbuf_t *message){
131 mdclog_write(MDCLOG_ERR, "Error: %s, %d Could not send RMR message of length %d and type %d, Reason %s", __FILE__, __LINE__, message->len, message->mtype, strerror(errno) );
135 int main(int argc, char *argv[]){
137 // initially set log level to INFO
138 init_logger("XaPP", MDCLOG_INFO);
140 configuration my_config;
142 // set config variables from environment
143 // used when deploying via start-up script
144 get_environment_config(my_config);
146 // over-ride with any command line variables if
148 get_command_line_config(argc, argv, my_config);
150 std::unique_ptr<XaPP> my_xapp;
153 // Reset log level based on configuration
154 init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));
156 if (my_config.gNodeB_list.size() == 0){
157 mdclog_write(MDCLOG_WARN, "WARNING : gNodeB not set for subscription. Subscription MAY FAIL");
160 std::string operating_mode;
162 // How are we operating ?
163 if (my_config.operating_mode == "CONTROL"){
164 // Full closed loop : process E2AP indication,
165 // E2SM, X2AP and generate response
166 my_config.report_mode_only = false;
167 operating_mode = "CLOSED LOOP CONTROL";
169 else if (my_config.operating_mode == "E2AP_PROC_ONLY"){
170 // Debugging : processing only E2AP indication
171 my_config.processing_level = ProcessingLevelTypes::E2AP_PROC_ONLY;
172 operating_mode = "E2AP PROCESSING ONLY";
174 else if (my_config.operating_mode == "E2SM_PROC_ONLY"){
175 // Debugging : processing only till E2SM indication header
176 my_config.processing_level = ProcessingLevelTypes::E2SM_PROC_ONLY;
177 operating_mode = "E2SM PROCESSING ONLY";
180 // Passive : processing till X2AP but do not generate response
181 my_config.report_mode_only = true;
182 operating_mode = "REPORTING ONLY";
185 mdclog_write(MDCLOG_DEBUG, "Operating mode of Admission Control xAPP is %s\n", operating_mode.c_str());
186 // Finished passing command line/environment arguments
187 //=============================================================
189 // instantiate xapp-rmr-framework object
190 mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
191 mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
192 mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
194 my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
197 // Instantiate admission logic handler (with only one instance for now)
198 int num_instances = 1;
199 Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, num_instances, my_config.xapp_id, my_config.report_mode_only));
201 // Add reference to plugin list .
202 // Plugin list is used by policy handler and metrics collector
203 plugin_rmr_map.insert(std::pair<int, Policy *>(RATE_CONTROL_POLICY_ID, Plugins[0].get()));
205 // instantiate ves thread (it polls all plugins and sends out their metrics)
206 std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
208 // Instantiate subscription handler
209 subscription_handler sub_handler;
211 // Instantiate message handlers for RMR
212 // (one for each thread) and registrer
213 // subscription and admission handlers
215 std::vector<std::unique_ptr<message_processor> > message_procs;
216 for(int i = 0; i < my_config.num_threads; i++){
217 std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> (my_config.processing_level, my_config.report_mode_only);
218 mp_handler.get()->register_subscription_handler(& sub_handler);
219 mp_handler.get()->register_protector(dynamic_cast<admission *>(Plugins[0].get())->get_protector_instance(0));
220 mp_handler.get()->register_policy_handler (& policy_handler);
221 message_procs.push_back(std::move(mp_handler));
225 // Start the RMR listening loops
226 std::vector<int> thread_ids(my_config.num_threads);
228 for(auto &e: message_procs){
229 thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);
233 mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
234 mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
236 //Register signal handler to stop
237 signal(SIGINT, EndProgram);
238 signal(SIGTERM, EndProgram);
240 // Instantiate startup/shutown subroutine objects
241 init boot_shutdown((*my_xapp), sub_handler, my_config);
244 // Trigger start functions
245 boot_shutdown.startup();
254 // we are in shutdown mode
255 // send out subscription deletes
256 boot_shutdown.shutdown();
259 for(auto &e: message_procs){
260 mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());
261 std::cout << "Thread " << thread_ids[i] << " Number of packets handled = " << e.get()->get_messages() << std::endl;
266 std::cout <<"Stopping all running threads ..." << std::endl;
268 std::cout <<"Stopped RMR processing threads ...." << std::endl;
269 metrics_thread.join();
270 std::cout <<"Stopped Metric collection thread ...." << std::endl;
272 plugin_rmr_map.clear();
273 std::cout <<"Cleared Plugins .." << std::endl;
275 std::cout <<"Finished ... " << std::endl;