Initial commit of Admission Control xAPP and E2AP/X2AP definitions
[ric-app/admin.git] / src / adm-ctrl-xapp.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19 /* Author : Ashwin Sridharan
20    Date    : Feb 2019
21 */
22
23 #include "adm-ctrl-xapp.hpp"
24
25 bool report_mode_only = true;
26 static int RunProg = 1;  // keep loop running
27
28 // list of plugins 
29 typedef  std::vector<std::unique_ptr<Policy> > plugin_list;
30 plugin_list Plugins;
31 std::map<int, Policy *> plugin_rmr_map;
32
33
34 bool add_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
35   unsigned char node_buffer[32];
36   std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
37   node_buffer[gNodeB.length()] = '\0';
38   bool res = sub_handler.RequestSubscription(he, he_resp,  RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
39   return res;
40 };
41
42
43 bool delete_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper  he_resp, std::string & gNodeB){
44   unsigned char node_buffer[32];
45   std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
46   node_buffer[gNodeB.length()] = '\0';
47
48   bool res = sub_handler.RequestSubscriptionDelete(he, he_resp, RIC_SUB_DEL_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
49   return res;
50 };
51
52
53 void  policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
54   auto it = plugin_rmr_map.find(message_type);
55   bool res;
56   if (it != plugin_rmr_map.end()){
57     if (set){
58       res = it->second->setPolicy(message, message_len, response);
59       if (res){
60         mdclog_write(MDCLOG_INFO, "A1 POLICY SET :: Successfully set A1 Policy\n");
61       }
62       else{
63         mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY SET   %s, %d . Unable to set policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
64       }
65         
66     }
67     else{
68       res = it->second->getPolicy(message, message_len, response);
69       if (res){
70         mdclog_write(MDCLOG_INFO, "A1 POLICY GET : Successfully retreived  A1 Policy\n");
71       }
72       else{
73         mdclog_write(MDCLOG_ERR, "Error :: A1 POLICY GET  %s, %d . Unable to get policy. Reason = %s\n", __FILE__, __LINE__, response.c_str());
74       }
75       
76     }
77   }
78   else{
79     response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}";
80   }
81 };
82
83
84 void metrics_collector(std::string ves_url, plugin_list * plugins,  unsigned int interval){
85   
86   // Instantiate the ves collector 
87   curl_interface curl_obj(ves_url);
88   std::string metrics_response;
89   int res;
90   while(RunProg){
91     for(unsigned int i = 0; i < plugins->size(); i++){
92       res = (*plugins)[i].get()->getMetrics(metrics_response);
93       if (res != 0){
94         mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), metrics_response.c_str());
95       }
96       else{
97         res = curl_obj.post_metrics(metrics_response);
98         if (!res){
99           mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
100         }
101         else{
102           mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", metrics_response.c_str());
103         }
104       }
105     }
106     sleep(interval);
107   }
108   std::cout <<"Stopped metrics collector/reporter .." << std::endl;
109 };
110     
111
112 void EndProgram(int signum){
113   std::cout <<"Signal received. Stopping program ....." << std::endl;
114   RunProg = 0;
115 }
116
117 void msg_error(rmr_mbuf_t *message){
118   mdclog_write(MDCLOG_ERR, "Error: %s, %d  Could not send RMR message of length %d and type %d, Reason %s",  __FILE__, __LINE__, message->len,  message->mtype, strerror(errno) );
119 };
120
121
122 int main(int argc, char *argv[]){
123
124   // initially set log level to INFO
125   init_logger("XaPP", MDCLOG_INFO);
126   
127   configuration my_config;
128
129   // set config variables from environment
130   // used when deploying via start-up script
131   get_environment_config(my_config);
132
133   // over-ride with any command line variables if
134   // provided
135   get_command_line_config(argc, argv, my_config);
136
137   std::unique_ptr<XaPP> my_xapp;
138
139
140   // Reset log level based on configuration
141   init_logger(my_config.name, static_cast<mdclog_severity_t>(my_config.log_level));
142   
143   if (my_config.gNodeB_list.size() == 0){
144     mdclog_write(MDCLOG_WARN, "WARNING  : gNodeB not set for subscription. Subscription MAY FAIL");
145   }
146
147   if (my_config.operating_mode == "CONTROL"){
148     report_mode_only = false;
149   }
150   else{
151     report_mode_only = true;
152   }
153   
154   // Finished passing command line/environment arguments 
155   //=============================================================
156
157    // instantiate xapp object
158    if(my_config.num_threads >= 1){
159     mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
160     // Create XaPP that starts with specified number of threads 
161     my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024, my_config.num_threads);
162   }
163   else{
164     mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
165     //Let XaPP pick threads based on hardware 
166     my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
167   }
168   
169
170   mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
171   mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
172
173   
174    // Instantiate admission logic handler
175    Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file,  1));
176   
177    // Add reference to plugin list . We add twice (once for set policy  and once for get policy  ids)
178    // Plugin list is used by policy handler and metrics collector
179    plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_INT_CONTROL, Plugins[0].get()));
180    plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_GET_POLICY,  Plugins[0].get()));
181    
182    // instantiate curl object for ves 
183    std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
184
185
186    // Instantiate subscription handler
187    SubscriptionHandler sub_handler;
188
189    // Instantiate message handlers for RMR
190    // (one for each thread) and registrer
191    // subscription and admission handlers
192    
193    std::vector<std::unique_ptr<message_processor> > message_procs;
194    for(int i = 0; i < my_config.num_threads; i++){
195      std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> ();
196      mp_handler.get()->register_subscription_handler(& sub_handler);
197      mp_handler.get()->register_protector(dynamic_cast<admission *>(Plugins[0].get())->get_protector_instance(0));
198      mp_handler.get()->register_policy_handler (& policy_handler);
199      message_procs.push_back(std::move(mp_handler));
200    }
201   
202   
203    // Start the listening loops
204    std::vector<int> thread_ids(my_config.num_threads);
205    unsigned int i = 0;
206    for(auto  &e: message_procs){
207      thread_ids[i] = (*my_xapp).StartThread(*(e.get()), msg_error);
208      i++;
209    };
210
211    mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
212    mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
213
214    //======================================================
215    // sgnb Subscription spec
216
217    int request_id = 2; // will be over-written by subscription handler
218    int req_seq = 1;
219    int function_id = 0;
220    int action_id = 4;
221    int action_type = report_mode_only ? 0:1;
222    
223    int message_type = 1;
224    int procedure_code = 27;
225    std::string egnb_id = "Testgnb";
226    std::string plmn_id = "Testplmn";
227
228    unsigned char event_buf[128];
229    size_t event_buf_len = 128;
230    bool res;
231
232
233    e2sm_event_trigger_helper trigger_data;
234    e2sm_event_trigger event_trigger;
235   
236    trigger_data.egNB_id = egnb_id;
237    trigger_data.plmn_id = plmn_id;
238    trigger_data.egNB_id_type = 2;
239    trigger_data.interface_direction = 1;
240    trigger_data.procedure_code = procedure_code;
241    trigger_data.message_type = message_type;
242    res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
243    if (!res){
244      mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());
245      exit(0);
246    }
247   
248
249    subscription_helper sgnb_add_subscr_req;
250    subscription_response_helper subscr_response;
251   
252    sgnb_add_subscr_req.clear();
253    sgnb_add_subscr_req.set_request(request_id, req_seq);
254    sgnb_add_subscr_req.set_function_id(function_id);
255    sgnb_add_subscr_req.add_action(action_id, action_type);
256   
257   
258    sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);
259    mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);
260
261    //======================================================
262    // Purely for testing purposes ... write subscription ASN binary to file 
263    // FILE *pfile;
264    // pfile = fopen("event_trigger.pr", "wb");
265    // fwrite(event_buf, 1, event_buf_len,  pfile);
266    // fclose(pfile);
267    //======================================================
268   
269    
270    // keep sending subscription request till successfull for all gnodebs ?
271    auto it = my_config.gNodeB_list.begin();
272    while(my_config.gNodeB_list.size() > 0 && RunProg){
273      int attempt = 0;
274      res = false;
275       
276      while(!res){
277        mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);
278        res = add_subscription(sub_handler, my_xapp.get(),  sgnb_add_subscr_req, subscr_response, *it);
279        if (!res){
280          sleep(5);
281        };
282        attempt ++;
283        if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){
284          break;
285        }
286      }
287      
288      if(res){
289        mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());
290        // remove node from list,
291        // move to next gnobde
292        it = my_config.gNodeB_list.erase(it);
293      }
294
295      if (it == my_config.gNodeB_list.end()){
296        it = my_config.gNodeB_list.begin();
297      }
298      
299    }
300    
301    
302    std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;
303
304    //Register signal handler to stop 
305    signal(SIGINT, EndProgram);
306    signal(SIGTERM, EndProgram);
307    
308
309    // Purely for testing purposes ....
310    // If in test mode, we wait an interval and then send delete subscription request for each gNodeB
311    if(my_config.test_mode){
312      std::cout <<"====================== " << std::endl;
313      std::cout <<"WE ARE IN TEST MODE. " << std::endl;
314      std::cout <<"====================== " << std::endl;
315      std::cout <<"WILL SEND SUBSCRIPTION DELETE REQUEST AFTER " << my_config.measurement_interval << " SECONDS " << std::endl;
316      sleep(my_config.measurement_interval); 
317      res = false;
318      // keep sending subscription delete request till successfull ? 
319      int attempt = 0;
320      while(!res){
321        mdclog_write(MDCLOG_INFO, "Sending subscription delete request for id = %d ... Attempt number = %d\n", sgnb_add_subscr_req.get_request_id(), attempt);
322        res = delete_subscription(sub_handler, my_xapp.get(),  sgnb_add_subscr_req, subscr_response, my_config.gNodeB_list[0]);
323        if (!res){
324          sleep(5);
325        };
326        attempt ++;
327      }
328
329      std::cout <<"SUBSCRIPTION DELETE REQUEST :: Successfuly deleted subscription request " << request_id << std::endl;
330      
331    };
332    
333    //Wait for stop
334    while(RunProg){
335      sleep(10);
336    }
337   
338    i = 0;
339    for(auto  &e: message_procs){
340      mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());
341      std::cout << "Thread " << thread_ids[i] << "  Number of packets handled = " <<  e.get()->get_messages() << std::endl;
342      
343      i ++ ;
344    }
345    
346    std::cout <<"Stopping all running threads ..." << std::endl;
347    (*my_xapp).Stop();
348    std::cout <<"Stopped RMR processing threads ...." << std::endl;
349    metrics_thread.join();
350    std::cout <<"Stopped Metric collection thread ...." << std::endl;
351    Plugins.clear();
352    plugin_rmr_map.clear();
353    std::cout <<"Cleared Plugins .." << std::endl;
354   
355    std::cout <<"Finished ... " << std::endl;
356  
357    return 0;
358 };