1. Transitioned to using latest asn1c compiler
[ric-app/admin.git] / src / adm-ctrl-xapp.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19 /* Author : Ashwin Sridharan
20    Date    : Feb 2019
21 */
22
23 #include "adm-ctrl-xapp.hpp"
24
25
26 static int RunProg = 1;  // keep loop running
27
28 // list of plugins 
29 typedef  std::vector<std::unique_ptr<Policy> > plugin_list;
30 plugin_list Plugins;
31 std::map<int, Policy *> plugin_rmr_map;
32
33
34 int add_subscription(subscription_handler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
35   unsigned char node_buffer[32];
36   std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
37   node_buffer[gNodeB.length()] = '\0';
38   int res = sub_handler.RequestSubscription(he, he_resp,  RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
39   return res;
40 };
41
42
43 int delete_subscription(subscription_handler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper  he_resp, std::string & gNodeB){
44   unsigned char node_buffer[32];
45   std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
46   node_buffer[gNodeB.length()] = '\0';
47
48   int res = sub_handler.RequestSubscriptionDelete(he, he_resp, RIC_SUB_DEL_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
49   return res;
50 };
51
52
53 void  policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
54   auto it = plugin_rmr_map.find(message_type);
55   bool res;
56   if (it != plugin_rmr_map.end()){
57     if (set){
58       res = it->second->setPolicy(message, message_len, response);
59       if (res){
60         mdclog_write(MDCLOG_INFO, "A1 POLICY SET :: Successfully set A1 Policy\n");
61       }
62       else{
63         mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY SET   %s, %d . Unable to set policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
64       }
65         
66     }
67     else{
68       res = it->second->getPolicy(message, message_len, response);
69       if (res){
70         mdclog_write(MDCLOG_INFO, "A1 POLICY GET : Successfully retreived  A1 Policy\n");
71       }
72       else{
73         mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY GET  %s, %d . Unable to get policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
74       }
75       
76     }
77   }
78   else{
79     response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}";
80   }
81 };
82
83
84 void metrics_collector(std::string ves_url, plugin_list * plugins,  unsigned int interval){
85   
86   // Instantiate the ves collector 
87   curl_interface curl_obj(ves_url);
88   std::string metrics_response;
89   int res;
90   while(RunProg){
91     for(unsigned int i = 0; i < plugins->size(); i++){
92       res = (*plugins)[i].get()->getMetrics(metrics_response);
93       if (res != 0){
94         mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), metrics_response.c_str());
95       }
96       else{
97         res = curl_obj.post_metrics(metrics_response);
98         if (!res){
99           mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
100         }
101         else{
102           mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", metrics_response.c_str());
103         }
104       }
105     }
106     sleep(interval);
107   }
108   std::cout <<"Stopped metrics collector/reporter .." << std::endl;
109 };
110     
111
112 void EndProgram(int signum){
113   std::cout <<"Signal received. Stopping program ....." << std::endl;
114   RunProg = 0;
115 }
116
117 void msg_error(rmr_mbuf_t *message){
118   mdclog_write(MDCLOG_ERR, "Error: %s, %d  Could not send RMR message of length %d and type %d, Reason %s",  __FILE__, __LINE__, message->len,  message->mtype, strerror(errno) );
119 };
120
121
122 int main(int argc, char *argv[]){
123
124   // initially set log level to INFO
125   init_logger("XaPP", MDCLOG_INFO);
126   
127   configuration my_config;
128
129   // set config variables from environment
130   // used when deploying via start-up script
131   get_environment_config(my_config);
132
133   // over-ride with any command line variables if
134   // provided
135   get_command_line_config(argc, argv, my_config);
136
137   std::unique_ptr<XaPP> my_xapp;
138
139
140   // Reset log level based on configuration
141   init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));
142   
143   if (my_config.gNodeB_list.size() == 0){
144     mdclog_write(MDCLOG_WARN, "WARNING  : gNodeB not set for subscription. Subscription MAY FAIL");
145   }
146
147
148   std::string operating_mode;
149   // How are we operating ? 
150   if (my_config.operating_mode == "CONTROL"){
151     // Full closed loop : process E2AP indication,
152     // E2SM, X2AP and generate response
153     my_config.report_mode_only = false;
154     operating_mode = "CLOSED LOOP CONTROL";
155   }
156   else if (my_config.operating_mode == "E2AP_PROC_ONLY"){
157     // Debugging : processing only E2AP indication 
158     my_config.processing_level = ProcessingLevelTypes::E2AP_PROC_ONLY;
159     operating_mode = "E2AP PROCESSING ONLY";
160   }
161   else if (my_config.operating_mode == "E2SM_PROC_ONLY"){
162     // Debugging : processing only till E2SM indication header
163     my_config.processing_level = ProcessingLevelTypes::E2SM_PROC_ONLY;
164     operating_mode = "E2SM PROCESSING ONLY";
165   }
166   else{
167     // Passive : processing till X2AP but do not generate response
168     my_config.report_mode_only = true;
169     operating_mode = "REPORTING ONLY";
170   }
171
172   mdclog_write(MDCLOG_DEBUG, "Operating mode of Admission Control xAPP is %s\n", operating_mode.c_str());
173   // Finished passing command line/environment arguments 
174   //=============================================================
175
176    // instantiate xapp object
177    if(my_config.num_threads >= 1){
178     mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
179     // Create XaPP that starts with specified number of threads 
180     my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024, my_config.num_threads);
181   }
182   else{
183     mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
184     //Let XaPP pick threads based on hardware 
185     my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
186   }
187   
188
189   mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
190   mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
191
192   
193    // Instantiate admission logic handler
194   Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file,  1, my_config.report_mode_only));
195   
196    // Add reference to plugin list . We add twice (once for set policy  and once for get policy  ids)
197    // Plugin list is used by policy handler and metrics collector
198    plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_INT_CONTROL, Plugins[0].get()));
199    plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_GET_POLICY,  Plugins[0].get()));
200    
201    // instantiate curl object for ves 
202    std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
203
204
205    // Instantiate subscription handler
206    subscription_handler sub_handler;
207
208    // Instantiate message handlers for RMR
209    // (one for each thread) and registrer
210    // subscription and admission handlers
211    
212    std::vector<std::unique_ptr<message_processor> > message_procs;
213    for(int i = 0; i < my_config.num_threads; i++){
214      std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> (my_config.processing_level, my_config.report_mode_only);
215      mp_handler.get()->register_subscription_handler(& sub_handler);
216      mp_handler.get()->register_protector(dynamic_cast<admission *>(Plugins[0].get())->get_protector_instance(0));
217      mp_handler.get()->register_policy_handler (& policy_handler);
218      message_procs.push_back(std::move(mp_handler));
219    }
220   
221   
222    // Start the listening loops
223    std::vector<int> thread_ids(my_config.num_threads);
224    unsigned int i = 0;
225    for(auto  &e: message_procs){
226      thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);
227      i++;
228    };
229
230    mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
231    mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
232
233    //======================================================
234    // sgnb Subscription spec
235
236    int request_id = 2; // will be over-written by subscription handler
237    int req_seq = 1;
238    int function_id = 0;
239    int action_id = 4;
240    int action_type = my_config.report_mode_only ? 0:1;
241    
242    int message_type = 1;
243    int procedure_code = 27;
244    std::string egnb_id = "Testgnb";
245    std::string plmn_id = "Testplmn";
246
247    unsigned char event_buf[128];
248    size_t event_buf_len = 128;
249    bool res;
250
251
252    e2sm_event_trigger_helper trigger_data;
253    e2sm_event_trigger event_trigger;
254   
255    trigger_data.egNB_id = egnb_id;
256    trigger_data.plmn_id = plmn_id;
257    trigger_data.egNB_id_type = 2;
258    trigger_data.interface_direction = 1;
259    trigger_data.procedure_code = procedure_code;
260    trigger_data.message_type = message_type;
261    res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
262    if (!res){
263      mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());
264      exit(0);
265    }
266   
267
268    subscription_helper sgnb_add_subscr_req;
269    subscription_response_helper subscr_response;
270   
271    sgnb_add_subscr_req.clear();
272    sgnb_add_subscr_req.set_request(request_id, req_seq);
273    sgnb_add_subscr_req.set_function_id(function_id);
274    sgnb_add_subscr_req.add_action(action_id, action_type);
275   
276   
277    sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);
278    mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);
279
280    //======================================================
281    // Purely for testing purposes ... write subscription ASN binary to file 
282    // FILE *pfile;
283    // pfile = fopen("event_trigger.pr", "wb");
284    // fwrite(event_buf, 1, event_buf_len,  pfile);
285    // fclose(pfile);
286    //======================================================
287   
288    
289    // keep sending subscription request till successfull for all gnodebs or exceed max attempts  ?
290    // or exceed max_iterations
291    auto it = my_config.gNodeB_list.begin();
292    int loop = 0;
293    int num_nodes = my_config.gNodeB_list.size();
294    
295    while((loop < num_nodes * my_config.max_sub_loops) && my_config.gNodeB_list.size() > 0 && RunProg){
296      int attempt = 0;
297      int subscr_result = -1;
298       
299      while(1){
300        mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);
301        subscr_result = add_subscription(sub_handler, my_xapp.get(),  sgnb_add_subscr_req, subscr_response, *it);
302        if (subscr_result == SUBSCR_SUCCESS){
303          break;
304        }
305        sleep(5);
306        attempt ++;
307        if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){
308          break;
309        }
310      }
311      
312      if(subscr_result == SUBSCR_SUCCESS){
313        mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());
314        // remove node from list,
315        // move to next gnobde
316        it = my_config.gNodeB_list.erase(it);
317      }
318
319      if (it == my_config.gNodeB_list.end()){
320        it = my_config.gNodeB_list.begin();
321      }
322
323      loop++;
324      
325    }
326    
327    if (my_config.gNodeB_list.size() == 0){
328      std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;
329    }
330    else{
331      std::cerr <<"SUBSCRIPTION REQUEST :: Failed to subscribe for following gNodeBs" << std::endl;
332      for(const auto &e: my_config.gNodeB_list){
333        std::cerr <<"Failed to subscribe for gNodeB " << e << std::endl;
334      }
335    }
336    
337    //Register signal handler to stop 
338    signal(SIGINT, EndProgram);
339    signal(SIGTERM, EndProgram);
340    
341    
342    //Wait for stop
343    while(RunProg){
344      sleep(10);
345    }
346   
347    i = 0;
348    for(auto  &e: message_procs){
349      mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());
350      std::cout << "Thread " << thread_ids[i] << "  Number of packets handled = " <<  e.get()->get_messages() << std::endl;
351      
352      i ++ ;
353    }
354    
355    std::cout <<"Stopping all running threads ..." << std::endl;
356    (*my_xapp).Stop();
357    std::cout <<"Stopped RMR processing threads ...." << std::endl;
358    metrics_thread.join();
359    std::cout <<"Stopped Metric collection thread ...." << std::endl;
360    Plugins.clear();
361    plugin_rmr_map.clear();
362    std::cout <<"Cleared Plugins .." << std::endl;
363   
364    std::cout <<"Finished ... " << std::endl;
365  
366    return 0;
367 };