#include "admission_policy.hpp"
-admission::admission (std::string policy_schema_file, std::string samples_file, std::string metrics_schema_file, unsigned int num_instances, bool report_only){
- bool res;
-
- if (num_instances == 0){
- throw std::runtime_error("Error ! Number of instances of admission_policy protector pluging must be > 0");
- }
-
- std::string response;
- std::string buffer;
- std::string error_string;
-
- std::vector<TrieNode> config_schema_path;
-
- config_schema_path.clear();
-
- // path to node in schema to process policy request
- config_schema_path.emplace_back("controls");
- config_schema_path.emplace_back((int)0);
- config_schema_path.emplace_back("message_receives_payload_schema");
- config_schema_path[0].add_child(&config_schema_path[1]);
- config_schema_path[1].add_child(&config_schema_path[2]);
-
- set_policy_req_obj.load_schema(policy_schema_file, &config_schema_path[0]);
- // mdclog_write(MDCLOG_INFO, "Loaded schema for set Policy request");
-
- //path to node in schema to process policy response
- config_schema_path.clear();
- config_schema_path.emplace_back("controls");
- config_schema_path.emplace_back(0);
- config_schema_path.emplace_back("message_sends_payload_schema");
- config_schema_path[0].add_child(&config_schema_path[1]);
- config_schema_path[1].add_child(&config_schema_path[2]);
-
- set_policy_resp_obj.load_schema(policy_schema_file, &config_schema_path[0]);
- mdclog_write(MDCLOG_INFO, "Loaded schema for set Policy response");
-
- // load sample response
- config_schema_path.clear();
- config_schema_path.emplace_back("message_sends_example");
-
- set_policy_resp_obj.load_buffer(samples_file, &config_schema_path[0]);
-
- // verify that our sample conforms to the schema ...
- buffer = set_policy_resp_obj.get_buffer();
- if (! set_policy_resp_obj.is_valid(buffer.c_str(), buffer.length(), response)){
- std::stringstream ss;
- ss << "Error ! Sample loaded for SET policy response = " << buffer << " from file " << samples_file << " not valid. Reason = " << response;
- throw std::runtime_error(ss.str());
- }
- mdclog_write(MDCLOG_INFO, "Verified sample for set Policy response");
-
- // path to node in schema to respond to get policy (current same as set policy)
- config_schema_path.clear();
- config_schema_path.emplace_back("controls");
- config_schema_path.emplace_back(0);
- config_schema_path.emplace_back("message_receives_payload_schema");
- config_schema_path[0].add_child(&config_schema_path[1]);
- config_schema_path[1].add_child(&config_schema_path[2]);
-
- get_policy_resp_obj.load_schema(policy_schema_file, &config_schema_path[0]);
- mdclog_write(MDCLOG_INFO, "Loaded schema for get Policy response");
-
- // sample to respond to get policy
- config_schema_path.clear();
- config_schema_path.emplace_back("message_receives_example");
+using namespace rapidjson;
- get_policy_resp_obj.load_buffer(samples_file, &config_schema_path[0]);
-
- // verify that our sample conforms to schema
- buffer = get_policy_resp_obj.get_buffer();
- if (! get_policy_resp_obj.is_valid(buffer.c_str(), buffer.length(), response)){
+// constructor loads the policy schema, metric schema, and example payload JSON files
+// instantiates the required number of protector plugins
+admission::admission (std::string policy_schema_file, std::string samples_file, std::string metrics_schema_file, unsigned int num_instances, std::string xapp_id, bool report_only){
+ if (num_instances < 1 || num_instances > MAX_INSTANCES){
std::stringstream ss;
- ss << "Error ! Sample loaded for GET policy response = " << buffer << " from file " << samples_file << " not valid. Reason = " << response;
- throw std::runtime_error(ss.str());
+ ss << "Error !" << __FILE__ << ", " << __LINE__ << " Number of instances must be between " << 1 << " and " << MAX_INSTANCES << ". Specified = " << num_instances << std::endl;
+ error_string = ss.str();
+ throw std::runtime_error(error_string);
+ }
+ _xapp_id.assign(xapp_id);
+ std::string response;
+ std::string buffer;
+ std::stringstream ss;
+ Document _doc;
+ Value * ref;
+ std::string schema_key;
+
+ // Load schema files and extract relevant validators
+ // schema for downstream message
+ bool res = load_schema(policy_schema_file, "/downstream_schema", downstream_schema_ref_, downstream_validator_);
+ if(res == false){
+ throw std::runtime_error(error_string);
+ }
+ // schema for notification message
+ res = load_schema(policy_schema_file, "/notify_schema", notify_schema_ref_, notify_validator_);
+ if(res == false){
+ throw std::runtime_error(error_string);
+ }
+ // schema for metrics message
+ res = load_schema(metrics_schema_file, "", metrics_schema_ref_, metrics_validator_);
+ if(res == false){
+ throw std::runtime_error(error_string);
}
-
- mdclog_write(MDCLOG_INFO, "Verified sample for get Policy response");
-
- // schema & sample for metrics
- metrics_obj.load_schema(metrics_schema_file);
- mdclog_write(MDCLOG_INFO, "Loaded schema for ves metrics");
-
- config_schema_path.clear();
- config_schema_path.emplace_back("metrics");
- metrics_obj.load_buffer(samples_file, &config_schema_path[0]);
- // verify sample conforms to schema
- buffer = metrics_obj.get_buffer();
- if (! metrics_obj.is_valid(buffer.c_str(), buffer.length(), response)){
- std::stringstream ss;
- ss << "Error ! Sample loaded for VES = " << buffer << " from file " << samples_file << " not valid. Reason = " << response;
- throw std::runtime_error(ss.str());
+ // load samples file and extract sample messages
+ buffer.erase();
+ load_file(samples_file, buffer);
+ if(_doc.Parse(buffer.c_str()).HasParseError()){
+ ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON in file " << samples_file << std::endl;
+ error_string = ss.str();
+ throw std::runtime_error(error_string);
+ }
+ mdclog_write(MDCLOG_DEBUG, "Loaded sample file %s\n", samples_file.c_str());
+
+ StringBuffer s_buffer;
+ Writer<StringBuffer> writer(s_buffer);
+
+ // extract notify sample message
+ ref = NULL;
+ schema_key = "/notify_policy_message";
+ ref = Pointer(schema_key.c_str()).Get(_doc);
+ if(! ref){
+ ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << std::endl;
+ error_string = ss.str();
+ throw std::runtime_error(error_string);
}
+ (*ref).Accept(writer);
+ if(notify_message_.Parse(s_buffer.GetString()).HasParseError()){
+ ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON snippet at key : " << schema_key << std::endl;
+ error_string = ss.str();
+ throw std::runtime_error(error_string);
+ }
+
+
+ mdclog_write(MDCLOG_DEBUG, "Loaded sample message for notification policy");
+
+ // extract metrics sample message
+ ref = NULL;
+ schema_key = "/metrics";
+ ref = Pointer(schema_key.c_str()).Get(_doc);
+ if(! ref){
+ ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << std::endl;
+ error_string = ss.str();
+ throw std::runtime_error(error_string);
+ }
+ s_buffer.Clear();
+ writer.Reset(s_buffer);
+ (*ref).Accept(writer);
+ if(metrics_message_.Parse(s_buffer.GetString()).HasParseError()){
+ ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON snippet at key : " << schema_key<< std::endl;
+ error_string = ss.str();
+ throw std::runtime_error(error_string);
+ }
+
- mdclog_write(MDCLOG_INFO, "Verified sample for metrics");
+ mdclog_write(MDCLOG_DEBUG, "Loaded sample message for metrics");
+ // set the keys we extract and update
setPolicyVars();
-
- //instantiate the core policy object
+ //instantiate the core policy plugin
for(unsigned int i = 0; i < num_instances; i++){
instantiate_protector_plugin(report_only);
}
};
+
+
+bool admission::load_schema(const std::string & schema_file, const std::string & schema_key, std::unique_ptr<SchemaDocument> & schema_ref, std::unique_ptr<SchemaValidator> &validator_ref){
+
+ std::string buffer;
+ Value *ref;
+ Document doc;
+ std::stringstream ss;
+ bool res;
+
+ // load policy schema file
+ res = load_file(schema_file, buffer);
+ if(!res){
+ return false;
+ }
+
+ if(doc.Parse(buffer.c_str()).HasParseError()){
+ ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON in file " << schema_file << std::endl;
+ error_string = ss.str();
+ return false;
+ }
+ mdclog_write(MDCLOG_DEBUG, "Loaded schema file %s\n", schema_file.c_str());
+
+ ref = NULL;
+ Pointer p(schema_key.c_str());
+ ref = p.Get(doc);
+ if(! ref){
+ ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << " in file " << schema_file << std::endl;
+ error_string = ss.str();
+ return false;
+ }
+
+ schema_ref = std::make_unique<SchemaDocument>((*ref));
+ validator_ref = std::make_unique<SchemaValidator>((*schema_ref));
+ mdclog_write(MDCLOG_DEBUG, "Loaded schema and validator for %s\n", schema_file.c_str());
+
+ return true;
+
+}
+
void admission::instantiate_protector_plugin(bool mode){
- _plugin_instances.emplace_back(bool(current_config["enforce"]), current_config["window_length"], current_config["trigger_threshold"], current_config["blocking_rate"], mode);
+ _plugin_instances.emplace_back(0, 60, 5000, 20, mode);
}
admission::~admission(void){
- prev_values.clear();
- curr_values.clear();
- policy_vars.clear();
- set_policy_response.clear();
- get_policy_response.clear();
- metric_responses.clear();
- policy_pointer.clear();
+ counters.clear();
+
+}
+
+bool admission::load_file(std::string input_file, std::string & contents ){
+
+ std::FILE *fp ;
+ try{
+ fp = std::fopen(input_file.c_str(), "rb");
+ }
+ catch(std::exception &e){
+ error_string = "Error opening input file " + input_file + " Reason = " + e.what();
+ return false;
+ }
+
+ if (fp){
+ std::fseek(fp, 0, SEEK_END);
+ contents.resize(std::ftell(fp));
+ std::rewind(fp);
+ std::fread(&contents[0], 1, contents.size(), fp);
+ std::fclose(fp);
+ }
+
+ else{
+ error_string = "Error opening input file " + input_file;
+ return false;
+ }
+
+ return true;
}
return std::string("admission control policy");
}
+// Function sets path for each key
+// in json object for set/get policy JSON
+// and metrics payload string. This way, we do not create
+// entire JSON from scratch, but rather just update the relevant portions
void admission::setPolicyVars(void){
-
- // keys in request to set policy
- policy_vars.emplace_back("enforce");
- policy_vars.emplace_back("window_length");
- policy_vars.emplace_back("blocking_rate");
- policy_vars.emplace_back("trigger_threshold");
-
+ // generic variables in the policy that are re-used
+ generic_policy_vars.insert(std::pair<std::string, Pointer>("policy_type_id", Pointer("/policy_type_id")));
+ generic_policy_vars.insert(std::pair<std::string, Pointer>("policy_instance_id", Pointer("/policy_instance_id")));
+ generic_policy_vars.insert(std::pair<std::string, Pointer>("status", Pointer("/status")));
+ generic_policy_vars.insert(std::pair<std::string, Pointer>("message", Pointer("/message")));
+ generic_policy_vars.insert(std::pair<std::string, Pointer>("operation", Pointer("/operation")));
+ generic_policy_vars.insert(std::pair<std::string, Pointer>("xapp_id", Pointer("/handler_id")));
- // keys in response to set policy
- set_policy_response.emplace_back("status");
- set_policy_response.emplace_back("message");
+ //variables in the sliding window policy
+ window_policy_vars.insert(std::pair<std::string, Pointer>("class", Pointer("/payload/class")));
+ window_policy_vars.insert(std::pair<std::string, Pointer>("enforce", Pointer("/payload/enforce")));
+ window_policy_vars.insert(std::pair<std::string, Pointer>("window_length", Pointer("/payload/window_length")));
+ window_policy_vars.insert(std::pair<std::string, Pointer>("trigger_threshold", Pointer("/payload/trigger_threshold")));
+ window_policy_vars.insert(std::pair<std::string, Pointer>("blocking_rate", Pointer("/payload/blocking_rate")));
-
- policy_pointer.push_back(&policy_vars[0]);
- policy_pointer.push_back(&policy_vars[1]);
- policy_pointer.push_back(&policy_vars[2]);
- policy_pointer.push_back(&policy_vars[3]);
-
-
- // keys in metric response
- metric_responses.emplace_back("event"); // 0
- metric_responses.emplace_back("commonEventHeader"); // 1
- metric_responses.emplace_back("measurementFields"); // 2
- metric_responses.emplace_back("additionalFields"); // 3
- metric_responses.emplace_back("SgNB Request Rate"); // 4
- metric_responses.emplace_back("SgNB Accept Rate"); // 5
- metric_responses.emplace_back("startEpochMicrosec"); // 6
- metric_responses.emplace_back("measurementInterval"); // 7
- metric_responses.emplace_back("lastEpochMicrosec");//8
- //metric_responses.emplace_back("TS"); //9
-
- metric_responses[0].add_child(&metric_responses[1]);
- metric_responses[0].add_child(&metric_responses[2]);
- metric_responses[1].add_child(&metric_responses[6]);
- metric_responses[1].add_child(&metric_responses[8]);
+ // variables in each metric message
+ metric_vars.insert(std::pair<std::string, Pointer>("class", Pointer("/event/measurementFields/additionalFields/Class Id")));
+ metric_vars.insert(std::pair<std::string, Pointer>("request_count", Pointer("/event/measurementFields/additionalFields/SgNB Request Count")));
+ metric_vars.insert(std::pair<std::string, Pointer>("accept_count", Pointer("/event/measurementFields/additionalFields/SgNB Accept Count")));
+ metric_vars.insert(std::pair<std::string, Pointer>("report_interval", Pointer("/event/measurementFields/measurementInterval")));
+ metric_vars.insert(std::pair<std::string, Pointer>("epoch", Pointer("/event/commonHeader/startEpochMicrosec")));
- metric_responses[2].add_child(&metric_responses[3]);
- metric_responses[2].add_child(&metric_responses[7]);
- metric_responses[3].add_child(&metric_responses[4]);
- metric_responses[3].add_child(&metric_responses[5]);
- //metric_responses[3].add_child(&metric_responses[9]);
-
- // default config map for the policy object
- current_config.insert(std::pair<std::string, int>("enforce", 1));
- current_config.insert(std::pair<std::string, int>("window_length", 60));
- current_config.insert(std::pair<std::string, int>("blocking_rate", 1));
- current_config.insert(std::pair<std::string, int>("trigger_threshold", 1));
+ // set xapp id in return message
+ generic_policy_vars["xapp_id"].Set(notify_message_, _xapp_id.c_str());
- prev_config = current_config;
-
+ // Set up the counters for metrics
auto ts = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now());
auto epoch = ts.time_since_epoch();
auto val = std::chrono::duration_cast<std::chrono::microseconds>(epoch);
prev_time_stamp = val.count();
- prev_values.push_back(0);
- prev_values.push_back(0);
- curr_values = prev_values;
+
}
+// returns the plugin core
protector * admission::get_protector_instance(unsigned int index){
if (index > _plugin_instances.size() -1){
- mdclog_write(MDCLOG_ERR, "%s, %d: Error . Requested index %u exceeds number of plugin instances %u", __FILE__, __LINE__, index, _plugin_instances.size());
+ mdclog_write(MDCLOG_ERR, "%s, %d: Error . Requested index %u exceeds number of plugin instances %lu", __FILE__, __LINE__, index, _plugin_instances.size());
return NULL;
}
else{
};
-
+// control plane function to set policy in plugin
+// create policy returns successful if no previous policy with same class id exists and values are valid
+// delete policy returns successful if policy table contains same policy instance id
+// update policy returns successful if policy table contains same policy instance id, and values are valid
bool admission::setPolicy(const char * message, int message_length, std::string & response){
- // Get the configuration
- std::vector<TrieNode*> available_keys;
- std::vector<TrieNode*> roots;
- bool res;
+ bool res = false;
+ std::string policy_instance_id;
+ std::string status_message, operation;
+ std::stringstream ss;
- std::vector<TrieNode*>resp_pointer;
- resp_pointer.push_back(&set_policy_response[0]);
- resp_pointer.push_back(&set_policy_response[1]);
+ // policy key variables
+ bool enforce = false;
+ int window_size = -1, trigger_threshold = -1, class_id = -1;
+ double blocking_rate = -1;
+
+ // reset the validator
+ (*downstream_validator_).Reset();
- std::string local_response;
- for(unsigned int i = 0; i < policy_vars.size(); i++){
- int res = set_policy_req_obj.get_values(message, message_length, local_response, &policy_vars[i], available_keys);
- if (res != 0 ){
- setError(local_response);
- set_policy_response[0].set_value("FAIL");
- set_policy_response[1].set_value(local_response);
- set_policy_resp_obj.set_values(response, resp_pointer);
-
- return false;
- }
+ // step 1: verify JSON
+ Document doc;
+ if(doc.Parse(message).HasParseError()){
+ status_message.assign("Invalid JSON");
}
- if ( available_keys.size() == 0){
- local_response = "No Keys were found";
- setError(local_response);
- set_policy_response[0].set_value("FAIL");
- set_policy_response[1].set_value(local_response);
-
- set_policy_resp_obj.set_values(response,resp_pointer);
- return false;
+ // Validate against our JSON input schema
+ else if (!doc.Accept((*downstream_validator_))){
+ StringBuffer sb;
+ (*downstream_validator_).GetInvalidSchemaPointer().StringifyUriFragment(sb);
+ std::string failed_message = std::string("\"message\": \"Schema Violation:") + std::string(sb.GetString());
+ failed_message += std::string(" Invalid keyword :") + std::string((*downstream_validator_).GetInvalidSchemaKeyword()) + " \"";
+ sb.Clear();
+ (*downstream_validator_).GetInvalidDocumentPointer().StringifyUriFragment(sb);
+ failed_message += std::string(" Invalid document :") + std::string(sb.GetString());
+ status_message.assign(failed_message);
+
}
-
+ else{
- // Get new config
- prev_config = current_config;
-
- for(std::vector<TrieNode *>::iterator it = available_keys.begin(); it != available_keys.end(); ++it){
- DataContainer const * id = (*it)->get_id();
- DataContainer const * val = (*it)->get_value();
- auto e = current_config.find(id->value.s.c_str());
- if (e != current_config.end()){
- e->second = val->value.i;
+ // step 2: extract the standard keys expected in all downstream
+ // messages : policy_instance
+ Value * ref;
+
+ for(auto const &e : generic_policy_vars){
+ ref = NULL;
+ ref = e.second.Get(doc);
+
+ // this key can be simultaneously put in notify
+ if (ref != NULL && e.first == "policy_type_id" ){
+ e.second.Set(notify_message_, ref->GetInt());
+ }
+
+ // this key can be simultaneously put in notify
+ else if (ref != NULL && e.first == "policy_instance_id" ){
+ e.second.Set(notify_message_, ref->GetString());
+ policy_instance_id = ref->GetString();
+ }
+ else if (ref != NULL && e.first == "operation"){
+ operation = ref->GetString();
+ }
}
- }
- // Note : xapp schema specifies window be in 'minutes'. Sliding window implementation maintains in seconds, hence multiply by 60
- current_config["window_length"] *= 60;
+ // do we have this policy ?
+ auto it = policy_table.find(policy_instance_id);
+ res = true;
- // Apply the config
- res= true;
- for(std::vector<protector>::iterator it_p = _plugin_instances.begin(); it_p != _plugin_instances.end(); ++it_p){
-
- res = (*it_p).configure( bool(current_config["enforce"]), current_config["window_length"], current_config["trigger_threshold"], current_config["blocking_rate"]);
- if (!res){
- mdclog_write(MDCLOG_ERR, "Error ::%s, %d :: Could not configure plugin\n", __FILE__, __LINE__);
- set_policy_response[0].set_value("FAIL");
- set_policy_response[1].set_value("Could not apply configuration");
- break;
+ // if operation is create and policy already present, simply return with OK ?
+ // we may get the same create policy multiple times due to race conditions when the
+ // xapp starts up
+ if(operation == "CREATE" && it != policy_table.end()){
+ res = true;
}
+ else{
+
+ if (operation == "DELETE" || operation == "UPDATE"){
+ // don't proceed if policy not found
+ if(it == policy_table.end()){
+ ss <<" No policy instance = " << policy_instance_id << " found. Cannot perform operation = " << operation << std::endl;
+ status_message = ss.str();
+ res = false;
+ }
+ else{
+ class_id = it->second; // used if delete
+ }
+ }
+
+ if (res){
+ // perform the operation
+ res = false;
+
+ if (operation == "DELETE"){
+ res = _plugin_instances[0].delete_policy(class_id);
+ if(res){
+ ss <<"Policy instance id " << policy_instance_id << " successfully deleted" << std::endl;
+ status_message = ss.str();
+ policy_table.erase(policy_instance_id);
+ }
+ else{
+ status_message = _plugin_instances[0].get_error();
+ }
+ }
+
+ else if (operation == "CREATE" or operation == "UPDATE"){
+ // initialize policy params to invalid
+ window_size = -1;
+ class_id = -1;
+ trigger_threshold = -1;
+ blocking_rate = -1;
+
+ // get values for policy keys
+ for(auto const& e: window_policy_vars){
+ ref = NULL;
+ ref = e.second.Get(doc);
+ if (ref == NULL){
+ continue;
+ }
+ if(e.first == "enforce"){
+ enforce = ref->GetBool();
+ }
+ else if (e.first == "window_length"){
+ window_size = ref->GetInt();
+ }
+ else if (e.first == "blocking_rate"){
+ blocking_rate = ref->GetDouble();
+ }
+ else if (e.first == "trigger_threshold"){
+ trigger_threshold = ref->GetInt();
+ }
+ else if (e.first == "class"){
+ class_id = ref->GetInt();
+ }
+ }
+
+
+ if(operation == "CREATE"){
+ res = _plugin_instances[0].add_policy(enforce, window_size, trigger_threshold, blocking_rate, class_id);
+ status_message.assign(_plugin_instances[0].get_error());
+
+ if(res == true){
+ // add to policy list
+ policy_table.insert(std::pair<std::string, int>(policy_instance_id, class_id));
+ }
+ }
+ else if (operation == "UPDATE"){
+ res = _plugin_instances[0].configure(enforce, window_size, trigger_threshold, blocking_rate, class_id);
+ status_message.assign(_plugin_instances[0].get_error());
+ }
+
+ }
+ }
+ }
+
+ if(res == true)
+ status_message.assign("SUCCESS");
}
+
- if (res){
- set_policy_response[0].set_value("SUCCESS");
- set_policy_response[1].set_value("configuration applied");
+ // generate response
+ //generic_policy_vars["message"].Set(notify_message_, status_message.c_str());
+
+ if(res == false){
+ generic_policy_vars["status"].Set(notify_message_, "ERROR");
+ }
+ else {
+ if(operation == "DELETE"){
+ generic_policy_vars["status"].Set(notify_message_, "DELETED");
+ }
+ else{
+ generic_policy_vars["status"].Set(notify_message_, "OK");
+ }
}
- set_policy_resp_obj.set_values(response,resp_pointer);
- return true;
+
+ StringBuffer s_buffer;
+ Writer<StringBuffer> writer(s_buffer);
+ notify_message_.Accept(writer);
+ response.assign(s_buffer.GetString(), s_buffer.GetLength());
+ mdclog_write(MDCLOG_DEBUG, "Set Policy Response = %s\n", response.c_str());
+ return res;
};
-
-bool admission::getPolicy(const char * message, int message_length, std::string & response){
- // Note : by same token, when returning sliding window length : translate to
- // minutes
- policy_vars[0].set_value(bool(current_config["enforce"]));
- policy_vars[1].set_value((int)( (double)current_config["window_length"]/60.0));
- policy_vars[2].set_value(current_config["blocking_rate"]);
- policy_vars[3].set_value(current_config["trigger_threshold"]);
-
- int res = get_policy_resp_obj.set_values(response, policy_pointer);
+// control plane function to retreive policy set in plugin
+// This is just a placeholder. Still TBD .....
+bool admission::getPolicy(const char * message, int message_length, std::string & response){
return true;
}
-int admission::getMetrics(std::string & response){
- std::vector<TrieNode *> metric_pointers;
- metric_pointers.push_back(&metric_responses[0]);
+// control plane function to retreive metrics from plugin
+// crafts into ves schema based JSON payload
+int admission::getMetrics(std::vector<std::string> & response_vector){
+
+ int res;
+ // the list of active policies on the protector plugin can
+ // dynamically change.
- auto ts = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now());
- auto epoch = ts.time_since_epoch();
- auto val_ms = std::chrono::duration_cast<std::chrono::microseconds>(epoch);
- auto val_s = std::chrono::duration_cast<std::chrono::seconds>(epoch);
-
- long int current_time_stamp = val_ms.count();
- long int current_time = val_s.count();
- long int interval = current_time_stamp - prev_time_stamp;
+ // run through active policies
+ for(auto const &e: policy_table){
+ int id = e.second;
+ std:: string response;
+ process_counters(id, response);
+ response_vector.emplace_back(response);
+ }
- curr_values[0] = _plugin_instances[0].get_requests();
- curr_values[1] = _plugin_instances[0].get_rejects();
+ // also account for default policy
+ int id = -1;
+ std::string response;
+ process_counters(id, response);
+ response_vector.emplace_back(response);
- //curr_values[0] = rand()%100 + prev_values[0];
- //curr_values[1] = rand()%20 + prev_values[1];
-
- //std::cout <<" Accept counter = " << curr_values[0]<< " Reject counter = " << curr_values[1] << " Request Count = " << (curr_values[0] - prev_values[0]) << " Reject count = " << curr_values[1] - prev_values[1] << std::endl;
-
- metric_responses[4].set_value(std::to_string(curr_values[0] - prev_values[0]));
- metric_responses[5].set_value(std::to_string((curr_values[0] - prev_values[0]) - (curr_values[1] - prev_values[1])));
- metric_responses[6].set_value(prev_time_stamp);
- metric_responses[8].set_value(current_time_stamp);
- metric_responses[7].set_value(interval);
- //metric_responses[9].set_value(current_time);
- prev_values = curr_values;
- prev_time_stamp = current_time_stamp;
- int res = metrics_obj.set_values(response, metric_pointers);
+ // clear out counters for expired policies
+ for (auto const &e : counters){
+
+ if (e.first != -1 && ! _plugin_instances[0].is_active(e.first)){
+ counters.erase(e.first);
+ }
+ }
- return res;
+ return 0;
+}
+
+
+void admission::process_counters(int id, std::string & response){
+
+
+ long request_count, accept_count, curr_timestamp, time_interval;
+ long int requests = _plugin_instances[0].get_requests(id);
+ long int rejects = _plugin_instances[0].get_rejects(id);
+
+
+ std::chrono::time_point<std::chrono::system_clock, std::chrono::microseconds> ts;
+ ts = std::chrono::time_point_cast<std::chrono::microseconds> (std::chrono::system_clock::now());
+ curr_timestamp =ts.time_since_epoch().count();
+
+ // do we have counters for this policy ?
+ auto prev_it = counters.find(id);
+ if(prev_it == counters.end()){
+
+ // new policy seeing for first time
+ request_count = requests;
+ accept_count = requests - rejects;
+ time_interval = 0;
+
+ // store
+ counters.insert(std::pair<int, std::vector<long int>>(id, std::vector<long int>({requests, rejects, curr_timestamp})));
+ }
+ else{
+ request_count = requests - prev_it->second[0];
+ accept_count = request_count - (rejects - prev_it->second[1]);
+ time_interval = ceil((curr_timestamp - prev_it->second[2])/1e+6); // seconds
+
+ // update history
+ prev_it->second[0] = requests;
+ prev_it->second[1] = rejects;
+ prev_it->second[2] = curr_timestamp;
+ }
+
+ // generate string and add to response vector
+ metric_vars["class"].Set(metrics_message_, std::to_string(id).c_str());
+ metric_vars["request_count"].Set(metrics_message_, std::to_string(request_count).c_str());
+ metric_vars["accept_count"].Set(metrics_message_, std::to_string(accept_count).c_str());
+ metric_vars["epoch"].Set(metrics_message_, std::to_string(curr_timestamp).c_str());
+ metric_vars["report_interval"].Set(metrics_message_, std::to_string(time_interval).c_str());
+
+ StringBuffer sb_buffer;
+ Writer<StringBuffer> writer(sb_buffer);
+ metrics_message_.Accept(writer);
+ response.assign(sb_buffer.GetString(), sb_buffer.GetLength());
+
+
}