2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
19 /* Author : Ashwin Sridharan
23 #include "adm-ctrl-xapp.hpp"
31 std::map<int, Policy *> plugin_rmr_map;
33 // Policy handler : All plugins (of abstract class Policy) are registered with the plugin list.
34 // The policy handler is registered as a call back to the RMR message handler. When a policy related RMR message
35 // is received, this function is invoked. It finds the appropriate plugin from the plugin list, passes the policy message and then
36 // returns back the response (in the response string)
38 // NOTE : This version of policy handler was written with R1 policy protocol in mind.
39 // Specifically, it was assumed that each message type corresponds to either set/get for a specific xAPP.
40 // It still works with R2, but ideally should be modified since a single policy type could be applied to multiple plugins.
42 void policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
43 auto it = plugin_rmr_map.find(message_type);
45 if (it != plugin_rmr_map.end()){
47 res = it->second->setPolicy(message, message_len, response);
49 mdclog_write(MDCLOG_INFO, "A1 POLICY SET :: Successfully set A1 Policy\n");
52 mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY SET %s, %d . Unable to set policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
57 res = it->second->getPolicy(message, message_len, response);
59 mdclog_write(MDCLOG_INFO, "A1 POLICY GET : Successfully retreived A1 Policy\n");
62 mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY GET %s, %d . Unable to get policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
68 response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}";
69 mdclog_write(MDCLOG_ERR, "Error ! %s, %d : %s\n", __FILE__, __LINE__, response.c_str());
75 // polling function that routinely queries all plugins for metrics and then posts them on
77 void metrics_collector(std::string ves_url, plugin_list * plugins, unsigned int interval){
79 // Instantiate the ves collector
80 curl_interface curl_obj(ves_url);
81 std::vector<std::string> response_vector;
84 for(unsigned int i = 0; i < plugins->size(); i++){
85 response_vector.clear();
86 res = (*plugins)[i].get()->getMetrics(response_vector);
88 mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), (*plugins)[i].get()->get_error().c_str());
92 for(auto &e: response_vector){
93 res = curl_obj.post_metrics(e);
95 mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
98 mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", e.c_str());
105 std::cout <<"Stopped metrics collector/reporter .." << std::endl;
109 void EndProgram(int signum){
110 std::cout <<"Signal received. Stopping program ....." << std::endl;
114 // ideally should be expanded for rollback purposes etc.
115 void msg_error(rmr_mbuf_t *message){
116 mdclog_write(MDCLOG_ERR, "Error: %s, %d Could not send RMR message of length %d and type %d, Reason %s", __FILE__, __LINE__, message->len, message->mtype, strerror(errno) );
120 int main(int argc, char *argv[]){
122 // initially set log level to INFO
123 init_logger("XaPP", MDCLOG_INFO);
125 configuration my_config;
127 // set config variables from environment
128 // used when deploying via start-up script
129 get_environment_config(my_config);
131 // over-ride with any command line variables if
133 get_command_line_config(argc, argv, my_config);
135 std::unique_ptr<XaPP> my_xapp;
138 // Reset log level based on configuration
139 init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));
141 if (my_config.gNodeB_list.size() == 0){
142 mdclog_write(MDCLOG_WARN, "WARNING : gNodeB not set for subscription. Subscription MAY FAIL");
145 std::string operating_mode;
147 // How are we operating ?
148 if (my_config.operating_mode == "CONTROL"){
149 // Full closed loop : process E2AP indication,
150 // E2SM, X2AP and generate response
151 my_config.report_mode_only = false;
152 operating_mode = "CLOSED LOOP CONTROL";
154 else if (my_config.operating_mode == "E2AP_PROC_ONLY"){
155 // Debugging : processing only E2AP indication
156 my_config.processing_level = ProcessingLevelTypes::E2AP_PROC_ONLY;
157 operating_mode = "E2AP PROCESSING ONLY";
159 else if (my_config.operating_mode == "E2SM_PROC_ONLY"){
160 // Debugging : processing only till E2SM indication header
161 my_config.processing_level = ProcessingLevelTypes::E2SM_PROC_ONLY;
162 operating_mode = "E2SM PROCESSING ONLY";
165 // Passive : processing till X2AP but do not generate response
166 my_config.report_mode_only = true;
167 operating_mode = "REPORTING ONLY";
170 mdclog_write(MDCLOG_DEBUG, "Operating mode of Admission Control xAPP is %s\n", operating_mode.c_str());
171 // Finished passing command line/environment arguments
172 //=============================================================
174 // instantiate xapp-rmr-framework object
175 mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
176 mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
177 mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
179 my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
182 // Instantiate admission logic handler (with only one instance for now)
183 int num_instances = 1;
184 Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, num_instances, my_config.xapp_id, my_config.report_mode_only));
186 // Add reference to plugin list .
187 // Plugin list is used by policy handler and metrics collector
188 plugin_rmr_map.insert(std::pair<int, Policy *>(A1_POLICY_REQ, Plugins[0].get()));
190 // instantiate ves thread (it polls all plugins and sends out their metrics)
191 std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
193 // Instantiate subscription handler
194 subscription_handler sub_handler;
196 // Instantiate message handlers for RMR
197 // (one for each thread) and registrer
198 // subscription and admission handlers
200 std::vector<std::unique_ptr<message_processor> > message_procs;
201 for(int i = 0; i < my_config.num_threads; i++){
202 std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> (my_config.processing_level, my_config.report_mode_only);
203 mp_handler.get()->register_subscription_handler(& sub_handler);
204 mp_handler.get()->register_protector(dynamic_cast<admission *>(Plugins[0].get())->get_protector_instance(0));
205 mp_handler.get()->register_policy_handler (& policy_handler);
206 message_procs.push_back(std::move(mp_handler));
210 // Start the RMR listening loops
211 std::vector<int> thread_ids(my_config.num_threads);
213 for(auto &e: message_procs){
214 thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);
218 mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
219 mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
221 //Register signal handler to stop
222 signal(SIGINT, EndProgram);
223 signal(SIGTERM, EndProgram);
225 // Instantiate startup/shutown subroutine objects
226 init boot_shutdown((*my_xapp), sub_handler, my_config);
229 // Trigger start functions
230 boot_shutdown.startup();
239 // we are in shutdown mode
240 // send out subscription deletes
241 boot_shutdown.shutdown();
244 for(auto &e: message_procs){
245 mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());
246 std::cout << "Thread " << thread_ids[i] << " Number of packets handled = " << e.get()->get_messages() << std::endl;
251 std::cout <<"Stopping all running threads ..." << std::endl;
253 std::cout <<"Stopped RMR processing threads ...." << std::endl;
254 metrics_thread.join();
255 std::cout <<"Stopped Metric collection thread ...." << std::endl;
257 plugin_rmr_map.clear();
258 std::cout <<"Cleared Plugins .." << std::endl;
260 std::cout <<"Finished ... " << std::endl;