X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fprotector-plugin%2Fadmission_policy.cc;fp=src%2Fprotector-plugin%2Fadmission_policy.cc;h=067f3853e665c219ac81e1f09742344f69e39f64;hb=4e545a8b013e60f2ff59254cb3fe435012d8fe5a;hp=6278197c2a28f0182e2d1ea840e833668291a315;hpb=82ba4b9999ca8e09461315a919b36a66641a6c7d;p=ric-app%2Fadmin.git diff --git a/src/protector-plugin/admission_policy.cc b/src/protector-plugin/admission_policy.cc index 6278197..067f385 100644 --- a/src/protector-plugin/admission_policy.cc +++ b/src/protector-plugin/admission_policy.cc @@ -19,127 +19,182 @@ #include "admission_policy.hpp" -admission::admission (std::string policy_schema_file, std::string samples_file, std::string metrics_schema_file, unsigned int num_instances, bool report_only){ - bool res; - - if (num_instances == 0){ - throw std::runtime_error("Error ! Number of instances of admission_policy protector pluging must be > 0"); - } - - std::string response; - std::string buffer; - std::string error_string; - - std::vector config_schema_path; - - config_schema_path.clear(); - - // path to node in schema to process policy request - config_schema_path.emplace_back("controls"); - config_schema_path.emplace_back((int)0); - config_schema_path.emplace_back("message_receives_payload_schema"); - config_schema_path[0].add_child(&config_schema_path[1]); - config_schema_path[1].add_child(&config_schema_path[2]); - - set_policy_req_obj.load_schema(policy_schema_file, &config_schema_path[0]); - // mdclog_write(MDCLOG_INFO, "Loaded schema for set Policy request"); - - //path to node in schema to process policy response - config_schema_path.clear(); - config_schema_path.emplace_back("controls"); - config_schema_path.emplace_back(0); - config_schema_path.emplace_back("message_sends_payload_schema"); - config_schema_path[0].add_child(&config_schema_path[1]); - config_schema_path[1].add_child(&config_schema_path[2]); - - set_policy_resp_obj.load_schema(policy_schema_file, &config_schema_path[0]); - mdclog_write(MDCLOG_INFO, "Loaded schema for set Policy response"); - - // load sample response - config_schema_path.clear(); - config_schema_path.emplace_back("message_sends_example"); - - set_policy_resp_obj.load_buffer(samples_file, &config_schema_path[0]); - - // verify that our sample conforms to the schema ... - buffer = set_policy_resp_obj.get_buffer(); - if (! set_policy_resp_obj.is_valid(buffer.c_str(), buffer.length(), response)){ - std::stringstream ss; - ss << "Error ! Sample loaded for SET policy response = " << buffer << " from file " << samples_file << " not valid. Reason = " << response; - throw std::runtime_error(ss.str()); - } - mdclog_write(MDCLOG_INFO, "Verified sample for set Policy response"); - - // path to node in schema to respond to get policy (current same as set policy) - config_schema_path.clear(); - config_schema_path.emplace_back("controls"); - config_schema_path.emplace_back(0); - config_schema_path.emplace_back("message_receives_payload_schema"); - config_schema_path[0].add_child(&config_schema_path[1]); - config_schema_path[1].add_child(&config_schema_path[2]); - - get_policy_resp_obj.load_schema(policy_schema_file, &config_schema_path[0]); - mdclog_write(MDCLOG_INFO, "Loaded schema for get Policy response"); - - // sample to respond to get policy - config_schema_path.clear(); - config_schema_path.emplace_back("message_receives_example"); +using namespace rapidjson; - get_policy_resp_obj.load_buffer(samples_file, &config_schema_path[0]); - - // verify that our sample conforms to schema - buffer = get_policy_resp_obj.get_buffer(); - if (! get_policy_resp_obj.is_valid(buffer.c_str(), buffer.length(), response)){ +// constructor loads the policy schema, metric schema, and example payload JSON files +// instantiates the required number of protector plugins +admission::admission (std::string policy_schema_file, std::string samples_file, std::string metrics_schema_file, unsigned int num_instances, std::string xapp_id, bool report_only){ + if (num_instances < 1 || num_instances > MAX_INSTANCES){ std::stringstream ss; - ss << "Error ! Sample loaded for GET policy response = " << buffer << " from file " << samples_file << " not valid. Reason = " << response; - throw std::runtime_error(ss.str()); + ss << "Error !" << __FILE__ << ", " << __LINE__ << " Number of instances must be between " << 1 << " and " << MAX_INSTANCES << ". Specified = " << num_instances << std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); + } + _xapp_id.assign(xapp_id); + std::string response; + std::string buffer; + std::stringstream ss; + Document _doc; + Value * ref; + std::string schema_key; + + // Load schema files and extract relevant validators + // schema for downstream message + bool res = load_schema(policy_schema_file, "/downstream_schema", downstream_schema_ref_, downstream_validator_); + if(res == false){ + throw std::runtime_error(error_string); + } + // schema for notification message + res = load_schema(policy_schema_file, "/notify_schema", notify_schema_ref_, notify_validator_); + if(res == false){ + throw std::runtime_error(error_string); + } + // schema for metrics message + res = load_schema(metrics_schema_file, "", metrics_schema_ref_, metrics_validator_); + if(res == false){ + throw std::runtime_error(error_string); } - - mdclog_write(MDCLOG_INFO, "Verified sample for get Policy response"); - - // schema & sample for metrics - metrics_obj.load_schema(metrics_schema_file); - mdclog_write(MDCLOG_INFO, "Loaded schema for ves metrics"); - - config_schema_path.clear(); - config_schema_path.emplace_back("metrics"); - metrics_obj.load_buffer(samples_file, &config_schema_path[0]); - // verify sample conforms to schema - buffer = metrics_obj.get_buffer(); - if (! metrics_obj.is_valid(buffer.c_str(), buffer.length(), response)){ - std::stringstream ss; - ss << "Error ! Sample loaded for VES = " << buffer << " from file " << samples_file << " not valid. Reason = " << response; - throw std::runtime_error(ss.str()); + // load samples file and extract sample messages + buffer.erase(); + load_file(samples_file, buffer); + if(_doc.Parse(buffer.c_str()).HasParseError()){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON in file " << samples_file << std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); + } + mdclog_write(MDCLOG_DEBUG, "Loaded sample file %s\n", samples_file.c_str()); + + StringBuffer s_buffer; + Writer writer(s_buffer); + + // extract notify sample message + ref = NULL; + schema_key = "/notify_policy_message"; + ref = Pointer(schema_key.c_str()).Get(_doc); + if(! ref){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); } + (*ref).Accept(writer); + if(notify_message_.Parse(s_buffer.GetString()).HasParseError()){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON snippet at key : " << schema_key << std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); + } + + + mdclog_write(MDCLOG_DEBUG, "Loaded sample message for notification policy"); + + // extract metrics sample message + ref = NULL; + schema_key = "/metrics"; + ref = Pointer(schema_key.c_str()).Get(_doc); + if(! ref){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); + } + s_buffer.Clear(); + writer.Reset(s_buffer); + (*ref).Accept(writer); + if(metrics_message_.Parse(s_buffer.GetString()).HasParseError()){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON snippet at key : " << schema_key<< std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); + } + - mdclog_write(MDCLOG_INFO, "Verified sample for metrics"); + mdclog_write(MDCLOG_DEBUG, "Loaded sample message for metrics"); + // set the keys we extract and update setPolicyVars(); - - //instantiate the core policy object + //instantiate the core policy plugin for(unsigned int i = 0; i < num_instances; i++){ instantiate_protector_plugin(report_only); } }; + + +bool admission::load_schema(const std::string & schema_file, const std::string & schema_key, std::unique_ptr & schema_ref, std::unique_ptr &validator_ref){ + + std::string buffer; + Value *ref; + Document doc; + std::stringstream ss; + bool res; + + // load policy schema file + res = load_file(schema_file, buffer); + if(!res){ + return false; + } + + if(doc.Parse(buffer.c_str()).HasParseError()){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON in file " << schema_file << std::endl; + error_string = ss.str(); + return false; + } + mdclog_write(MDCLOG_DEBUG, "Loaded schema file %s\n", schema_file.c_str()); + + ref = NULL; + Pointer p(schema_key.c_str()); + ref = p.Get(doc); + if(! ref){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << " in file " << schema_file << std::endl; + error_string = ss.str(); + return false; + } + + schema_ref = std::make_unique((*ref)); + validator_ref = std::make_unique((*schema_ref)); + mdclog_write(MDCLOG_DEBUG, "Loaded schema and validator for %s\n", schema_file.c_str()); + + return true; + +} + void admission::instantiate_protector_plugin(bool mode){ - _plugin_instances.emplace_back(bool(current_config["enforce"]), current_config["window_length"], current_config["trigger_threshold"], current_config["blocking_rate"], mode); + _plugin_instances.emplace_back(0, 60, 5000, 20, mode); } admission::~admission(void){ - prev_values.clear(); - curr_values.clear(); - policy_vars.clear(); - set_policy_response.clear(); - get_policy_response.clear(); - metric_responses.clear(); - policy_pointer.clear(); + counters.clear(); + +} + +bool admission::load_file(std::string input_file, std::string & contents ){ + + std::FILE *fp ; + try{ + fp = std::fopen(input_file.c_str(), "rb"); + } + catch(std::exception &e){ + error_string = "Error opening input file " + input_file + " Reason = " + e.what(); + return false; + } + + if (fp){ + std::fseek(fp, 0, SEEK_END); + contents.resize(std::ftell(fp)); + std::rewind(fp); + std::fread(&contents[0], 1, contents.size(), fp); + std::fclose(fp); + } + + else{ + error_string = "Error opening input file " + input_file; + return false; + } + + return true; } @@ -147,72 +202,51 @@ std::string admission::getName(void){ return std::string("admission control policy"); } +// Function sets path for each key +// in json object for set/get policy JSON +// and metrics payload string. This way, we do not create +// entire JSON from scratch, but rather just update the relevant portions void admission::setPolicyVars(void){ - - // keys in request to set policy - policy_vars.emplace_back("enforce"); - policy_vars.emplace_back("window_length"); - policy_vars.emplace_back("blocking_rate"); - policy_vars.emplace_back("trigger_threshold"); - + // generic variables in the policy that are re-used + generic_policy_vars.insert(std::pair("policy_type_id", Pointer("/policy_type_id"))); + generic_policy_vars.insert(std::pair("policy_instance_id", Pointer("/policy_instance_id"))); + generic_policy_vars.insert(std::pair("status", Pointer("/status"))); + generic_policy_vars.insert(std::pair("message", Pointer("/message"))); + generic_policy_vars.insert(std::pair("operation", Pointer("/operation"))); + generic_policy_vars.insert(std::pair("xapp_id", Pointer("/handler_id"))); - // keys in response to set policy - set_policy_response.emplace_back("status"); - set_policy_response.emplace_back("message"); + //variables in the sliding window policy + window_policy_vars.insert(std::pair("class", Pointer("/payload/class"))); + window_policy_vars.insert(std::pair("enforce", Pointer("/payload/enforce"))); + window_policy_vars.insert(std::pair("window_length", Pointer("/payload/window_length"))); + window_policy_vars.insert(std::pair("trigger_threshold", Pointer("/payload/trigger_threshold"))); + window_policy_vars.insert(std::pair("blocking_rate", Pointer("/payload/blocking_rate"))); - - policy_pointer.push_back(&policy_vars[0]); - policy_pointer.push_back(&policy_vars[1]); - policy_pointer.push_back(&policy_vars[2]); - policy_pointer.push_back(&policy_vars[3]); - - - // keys in metric response - metric_responses.emplace_back("event"); // 0 - metric_responses.emplace_back("commonEventHeader"); // 1 - metric_responses.emplace_back("measurementFields"); // 2 - metric_responses.emplace_back("additionalFields"); // 3 - metric_responses.emplace_back("SgNB Request Rate"); // 4 - metric_responses.emplace_back("SgNB Accept Rate"); // 5 - metric_responses.emplace_back("startEpochMicrosec"); // 6 - metric_responses.emplace_back("measurementInterval"); // 7 - metric_responses.emplace_back("lastEpochMicrosec");//8 - //metric_responses.emplace_back("TS"); //9 - - metric_responses[0].add_child(&metric_responses[1]); - metric_responses[0].add_child(&metric_responses[2]); - metric_responses[1].add_child(&metric_responses[6]); - metric_responses[1].add_child(&metric_responses[8]); + // variables in each metric message + metric_vars.insert(std::pair("class", Pointer("/event/measurementFields/additionalFields/Class Id"))); + metric_vars.insert(std::pair("request_count", Pointer("/event/measurementFields/additionalFields/SgNB Request Count"))); + metric_vars.insert(std::pair("accept_count", Pointer("/event/measurementFields/additionalFields/SgNB Accept Count"))); + metric_vars.insert(std::pair("report_interval", Pointer("/event/measurementFields/measurementInterval"))); + metric_vars.insert(std::pair("epoch", Pointer("/event/commonHeader/startEpochMicrosec"))); - metric_responses[2].add_child(&metric_responses[3]); - metric_responses[2].add_child(&metric_responses[7]); - metric_responses[3].add_child(&metric_responses[4]); - metric_responses[3].add_child(&metric_responses[5]); - //metric_responses[3].add_child(&metric_responses[9]); - - // default config map for the policy object - current_config.insert(std::pair("enforce", 1)); - current_config.insert(std::pair("window_length", 60)); - current_config.insert(std::pair("blocking_rate", 1)); - current_config.insert(std::pair("trigger_threshold", 1)); + // set xapp id in return message + generic_policy_vars["xapp_id"].Set(notify_message_, _xapp_id.c_str()); - prev_config = current_config; - + // Set up the counters for metrics auto ts = std::chrono::time_point_cast(std::chrono::system_clock::now()); auto epoch = ts.time_since_epoch(); auto val = std::chrono::duration_cast(epoch); prev_time_stamp = val.count(); - prev_values.push_back(0); - prev_values.push_back(0); - curr_values = prev_values; + } +// returns the plugin core protector * admission::get_protector_instance(unsigned int index){ if (index > _plugin_instances.size() -1){ - mdclog_write(MDCLOG_ERR, "%s, %d: Error . Requested index %u exceeds number of plugin instances %u", __FILE__, __LINE__, index, _plugin_instances.size()); + mdclog_write(MDCLOG_ERR, "%s, %d: Error . Requested index %u exceeds number of plugin instances %lu", __FILE__, __LINE__, index, _plugin_instances.size()); return NULL; } else{ @@ -221,129 +255,280 @@ protector * admission::get_protector_instance(unsigned int index){ }; - +// control plane function to set policy in plugin +// create policy returns successful if no previous policy with same class id exists and values are valid +// delete policy returns successful if policy table contains same policy instance id +// update policy returns successful if policy table contains same policy instance id, and values are valid bool admission::setPolicy(const char * message, int message_length, std::string & response){ - // Get the configuration - std::vector available_keys; - std::vector roots; - bool res; + bool res = false; + std::string policy_instance_id; + std::string status_message, operation; + std::stringstream ss; - std::vectorresp_pointer; - resp_pointer.push_back(&set_policy_response[0]); - resp_pointer.push_back(&set_policy_response[1]); + // policy key variables + bool enforce = false; + int window_size = -1, trigger_threshold = -1, class_id = -1; + double blocking_rate = -1; + + // reset the validator + (*downstream_validator_).Reset(); - std::string local_response; - for(unsigned int i = 0; i < policy_vars.size(); i++){ - int res = set_policy_req_obj.get_values(message, message_length, local_response, &policy_vars[i], available_keys); - if (res != 0 ){ - setError(local_response); - set_policy_response[0].set_value("FAIL"); - set_policy_response[1].set_value(local_response); - set_policy_resp_obj.set_values(response, resp_pointer); - - return false; - } + // step 1: verify JSON + Document doc; + if(doc.Parse(message).HasParseError()){ + status_message.assign("Invalid JSON"); } - if ( available_keys.size() == 0){ - local_response = "No Keys were found"; - setError(local_response); - set_policy_response[0].set_value("FAIL"); - set_policy_response[1].set_value(local_response); - - set_policy_resp_obj.set_values(response,resp_pointer); - return false; + // Validate against our JSON input schema + else if (!doc.Accept((*downstream_validator_))){ + StringBuffer sb; + (*downstream_validator_).GetInvalidSchemaPointer().StringifyUriFragment(sb); + std::string failed_message = std::string("\"message\": \"Schema Violation:") + std::string(sb.GetString()); + failed_message += std::string(" Invalid keyword :") + std::string((*downstream_validator_).GetInvalidSchemaKeyword()) + " \""; + sb.Clear(); + (*downstream_validator_).GetInvalidDocumentPointer().StringifyUriFragment(sb); + failed_message += std::string(" Invalid document :") + std::string(sb.GetString()); + status_message.assign(failed_message); + } - + else{ - // Get new config - prev_config = current_config; - - for(std::vector::iterator it = available_keys.begin(); it != available_keys.end(); ++it){ - DataContainer const * id = (*it)->get_id(); - DataContainer const * val = (*it)->get_value(); - auto e = current_config.find(id->value.s.c_str()); - if (e != current_config.end()){ - e->second = val->value.i; + // step 2: extract the standard keys expected in all downstream + // messages : policy_instance + Value * ref; + + for(auto const &e : generic_policy_vars){ + ref = NULL; + ref = e.second.Get(doc); + + // this key can be simultaneously put in notify + if (ref != NULL && e.first == "policy_type_id" ){ + e.second.Set(notify_message_, ref->GetInt()); + } + + // this key can be simultaneously put in notify + else if (ref != NULL && e.first == "policy_instance_id" ){ + e.second.Set(notify_message_, ref->GetString()); + policy_instance_id = ref->GetString(); + } + else if (ref != NULL && e.first == "operation"){ + operation = ref->GetString(); + } } - } - // Note : xapp schema specifies window be in 'minutes'. Sliding window implementation maintains in seconds, hence multiply by 60 - current_config["window_length"] *= 60; + // do we have this policy ? + auto it = policy_table.find(policy_instance_id); + res = true; - // Apply the config - res= true; - for(std::vector::iterator it_p = _plugin_instances.begin(); it_p != _plugin_instances.end(); ++it_p){ - - res = (*it_p).configure( bool(current_config["enforce"]), current_config["window_length"], current_config["trigger_threshold"], current_config["blocking_rate"]); - if (!res){ - mdclog_write(MDCLOG_ERR, "Error ::%s, %d :: Could not configure plugin\n", __FILE__, __LINE__); - set_policy_response[0].set_value("FAIL"); - set_policy_response[1].set_value("Could not apply configuration"); - break; + // if operation is create and policy already present, simply return with OK ? + // we may get the same create policy multiple times due to race conditions when the + // xapp starts up + if(operation == "CREATE" && it != policy_table.end()){ + res = true; } + else{ + + if (operation == "DELETE" || operation == "UPDATE"){ + // don't proceed if policy not found + if(it == policy_table.end()){ + ss <<" No policy instance = " << policy_instance_id << " found. Cannot perform operation = " << operation << std::endl; + status_message = ss.str(); + res = false; + } + else{ + class_id = it->second; // used if delete + } + } + + if (res){ + // perform the operation + res = false; + + if (operation == "DELETE"){ + res = _plugin_instances[0].delete_policy(class_id); + if(res){ + ss <<"Policy instance id " << policy_instance_id << " successfully deleted" << std::endl; + status_message = ss.str(); + policy_table.erase(policy_instance_id); + } + else{ + status_message = _plugin_instances[0].get_error(); + } + } + + else if (operation == "CREATE" or operation == "UPDATE"){ + // initialize policy params to invalid + window_size = -1; + class_id = -1; + trigger_threshold = -1; + blocking_rate = -1; + + // get values for policy keys + for(auto const& e: window_policy_vars){ + ref = NULL; + ref = e.second.Get(doc); + if (ref == NULL){ + continue; + } + if(e.first == "enforce"){ + enforce = ref->GetBool(); + } + else if (e.first == "window_length"){ + window_size = ref->GetInt(); + } + else if (e.first == "blocking_rate"){ + blocking_rate = ref->GetDouble(); + } + else if (e.first == "trigger_threshold"){ + trigger_threshold = ref->GetInt(); + } + else if (e.first == "class"){ + class_id = ref->GetInt(); + } + } + + + if(operation == "CREATE"){ + res = _plugin_instances[0].add_policy(enforce, window_size, trigger_threshold, blocking_rate, class_id); + status_message.assign(_plugin_instances[0].get_error()); + + if(res == true){ + // add to policy list + policy_table.insert(std::pair(policy_instance_id, class_id)); + } + } + else if (operation == "UPDATE"){ + res = _plugin_instances[0].configure(enforce, window_size, trigger_threshold, blocking_rate, class_id); + status_message.assign(_plugin_instances[0].get_error()); + } + + } + } + } + + if(res == true) + status_message.assign("SUCCESS"); } + - if (res){ - set_policy_response[0].set_value("SUCCESS"); - set_policy_response[1].set_value("configuration applied"); + // generate response + //generic_policy_vars["message"].Set(notify_message_, status_message.c_str()); + + if(res == false){ + generic_policy_vars["status"].Set(notify_message_, "ERROR"); + } + else { + if(operation == "DELETE"){ + generic_policy_vars["status"].Set(notify_message_, "DELETED"); + } + else{ + generic_policy_vars["status"].Set(notify_message_, "OK"); + } } - set_policy_resp_obj.set_values(response,resp_pointer); - return true; + + StringBuffer s_buffer; + Writer writer(s_buffer); + notify_message_.Accept(writer); + response.assign(s_buffer.GetString(), s_buffer.GetLength()); + mdclog_write(MDCLOG_DEBUG, "Set Policy Response = %s\n", response.c_str()); + return res; }; - -bool admission::getPolicy(const char * message, int message_length, std::string & response){ - // Note : by same token, when returning sliding window length : translate to - // minutes - policy_vars[0].set_value(bool(current_config["enforce"])); - policy_vars[1].set_value((int)( (double)current_config["window_length"]/60.0)); - policy_vars[2].set_value(current_config["blocking_rate"]); - policy_vars[3].set_value(current_config["trigger_threshold"]); - - int res = get_policy_resp_obj.set_values(response, policy_pointer); +// control plane function to retreive policy set in plugin +// This is just a placeholder. Still TBD ..... +bool admission::getPolicy(const char * message, int message_length, std::string & response){ return true; } -int admission::getMetrics(std::string & response){ - std::vector metric_pointers; - metric_pointers.push_back(&metric_responses[0]); +// control plane function to retreive metrics from plugin +// crafts into ves schema based JSON payload +int admission::getMetrics(std::vector & response_vector){ + + int res; + // the list of active policies on the protector plugin can + // dynamically change. - auto ts = std::chrono::time_point_cast(std::chrono::system_clock::now()); - auto epoch = ts.time_since_epoch(); - auto val_ms = std::chrono::duration_cast(epoch); - auto val_s = std::chrono::duration_cast(epoch); - - long int current_time_stamp = val_ms.count(); - long int current_time = val_s.count(); - long int interval = current_time_stamp - prev_time_stamp; + // run through active policies + for(auto const &e: policy_table){ + int id = e.second; + std:: string response; + process_counters(id, response); + response_vector.emplace_back(response); + } - curr_values[0] = _plugin_instances[0].get_requests(); - curr_values[1] = _plugin_instances[0].get_rejects(); + // also account for default policy + int id = -1; + std::string response; + process_counters(id, response); + response_vector.emplace_back(response); - //curr_values[0] = rand()%100 + prev_values[0]; - //curr_values[1] = rand()%20 + prev_values[1]; - - //std::cout <<" Accept counter = " << curr_values[0]<< " Reject counter = " << curr_values[1] << " Request Count = " << (curr_values[0] - prev_values[0]) << " Reject count = " << curr_values[1] - prev_values[1] << std::endl; - - metric_responses[4].set_value(std::to_string(curr_values[0] - prev_values[0])); - metric_responses[5].set_value(std::to_string((curr_values[0] - prev_values[0]) - (curr_values[1] - prev_values[1]))); - metric_responses[6].set_value(prev_time_stamp); - metric_responses[8].set_value(current_time_stamp); - metric_responses[7].set_value(interval); - //metric_responses[9].set_value(current_time); - prev_values = curr_values; - prev_time_stamp = current_time_stamp; - int res = metrics_obj.set_values(response, metric_pointers); + // clear out counters for expired policies + for (auto const &e : counters){ + + if (e.first != -1 && ! _plugin_instances[0].is_active(e.first)){ + counters.erase(e.first); + } + } - return res; + return 0; +} + + +void admission::process_counters(int id, std::string & response){ + + + long request_count, accept_count, curr_timestamp, time_interval; + long int requests = _plugin_instances[0].get_requests(id); + long int rejects = _plugin_instances[0].get_rejects(id); + + + std::chrono::time_point ts; + ts = std::chrono::time_point_cast (std::chrono::system_clock::now()); + curr_timestamp =ts.time_since_epoch().count(); + + // do we have counters for this policy ? + auto prev_it = counters.find(id); + if(prev_it == counters.end()){ + + // new policy seeing for first time + request_count = requests; + accept_count = requests - rejects; + time_interval = 0; + + // store + counters.insert(std::pair>(id, std::vector({requests, rejects, curr_timestamp}))); + } + else{ + request_count = requests - prev_it->second[0]; + accept_count = request_count - (rejects - prev_it->second[1]); + time_interval = ceil((curr_timestamp - prev_it->second[2])/1e+6); // seconds + + // update history + prev_it->second[0] = requests; + prev_it->second[1] = rejects; + prev_it->second[2] = curr_timestamp; + } + + // generate string and add to response vector + metric_vars["class"].Set(metrics_message_, std::to_string(id).c_str()); + metric_vars["request_count"].Set(metrics_message_, std::to_string(request_count).c_str()); + metric_vars["accept_count"].Set(metrics_message_, std::to_string(accept_count).c_str()); + metric_vars["epoch"].Set(metrics_message_, std::to_string(curr_timestamp).c_str()); + metric_vars["report_interval"].Set(metrics_message_, std::to_string(time_interval).c_str()); + + StringBuffer sb_buffer; + Writer writer(sb_buffer); + metrics_message_.Accept(writer); + response.assign(sb_buffer.GetString(), sb_buffer.GetLength()); + + }