Updated documentation for mock a1 tool
[ric-app/admin.git] / src / adm-ctrl-xapp.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19 /* Author : Ashwin Sridharan
20    Date    : Feb 2019
21 */
22
23 #include "adm-ctrl-xapp.hpp"
24
25 int run_program = 1; 
26
27
28
29 // list of plugins 
30 plugin_list Plugins;
31 std::map<int, Policy *> plugin_rmr_map;
32
33 // Policy handler : All plugins (of abstract class Policy) are registered with the plugin list.
34 // The policy handler  is registered as a call back to the RMR message handler. When a policy related RMR message
35 // is received, this function is invoked. It finds the appropriate plugin from the plugin list, passes the policy message and then
36 // returns back the response (in the response string)
37
38 // NOTE : This version of policy handler was written with R1 policy protocol in mind.
39 // Specifically, it was assumed that each message type corresponds to either set/get for a specific xAPP.
40 // It still works with R2, but ideally should be modified since a single policy type could be applied to multiple plugins.
41
42 void  policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
43   
44   // Extract policy id type : for now we assume exactly 1 plugin maps to each policy type
45   rapidjson::Pointer policy_type_ref("/policy_type_id");
46   rapidjson::Document doc;
47   if (doc.Parse(message).HasParseError()){
48     mdclog_write(MDCLOG_ERR, "Error: %s, %d :: Could not decode A1 JSON message %s\n", __FILE__, __LINE__, message);
49     return;
50   }
51   rapidjson::Value * ref = policy_type_ref.Get(doc);
52   if (ref == NULL){
53     mdclog_write(MDCLOG_ERR, "Error : %s, %d:: Could not extract policy type id from %s\n", __FILE__, __LINE__, message);
54     return;
55   }
56   int policy_type_id = ref->GetInt();
57
58   auto it = plugin_rmr_map.find(policy_type_id);
59   bool res;
60   if (it != plugin_rmr_map.end()){
61     if (set){
62       res = it->second->setPolicy(message, message_len, response);
63       if (res){
64         mdclog_write(MDCLOG_INFO, "A1 POLICY SET :: Successfully set A1 Policy\n");
65       }
66       else{
67         mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY SET   %s, %d . Unable to set policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
68       }
69         
70     }
71     else{
72       res = it->second->getPolicy(message, message_len, response);
73       if (res){
74         mdclog_write(MDCLOG_INFO, "A1 POLICY GET : Successfully retreived  A1 Policy\n");
75       }
76       else{
77         mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY GET  %s, %d . Unable to get policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
78       }
79       
80     }
81   }
82   else{
83     response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with policy type id  = " + std::to_string(policy_type_id) + "\"}";
84     mdclog_write(MDCLOG_ERR, "Error ! %s, %d : %s\n", __FILE__, __LINE__, response.c_str());
85   }
86 };
87
88
89
90 // polling function that routinely queries all plugins for metrics and then posts them on
91 // VES url
92 void metrics_collector(std::string ves_url, plugin_list * plugins,  unsigned int interval){
93   
94   // Instantiate the ves collector 
95   curl_interface curl_obj(ves_url);
96   std::vector<std::string> response_vector;
97   int res;
98   while(run_program){
99     for(unsigned int i = 0; i < plugins->size(); i++){
100       response_vector.clear();
101       res =  (*plugins)[i].get()->getMetrics(response_vector);
102       if (res != 0){
103         mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), (*plugins)[i].get()->get_error().c_str());
104       }
105       else{
106         // send each response
107         for(auto  &e: response_vector){
108           res = curl_obj.post_metrics(e);
109           if (!res){
110             mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
111           }
112           else{
113             mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", e.c_str());
114           }
115         }
116       }
117     }
118     sleep(interval);
119   }
120   std::cout <<"Stopped metrics collector/reporter .." << std::endl;
121 };
122     
123
124 void EndProgram(int signum){
125   std::cout <<"Signal received. Stopping program ....." << std::endl;
126   run_program = 0;
127   
128 }
129 // ideally should be expanded for rollback purposes etc.
130 void msg_error(rmr_mbuf_t *message){
131   mdclog_write(MDCLOG_ERR, "Error: %s, %d  Could not send RMR message of length %d and type %d, Reason %s",  __FILE__, __LINE__, message->len,  message->mtype, strerror(errno) );
132 };
133
134
135 int main(int argc, char *argv[]){
136
137   // initially set log level to INFO
138   init_logger("XaPP", MDCLOG_INFO);
139   
140   configuration my_config;
141
142   // set config variables from environment
143   // used when deploying via start-up script
144   get_environment_config(my_config);
145
146   // over-ride with any command line variables if
147   // provided
148   get_command_line_config(argc, argv, my_config);
149
150   std::unique_ptr<XaPP> my_xapp;
151
152
153   // Reset log level based on configuration
154   init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));
155   
156   if (my_config.gNodeB_list.size() == 0){
157     mdclog_write(MDCLOG_WARN, "WARNING  : gNodeB not set for subscription. Subscription MAY FAIL");
158   }
159
160   std::string operating_mode;
161
162   // How are we operating ? 
163   if (my_config.operating_mode == "CONTROL"){
164     // Full closed loop : process E2AP indication,
165     // E2SM, X2AP and generate response
166     my_config.report_mode_only = false;
167     operating_mode = "CLOSED LOOP CONTROL";
168   }
169   else if (my_config.operating_mode == "E2AP_PROC_ONLY"){
170     // Debugging : processing only E2AP indication 
171     my_config.processing_level = ProcessingLevelTypes::E2AP_PROC_ONLY;
172     operating_mode = "E2AP PROCESSING ONLY";
173   }
174   else if (my_config.operating_mode == "E2SM_PROC_ONLY"){
175     // Debugging : processing only till E2SM indication header
176     my_config.processing_level = ProcessingLevelTypes::E2SM_PROC_ONLY;
177     operating_mode = "E2SM PROCESSING ONLY";
178   }
179   else{
180     // Passive : processing till X2AP but do not generate response
181     my_config.report_mode_only = true;
182     operating_mode = "REPORTING ONLY";
183   }
184
185   mdclog_write(MDCLOG_DEBUG, "Operating mode of Admission Control xAPP is %s\n", operating_mode.c_str());
186   // Finished passing command line/environment arguments 
187   //=============================================================
188
189   // instantiate xapp-rmr-framework  object
190   mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
191   mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
192   mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
193
194   my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
195   
196   
197   // Instantiate admission logic handler (with only one instance for now)
198   int num_instances = 1;
199   Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file,  num_instances, my_config.xapp_id, my_config.report_mode_only));
200   
201   // Add reference to plugin list . 
202   // Plugin list is used by policy handler and metrics collector
203    plugin_rmr_map.insert(std::pair<int, Policy *>(RATE_CONTROL_POLICY_ID, Plugins[0].get()));
204    
205    // instantiate ves thread (it polls all plugins and sends out their metrics) 
206    std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
207
208    // Instantiate subscription handler
209    subscription_handler sub_handler;
210
211    // Instantiate message handlers for RMR
212    // (one for each thread) and registrer
213    // subscription and admission handlers
214    
215    std::vector<std::unique_ptr<message_processor> > message_procs;
216    for(int i = 0; i < my_config.num_threads; i++){
217      std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> (my_config.processing_level, my_config.report_mode_only);
218      mp_handler.get()->register_subscription_handler(& sub_handler);
219      mp_handler.get()->register_protector(dynamic_cast<admission *>(Plugins[0].get())->get_protector_instance(0));
220      mp_handler.get()->register_policy_handler (& policy_handler);
221      message_procs.push_back(std::move(mp_handler));
222    }
223   
224   
225    // Start the RMR listening loops
226    std::vector<int> thread_ids(my_config.num_threads);
227    unsigned int i = 0;
228    for(auto  &e: message_procs){
229      thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);
230      i++;
231    };
232
233    mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
234    mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
235
236    //Register signal handler to stop
237    signal(SIGINT, EndProgram);
238    signal(SIGTERM, EndProgram);
239
240    // Instantiate startup/shutown subroutine objects
241    init boot_shutdown((*my_xapp), sub_handler, my_config);
242
243
244    // Trigger start functions
245    boot_shutdown.startup();
246    
247    
248    //Wait for stop
249    while(run_program){
250      sleep(10);
251    }
252
253
254    // we are in shutdown mode
255    // send out subscription deletes
256    boot_shutdown.shutdown();
257
258    i = 0;
259    for(auto  &e: message_procs){
260      mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());
261      std::cout << "Thread " << thread_ids[i] << "  Number of packets handled = " <<  e.get()->get_messages() << std::endl;
262      
263      i ++ ;
264    }
265    
266    std::cout <<"Stopping all running threads ..." << std::endl;
267    (*my_xapp).Stop();
268    std::cout <<"Stopped RMR processing threads ...." << std::endl;
269    metrics_thread.join();
270    std::cout <<"Stopped Metric collection thread ...." << std::endl;
271    Plugins.clear();
272    plugin_rmr_map.clear();
273    std::cout <<"Cleared Plugins .." << std::endl;
274   
275    std::cout <<"Finished ... " << std::endl;
276  
277    return 0;
278 };