#include "adm-ctrl-xapp.hpp"
-bool report_mode_only = true;
-static int RunProg = 1; // keep loop running
+int run_program = 1;
+
+
// list of plugins
-typedef std::vector<std::unique_ptr<Policy> > plugin_list;
plugin_list Plugins;
std::map<int, Policy *> plugin_rmr_map;
+// Policy handler : All plugins (of abstract class Policy) are registered with the plugin list.
+// The policy handler is registered as a call back to the RMR message handler. When a policy related RMR message
+// is received, this function is invoked. It finds the appropriate plugin from the plugin list, passes the policy message and then
+// returns back the response (in the response string)
-bool add_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
- unsigned char node_buffer[32];
- std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
- node_buffer[gNodeB.length()] = '\0';
- bool res = sub_handler.RequestSubscription(he, he_resp, RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
- return res;
-};
-
-
-bool delete_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
- unsigned char node_buffer[32];
- std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
- node_buffer[gNodeB.length()] = '\0';
-
- bool res = sub_handler.RequestSubscriptionDelete(he, he_resp, RIC_SUB_DEL_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
- return res;
-};
-
+// NOTE : This version of policy handler was written with R1 policy protocol in mind.
+// Specifically, it was assumed that each message type corresponds to either set/get for a specific xAPP.
+// It still works with R2, but ideally should be modified since a single policy type could be applied to multiple plugins.
void policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
- auto it = plugin_rmr_map.find(message_type);
+
+ // Extract policy id type : for now we assume exactly 1 plugin maps to each policy type
+ rapidjson::Pointer policy_type_ref("/policy_type_id");
+ rapidjson::Document doc;
+ if (doc.Parse(message).HasParseError()){
+ mdclog_write(MDCLOG_ERR, "Error: %s, %d :: Could not decode A1 JSON message %s\n", __FILE__, __LINE__, message);
+ return;
+ }
+ rapidjson::Value * ref = policy_type_ref.Get(doc);
+ if (ref == NULL){
+ mdclog_write(MDCLOG_ERR, "Error : %s, %d:: Could not extract policy type id from %s\n", __FILE__, __LINE__, message);
+ return;
+ }
+ int policy_type_id = ref->GetInt();
+
+ auto it = plugin_rmr_map.find(policy_type_id);
bool res;
if (it != plugin_rmr_map.end()){
if (set){
}
}
else{
- response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}";
+ response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with policy type id = " + std::to_string(policy_type_id) + "\"}";
+ mdclog_write(MDCLOG_ERR, "Error ! %s, %d : %s\n", __FILE__, __LINE__, response.c_str());
}
};
+
+// polling function that routinely queries all plugins for metrics and then posts them on
+// VES url
void metrics_collector(std::string ves_url, plugin_list * plugins, unsigned int interval){
// Instantiate the ves collector
curl_interface curl_obj(ves_url);
- std::string metrics_response;
+ std::vector<std::string> response_vector;
int res;
- while(RunProg){
+ while(run_program){
for(unsigned int i = 0; i < plugins->size(); i++){
- res = (*plugins)[i].get()->getMetrics(metrics_response);
+ response_vector.clear();
+ res = (*plugins)[i].get()->getMetrics(response_vector);
if (res != 0){
- mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), metrics_response.c_str());
+ mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), (*plugins)[i].get()->get_error().c_str());
}
else{
- res = curl_obj.post_metrics(metrics_response);
- if (!res){
- mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
- }
- else{
- mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", metrics_response.c_str());
+ // send each response
+ for(auto &e: response_vector){
+ res = curl_obj.post_metrics(e);
+ if (!res){
+ mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
+ }
+ else{
+ mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", e.c_str());
+ }
}
}
}
void EndProgram(int signum){
std::cout <<"Signal received. Stopping program ....." << std::endl;
- RunProg = 0;
+ run_program = 0;
+
}
-
+// ideally should be expanded for rollback purposes etc.
void msg_error(rmr_mbuf_t *message){
mdclog_write(MDCLOG_ERR, "Error: %s, %d Could not send RMR message of length %d and type %d, Reason %s", __FILE__, __LINE__, message->len, message->mtype, strerror(errno) );
};
mdclog_write(MDCLOG_WARN, "WARNING : gNodeB not set for subscription. Subscription MAY FAIL");
}
+ std::string operating_mode;
+
+ // How are we operating ?
if (my_config.operating_mode == "CONTROL"){
- report_mode_only = false;
+ // Full closed loop : process E2AP indication,
+ // E2SM, X2AP and generate response
+ my_config.report_mode_only = false;
+ operating_mode = "CLOSED LOOP CONTROL";
}
- else{
- report_mode_only = true;
+ else if (my_config.operating_mode == "E2AP_PROC_ONLY"){
+ // Debugging : processing only E2AP indication
+ my_config.processing_level = ProcessingLevelTypes::E2AP_PROC_ONLY;
+ operating_mode = "E2AP PROCESSING ONLY";
}
-
- // Finished passing command line/environment arguments
- //=============================================================
-
- // instantiate xapp object
- if(my_config.num_threads >= 1){
- mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
- // Create XaPP that starts with specified number of threads
- my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024, my_config.num_threads);
+ else if (my_config.operating_mode == "E2SM_PROC_ONLY"){
+ // Debugging : processing only till E2SM indication header
+ my_config.processing_level = ProcessingLevelTypes::E2SM_PROC_ONLY;
+ operating_mode = "E2SM PROCESSING ONLY";
}
else{
- mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
- //Let XaPP pick threads based on hardware
- my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
+ // Passive : processing till X2AP but do not generate response
+ my_config.report_mode_only = true;
+ operating_mode = "REPORTING ONLY";
}
-
+ mdclog_write(MDCLOG_DEBUG, "Operating mode of Admission Control xAPP is %s\n", operating_mode.c_str());
+ // Finished passing command line/environment arguments
+ //=============================================================
+
+ // instantiate xapp-rmr-framework object
+ mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
+ my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
- // Instantiate admission logic handler
- Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, 1));
- // Add reference to plugin list . We add twice (once for set policy and once for get policy ids)
- // Plugin list is used by policy handler and metrics collector
- plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_INT_CONTROL, Plugins[0].get()));
- plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_GET_POLICY, Plugins[0].get()));
+ // Instantiate admission logic handler (with only one instance for now)
+ int num_instances = 1;
+ Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, num_instances, my_config.xapp_id, my_config.report_mode_only));
+
+ // Add reference to plugin list .
+ // Plugin list is used by policy handler and metrics collector
+ plugin_rmr_map.insert(std::pair<int, Policy *>(RATE_CONTROL_POLICY_ID, Plugins[0].get()));
- // instantiate curl object for ves
+ // instantiate ves thread (it polls all plugins and sends out their metrics)
std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
-
// Instantiate subscription handler
- SubscriptionHandler sub_handler;
+ subscription_handler sub_handler;
// Instantiate message handlers for RMR
// (one for each thread) and registrer
std::vector<std::unique_ptr<message_processor> > message_procs;
for(int i = 0; i < my_config.num_threads; i++){
- std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> ();
+ std::unique_ptr<message_processor> mp_handler = std::make_unique<message_processor> (my_config.processing_level, my_config.report_mode_only);
mp_handler.get()->register_subscription_handler(& sub_handler);
mp_handler.get()->register_protector(dynamic_cast<admission *>(Plugins[0].get())->get_protector_instance(0));
mp_handler.get()->register_policy_handler (& policy_handler);
}
- // Start the listening loops
+ // Start the RMR listening loops
std::vector<int> thread_ids(my_config.num_threads);
unsigned int i = 0;
for(auto &e: message_procs){
mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
- //======================================================
- // sgnb Subscription spec
-
- int request_id = 2; // will be over-written by subscription handler
- int req_seq = 1;
- int function_id = 0;
- int action_id = 4;
- int action_type = report_mode_only ? 0:1;
-
- int message_type = 1;
- int procedure_code = 27;
- std::string egnb_id = "Testgnb";
- std::string plmn_id = "Testplmn";
-
- unsigned char event_buf[128];
- size_t event_buf_len = 128;
- bool res;
-
+ //Register signal handler to stop
+ signal(SIGINT, EndProgram);
+ signal(SIGTERM, EndProgram);
- e2sm_event_trigger_helper trigger_data;
- e2sm_event_trigger event_trigger;
-
- trigger_data.egNB_id = egnb_id;
- trigger_data.plmn_id = plmn_id;
- trigger_data.egNB_id_type = 2;
- trigger_data.interface_direction = 1;
- trigger_data.procedure_code = procedure_code;
- trigger_data.message_type = message_type;
- res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
- if (!res){
- mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());
- exit(0);
- }
-
+ // Instantiate startup/shutown subroutine objects
+ init boot_shutdown((*my_xapp), sub_handler, my_config);
- subscription_helper sgnb_add_subscr_req;
- subscription_response_helper subscr_response;
-
- sgnb_add_subscr_req.clear();
- sgnb_add_subscr_req.set_request(request_id, req_seq);
- sgnb_add_subscr_req.set_function_id(function_id);
- sgnb_add_subscr_req.add_action(action_id, action_type);
-
-
- sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);
- mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);
-
- //======================================================
- // Purely for testing purposes ... write subscription ASN binary to file
- // FILE *pfile;
- // pfile = fopen("event_trigger.pr", "wb");
- // fwrite(event_buf, 1, event_buf_len, pfile);
- // fclose(pfile);
- //======================================================
-
-
- // keep sending subscription request till successfull for all gnodebs ?
- auto it = my_config.gNodeB_list.begin();
- while(my_config.gNodeB_list.size() > 0 && RunProg){
- int attempt = 0;
- res = false;
-
- while(!res){
- mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);
- res = add_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, *it);
- if (!res){
- sleep(5);
- };
- attempt ++;
- if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){
- break;
- }
- }
-
- if(res){
- mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());
- // remove node from list,
- // move to next gnobde
- it = my_config.gNodeB_list.erase(it);
- }
-
- if (it == my_config.gNodeB_list.end()){
- it = my_config.gNodeB_list.begin();
- }
-
- }
-
-
- std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;
- //Register signal handler to stop
- signal(SIGINT, EndProgram);
- signal(SIGTERM, EndProgram);
+ // Trigger start functions
+ boot_shutdown.startup();
-
- // Purely for testing purposes ....
- // If in test mode, we wait an interval and then send delete subscription request for each gNodeB
- if(my_config.test_mode){
- std::cout <<"====================== " << std::endl;
- std::cout <<"WE ARE IN TEST MODE. " << std::endl;
- std::cout <<"====================== " << std::endl;
- std::cout <<"WILL SEND SUBSCRIPTION DELETE REQUEST AFTER " << my_config.measurement_interval << " SECONDS " << std::endl;
- sleep(my_config.measurement_interval);
- res = false;
- // keep sending subscription delete request till successfull ?
- int attempt = 0;
- while(!res){
- mdclog_write(MDCLOG_INFO, "Sending subscription delete request for id = %d ... Attempt number = %d\n", sgnb_add_subscr_req.get_request_id(), attempt);
- res = delete_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, my_config.gNodeB_list[0]);
- if (!res){
- sleep(5);
- };
- attempt ++;
- }
-
- std::cout <<"SUBSCRIPTION DELETE REQUEST :: Successfuly deleted subscription request " << request_id << std::endl;
-
- };
//Wait for stop
- while(RunProg){
+ while(run_program){
sleep(10);
}
-
+
+
+ // we are in shutdown mode
+ // send out subscription deletes
+ boot_shutdown.shutdown();
+
i = 0;
for(auto &e: message_procs){
mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());