067f3853e665c219ac81e1f09742344f69e39f64
[ric-app/admin.git] / src / protector-plugin / admission_policy.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20
21 #include "admission_policy.hpp"
22
23 using namespace rapidjson;
24
25 // constructor loads the policy schema, metric schema, and example payload JSON files
26 // instantiates the required number of protector plugins 
27 admission::admission (std::string policy_schema_file, std::string samples_file,  std::string metrics_schema_file, unsigned int num_instances, std::string xapp_id, bool report_only){
28   if (num_instances < 1 || num_instances > MAX_INSTANCES){
29     std::stringstream ss;
30     ss << "Error !" << __FILE__ << ", " << __LINE__ << " Number of instances must be between " << 1 << " and " << MAX_INSTANCES << ". Specified = " << num_instances << std::endl;
31     error_string = ss.str();
32     throw std::runtime_error(error_string);
33   }
34
35   _xapp_id.assign(xapp_id);
36   std::string response;
37   std::string buffer;
38   std::stringstream ss;
39   Document _doc;
40   Value * ref;
41   std::string schema_key;
42
43   // Load schema files and extract relevant validators
44   // schema for downstream message
45   bool res = load_schema(policy_schema_file, "/downstream_schema", downstream_schema_ref_, downstream_validator_);
46   if(res == false){
47     throw std::runtime_error(error_string);
48   }
49   // schema for notification message
50   res = load_schema(policy_schema_file, "/notify_schema", notify_schema_ref_, notify_validator_);
51   if(res == false){
52     throw std::runtime_error(error_string);
53   }
54   // schema for metrics message
55   res = load_schema(metrics_schema_file, "", metrics_schema_ref_, metrics_validator_);
56   if(res == false){
57     throw std::runtime_error(error_string);
58   }
59
60   
61   // load samples file and extract sample messages
62   buffer.erase();
63   load_file(samples_file, buffer);
64   if(_doc.Parse(buffer.c_str()).HasParseError()){
65     ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON in file " << samples_file << std::endl;
66     error_string = ss.str();
67     throw std::runtime_error(error_string);
68   }
69   mdclog_write(MDCLOG_DEBUG, "Loaded sample file %s\n", samples_file.c_str());
70
71   StringBuffer s_buffer;
72   Writer<StringBuffer> writer(s_buffer);
73   
74   // extract notify sample message
75   ref = NULL;
76   schema_key = "/notify_policy_message";
77   ref = Pointer(schema_key.c_str()).Get(_doc);
78   if(! ref){
79     ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << std::endl;
80     error_string = ss.str();
81     throw std::runtime_error(error_string);
82   }
83   (*ref).Accept(writer);
84   if(notify_message_.Parse(s_buffer.GetString()).HasParseError()){
85     ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON snippet at key :  " << schema_key  << std::endl;
86     error_string = ss.str();
87     throw std::runtime_error(error_string);
88   }
89       
90                                                     
91   mdclog_write(MDCLOG_DEBUG, "Loaded sample message for notification policy");
92   
93   // extract metrics sample message
94   ref = NULL;
95   schema_key = "/metrics";
96   ref = Pointer(schema_key.c_str()).Get(_doc);
97   if(! ref){
98     ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << std::endl;
99     error_string = ss.str();
100     throw std::runtime_error(error_string);
101   }
102   s_buffer.Clear();
103   writer.Reset(s_buffer);
104   (*ref).Accept(writer);
105   if(metrics_message_.Parse(s_buffer.GetString()).HasParseError()){
106     ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON snippet at key :  " << schema_key<< std::endl;
107     error_string = ss.str();
108     throw std::runtime_error(error_string);
109   }
110
111   
112   mdclog_write(MDCLOG_DEBUG, "Loaded sample message for metrics");
113   
114   // set the keys we extract and update 
115   setPolicyVars();
116   
117   //instantiate the core policy plugin
118   for(unsigned int i = 0; i < num_instances; i++){
119     instantiate_protector_plugin(report_only);
120   }
121   
122 };
123
124
125
126 bool admission::load_schema(const std::string & schema_file, const std::string & schema_key, std::unique_ptr<SchemaDocument> & schema_ref, std::unique_ptr<SchemaValidator> &validator_ref){
127
128   std::string buffer;
129   Value *ref;
130   Document doc;
131   std::stringstream ss;
132   bool res;
133   
134   // load policy schema file
135   res = load_file(schema_file, buffer);
136   if(!res){
137     return false;
138   }
139   
140   if(doc.Parse(buffer.c_str()).HasParseError()){
141     ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON in file " << schema_file << std::endl;
142     error_string = ss.str();
143     return false;
144   }
145   mdclog_write(MDCLOG_DEBUG, "Loaded schema file %s\n", schema_file.c_str());
146
147   ref = NULL;
148   Pointer p(schema_key.c_str());
149   ref = p.Get(doc);
150   if(! ref){
151     ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << " in file " << schema_file <<  std::endl;
152     error_string = ss.str();
153     return false;
154   }
155
156   schema_ref = std::make_unique<SchemaDocument>((*ref));
157   validator_ref = std::make_unique<SchemaValidator>((*schema_ref));  
158   mdclog_write(MDCLOG_DEBUG, "Loaded schema and validator for %s\n", schema_file.c_str());
159
160   return true;
161
162 }
163
164 void admission::instantiate_protector_plugin(bool mode){
165   _plugin_instances.emplace_back(0, 60, 5000, 20, mode);
166 }
167
168 admission::~admission(void){
169   counters.clear();
170  
171 }
172
173 bool admission::load_file(std::string input_file, std::string &  contents ){
174   
175   std::FILE *fp ;
176   try{
177     fp = std::fopen(input_file.c_str(), "rb");
178   }
179   catch(std::exception &e){
180     error_string = "Error opening input  file " + input_file + " Reason = " + e.what();
181     return false;
182   } 
183   
184   if (fp){
185     std::fseek(fp, 0, SEEK_END);
186     contents.resize(std::ftell(fp));
187     std::rewind(fp);
188     std::fread(&contents[0], 1, contents.size(), fp);
189     std::fclose(fp);
190   }
191   
192   else{
193     error_string = "Error opening input  file " + input_file;
194     return false;
195   }
196
197   return true;
198 }
199
200
201 std::string admission::getName(void){
202   return std::string("admission control policy");
203 }
204
205 // Function sets path for each key
206 // in json object for set/get policy JSON
207 // and metrics payload string. This way, we do not create
208 // entire JSON from scratch, but rather just update the relevant portions
209 void admission::setPolicyVars(void){
210
211   // generic variables in the policy that are re-used
212   generic_policy_vars.insert(std::pair<std::string, Pointer>("policy_type_id", Pointer("/policy_type_id")));
213   generic_policy_vars.insert(std::pair<std::string, Pointer>("policy_instance_id", Pointer("/policy_instance_id")));
214   generic_policy_vars.insert(std::pair<std::string, Pointer>("status", Pointer("/status")));
215   generic_policy_vars.insert(std::pair<std::string, Pointer>("message", Pointer("/message")));
216   generic_policy_vars.insert(std::pair<std::string, Pointer>("operation", Pointer("/operation")));
217   generic_policy_vars.insert(std::pair<std::string, Pointer>("xapp_id", Pointer("/handler_id")));
218   
219   //variables in the sliding window policy
220   window_policy_vars.insert(std::pair<std::string, Pointer>("class", Pointer("/payload/class")));
221   window_policy_vars.insert(std::pair<std::string, Pointer>("enforce", Pointer("/payload/enforce")));
222   window_policy_vars.insert(std::pair<std::string, Pointer>("window_length", Pointer("/payload/window_length")));
223   window_policy_vars.insert(std::pair<std::string, Pointer>("trigger_threshold", Pointer("/payload/trigger_threshold")));
224   window_policy_vars.insert(std::pair<std::string, Pointer>("blocking_rate", Pointer("/payload/blocking_rate")));
225
226
227   // variables in each metric message
228   metric_vars.insert(std::pair<std::string, Pointer>("class", Pointer("/event/measurementFields/additionalFields/Class Id")));
229   metric_vars.insert(std::pair<std::string, Pointer>("request_count", Pointer("/event/measurementFields/additionalFields/SgNB Request Count")));
230   metric_vars.insert(std::pair<std::string, Pointer>("accept_count", Pointer("/event/measurementFields/additionalFields/SgNB Accept Count")));
231   metric_vars.insert(std::pair<std::string, Pointer>("report_interval", Pointer("/event/measurementFields/measurementInterval")));
232   metric_vars.insert(std::pair<std::string, Pointer>("epoch", Pointer("/event/commonHeader/startEpochMicrosec")));
233
234
235   // set xapp id in return message
236   generic_policy_vars["xapp_id"].Set(notify_message_, _xapp_id.c_str());
237   
238   // Set up the counters for metrics
239   auto ts = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now());
240   auto epoch = ts.time_since_epoch();
241   auto val = std::chrono::duration_cast<std::chrono::microseconds>(epoch);
242   prev_time_stamp = val.count();
243
244 }
245
246 // returns the plugin core 
247 protector * admission::get_protector_instance(unsigned int index){
248   if (index > _plugin_instances.size() -1){
249     mdclog_write(MDCLOG_ERR, "%s, %d: Error . Requested index %u exceeds number of plugin instances %lu", __FILE__, __LINE__, index, _plugin_instances.size());
250     return NULL;
251   }
252   else{
253     return &_plugin_instances[index];
254   }
255 };
256
257
258 // control plane function to set policy in plugin
259 // create policy returns successful if no previous policy with same class id exists and values are valid
260 // delete policy returns successful if policy table contains same policy instance id
261 // update policy returns successful if policy table contains same policy instance id,  and values are valid
262 bool admission::setPolicy(const char * message, int message_length, std::string & response){
263
264   bool res = false;
265   std::string policy_instance_id;
266   std::string status_message, operation;
267   std::stringstream ss;
268   
269   // policy key variables
270   bool enforce = false;
271   int window_size = -1, trigger_threshold = -1, class_id = -1;
272   double blocking_rate = -1;
273
274   // reset the validator
275   (*downstream_validator_).Reset();
276   
277   // step 1: verify JSON
278   Document doc;
279   if(doc.Parse(message).HasParseError()){
280     status_message.assign("Invalid JSON");
281   }
282   
283   // Validate against our JSON input schema
284   else if (!doc.Accept((*downstream_validator_))){
285     StringBuffer sb;
286     (*downstream_validator_).GetInvalidSchemaPointer().StringifyUriFragment(sb);
287     std::string failed_message = std::string("\"message\": \"Schema Violation:") + std::string(sb.GetString());
288     failed_message += std::string(" Invalid keyword :") + std::string((*downstream_validator_).GetInvalidSchemaKeyword()) + " \"";
289     sb.Clear();
290     (*downstream_validator_).GetInvalidDocumentPointer().StringifyUriFragment(sb);
291     failed_message += std::string(" Invalid document :") + std::string(sb.GetString());
292     status_message.assign(failed_message);
293
294   }
295   else{
296
297     // step 2: extract the standard keys expected in all downstream
298     // messages : policy_instance
299     Value * ref;
300     
301     for(auto const &e : generic_policy_vars){
302       ref = NULL;
303       ref = e.second.Get(doc);
304
305       // this key can be simultaneously put in notify 
306       if (ref != NULL && e.first == "policy_type_id" ){
307         e.second.Set(notify_message_, ref->GetInt());
308       }
309
310       // this key can be simultaneously put in notify 
311       else if (ref != NULL && e.first == "policy_instance_id" ){
312         e.second.Set(notify_message_, ref->GetString());
313         policy_instance_id = ref->GetString();
314       }
315       else if (ref != NULL && e.first == "operation"){
316         operation = ref->GetString();
317       }
318     }
319
320
321     // do we have this policy ?
322     auto it = policy_table.find(policy_instance_id);
323     res = true;
324
325     // if operation is create and policy already present, simply return with OK ?
326     // we may get the same create policy multiple times due to race conditions when the
327     // xapp starts up
328     if(operation == "CREATE" && it != policy_table.end()){
329       res = true;
330     }
331     else{
332       
333       if (operation == "DELETE" || operation == "UPDATE"){
334         // don't proceed if policy not found 
335         if(it == policy_table.end()){
336           ss <<" No policy instance = " << policy_instance_id << " found. Cannot perform operation = " << operation << std::endl;
337           status_message = ss.str();
338           res = false;
339         }
340         else{
341           class_id = it->second; // used if delete 
342         }
343       }
344       
345       if (res){
346         // perform the operation
347         res = false;
348         
349         if (operation == "DELETE"){
350           res = _plugin_instances[0].delete_policy(class_id);
351           if(res){
352             ss <<"Policy instance id " << policy_instance_id << " successfully deleted" << std::endl;
353             status_message = ss.str();
354             policy_table.erase(policy_instance_id);
355           }
356           else{
357             status_message = _plugin_instances[0].get_error();
358           }
359         }
360         
361         else if (operation == "CREATE" or operation == "UPDATE"){
362           // initialize policy params to invalid
363           window_size = -1;
364           class_id = -1;
365           trigger_threshold = -1;
366           blocking_rate = -1;
367           
368           // get values for policy keys
369           for(auto const& e: window_policy_vars){
370             ref = NULL;
371             ref = e.second.Get(doc);
372             if (ref == NULL){
373               continue;
374             }
375             if(e.first == "enforce"){
376               enforce = ref->GetBool();
377             }
378             else if (e.first == "window_length"){
379               window_size = ref->GetInt();
380             }
381             else if (e.first == "blocking_rate"){
382               blocking_rate = ref->GetDouble();
383             }
384             else if (e.first == "trigger_threshold"){
385               trigger_threshold = ref->GetInt();
386             }
387             else if (e.first == "class"){
388               class_id = ref->GetInt();
389             } 
390           }
391           
392           
393           if(operation == "CREATE"){
394             res = _plugin_instances[0].add_policy(enforce, window_size, trigger_threshold, blocking_rate, class_id);
395             status_message.assign(_plugin_instances[0].get_error());
396             
397             if(res == true){
398               // add to policy list
399               policy_table.insert(std::pair<std::string, int>(policy_instance_id, class_id));
400             }
401           }
402           else if (operation == "UPDATE"){
403             res = _plugin_instances[0].configure(enforce, window_size, trigger_threshold, blocking_rate, class_id);
404             status_message.assign(_plugin_instances[0].get_error()); 
405           }
406           
407         }
408       }
409     }
410     
411     if(res == true)
412       status_message.assign("SUCCESS");
413   }
414   
415
416   // generate response 
417   //generic_policy_vars["message"].Set(notify_message_, status_message.c_str());
418   
419   if(res == false){
420     generic_policy_vars["status"].Set(notify_message_, "ERROR");
421   }
422   else {
423     if(operation == "DELETE"){
424       generic_policy_vars["status"].Set(notify_message_, "DELETED");
425     }
426     else{
427       generic_policy_vars["status"].Set(notify_message_, "OK");
428     }
429   }
430   
431
432   StringBuffer s_buffer;
433   Writer<StringBuffer> writer(s_buffer);
434   notify_message_.Accept(writer);
435   response.assign(s_buffer.GetString(), s_buffer.GetLength());
436   mdclog_write(MDCLOG_DEBUG, "Set Policy Response = %s\n", response.c_str());
437   return res;
438   
439 };
440
441 // control plane function to retreive policy set in plugin
442 // This is just a placeholder. Still TBD ..... 
443 bool admission::getPolicy(const char * message, int message_length, std::string & response){
444   return true;
445                                                                        
446 }
447
448
449 // control plane function to retreive metrics from plugin
450 // crafts into ves schema based JSON payload
451 int admission::getMetrics(std::vector<std::string> & response_vector){
452
453   int res;
454   // the list of active policies on the protector plugin can
455   // dynamically change.
456
457
458   // run through active policies
459   for(auto const &e: policy_table){
460     int id = e.second;
461     std:: string response;
462     process_counters(id, response);
463     response_vector.emplace_back(response);
464   }
465
466   // also account for default policy
467   int id = -1;
468   std::string response;
469   process_counters(id, response);
470   response_vector.emplace_back(response);
471
472   
473   // clear out counters for expired policies
474   for (auto const &e : counters){
475     
476     if (e.first != -1 && ! _plugin_instances[0].is_active(e.first)){
477       counters.erase(e.first);
478     }
479   }
480
481   return 0;
482 }
483
484
485 void admission::process_counters(int id, std::string & response){
486
487
488   long request_count, accept_count, curr_timestamp,  time_interval;
489   
490   long int requests = _plugin_instances[0].get_requests(id);
491   long int rejects = _plugin_instances[0].get_rejects(id);
492     
493    
494   std::chrono::time_point<std::chrono::system_clock, std::chrono::microseconds> ts;
495   ts = std::chrono::time_point_cast<std::chrono::microseconds> (std::chrono::system_clock::now());
496   curr_timestamp =ts.time_since_epoch().count();
497   
498   // do we have counters for this policy ?
499   auto prev_it = counters.find(id);
500   if(prev_it == counters.end()){
501
502     // new policy seeing for first time 
503     request_count = requests;
504     accept_count = requests - rejects;
505     time_interval = 0;
506     
507     // store
508     counters.insert(std::pair<int, std::vector<long int>>(id, std::vector<long int>({requests, rejects, curr_timestamp})));
509   }
510   else{
511     request_count = requests - prev_it->second[0];
512     accept_count = request_count - (rejects - prev_it->second[1]);
513     time_interval = ceil((curr_timestamp - prev_it->second[2])/1e+6); // seconds
514     
515     // update history
516     prev_it->second[0] = requests;
517     prev_it->second[1] = rejects;
518     prev_it->second[2] = curr_timestamp;
519   }
520   
521   // generate string and add to response vector
522   metric_vars["class"].Set(metrics_message_, std::to_string(id).c_str());
523   metric_vars["request_count"].Set(metrics_message_, std::to_string(request_count).c_str());
524   metric_vars["accept_count"].Set(metrics_message_, std::to_string(accept_count).c_str());
525   metric_vars["epoch"].Set(metrics_message_, std::to_string(curr_timestamp).c_str());
526   metric_vars["report_interval"].Set(metrics_message_, std::to_string(time_interval).c_str());
527   
528   StringBuffer sb_buffer;
529   Writer<StringBuffer> writer(sb_buffer);  
530   metrics_message_.Accept(writer);
531   response.assign(sb_buffer.GetString(), sb_buffer.GetLength());
532   
533
534 }