X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fadm-ctrl-xapp.cc;h=a5abeab1777a373bb96bce41161d57bcf9701287;hb=HEAD;hp=f9994a3f0829ac362cc7e680d2760d677d38d1fe;hpb=b9d7e9c232a4371ddfed51c58e5a57f87b057229;p=ric-app%2Fadmin.git diff --git a/src/adm-ctrl-xapp.cc b/src/adm-ctrl-xapp.cc index f9994a3..a5abeab 100644 --- a/src/adm-ctrl-xapp.cc +++ b/src/adm-ctrl-xapp.cc @@ -22,36 +22,40 @@ #include "adm-ctrl-xapp.hpp" -bool report_mode_only = true; -static int RunProg = 1; // keep loop running +int run_program = 1; + + // list of plugins -typedef std::vector > plugin_list; plugin_list Plugins; std::map plugin_rmr_map; +// Policy handler : All plugins (of abstract class Policy) are registered with the plugin list. +// The policy handler is registered as a call back to the RMR message handler. When a policy related RMR message +// is received, this function is invoked. It finds the appropriate plugin from the plugin list, passes the policy message and then +// returns back the response (in the response string) -bool add_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){ - unsigned char node_buffer[32]; - std::copy(gNodeB.begin(), gNodeB.end(), node_buffer); - node_buffer[gNodeB.length()] = '\0'; - bool res = sub_handler.RequestSubscription(he, he_resp, RIC_SUB_REQ, std::bind(static_cast( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer)); - return res; -}; - - -bool delete_subscription(SubscriptionHandler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){ - unsigned char node_buffer[32]; - std::copy(gNodeB.begin(), gNodeB.end(), node_buffer); - node_buffer[gNodeB.length()] = '\0'; - - bool res = sub_handler.RequestSubscriptionDelete(he, he_resp, RIC_SUB_DEL_REQ, std::bind(static_cast( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer)); - return res; -}; - +// NOTE : This version of policy handler was written with R1 policy protocol in mind. +// Specifically, it was assumed that each message type corresponds to either set/get for a specific xAPP. +// It still works with R2, but ideally should be modified since a single policy type could be applied to multiple plugins. void policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){ - auto it = plugin_rmr_map.find(message_type); + + // Extract policy id type : for now we assume exactly 1 plugin maps to each policy type + rapidjson::Pointer policy_type_ref("/policy_type_id"); + rapidjson::Document doc; + if (doc.Parse(message).HasParseError()){ + mdclog_write(MDCLOG_ERR, "Error: %s, %d :: Could not decode A1 JSON message %s\n", __FILE__, __LINE__, message); + return; + } + rapidjson::Value * ref = policy_type_ref.Get(doc); + if (ref == NULL){ + mdclog_write(MDCLOG_ERR, "Error : %s, %d:: Could not extract policy type id from %s\n", __FILE__, __LINE__, message); + return; + } + int policy_type_id = ref->GetInt(); + + auto it = plugin_rmr_map.find(policy_type_id); bool res; if (it != plugin_rmr_map.end()){ if (set){ @@ -76,30 +80,38 @@ void policy_handler(int message_type, const char * message, int message_len, st } } else{ - response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}"; + response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with policy type id = " + std::to_string(policy_type_id) + "\"}"; + mdclog_write(MDCLOG_ERR, "Error ! %s, %d : %s\n", __FILE__, __LINE__, response.c_str()); } }; + +// polling function that routinely queries all plugins for metrics and then posts them on +// VES url void metrics_collector(std::string ves_url, plugin_list * plugins, unsigned int interval){ // Instantiate the ves collector curl_interface curl_obj(ves_url); - std::string metrics_response; + std::vector response_vector; int res; - while(RunProg){ + while(run_program){ for(unsigned int i = 0; i < plugins->size(); i++){ - res = (*plugins)[i].get()->getMetrics(metrics_response); + response_vector.clear(); + res = (*plugins)[i].get()->getMetrics(response_vector); if (res != 0){ - mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), metrics_response.c_str()); + mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), (*plugins)[i].get()->get_error().c_str()); } else{ - res = curl_obj.post_metrics(metrics_response); - if (!res){ - mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str()); - } - else{ - mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", metrics_response.c_str()); + // send each response + for(auto &e: response_vector){ + res = curl_obj.post_metrics(e); + if (!res){ + mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str()); + } + else{ + mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", e.c_str()); + } } } } @@ -111,9 +123,10 @@ void metrics_collector(std::string ves_url, plugin_list * plugins, unsigned int void EndProgram(int signum){ std::cout <<"Signal received. Stopping program ....." << std::endl; - RunProg = 0; + run_program = 0; + } - +// ideally should be expanded for rollback purposes etc. void msg_error(rmr_mbuf_t *message){ mdclog_write(MDCLOG_ERR, "Error: %s, %d Could not send RMR message of length %d and type %d, Reason %s", __FILE__, __LINE__, message->len, message->mtype, strerror(errno) ); }; @@ -144,47 +157,56 @@ int main(int argc, char *argv[]){ mdclog_write(MDCLOG_WARN, "WARNING : gNodeB not set for subscription. Subscription MAY FAIL"); } + std::string operating_mode; + + // How are we operating ? if (my_config.operating_mode == "CONTROL"){ - report_mode_only = false; + // Full closed loop : process E2AP indication, + // E2SM, X2AP and generate response + my_config.report_mode_only = false; + operating_mode = "CLOSED LOOP CONTROL"; } - else{ - report_mode_only = true; + else if (my_config.operating_mode == "E2AP_PROC_ONLY"){ + // Debugging : processing only E2AP indication + my_config.processing_level = ProcessingLevelTypes::E2AP_PROC_ONLY; + operating_mode = "E2AP PROCESSING ONLY"; } - - // Finished passing command line/environment arguments - //============================================================= - - // instantiate xapp object - if(my_config.num_threads >= 1){ - mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads); - // Create XaPP that starts with specified number of threads - my_xapp = std::make_unique(my_config.name, my_config.port, 1024, my_config.num_threads); + else if (my_config.operating_mode == "E2SM_PROC_ONLY"){ + // Debugging : processing only till E2SM indication header + my_config.processing_level = ProcessingLevelTypes::E2SM_PROC_ONLY; + operating_mode = "E2SM PROCESSING ONLY"; } else{ - mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto"); - //Let XaPP pick threads based on hardware - my_xapp = std::make_unique(my_config.name, my_config.port, 1024); + // Passive : processing till X2AP but do not generate response + my_config.report_mode_only = true; + operating_mode = "REPORTING ONLY"; } - + mdclog_write(MDCLOG_DEBUG, "Operating mode of Admission Control xAPP is %s\n", operating_mode.c_str()); + // Finished passing command line/environment arguments + //============================================================= + + // instantiate xapp-rmr-framework object + mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads); mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name); mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port); + my_xapp = std::make_unique(my_config.name, my_config.port, 1024); - // Instantiate admission logic handler - Plugins.emplace_back(std::make_unique(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, 1)); - // Add reference to plugin list . We add twice (once for set policy and once for get policy ids) - // Plugin list is used by policy handler and metrics collector - plugin_rmr_map.insert(std::pair(DC_ADM_INT_CONTROL, Plugins[0].get())); - plugin_rmr_map.insert(std::pair(DC_ADM_GET_POLICY, Plugins[0].get())); + // Instantiate admission logic handler (with only one instance for now) + int num_instances = 1; + Plugins.emplace_back(std::make_unique(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, num_instances, my_config.xapp_id, my_config.report_mode_only)); + + // Add reference to plugin list . + // Plugin list is used by policy handler and metrics collector + plugin_rmr_map.insert(std::pair(RATE_CONTROL_POLICY_ID, Plugins[0].get())); - // instantiate curl object for ves + // instantiate ves thread (it polls all plugins and sends out their metrics) std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval); - // Instantiate subscription handler - SubscriptionHandler sub_handler; + subscription_handler sub_handler; // Instantiate message handlers for RMR // (one for each thread) and registrer @@ -192,7 +214,7 @@ int main(int argc, char *argv[]){ std::vector > message_procs; for(int i = 0; i < my_config.num_threads; i++){ - std::unique_ptr mp_handler = std::make_unique (); + std::unique_ptr mp_handler = std::make_unique (my_config.processing_level, my_config.report_mode_only); mp_handler.get()->register_subscription_handler(& sub_handler); mp_handler.get()->register_protector(dynamic_cast(Plugins[0].get())->get_protector_instance(0)); mp_handler.get()->register_policy_handler (& policy_handler); @@ -200,7 +222,7 @@ int main(int argc, char *argv[]){ } - // Start the listening loops + // Start the RMR listening loops std::vector thread_ids(my_config.num_threads); unsigned int i = 0; for(auto &e: message_procs){ @@ -211,130 +233,28 @@ int main(int argc, char *argv[]){ mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n"); mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size()); - //====================================================== - // sgnb Subscription spec - - int request_id = 2; // will be over-written by subscription handler - int req_seq = 1; - int function_id = 0; - int action_id = 4; - int action_type = report_mode_only ? 0:1; - - int message_type = 1; - int procedure_code = 27; - std::string egnb_id = "Testgnb"; - std::string plmn_id = "Testplmn"; - - unsigned char event_buf[128]; - size_t event_buf_len = 128; - bool res; - + //Register signal handler to stop + signal(SIGINT, EndProgram); + signal(SIGTERM, EndProgram); - e2sm_event_trigger_helper trigger_data; - e2sm_event_trigger event_trigger; - - trigger_data.egNB_id = egnb_id; - trigger_data.plmn_id = plmn_id; - trigger_data.egNB_id_type = 2; - trigger_data.interface_direction = 1; - trigger_data.procedure_code = procedure_code; - trigger_data.message_type = message_type; - res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data); - if (!res){ - mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str()); - exit(0); - } - + // Instantiate startup/shutown subroutine objects + init boot_shutdown((*my_xapp), sub_handler, my_config); - subscription_helper sgnb_add_subscr_req; - subscription_response_helper subscr_response; - - sgnb_add_subscr_req.clear(); - sgnb_add_subscr_req.set_request(request_id, req_seq); - sgnb_add_subscr_req.set_function_id(function_id); - sgnb_add_subscr_req.add_action(action_id, action_type); - - - sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len); - mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len); - - //====================================================== - // Purely for testing purposes ... write subscription ASN binary to file - // FILE *pfile; - // pfile = fopen("event_trigger.pr", "wb"); - // fwrite(event_buf, 1, event_buf_len, pfile); - // fclose(pfile); - //====================================================== - - - // keep sending subscription request till successfull for all gnodebs ? - auto it = my_config.gNodeB_list.begin(); - while(my_config.gNodeB_list.size() > 0 && RunProg){ - int attempt = 0; - res = false; - - while(!res){ - mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt); - res = add_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, *it); - if (!res){ - sleep(5); - }; - attempt ++; - if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){ - break; - } - } - - if(res){ - mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str()); - // remove node from list, - // move to next gnobde - it = my_config.gNodeB_list.erase(it); - } - - if (it == my_config.gNodeB_list.end()){ - it = my_config.gNodeB_list.begin(); - } - - } - - - std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl; - //Register signal handler to stop - signal(SIGINT, EndProgram); - signal(SIGTERM, EndProgram); + // Trigger start functions + boot_shutdown.startup(); - - // Purely for testing purposes .... - // If in test mode, we wait an interval and then send delete subscription request for each gNodeB - if(my_config.test_mode){ - std::cout <<"====================== " << std::endl; - std::cout <<"WE ARE IN TEST MODE. " << std::endl; - std::cout <<"====================== " << std::endl; - std::cout <<"WILL SEND SUBSCRIPTION DELETE REQUEST AFTER " << my_config.measurement_interval << " SECONDS " << std::endl; - sleep(my_config.measurement_interval); - res = false; - // keep sending subscription delete request till successfull ? - int attempt = 0; - while(!res){ - mdclog_write(MDCLOG_INFO, "Sending subscription delete request for id = %d ... Attempt number = %d\n", sgnb_add_subscr_req.get_request_id(), attempt); - res = delete_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, my_config.gNodeB_list[0]); - if (!res){ - sleep(5); - }; - attempt ++; - } - - std::cout <<"SUBSCRIPTION DELETE REQUEST :: Successfuly deleted subscription request " << request_id << std::endl; - - }; //Wait for stop - while(RunProg){ + while(run_program){ sleep(10); } - + + + // we are in shutdown mode + // send out subscription deletes + boot_shutdown.shutdown(); + i = 0; for(auto &e: message_procs){ mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages());