#include "adm-ctrl-xapp.hpp"
+int run_program = 1;
+
-static int RunProg = 1; // keep loop running
// list of plugins
-typedef std::vector<std::unique_ptr<Policy> > plugin_list;
plugin_list Plugins;
std::map<int, Policy *> plugin_rmr_map;
+// Policy handler : All plugins (of abstract class Policy) are registered with the plugin list.
+// The policy handler is registered as a call back to the RMR message handler. When a policy related RMR message
+// is received, this function is invoked. It finds the appropriate plugin from the plugin list, passes the policy message and then
+// returns back the response (in the response string)
-int add_subscription(subscription_handler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
- unsigned char node_buffer[32];
- std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
- node_buffer[gNodeB.length()] = '\0';
- int res = sub_handler.RequestSubscription(he, he_resp, RIC_SUB_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
- return res;
-};
-
-
-int delete_subscription(subscription_handler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){
- unsigned char node_buffer[32];
- std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
- node_buffer[gNodeB.length()] = '\0';
-
- int res = sub_handler.RequestSubscriptionDelete(he, he_resp, RIC_SUB_DEL_REQ, std::bind(static_cast<bool (XaPP::*)(int, int, void *, unsigned char const*)>( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer));
- return res;
-};
-
+// NOTE : This version of policy handler was written with R1 policy protocol in mind.
+// Specifically, it was assumed that each message type corresponds to either set/get for a specific xAPP.
+// It still works with R2, but ideally should be modified since a single policy type could be applied to multiple plugins.
void policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){
auto it = plugin_rmr_map.find(message_type);
}
else{
response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}";
+ mdclog_write(MDCLOG_ERR, "Error ! %s, %d : %s\n", __FILE__, __LINE__, response.c_str());
}
};
+
+// polling function that routinely queries all plugins for metrics and then posts them on
+// VES url
void metrics_collector(std::string ves_url, plugin_list * plugins, unsigned int interval){
// Instantiate the ves collector
curl_interface curl_obj(ves_url);
- std::string metrics_response;
+ std::vector<std::string> response_vector;
int res;
- while(RunProg){
+ while(run_program){
for(unsigned int i = 0; i < plugins->size(); i++){
- res = (*plugins)[i].get()->getMetrics(metrics_response);
+ response_vector.clear();
+ res = (*plugins)[i].get()->getMetrics(response_vector);
if (res != 0){
- mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), metrics_response.c_str());
+ mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), (*plugins)[i].get()->get_error().c_str());
}
else{
- res = curl_obj.post_metrics(metrics_response);
- if (!res){
- mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
- }
- else{
- mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", metrics_response.c_str());
+ // send each response
+ for(auto &e: response_vector){
+ res = curl_obj.post_metrics(e);
+ if (!res){
+ mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str());
+ }
+ else{
+ mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", e.c_str());
+ }
}
}
}
void EndProgram(int signum){
std::cout <<"Signal received. Stopping program ....." << std::endl;
- RunProg = 0;
+ run_program = 0;
+
}
-
+// ideally should be expanded for rollback purposes etc.
void msg_error(rmr_mbuf_t *message){
mdclog_write(MDCLOG_ERR, "Error: %s, %d Could not send RMR message of length %d and type %d, Reason %s", __FILE__, __LINE__, message->len, message->mtype, strerror(errno) );
};
mdclog_write(MDCLOG_WARN, "WARNING : gNodeB not set for subscription. Subscription MAY FAIL");
}
-
std::string operating_mode;
+
// How are we operating ?
if (my_config.operating_mode == "CONTROL"){
// Full closed loop : process E2AP indication,
// Finished passing command line/environment arguments
//=============================================================
- // instantiate xapp object
- if(my_config.num_threads >= 1){
- mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
- // Create XaPP that starts with specified number of threads
- my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024, my_config.num_threads);
- }
- else{
- mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
- //Let XaPP pick threads based on hardware
- my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
- }
-
-
+ // instantiate xapp-rmr-framework object
+ mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads);
mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name);
mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port);
+ my_xapp = std::make_unique<XaPP>(my_config.name, my_config.port, 1024);
- // Instantiate admission logic handler
- Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, 1, my_config.report_mode_only));
- // Add reference to plugin list . We add twice (once for set policy and once for get policy ids)
- // Plugin list is used by policy handler and metrics collector
- plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_INT_CONTROL, Plugins[0].get()));
- plugin_rmr_map.insert(std::pair<int, Policy *>(DC_ADM_GET_POLICY, Plugins[0].get()));
+ // Instantiate admission logic handler (with only one instance for now)
+ int num_instances = 1;
+ Plugins.emplace_back(std::make_unique<admission>(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, num_instances, my_config.xapp_id, my_config.report_mode_only));
+
+ // Add reference to plugin list .
+ // Plugin list is used by policy handler and metrics collector
+ plugin_rmr_map.insert(std::pair<int, Policy *>(A1_POLICY_REQ, Plugins[0].get()));
- // instantiate curl object for ves
+ // instantiate ves thread (it polls all plugins and sends out their metrics)
std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval);
-
// Instantiate subscription handler
subscription_handler sub_handler;
}
- // Start the listening loops
+ // Start the RMR listening loops
std::vector<int> thread_ids(my_config.num_threads);
unsigned int i = 0;
for(auto &e: message_procs){
mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n");
mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size());
- //======================================================
- // sgnb Subscription spec
-
- int request_id = 2; // will be over-written by subscription handler
- int req_seq = 1;
- int function_id = 0;
- int action_id = 4;
- int action_type = my_config.report_mode_only ? 0:1;
-
- int message_type = 1;
- int procedure_code = 27;
- std::string egnb_id = "Testgnb";
- std::string plmn_id = "Testplmn";
-
- unsigned char event_buf[128];
- size_t event_buf_len = 128;
- bool res;
+ //Register signal handler to stop
+ signal(SIGINT, EndProgram);
+ signal(SIGTERM, EndProgram);
+ // Instantiate startup/shutown subroutine objects
+ init boot_shutdown((*my_xapp), sub_handler, my_config);
- e2sm_event_trigger_helper trigger_data;
- e2sm_event_trigger event_trigger;
-
- trigger_data.egNB_id = egnb_id;
- trigger_data.plmn_id = plmn_id;
- trigger_data.egNB_id_type = 2;
- trigger_data.interface_direction = 1;
- trigger_data.procedure_code = procedure_code;
- trigger_data.message_type = message_type;
- res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
- if (!res){
- mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str());
- exit(0);
- }
-
- subscription_helper sgnb_add_subscr_req;
- subscription_response_helper subscr_response;
-
- sgnb_add_subscr_req.clear();
- sgnb_add_subscr_req.set_request(request_id, req_seq);
- sgnb_add_subscr_req.set_function_id(function_id);
- sgnb_add_subscr_req.add_action(action_id, action_type);
-
-
- sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len);
- mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len);
-
- //======================================================
- // Purely for testing purposes ... write subscription ASN binary to file
- // FILE *pfile;
- // pfile = fopen("event_trigger.pr", "wb");
- // fwrite(event_buf, 1, event_buf_len, pfile);
- // fclose(pfile);
- //======================================================
-
-
- // keep sending subscription request till successfull for all gnodebs or exceed max attempts ?
- // or exceed max_iterations
- auto it = my_config.gNodeB_list.begin();
- int loop = 0;
- int num_nodes = my_config.gNodeB_list.size();
-
- while((loop < num_nodes * my_config.max_sub_loops) && my_config.gNodeB_list.size() > 0 && RunProg){
- int attempt = 0;
- int subscr_result = -1;
-
- while(1){
- mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt);
- subscr_result = add_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, *it);
- if (subscr_result == SUBSCR_SUCCESS){
- break;
- }
- sleep(5);
- attempt ++;
- if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){
- break;
- }
- }
-
- if(subscr_result == SUBSCR_SUCCESS){
- mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str());
- // remove node from list,
- // move to next gnobde
- it = my_config.gNodeB_list.erase(it);
- }
-
- if (it == my_config.gNodeB_list.end()){
- it = my_config.gNodeB_list.begin();
- }
-
- loop++;
-
- }
-
- if (my_config.gNodeB_list.size() == 0){
- std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl;
- }
- else{
- std::cerr <<"SUBSCRIPTION REQUEST :: Failed to subscribe for following gNodeBs" << std::endl;
- for(const auto &e: my_config.gNodeB_list){
- std::cerr <<"Failed to subscribe for gNodeB " << e << std::endl;
- }
- }
-
- //Register signal handler to stop
- signal(SIGINT, EndProgram);
- signal(SIGTERM, EndProgram);
+ // Trigger start functions
+ boot_shutdown.startup();
//Wait for stop
- while(RunProg){
+ while(run_program){
sleep(10);
}
-
+
+
+ // we are in shutdown mode
+ // send out subscription deletes
+ boot_shutdown.shutdown();
+
i = 0;
for(auto &e: message_procs){
mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());