2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
21 #include "admission_policy.hpp"
23 using namespace rapidjson;
25 // constructor loads the policy schema, metric schema, and example payload JSON files
26 // instantiates the required number of protector plugins
27 admission::admission (std::string policy_schema_file, std::string samples_file, std::string metrics_schema_file, unsigned int num_instances, std::string xapp_id, bool report_only){
28 if (num_instances < 1 || num_instances > MAX_INSTANCES){
30 ss << "Error !" << __FILE__ << ", " << __LINE__ << " Number of instances must be between " << 1 << " and " << MAX_INSTANCES << ". Specified = " << num_instances << std::endl;
31 error_string = ss.str();
32 throw std::runtime_error(error_string);
35 _xapp_id.assign(xapp_id);
41 std::string schema_key;
43 // Load schema files and extract relevant validators
44 // schema for downstream message
45 bool res = load_schema(policy_schema_file, "/downstream_schema", downstream_schema_ref_, downstream_validator_);
47 throw std::runtime_error(error_string);
49 // schema for notification message
50 res = load_schema(policy_schema_file, "/notify_schema", notify_schema_ref_, notify_validator_);
52 throw std::runtime_error(error_string);
54 // schema for metrics message
55 res = load_schema(metrics_schema_file, "", metrics_schema_ref_, metrics_validator_);
57 throw std::runtime_error(error_string);
61 // load samples file and extract sample messages
63 load_file(samples_file, buffer);
64 if(_doc.Parse(buffer.c_str()).HasParseError()){
65 ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON in file " << samples_file << std::endl;
66 error_string = ss.str();
67 throw std::runtime_error(error_string);
69 mdclog_write(MDCLOG_DEBUG, "Loaded sample file %s\n", samples_file.c_str());
71 StringBuffer s_buffer;
72 Writer<StringBuffer> writer(s_buffer);
74 // extract notify sample message
76 schema_key = "/notify_policy_message";
77 ref = Pointer(schema_key.c_str()).Get(_doc);
79 ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << std::endl;
80 error_string = ss.str();
81 throw std::runtime_error(error_string);
83 (*ref).Accept(writer);
84 if(notify_message_.Parse(s_buffer.GetString()).HasParseError()){
85 ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON snippet at key : " << schema_key << std::endl;
86 error_string = ss.str();
87 throw std::runtime_error(error_string);
91 mdclog_write(MDCLOG_DEBUG, "Loaded sample message for notification policy");
93 // extract metrics sample message
95 schema_key = "/metrics";
96 ref = Pointer(schema_key.c_str()).Get(_doc);
98 ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << std::endl;
99 error_string = ss.str();
100 throw std::runtime_error(error_string);
103 writer.Reset(s_buffer);
104 (*ref).Accept(writer);
105 if(metrics_message_.Parse(s_buffer.GetString()).HasParseError()){
106 ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON snippet at key : " << schema_key<< std::endl;
107 error_string = ss.str();
108 throw std::runtime_error(error_string);
112 mdclog_write(MDCLOG_DEBUG, "Loaded sample message for metrics");
114 // set the keys we extract and update
117 //instantiate the core policy plugin
118 for(unsigned int i = 0; i < num_instances; i++){
119 instantiate_protector_plugin(report_only);
126 bool admission::load_schema(const std::string & schema_file, const std::string & schema_key, std::unique_ptr<SchemaDocument> & schema_ref, std::unique_ptr<SchemaValidator> &validator_ref){
131 std::stringstream ss;
134 // load policy schema file
135 res = load_file(schema_file, buffer);
140 if(doc.Parse(buffer.c_str()).HasParseError()){
141 ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON in file " << schema_file << std::endl;
142 error_string = ss.str();
145 mdclog_write(MDCLOG_DEBUG, "Loaded schema file %s\n", schema_file.c_str());
148 Pointer p(schema_key.c_str());
151 ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << " in file " << schema_file << std::endl;
152 error_string = ss.str();
156 schema_ref = std::make_unique<SchemaDocument>((*ref));
157 validator_ref = std::make_unique<SchemaValidator>((*schema_ref));
158 mdclog_write(MDCLOG_DEBUG, "Loaded schema and validator for %s\n", schema_file.c_str());
164 void admission::instantiate_protector_plugin(bool mode){
165 _plugin_instances.emplace_back(0, 60, 5000, 20, mode);
168 admission::~admission(void){
173 bool admission::load_file(std::string input_file, std::string & contents ){
177 fp = std::fopen(input_file.c_str(), "rb");
179 catch(std::exception &e){
180 error_string = "Error opening input file " + input_file + " Reason = " + e.what();
185 std::fseek(fp, 0, SEEK_END);
186 contents.resize(std::ftell(fp));
188 std::fread(&contents[0], 1, contents.size(), fp);
193 error_string = "Error opening input file " + input_file;
201 std::string admission::getName(void){
202 return std::string("admission control policy");
205 // Function sets path for each key
206 // in json object for set/get policy JSON
207 // and metrics payload string. This way, we do not create
208 // entire JSON from scratch, but rather just update the relevant portions
209 void admission::setPolicyVars(void){
211 // generic variables in the policy that are re-used
212 generic_policy_vars.insert(std::pair<std::string, Pointer>("policy_type_id", Pointer("/policy_type_id")));
213 generic_policy_vars.insert(std::pair<std::string, Pointer>("policy_instance_id", Pointer("/policy_instance_id")));
214 generic_policy_vars.insert(std::pair<std::string, Pointer>("status", Pointer("/status")));
215 generic_policy_vars.insert(std::pair<std::string, Pointer>("message", Pointer("/message")));
216 generic_policy_vars.insert(std::pair<std::string, Pointer>("operation", Pointer("/operation")));
217 generic_policy_vars.insert(std::pair<std::string, Pointer>("xapp_id", Pointer("/handler_id")));
219 //variables in the sliding window policy
220 window_policy_vars.insert(std::pair<std::string, Pointer>("class", Pointer("/payload/class")));
221 window_policy_vars.insert(std::pair<std::string, Pointer>("enforce", Pointer("/payload/enforce")));
222 window_policy_vars.insert(std::pair<std::string, Pointer>("window_length", Pointer("/payload/window_length")));
223 window_policy_vars.insert(std::pair<std::string, Pointer>("trigger_threshold", Pointer("/payload/trigger_threshold")));
224 window_policy_vars.insert(std::pair<std::string, Pointer>("blocking_rate", Pointer("/payload/blocking_rate")));
227 // variables in each metric message
228 metric_vars.insert(std::pair<std::string, Pointer>("class", Pointer("/event/measurementFields/additionalFields/Class Id")));
229 metric_vars.insert(std::pair<std::string, Pointer>("request_count", Pointer("/event/measurementFields/additionalFields/SgNB Request Count")));
230 metric_vars.insert(std::pair<std::string, Pointer>("accept_count", Pointer("/event/measurementFields/additionalFields/SgNB Accept Count")));
231 metric_vars.insert(std::pair<std::string, Pointer>("report_interval", Pointer("/event/measurementFields/measurementInterval")));
232 metric_vars.insert(std::pair<std::string, Pointer>("epoch", Pointer("/event/commonHeader/startEpochMicrosec")));
235 // set xapp id in return message
236 generic_policy_vars["xapp_id"].Set(notify_message_, _xapp_id.c_str());
238 // Set up the counters for metrics
239 auto ts = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now());
240 auto epoch = ts.time_since_epoch();
241 auto val = std::chrono::duration_cast<std::chrono::microseconds>(epoch);
242 prev_time_stamp = val.count();
246 // returns the plugin core
247 protector * admission::get_protector_instance(unsigned int index){
248 if (index > _plugin_instances.size() -1){
249 mdclog_write(MDCLOG_ERR, "%s, %d: Error . Requested index %u exceeds number of plugin instances %lu", __FILE__, __LINE__, index, _plugin_instances.size());
253 return &_plugin_instances[index];
258 // control plane function to set policy in plugin
259 // create policy returns successful if no previous policy with same class id exists and values are valid
260 // delete policy returns successful if policy table contains same policy instance id
261 // update policy returns successful if policy table contains same policy instance id, and values are valid
262 bool admission::setPolicy(const char * message, int message_length, std::string & response){
265 std::string policy_instance_id;
266 std::string status_message, operation;
267 std::stringstream ss;
269 // policy key variables
270 bool enforce = false;
271 int window_size = -1, trigger_threshold = -1, class_id = -1;
272 double blocking_rate = -1;
274 // reset the validator
275 (*downstream_validator_).Reset();
277 // step 1: verify JSON
279 if(doc.Parse(message).HasParseError()){
280 status_message.assign("Invalid JSON");
283 // Validate against our JSON input schema
284 else if (!doc.Accept((*downstream_validator_))){
286 (*downstream_validator_).GetInvalidSchemaPointer().StringifyUriFragment(sb);
287 std::string failed_message = std::string("\"message\": \"Schema Violation:") + std::string(sb.GetString());
288 failed_message += std::string(" Invalid keyword :") + std::string((*downstream_validator_).GetInvalidSchemaKeyword()) + " \"";
290 (*downstream_validator_).GetInvalidDocumentPointer().StringifyUriFragment(sb);
291 failed_message += std::string(" Invalid document :") + std::string(sb.GetString());
292 status_message.assign(failed_message);
297 // step 2: extract the standard keys expected in all downstream
298 // messages : policy_instance
301 for(auto const &e : generic_policy_vars){
303 ref = e.second.Get(doc);
305 // this key can be simultaneously put in notify
306 if (ref != NULL && e.first == "policy_type_id" ){
307 e.second.Set(notify_message_, ref->GetInt());
310 // this key can be simultaneously put in notify
311 else if (ref != NULL && e.first == "policy_instance_id" ){
312 e.second.Set(notify_message_, ref->GetString());
313 policy_instance_id = ref->GetString();
315 else if (ref != NULL && e.first == "operation"){
316 operation = ref->GetString();
321 // do we have this policy ?
322 auto it = policy_table.find(policy_instance_id);
325 // if operation is create and policy already present, simply return with OK ?
326 // we may get the same create policy multiple times due to race conditions when the
328 if(operation == "CREATE" && it != policy_table.end()){
333 if (operation == "DELETE" || operation == "UPDATE"){
334 // don't proceed if policy not found
335 if(it == policy_table.end()){
336 ss <<" No policy instance = " << policy_instance_id << " found. Cannot perform operation = " << operation << std::endl;
337 status_message = ss.str();
341 class_id = it->second; // used if delete
346 // perform the operation
349 if (operation == "DELETE"){
350 res = _plugin_instances[0].delete_policy(class_id);
352 ss <<"Policy instance id " << policy_instance_id << " successfully deleted" << std::endl;
353 status_message = ss.str();
354 policy_table.erase(policy_instance_id);
357 status_message = _plugin_instances[0].get_error();
361 else if (operation == "CREATE" or operation == "UPDATE"){
362 // initialize policy params to invalid
365 trigger_threshold = -1;
368 // get values for policy keys
369 for(auto const& e: window_policy_vars){
371 ref = e.second.Get(doc);
375 if(e.first == "enforce"){
376 enforce = ref->GetBool();
378 else if (e.first == "window_length"){
379 window_size = ref->GetInt();
381 else if (e.first == "blocking_rate"){
382 blocking_rate = ref->GetDouble();
384 else if (e.first == "trigger_threshold"){
385 trigger_threshold = ref->GetInt();
387 else if (e.first == "class"){
388 class_id = ref->GetInt();
393 if(operation == "CREATE"){
394 res = _plugin_instances[0].add_policy(enforce, window_size, trigger_threshold, blocking_rate, class_id);
395 status_message.assign(_plugin_instances[0].get_error());
398 // add to policy list
399 policy_table.insert(std::pair<std::string, int>(policy_instance_id, class_id));
402 else if (operation == "UPDATE"){
403 res = _plugin_instances[0].configure(enforce, window_size, trigger_threshold, blocking_rate, class_id);
404 status_message.assign(_plugin_instances[0].get_error());
412 status_message.assign("SUCCESS");
417 //generic_policy_vars["message"].Set(notify_message_, status_message.c_str());
420 generic_policy_vars["status"].Set(notify_message_, "ERROR");
423 if(operation == "DELETE"){
424 generic_policy_vars["status"].Set(notify_message_, "DELETED");
427 generic_policy_vars["status"].Set(notify_message_, "OK");
432 StringBuffer s_buffer;
433 Writer<StringBuffer> writer(s_buffer);
434 notify_message_.Accept(writer);
435 response.assign(s_buffer.GetString(), s_buffer.GetLength());
436 mdclog_write(MDCLOG_DEBUG, "Set Policy Response = %s\n", response.c_str());
441 // control plane function to retreive policy set in plugin
442 // This is just a placeholder. Still TBD .....
443 bool admission::getPolicy(const char * message, int message_length, std::string & response){
449 // control plane function to retreive metrics from plugin
450 // crafts into ves schema based JSON payload
451 int admission::getMetrics(std::vector<std::string> & response_vector){
454 // the list of active policies on the protector plugin can
455 // dynamically change.
458 // run through active policies
459 for(auto const &e: policy_table){
461 std:: string response;
462 process_counters(id, response);
463 response_vector.emplace_back(response);
466 // also account for default policy
468 std::string response;
469 process_counters(id, response);
470 response_vector.emplace_back(response);
473 // clear out counters for expired policies
474 for (auto const &e : counters){
476 if (e.first != -1 && ! _plugin_instances[0].is_active(e.first)){
477 counters.erase(e.first);
485 void admission::process_counters(int id, std::string & response){
488 long request_count, accept_count, curr_timestamp, time_interval;
490 long int requests = _plugin_instances[0].get_requests(id);
491 long int rejects = _plugin_instances[0].get_rejects(id);
494 std::chrono::time_point<std::chrono::system_clock, std::chrono::microseconds> ts;
495 ts = std::chrono::time_point_cast<std::chrono::microseconds> (std::chrono::system_clock::now());
496 curr_timestamp =ts.time_since_epoch().count();
498 // do we have counters for this policy ?
499 auto prev_it = counters.find(id);
500 if(prev_it == counters.end()){
502 // new policy seeing for first time
503 request_count = requests;
504 accept_count = requests - rejects;
508 counters.insert(std::pair<int, std::vector<long int>>(id, std::vector<long int>({requests, rejects, curr_timestamp})));
511 request_count = requests - prev_it->second[0];
512 accept_count = request_count - (rejects - prev_it->second[1]);
513 time_interval = ceil((curr_timestamp - prev_it->second[2])/1e+6); // seconds
516 prev_it->second[0] = requests;
517 prev_it->second[1] = rejects;
518 prev_it->second[2] = curr_timestamp;
521 // generate string and add to response vector
522 metric_vars["class"].Set(metrics_message_, std::to_string(id).c_str());
523 metric_vars["request_count"].Set(metrics_message_, std::to_string(request_count).c_str());
524 metric_vars["accept_count"].Set(metrics_message_, std::to_string(accept_count).c_str());
525 metric_vars["epoch"].Set(metrics_message_, std::to_string(curr_timestamp).c_str());
526 metric_vars["report_interval"].Set(metrics_message_, std::to_string(time_interval).c_str());
528 StringBuffer sb_buffer;
529 Writer<StringBuffer> writer(sb_buffer);
530 metrics_message_.Accept(writer);
531 response.assign(sb_buffer.GetString(), sb_buffer.GetLength());