User story RICPLT-2620
[ric-app/admin.git] / src / adm-ctrl-xapp.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19 /* Author : Ashwin Sridharan
20    Date    : Feb 2019
21 */
22
23 #include "adm-ctrl-xapp.hpp"
24
25 int run_program = 1; 
26
27
28
29 // list of plugins 
30 plugin_list Plugins;
31 std::map<int, Policy *> plugin_rmr_map;
32
33 // Policy handler : All plugins (of abstract class Policy) are registered with the plugin list.
34 // The policy handler  is registered as a call back to the RMR message handler. When a policy related RMR message
35 // is received, this function is invoked. It finds the appropriate plugin from the plugin list, passes the policy message and then
36 // returns back the response (in the response string)
37
38 // NOTE : This version of policy handler was written with R1 policy protocol in mind.
39 // Specifically, it was assumed that each message type corresponds to either set/get for a specific xAPP.
40 // It still works with R2, but ideally should be modified since a single policy type could be applied to multiple plugins.
41
42 void  policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
43   auto it = plugin_rmr_map.find(message_type);
44   bool res;
45   if (it != plugin_rmr_map.end()){
46     if (set){
47       res = it->second->setPolicy(message, message_len, response);
48       if (res){
49         mdclog_write(MDCLOG_INFO, "A1 POLICY SET :: Successfully set A1 Policy\n");
50       }
51       else{
52         mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY SET   %s, %d . Unable to set policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
53       }
54         
55     }
56     else{
57       res = it->second->getPolicy(message, message_len, response);
58       if (res){
59         mdclog_write(MDCLOG_INFO, "A1 POLICY GET : Successfully retreived  A1 Policy\n");
60       }
61       else{
62         mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY GET  %s, %d . Unable to get policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
63       }
64       
65     }
66   }
67   else{
68     response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}";
69     mdclog_write(MDCLOG_ERR, "Error ! %s, %d : %s\n", __FILE__, __LINE__, response.c_str());
70   }
71 };
72
73
74
75 // polling function that routinely queries all plugins for metrics and then posts them on
76 // VES url
77 void metrics_collector(std::string ves_url, plugin_list * plugins,  unsigned int interval){
78   
79   // Instantiate the ves collector 
80   curl_interface curl_obj(ves_url);
81   std::vector<std::string> response_vector;
82   int res;
83   while(run_program){
84     for(unsigned int i = 0; i < plugins->size(); i++){
85       response_vector.clear();
86       res =  (*plugins)[i].get()->getMetrics(response_vector);
87       if (res != 0){
88         mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), (*plugins)[i].get()->get_error().c_str());
89       }
90       else{
91         // send each response
92         for(auto  &e: response_vector){
93           res = curl_obj.post_metrics(e);
94           if (!res){
95             mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
96           }
97           else{
98             mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", e.c_str());
99           }
100         }
101       }
102     }
103     sleep(interval);
104   }
105   std::cout <<"Stopped metrics collector/reporter .." << std::endl;
106 };
107     
108
109 void EndProgram(int signum){
110   std::cout <<"Signal received. Stopping program ....." << std::endl;
111   run_program = 0;
112   
113 }
114 // ideally should be expanded for rollback purposes etc.
115 void msg_error(rmr_mbuf_t *message){
116   mdclog_write(MDCLOG_ERR, "Error: %s, %d  Could not send RMR message of length %d and type %d, Reason %s",  __FILE__, __LINE__, message->len,  message->mtype, strerror(errno) );
117 };
118
119
120 int main(int argc, char *argv[]){
121
122   // initially set log level to INFO
123   init_logger("XaPP", MDCLOG_INFO);
124   
125   configuration my_config;
126
127   // set config variables from environment
128   // used when deploying via start-up script
129   get_environment_config(my_config);
130
131   // over-ride with any command line variables if
132   // provided
133   get_command_line_config(argc, argv, my_config);
134
135   std::unique_ptr<XaPP> my_xapp;
136
137
138   // Reset log level based on configuration
139   init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));
140   
141   if (my_config.gNodeB_list.size() == 0){
142     mdclog_write(MDCLOG_WARN, "WARNING  : gNodeB not set for subscription. Subscription MAY FAIL");
143   }
144
145   std::string operating_mode;
146
147   // How are we operating ? 
148   if (my_config.operating_mode == "CONTROL"){
149     // Full closed loop : process E2AP indication,
150     // E2SM, X2AP and generate response
151     my_config.report_mode_only = false;
152     operating_mode = "CLOSED LOOP CONTROL";
153   }
154   else if (my_config.operating_mode == "E2AP_PROC_ONLY"){
155     // Debugging : processing only E2AP indication 
156     my_config.processing_level = ProcessingLevelTypes::E2AP_PROC_ONLY;
157     operating_mode = "E2AP PROCESSING ONLY";
158   }
159   else if (my_config.operating_mode == "E2SM_PROC_ONLY"){
160     // Debugging : processing only till E2SM indication header
161     my_config.processing_level = ProcessingLevelTypes::E2SM_PROC_ONLY;
162     operating_mode = "E2SM PROCESSING ONLY";
163   }
164   else{
165     // Passive : processing till X2AP but do not generate response
166     my_config.report_mode_only = true;
167     operating_mode = "REPORTING ONLY";
168   }
169
170   mdclog_write(MDCLOG_DEBUG, "Operating mode of Admission Control xAPP is %s\n", operating_mode.c_str());
171   // Finished passing command line/environment arguments 
172   //=============================================================
173
174   // instantiate xapp-rmr-framework  object
175   mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
176   mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
177   mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
178
179   my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
180   
181   
182   // Instantiate admission logic handler (with only one instance for now)
183   int num_instances = 1;
184   Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file,  num_instances, my_config.xapp_id, my_config.report_mode_only));
185   
186   // Add reference to plugin list . 
187   // Plugin list is used by policy handler and metrics collector
188    plugin_rmr_map.insert(std::pair<int, Policy *>(A1_POLICY_REQ, Plugins[0].get()));
189    
190    // instantiate ves thread (it polls all plugins and sends out their metrics) 
191    std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
192
193    // Instantiate subscription handler
194    subscription_handler sub_handler;
195
196    // Instantiate message handlers for RMR
197    // (one for each thread) and registrer
198    // subscription and admission handlers
199    
200    std::vector<std::unique_ptr<message_processor> > message_procs;
201    for(int i = 0; i < my_config.num_threads; i++){
202      std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> (my_config.processing_level, my_config.report_mode_only);
203      mp_handler.get()->register_subscription_handler(& sub_handler);
204      mp_handler.get()->register_protector(dynamic_cast<admission *>(Plugins[0].get())->get_protector_instance(0));
205      mp_handler.get()->register_policy_handler (& policy_handler);
206      message_procs.push_back(std::move(mp_handler));
207    }
208   
209   
210    // Start the RMR listening loops
211    std::vector<int> thread_ids(my_config.num_threads);
212    unsigned int i = 0;
213    for(auto  &e: message_procs){
214      thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);
215      i++;
216    };
217
218    mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
219    mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
220
221    //Register signal handler to stop
222    signal(SIGINT, EndProgram);
223    signal(SIGTERM, EndProgram);
224
225    // Instantiate startup/shutown subroutine objects
226    init boot_shutdown((*my_xapp), sub_handler, my_config);
227
228
229    // Trigger start functions
230    boot_shutdown.startup();
231    
232    
233    //Wait for stop
234    while(run_program){
235      sleep(10);
236    }
237
238
239    // we are in shutdown mode
240    // send out subscription deletes
241    boot_shutdown.shutdown();
242
243    i = 0;
244    for(auto  &e: message_procs){
245      mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());
246      std::cout << "Thread " << thread_ids[i] << "  Number of packets handled = " <<  e.get()->get_messages() << std::endl;
247      
248      i ++ ;
249    }
250    
251    std::cout <<"Stopping all running threads ..." << std::endl;
252    (*my_xapp).Stop();
253    std::cout <<"Stopped RMR processing threads ...." << std::endl;
254    metrics_thread.join();
255    std::cout <<"Stopped Metric collection thread ...." << std::endl;
256    Plugins.clear();
257    plugin_rmr_map.clear();
258    std::cout <<"Cleared Plugins .." << std::endl;
259   
260    std::cout <<"Finished ... " << std::endl;
261  
262    return 0;
263 };