From 4e545a8b013e60f2ff59254cb3fe435012d8fe5a Mon Sep 17 00:00:00 2001 From: Ashwin Sridharan Date: Wed, 11 Dec 2019 09:53:23 -0500 Subject: [PATCH] User story RICPLT-2620 Modified the AC xAPP to support the R3 A1 workflow with multiple policies. AC xAPP now supports multiple policies (policy type 21000) wherein different admission policies can be set based on the subscriber profile ID in the sgnb addition request. Updated to use RMR v 1.12. which supports new A1 messages Signed-off-by: Ashwin Sridharan Change-Id: Ic42b4785bf49f5203cebae15af9cc74ebeb5ac15 User story RICPLT-2620 Modified the AC xAPP to support the R3 A1 workflow with multiple policies. AC xAPP now supports multiple policies (policy type 21000) wherein different admission policies can be set based on the subscriber profile ID in the sgnb addition request. Signed-off-by: Ashwin Sridharan Change-Id: I069f6521d9297fe1624a513a1236052979a30fd0 Signed-off-by: Ashwin Sridharan --- Dockerfile | 2 +- init/README.md | 16 - init/config-file.json | 11 +- schemas/README.md | 17 +- schemas/adm-ctrl-xapp-policy-schema.json | 98 --- schemas/rate-control-policy.json | 117 ++++ schemas/samples.json | 75 ++- src/E2AP-c/subscription/subscription_handler.cc | 115 ++-- src/E2AP-c/subscription/subscription_handler.hpp | 189 ++++-- src/Makefile | 23 +- src/README.md | 3 +- src/X2AP/sgnb_addition_request.cc | 2 +- src/adm-ctrl-xapp.cc | 218 ++---- src/adm-ctrl-xapp.hpp | 26 + src/admission_init_routines.cc | 255 +++++++ src/get_config.cc | 13 +- src/json/json_handler.cc | 686 ------------------- src/json/json_handler.hpp | 150 ----- src/message_processor_class.cc | 30 +- src/message_processor_class.hpp | 2 +- src/plugin-interface/plugin-interface.cc | 6 - src/plugin-interface/plugin-interface.hpp | 12 +- src/protector-plugin/NetworkProtector.cc | 282 ++++++-- src/protector-plugin/NetworkProtector.h | 58 +- src/protector-plugin/admission_policy.cc | 669 ++++++++++++------- src/protector-plugin/admission_policy.hpp | 49 +- src/protector-plugin/sliding_window.cc | 2 +- src/run_xapp.sh | 7 +- src/xapp_utils.cc | 141 ++-- src/xapp_utils.hpp | 208 ++---- test/Makefile | 22 +- test/README.md | 21 - test/mock_a1_mediator.cc | 220 +++--- test/mock_e2term_server.cc | 72 +- test/mock_ves_collector.py | 22 +- test/run_tests.sh | 22 +- .../X2AP-PDU-SgNBAdditionRequest_SubId_10.per | Bin 0 -> 368 bytes .../X2AP-PDU-SgNBAdditionRequest_SubId_180.per | Bin 0 -> 368 bytes .../X2AP-PDU-SgNBAdditionRequest_SubId_210.per | Bin 0 -> 368 bytes .../X2AP-PDU-SgNBAdditionRequest_SubId_23.per | Bin 0 -> 368 bytes .../X2AP-PDU-SgNBAdditionRequest_SubId_29.per | Bin 0 -> 368 bytes .../X2AP-PDU-SgNBAdditionRequest_SubId_34.per | Bin 0 -> 368 bytes test/unit_test_admission_policy.cc | 280 ++++++-- test/unit_test_protector_plugin.cc | 254 +++++-- test/unit_test_sgnb_addition_request.cc | 3 + test/unit_test_subscription_flow.cc | 736 +++++++++++++-------- test/unit_test_xapp.cc | 422 ++++++------ test/uta_rtg.rt | 2 +- 48 files changed, 2888 insertions(+), 2670 deletions(-) delete mode 100644 schemas/adm-ctrl-xapp-policy-schema.json create mode 100644 schemas/rate-control-policy.json create mode 100644 src/admission_init_routines.cc delete mode 100644 src/json/json_handler.cc delete mode 100644 src/json/json_handler.hpp create mode 100644 test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_10.per create mode 100644 test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_180.per create mode 100644 test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_210.per create mode 100644 test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_23.per create mode 100644 test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_29.per create mode 100644 test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_34.per diff --git a/Dockerfile b/Dockerfile index 6795e55..9d52ba3 100755 --- a/Dockerfile +++ b/Dockerfile @@ -39,7 +39,7 @@ RUN dpkg -i mdclog_${MDC_VER}_amd64.deb RUN dpkg -i mdclog-dev_${MDC_VER}_amd64.deb # Install RMr using debian package hosted at packagecloud.io -ARG RMR_VER=1.3.0 +ARG RMR_VER=1.12.1 RUN wget -nv --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMR_VER}_amd64.deb/download.deb RUN wget -nv --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMR_VER}_amd64.deb/download.deb RUN dpkg -i rmr_${RMR_VER}_amd64.deb diff --git a/init/README.md b/init/README.md index 471dcdc..24ef4a3 100644 --- a/init/README.md +++ b/init/README.md @@ -1,19 +1,3 @@ -#================================================================================== -# Copyright (c) 2018-2019 AT&T Intellectual Property. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#================================================================================== - Configuration parameters for the AC xAPP process may be provided via command line or environment variables. When testing via command line, they can be set using the src/run_xapp.sh script for convenience (routes however must be set via the RMR_SEED_RT file) as documented under src/README.md diff --git a/init/config-file.json b/init/config-file.json index cc432f7..414e808 100644 --- a/init/config-file.json +++ b/init/config-file.json @@ -8,21 +8,22 @@ "protPort": "tcp:4560", "maxSize": 2072, "numWorkers": 1, - "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ", "RIC_CONTROL_REQ"], - "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE", "RIC_INDICATION", "RIC_CONTROL_ACK", "RIC_CONTROL_FAILURE"], + "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ", "RIC_CONTROL_REQ", "A1_POLICY_RESP", "A1_POLICY_QUERY"], + "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE", "RIC_INDICATION", "RIC_CONTROL_ACK", "RIC_CONTROL_FAILURE", "A1_POLICY_REQ"], "file_path":"/tmp/routeinfo/routes.txt", - "contents": "newrt|start\nrte|0|localhost:4560\nrte|2|localhost:38000\nrte|10002|localhost:4560\nrte|10005|localhost:4560\nrte|10003|localhost:38000\nrte|12010|localhost:38000\nrte|12020|localhost:38000\nrte|12011|localhost:4560\nrte|12012|localhost:4560\nrte|12021|localhost:4560\nrte|12022|localhost:4560\nrte|20000|localhost:4560\nrte|12040|localhost:38000\nrte|20001|localhost:4566\nnewrt|end" + "contents": "newrt|start\nrte|0|localhost:4560\nrte|2|localhost:38000\nrte|10002|localhost:4560\nrte|10005|localhost:4560\nrte|10003|localhost:38000\nrte|12010|localhost:38000\nrte|12020|localhost:38000\nrte|12011|localhost:4560\nrte|12012|localhost:4560\nrte|12021|localhost:4560\nrte|12022|localhost:4560\nrte|20000|localhost:4560\nrte|12040|localhost:38000\nrte|20001|localhost:4566\nrte|20011|localhost:4560\nrte|20012|localhost:4560\nnewrt|end" }, "envs":{ "GNODEB":"NYC123", "THREADS":"1", - "A1_SCHEMA_FILE":"/etc/xapp/adm-ctrl-xapp-policy-schema.json", + "XAPP_ID":"3489-e9r892k-92389", + "A1_SCHEMA_FILE":"/etc/xapp/rate-control-policy.json", "VES_SCHEMA_FILE":"/etc/xapp/ves_schema.json", "SAMPLE_FILE":"/etc/xapp/samples.json", "VES_COLLECTOR_URL":"127.0.0.1:6350", "VES_MEASUREMENT_INTERVAL":"10", "LOG_LEVEL":"MDCLOG_ERR", - "OPERATING_MODE":"REPORT" + "OPERATING_MODE":"CONTROL" } } diff --git a/schemas/README.md b/schemas/README.md index 1c12c77..2994760 100644 --- a/schemas/README.md +++ b/schemas/README.md @@ -1,22 +1,7 @@ -#================================================================================== -# Copyright (c) 2018-2019 AT&T Intellectual Property. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#================================================================================== This directory contains the various JSON payloads received/sent by the xAPP and the associated schemas. 1. Policy messages are set (and requested) via RMR. The A1 mediator which sends these requests verifies that the messages conform to schema specified by the xAPP developer (the AC xAPP also validates this). The schema for the policies is provided by the xAPP developer - - The AC xAPP policy schema file is adm-ctrl-xapp-policy-schema.json + - The AC xAPP policy schema file is rate-control-policy.json 2. Similarly, metrics posted to the VES collector must conform to the global schema specified by the VES collector. - the ves schema file is ves_schema.json diff --git a/schemas/adm-ctrl-xapp-policy-schema.json b/schemas/adm-ctrl-xapp-policy-schema.json deleted file mode 100644 index 451d1f2..0000000 --- a/schemas/adm-ctrl-xapp-policy-schema.json +++ /dev/null @@ -1,98 +0,0 @@ -{ - "name":"admission_control_xapp", - "owner":"att:app_paas", - "description":"Admission Control logic for dual connection", - "controls":[ - { - "name":"admission_control_policy", - "description":"various parameters to control admission of dual connection", - "message_receives_rmr_type":"DC_ADMISSION_INTERVAL_CONTROL", - "message_receives_payload_schema":{ - "$schema":"http://json-schema.org/draft-07/schema#", - "type":"object", - "properties":{ - "enforce":{ - "type":"boolean", - "default":true - }, - "window_length":{ - "type":"integer", - "default":1, - "minimum":1, - "maximum":60, - "description":"Sliding window length (in minutes)" - }, - "blocking_rate":{ - "type":"number", - "default":10, - "minimum":1, - "maximum":100, - "description":"% Connections to block" - }, - "trigger_threshold":{ - "type":"integer", - "default":10, - "minimum":1, - "description":"Minimum number of events in window to trigger blocking" - } - }, - - "required":["enforce", "blocking_rate", "trigger_threshold", "window_length"], - "additionalProperties":false - }, - "message_sends_rmr_type":"DC_ADMISSION_INTERVAL_CONTROL_ACK", - "message_sends_payload_schema":{ - "$schema":"http://json-schema.org/draft-07/schema#", - "type":"object", - "properties":{ - "status":{ - "type":"string", - "enum":[ - "SUCCESS", - "FAIL" - ] - }, - "message":{ - "type":"string" - } - }, - "required":["status"], - "additionalProperties":false - } - } - ], - "configuration":[ - { - "name":"rnib-endpoint", - "type":"string", - "description":"rnib namespace", - "default_value":"127.0.0.1:6379" - }, - { - "name":"spid", - "type":"NUMBER", - "description":"spid values for admitting dual connection", - "default_value":"34,35" - } - ], - "metrics":[ - { - "name":"num_accepted_dc_reqs", - "type":"COUNTER", - "unit":"", - "description":"Number of accepted dc requests since the start of the xapp" - }, - { - "name":"num_rejected_dc_reqs", - "type":"COUNTER", - "unit":"", - "description":"Number of rejected dc requests since the start of the xapp" - }, - { - "name":"num_dc_reqs_per_min", - "type":"GUAGE", - "description":"Number of dc requests received by this xAPP per min", - "unit":"Number per min" - } - ] -} diff --git a/schemas/rate-control-policy.json b/schemas/rate-control-policy.json new file mode 100644 index 0000000..06bb5dd --- /dev/null +++ b/schemas/rate-control-policy.json @@ -0,0 +1,117 @@ +{ + "name": "Policy for Rate Control", + "policy_type_id":21000, + "description":"This policy is associated with rate control. Entities which support this policy type must accept the following policy inputs (see the payload for more specifics) : class, which represents the class of traffic for which the policy is being enforced", + + "create_schema":{ + "$schema":"http://json-schema.org/draft-07/schema#", + "type":"object", + "additionalProperties":false, + "required":["class"], + "properties":{ + "class":{ + "type":"integer", + "minimum":1, + "maximum":256, + "description":"integer id representing class to which we are applying policy" + }, + "enforce":{ + "type":"boolean", + "description": "Whether to enable or disable enforcement of policy on this class" + }, + "window_length":{ + "type":"integer", + "minimum":15, + "maximum":300, + "description":"Sliding window length in seconds" + }, + "trigger_threshold":{ + "type":"integer", + "minimum":1 + }, + "blocking_rate":{ + "type":"number", + "minimum":0, + "maximum":100 + } + + } + }, + + "downstream_schema":{ + "type":"object", + "additionalProperties":false, + "required":["policy_type_id", "policy_instance_id", "operation"], + "properties":{ + "policy_type_id":{ + "type":"integer", + "enum":[21000] + }, + "policy_instance_id":{ + "type":"string" + }, + "operation":{ + "type":"string", + "enum":["CREATE", "UPDATE", "DELETE"] + }, + "payload":{ + "$schema":"http://json-schema.org/draft-07/schema#", + "type":"object", + "additionalProperties":false, + "required":["class"], + "properties":{ + "class":{ + "type":"integer", + "minimum":1, + "maximum":256, + "description":"integer id representing class to which we are applying policy" + }, + "enforce":{ + "type":"boolean", + "description": "Whether to enable or disable enforcement of policy on this class" + }, + "window_length":{ + "type":"integer", + "minimum":15, + "maximum":300, + "description":"Sliding window length in seconds" + }, + "trigger_threshold":{ + "type":"integer", + "minimum":1 + }, + "blocking_rate":{ + "type":"number", + "minimum":0, + "maximum":100 + } + + + } + } + } + }, + "notify_schema":{ + "type":"object", + "additionalProperties":false, + "required":["policy_type_id", "policy_instance_id", "handler_id", "status"], + "properties":{ + "policy_type_id":{ + "type":"integer", + "enum":[21000] + }, + "policy_instance_id":{ + "type":"string" + }, + "handler_id":{ + "type":"string" + }, + "status":{ + "type":"string", + "enum":["OK", "ERROR", "DELETED"] + } + } + } +} + + diff --git a/schemas/samples.json b/schemas/samples.json index d9ba40a..a781624 100644 --- a/schemas/samples.json +++ b/schemas/samples.json @@ -1,40 +1,49 @@ { - "message_receives_example": { + "downstream_policy_message": { + "policy_type_id":21000, + "operation":"CREATE", + "policy_instance_id":"Hello-world", + "payload":{ + "class":12, "enforce":true, - "window_length":10, + "window_length":20, "blocking_rate":20, "trigger_threshold":10 - }, - - "message_sends_example": { - "status":"SUCCESS", - "message":"Config applied" - }, - - "metrics": { - "event": { - "commonEventHeader": { - "startEpochMicrosec": 1542231546086613, - "eventId": "sGNB Requests", - "eventType": "sGNB Request Rate", - "priority": "Normal", - "version": "4.0.1", - "reportingEntityName": "AC xAPP", - "sequence": 0, - "domain": "measurement", - "lastEpochMicrosec": 1542231556086613, - "eventName": "Measurement_vGMUX", - "sourceName": "adm-ctrl-xapp", - "vesEventListenerVersion": "7.0.1" + } }, - "measurementFields": { - "additionalFields":{ - "SgNB Request Rate":"100", - "SgNB Accept Rate":"10" - }, - "measurementInterval":60, - "measurementFieldsVersion":"4.0" + + "notify_policy_message": { + "policy_instance_id":"Hello-world", + "policy_type_id":21000, + "status":"OK", + "handler_id":"ACxAPP1234" + }, + + "metrics": { + "event": { + "commonEventHeader": { + "startEpochMicrosec": 1542231546086613, + "eventId": "sGNB Requests", + "eventType": "sGNB Request Rate", + "priority": "Normal", + "version": "4.0.1", + "reportingEntityName": "AC xAPP", + "sequence": 0, + "domain": "measurement", + "lastEpochMicrosec": 1542231556086613, + "eventName": "Measurement_vGMUX", + "sourceName": "adm-ctrl-xapp", + "vesEventListenerVersion": "7.0.1" + }, + "measurementFields": { + "additionalFields":{ + "Class Id":1, + "SgNB Request Count":"100", + "SgNB Accept Count":"10" + }, + "measurementInterval":60, + "measurementFieldsVersion":"4.0" + } + } } - } - } } diff --git a/src/E2AP-c/subscription/subscription_handler.cc b/src/E2AP-c/subscription/subscription_handler.cc index d81027d..effbb78 100644 --- a/src/E2AP-c/subscription/subscription_handler.cc +++ b/src/E2AP-c/subscription/subscription_handler.cc @@ -24,45 +24,6 @@ #include #include -subscription_handler::subscription_handler(void){ - - init(); - _time_out = std::chrono::seconds(5); - _num_retries = 2; - - // bool res; - // unsigned char buffer[128]; - // size_t buf_len = 128; - - // E2N_E2AP_PDU_t e2ap_pdu; - // subscription_request e2ap_sub_req; - - // int request_id = 2; - // int req_seq = 1; - // int function_id = 0; - // int action_id = 0; - // int action_type = 0; - // int message_type = 1; - - // subscription_helper sgnb_add_subscr_req; - - // //sgnb_add_subscr_req.clear(); - // sgnb_add_subscr_req.set_request(request_id, req_seq); - // sgnb_add_subscr_req.set_function_id(function_id); - // sgnb_add_subscr_req.add_action(action_id, action_type); - // std::string test = "This is a test"; - // sgnb_add_subscr_req.set_event_def(test.c_str(), test.length()); - // std::cout <<"Constructor ........" << std::endl; - // // generate the request pdu - // res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, &e2ap_pdu, sgnb_add_subscr_req); - // if(! res){ - // mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__); - - // } - // std::cout <<"Encoded subscription request pdu " << std::endl; - - -} subscription_handler::subscription_handler(unsigned int timeout_seconds, unsigned int num_tries):_time_out(std::chrono::seconds(timeout_seconds)), _num_retries(num_tries){ init(); @@ -102,13 +63,8 @@ void subscription_handler::set_num_retries(unsigned int num_tries){ }; -unsigned int subscription_handler::get_next_id(void){ - std::lock_guard lock(*(_data_lock).get()); - unique_request_id ++; - return unique_request_id; -} -bool subscription_handler::add_request_entry(int id, int status){ +bool subscription_handler::add_request_entry(subscription_identifier id, int status){ // add entry in hash table if it does not exist auto search = requests_table.find(id); @@ -121,10 +77,9 @@ bool subscription_handler::add_request_entry(int id, int status){ }; -bool subscription_handler::set_request_status(int id, int status){ +bool subscription_handler::set_request_status(subscription_identifier id, int status){ // change status of a request only if it exists. - auto search = requests_table.find(id); if(search != requests_table.end()){ requests_table[id] = status; @@ -136,7 +91,7 @@ bool subscription_handler::set_request_status(int id, int status){ }; -bool subscription_handler::delete_request_entry(int id){ +bool subscription_handler::delete_request_entry(subscription_identifier id){ auto search = requests_table.find(id); if (search != requests_table.end()){ @@ -147,7 +102,7 @@ bool subscription_handler::delete_request_entry(int id){ return false; }; -bool subscription_handler::add_subscription_entry(int id, subscription_response_helper &he){ +bool subscription_handler::add_subscription_entry(subscription_identifier id, subscription_response_helper &he){ auto search = subscription_responses.find(id); if (search == subscription_responses.end()){ @@ -159,7 +114,7 @@ bool subscription_handler::add_subscription_entry(int id, subscription_response_ } -bool subscription_handler::delete_subscription_entry(int id){ +bool subscription_handler::delete_subscription_entry(subscription_identifier id){ auto search = subscription_responses.find(id); if(search == subscription_responses.end()){ @@ -172,7 +127,7 @@ bool subscription_handler::delete_subscription_entry(int id){ } -subscription_response_helper * const subscription_handler::get_subscription(int id){ +subscription_response_helper * const subscription_handler::get_subscription(subscription_identifier id){ auto search = subscription_responses.find(id); if(search == subscription_responses.end()){ return NULL; @@ -184,10 +139,10 @@ subscription_response_helper * const subscription_handler::get_subscription(int // Handles responses from RMR -void subscription_handler::Response(int message_type, unsigned char *payload, int payload_length){ +void subscription_handler::Response(int message_type, unsigned char *payload, int payload_length, const char * node_id){ bool res; - int id; + std::string node(node_id); int type; int procedureCode; bool valid_response =false; @@ -222,11 +177,11 @@ void subscription_handler::Response(int message_type, unsigned char *payload, in // subscription response // decode the message sub_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response); - { std::lock_guard lock(*(_data_lock.get())); // get the id - id = he_response.get_request_id(); + subscription_identifier id = std::make_tuple (node, he_response.get_request_id()); + // get status of id int req_status = get_request_status(id); if (req_status == request_pending ){ @@ -236,7 +191,7 @@ void subscription_handler::Response(int message_type, unsigned char *payload, in else{ set_request_status(id, request_duplicate); - mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d seems to be a duplicate\n", __FILE__, __LINE__, id); + mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d seems to be a duplicate. Subscription already present in subscription table\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id)); } valid_response = true; @@ -244,11 +199,11 @@ void subscription_handler::Response(int message_type, unsigned char *payload, in else if (req_status > 0){ // we don't change status of response since it was not in pending // we simply fail - mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status); + mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s,%d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status); } else{ - mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for subscription", __FILE__, __LINE__, id); + mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %s, %d in request queue for subscription", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id)); } } @@ -260,8 +215,10 @@ void subscription_handler::Response(int message_type, unsigned char *payload, in res = sub_del_resp.get_fields(e2ap_recv->choice.successfulOutcome, he_response); { std::lock_guard lock(*(_data_lock.get())); + // get the id - id = he_response.get_request_id(); + subscription_identifier id = std::make_tuple (node, he_response.get_request_id()); + int req_status = get_request_status(id); if (req_status == delete_request_pending ){ // Remove the subscription from the table @@ -272,18 +229,17 @@ void subscription_handler::Response(int message_type, unsigned char *payload, in } else{ set_request_status(id, delete_request_failed); - std::string error_string = "Error deleting subscription entry for id = "; - mdclog_write(MDCLOG_ERR, "%s, %d: %s, %d", __FILE__, __LINE__, error_string.c_str(), id); + mdclog_write(MDCLOG_ERR, "%s, %d: Error deleting subscription entry for %s, %d", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id)); valid_response = true; } } else if (req_status > 0){ // we don't change status since it was not in pending // we simply fail - mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d for deletion is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status); + mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d for deletion is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, id, std::get<0>(id).c_str(), std::get<1>(id)); } else{ - mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for deletion ", __FILE__, __LINE__, id); + mdclog_write(MDCLOG_ERR, "%s, %d: Could not find request id %s, %d in request queue for deletion ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id)); } } @@ -304,8 +260,11 @@ void subscription_handler::Response(int message_type, unsigned char *payload, in sub_resp.get_fields(e2ap_recv->choice.unsuccessfulOutcome, he_response); { - std::lock_guard lock(*(_data_lock.get())); - id = he_response.get_request_id(); + std::lock_guard lock(*(_data_lock.get())); + + // get the id + subscription_identifier id = std::make_tuple (node, he_response.get_request_id()); + int req_status = get_request_status(id); if(req_status == request_pending){ set_request_status(id, request_failed); @@ -315,10 +274,10 @@ void subscription_handler::Response(int message_type, unsigned char *payload, in else if (req_status > 0){ // we don't changet status since it was not in pending // we simply fail - mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status); + mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s, %d is not in request_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status); } else{ - mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for subscription ", __FILE__, __LINE__, id); + mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %s, %d in request queue for subscription ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id)); } } } @@ -329,18 +288,19 @@ void subscription_handler::Response(int message_type, unsigned char *payload, in { std::lock_guard lock(*(_data_lock.get())); // get the id - id = he_response.get_request_id(); + subscription_identifier id = std::make_tuple (node, he_response.get_request_id()); + int req_status = get_request_status(id); if(req_status == delete_request_pending){ set_request_status(id, delete_request_failed); - mdclog_write(MDCLOG_INFO, "Subscription delete request %d failed", id); + mdclog_write(MDCLOG_INFO, "Subscription delete request %s,%d failed", std::get<0>(id).c_str(), std::get<1>(id)); valid_response = true; } else if (req_status > 0){ - mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %d for deletion is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, id, req_status); + mdclog_write(MDCLOG_ERR, "Error:: %s, %d: Request %s,%d for deletion is not in delete_pending state, is in State = %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id), req_status); } else{ - mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %d in request queue for deletion ", __FILE__, __LINE__, id); + mdclog_write(MDCLOG_ERR, "%s, %d: Could not find id %s,%d in request queue for deletion ", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id)); } } @@ -365,7 +325,7 @@ void subscription_handler::Response(int message_type, unsigned char *payload, in } -int const subscription_handler::get_request_status(int id){ +int const subscription_handler::get_request_status(subscription_identifier id){ auto search = requests_table.find(id); if (search == requests_table.end()){ return -1; @@ -374,7 +334,7 @@ int const subscription_handler::get_request_status(int id){ return search->second; } - bool subscription_handler::is_subscription_entry(int id){ + bool subscription_handler::is_subscription_entry(subscription_identifier id){ auto search = subscription_responses.find(id); if (search != subscription_responses.end()) return true; @@ -382,10 +342,17 @@ int const subscription_handler::get_request_status(int id){ return false; } -bool subscription_handler::is_request_entry(int id){ +bool subscription_handler::is_request_entry(subscription_identifier id){ auto search = requests_table.find(id); if (search != requests_table.end()) return true; else return false; } + + +void subscription_handler::get_subscription_keys(std::vector & key_list){ + for(auto & e: subscription_responses){ + key_list.push_back(e.first); + } +} diff --git a/src/E2AP-c/subscription/subscription_handler.hpp b/src/E2AP-c/subscription/subscription_handler.hpp index fcfd77f..0dd78a8 100644 --- a/src/E2AP-c/subscription/subscription_handler.hpp +++ b/src/E2AP-c/subscription/subscription_handler.hpp @@ -26,11 +26,13 @@ #ifndef SUBSCRIPTION_HANDLER #define SUBSCRIPTION_HANDLER +#include #include #include #include #include #include +#include #include #include @@ -58,39 +60,73 @@ typedef enum { request_duplicate }Subscription_Status_Types; +using subscription_identifier = std::tuple; /* Class to process subscription related messages - each subscription request is assigned a unique internally -generated request id for tracking purposes. this is because -the e2 subscription request does not carry any gnodeb id information + The class provides mechanism to send and process + subscriptions and subscription deletes. + + NOTE 1: It is currently unclear how an xAPP should identify a + subscription request/response pair uniquely in the absence/presence of + subscription manager. Ideally, the subscription manager should be + transperent to the xAPP but that may not be the case, i.e the + subscription manager may the subscription request id fields. + The xAPP needs to identify uniquely not just the subscription response, but + also when it needs to send a delete for the corresponding request. + From that perspective, the fields present in both the subscription response and + a delete request are the RICrequestId and RANfunctionId. However, the subscription manager + may require that the RICrequestId fields be set to a specific value (TBD). Hence + for current purposes, a RIC subscription request is uniquely identified by the + tuple . This is not ideal, since potentially the same RANfunctionID + may be subscribed to in different modes, but for now this is the constraint. + + + NOTE 2: There is discussion on tracking subscription request/repsonse using the RMR transaction ID. + However, a conscious choice made with the subscription_handler is that it be agnostic to the transmission + medium(RMR) for purposes of design isolation. Consequently, the subscription handler is not aware of any RMR + related semantics, but simply accepts a function to send the request/delete request that accepts a signature + Type, Length, Value . This also means in its current design, we cannot use transaction id to track request/response. + + + NOTE 3: The subscription handler is thread-safe, i.e multiple elements + can request subscriptions/subscription deletes from multiple threads. However + this does not preclude conflict if multiple threads are trying to make + subscriptions based on the same triplet (in which cases, results will be internally + consistent, but may yield errors to calling agent). */ -class subscription_handler { +struct subscription_hasher { + size_t operator()(const subscription_identifier & key) const { + return std::hash{}(std::get<0>(key) + std::to_string(std::get<1>(key))); + } +}; +class subscription_handler { + public: - subscription_handler(void); - subscription_handler(unsigned int, unsigned int); + + subscription_handler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2); void init(void); + template - int RequestSubscription(subscription_helper &, subscription_response_helper &, int , Transmitter &&); + int request_subscription(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&); template - int RequestSubscriptionDelete(subscription_helper &, subscription_response_helper &, int , Transmitter &&); - - - void Response(int, unsigned char *, int); - int const get_request_status(int); - subscription_response_helper * const get_subscription(int); + int request_subscription_delete(subscription_helper &, subscription_response_helper &, std::string, int , Transmitter &&); + void Response(int, unsigned char *, int, const char *); + int const get_request_status(subscription_identifier); + subscription_response_helper * const get_subscription(subscription_identifier); + unsigned int get_next_id(void); void set_timeout(unsigned int); void set_num_retries(unsigned int); - bool is_subscription_entry(int); - bool is_request_entry(int); - + bool is_subscription_entry(subscription_identifier); + bool is_request_entry(subscription_identifier); + void get_subscription_keys(std::vector &); void clear(void); size_t num_pending(void) const; size_t num_complete(void) const ; @@ -99,16 +135,17 @@ public: private: - bool add_request_entry(int, int); - bool set_request_status(int, int); - bool delete_request_entry(int); + + bool add_request_entry(subscription_identifier, int); + bool set_request_status(subscription_identifier, int); + bool delete_request_entry(subscription_identifier); - bool get_subscription_entry(int, int); - bool add_subscription_entry(int, subscription_response_helper &he); - bool delete_subscription_entry(int); + bool get_subscription_entry(subscription_identifier); + bool add_subscription_entry(subscription_identifier, subscription_response_helper &he); + bool delete_subscription_entry(subscription_identifier); - std::unordered_map requests_table; - std::unordered_map subscription_responses; // stores list of successful subscriptions + std::unordered_map requests_table; + std::unordered_map subscription_responses; // stores list of successful subscriptions std::unique_ptr _data_lock; std::unique_ptr _cv; @@ -120,19 +157,19 @@ private: }; template -int subscription_handler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){ +int subscription_handler::request_subscription(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){ int res; unsigned char buffer[512]; size_t buf_len = 512; - // get a new unique request id ... - unsigned int new_req_id = get_next_id(); - mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id); - he.set_request(new_req_id, he.get_req_seq()); - - + // As per current design, request id and request sequence number + // must be set to zero ... + he.set_request(0, 0); subscription_request e2ap_sub_req; + + // generate subscription identifier + subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id()); // generate the request pdu res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, he); @@ -144,10 +181,10 @@ int subscription_handler::RequestSubscription(subscription_helper &he, subscript // put entry in request table { std::lock_guard lock(*(_data_lock.get())); - res = add_request_entry(he.get_request_id(), request_pending); + res = add_request_entry(sub_id, request_pending); if(! res){ - mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %d to queue", __FILE__, __LINE__, he.get_request_id()); - return SUBSCR_ERR_UNKNOWN; + mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + return SUBSCR_ERR_DUPLICATE; } } @@ -159,8 +196,8 @@ int subscription_handler::RequestSubscription(subscription_helper &he, subscript res = tx(TxCode, buf_len, buffer); if (!res){ // clear state - delete_request_entry(he.get_request_id()); - mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %d", __FILE__, __LINE__, he.get_request_id()); + delete_request_entry(sub_id); + mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) ); return SUBSCR_ERR_TX; }; @@ -170,80 +207,84 @@ int subscription_handler::RequestSubscription(subscription_helper &he, subscript res = SUBSCR_ERR_UNKNOWN; while(1){ - - // release lock and wait to be woken up _cv.get()->wait_for(_local_lock, _time_out); // we have woken and acquired data_lock // check status and return appropriate object - int status = get_request_status(he.get_request_id()); + int status = get_request_status(sub_id); if (status == request_success){ - // retreive & store the subscription response - response = subscription_responses[he.get_request_id()]; - mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id()); + response = subscription_responses[sub_id]; + mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_SUCCESS; break; } if (status == request_pending){ - // woken up spuriously or timed out auto end = std::chrono::system_clock::now(); std::chrono::duration f = end - start; if ( f > _num_retries * _time_out){ - mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id()); + mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_TIMEOUT; - std::cout <<"Set res = " << res << " for " << he.get_request_id() << std::endl; break; } else{ - mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id()); + mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); continue; } } if(status == request_failed){ - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %d got failure response .. \n", __FILE__, __LINE__, he.get_request_id()); + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_FAIL; break; } + if (status == request_duplicate){ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + res = SUBSCR_ERR_DUPLICATE; + break; + + } + // if we are here, some spurious - // status obtained or request failed . we return false - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status); + // status obtained or request failed . we return appropriate error code + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status); res = SUBSCR_ERR_UNKNOWN; break; }; - delete_request_entry(he.get_request_id()); + delete_request_entry(sub_id); // release data lock _local_lock.unlock(); - std::cout <<"Returning res = " << res << " for " << he.get_request_id() << std::endl; + std::cout <<"Returning res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," << std::get<1>(sub_id) << std::endl; return res; }; template -int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){ +int subscription_handler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){ int res; - + // generate subscription identifier + subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id()); + // First check if we have this subscription - if(! is_subscription_entry(he.get_request_id())){ - mdclog_write(MDCLOG_ERR, "subscription with id %d does not exist. Cannot be deleted", he.get_request_id()); + if(! is_subscription_entry(sub_id)){ + mdclog_write(MDCLOG_ERR, "subscription with id %s, %d does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); return SUBSCR_ERR_MISSING; } // Also check if such a request is queued - if (is_request_entry(he.get_request_id())){ - mdclog_write(MDCLOG_ERR, "Subscription delete request with id %d already in queue", he.get_request_id()); - return SUBSCR_ERR_UNKNOWN; + if (is_request_entry(sub_id)){ + mdclog_write(MDCLOG_ERR, "Subscription delete request with id %s, %d already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); + return SUBSCR_ERR_DUPLICATE; } subscription_delete e2ap_sub_req_del; @@ -259,10 +300,13 @@ int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su } // put entry in request table - res = add_request_entry(he.get_request_id(), delete_request_pending); - if(! res){ - mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue", he.get_request_id()); - return SUBSCR_ERR_UNKNOWN; + { + std::lock_guard lock(*(_data_lock.get())); + res = add_request_entry(sub_id, delete_request_pending); + if(!res){ + mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) ); + return SUBSCR_ERR_DUPLICATE; + } } std::unique_lock _local_lock(*(_data_lock.get())); @@ -271,8 +315,8 @@ int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su res = tx(TxCode, buf_len, buffer); if (!res){ - delete_request_entry(he.get_request_id()); - mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id()); + delete_request_entry(sub_id); + mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); return SUBSCR_ERR_TX; }; @@ -287,9 +331,9 @@ int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su _cv.get()->wait_for(_local_lock, _time_out); // check status and return appropriate object - int status = get_request_status(he.get_request_id()); + int status = get_request_status(sub_id); if (status == delete_request_success){ - mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id()); + mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_SUCCESS; break; } @@ -300,36 +344,37 @@ int subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su std::chrono::duration f = end - start; if (f > _num_retries * _time_out){ - mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id()); + mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_TIMEOUT; break; } else{ - mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id()); + mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); } continue; } if(status == delete_request_failed){ - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %d got failure response .. \n", __FILE__, __LINE__, he.get_request_id()); + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); res = SUBSCR_ERR_FAIL; break; } + // if we are here, some spurious // status obtained. we return false - mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status); + mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status); res = SUBSCR_ERR_UNKNOWN; break; }; - delete_request_entry(he.get_request_id()); + delete_request_entry(sub_id); // release data lock _local_lock.unlock(); - std::cout <<"Returning res = " << res << " for " << he.get_request_id() << std::endl; + std::cout <<"Returning res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl; return res; }; diff --git a/src/Makefile b/src/Makefile index 7246061..828db9d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -6,7 +6,7 @@ E2AP_c:=./E2AP-c E2SM := ./E2SM X2AP:=./X2AP SUBSCR:=$(E2AP_c)/subscription -JSON := ./json + PLUGININTERFACE :=./plugin-interface CURL := ./curl PLUGIN:=./protector-plugin @@ -23,7 +23,7 @@ BASEFLAGS= -Wall -std=c++14 $(CLOGFLAGS) XAPPFLAGS= -I./ ASN1C_FLAGS=-I$(ASN1C_DEFS) -DASN_DISABLE_OER_SUPPORT SUBSCRFLAGS= -I$(SUBSCR) -JSONFLAGS= -I$(JSON) + X2FLAGS= -I$(X2AP) PLUGINFLAGS = -I$(PLUGIN) PLUGININTERFACEFLAGS = -I$(PLUGININTERFACE) @@ -38,10 +38,11 @@ COV_FLAGS= -fprofile-arcs -ftest-coverage XAPP_SRC= adm-ctrl-xapp.cc \ xapp_utils.cc \ message_processor_class.cc \ - get_config.cc + get_config.cc \ + admission_init_routines.cc CURL_SRC=$(wildcard $(CURL)/*.cc) -JSON_SRC=$(wildcard $(JSON)/*.cc) + SUBSCR_SRC= $(wildcard $(SUBSCR)/*.cc) X2AP_SRC = $(wildcard $(X2AP)/*.cc) PLUGINS_SRC = $(wildcard $(PLUGIN)/*.cc) @@ -51,7 +52,7 @@ E2SM_SRC = $(wildcard $(E2SM)/*.cc) ASN1C_SRC= $(wildcard $(ASN1C_DEFS)/*.c) CURL_OBJ = $(CURL_SRC:.cc=.o) -JSON_OBJ = $(JSON_SRC:.cc=.o) + XAPP_OBJ= ${XAPP_SRC:.cc=.o} SUBSCR_OBJ= ${SUBSCR_SRC:.cc=.o} X2AP_OBJ= ${X2AP_SRC:.cc=.o} @@ -63,16 +64,16 @@ ASN1C_MODULES = $(ASN1C_SRC:.c=.o) $(ASN1C_MODULES): export CFLAGS = $(C_BASEFLAGS) $(ASN1C_FLAGS) $(CURL_OBJ):export CPPFLAGS=$(BASEFLAGS) $(CURLFLAGS) -$(JSON_OBJ):export CPPFLAGS= $(BASEFLAGS) $(JSONFLAGS) -$(PLUGININTERFACE_OBJ):export CPPFLAGS=$(BASEFLAGS) $(PLUGININTERFACEFLAGS) $(JSONFLAGS) + +$(PLUGININTERFACE_OBJ):export CPPFLAGS=$(BASEFLAGS) $(PLUGININTERFACEFLAGS) $(SUBSCR_OBJ): export CPPFLAGS=$(BASEFLAGS) $(ASN1C_FLAGS) $(E2AP_C_FLAGS) $(SUBSCRFLAGS) $(E2AP_C_OBJ): export CPPFLAGS = $(BASEFLAGS) $(ASN1C_FLAGS) $(E2AP_C_FLAGS) $(E2SM_OBJ): export CPPFLAGS = $(BASEFLAGS) $(ASN1C_FLAGS) $(E2SM_FLAGS) $(X2AP_OBJ): export CPPFLAGS = $(BASEFLAGS) $(ASN1C_FLAGS) $(X2FLAGS) -$(PLUGINS_OBJ): export CPPFLAGS = $(BASEFLAGS) $(ASN1C_FLAGS) $(X2FLAGS) $(PLUGINFLAGS) $(PLUGININTERFACEFLAGS) $(JSONFLAGS) -$(XAPP_OBJ):export CPPFLAGS=$(BASEFLAGS) $(JSONFLAGS) $(SUBSCRFLAGS) $(PLUGINFLAGS) $(CURLFLAGS) $(E2AP_C_FLAGS) $(ASN1C_FLAGS) $(E2SM_FLAGS) $(X2FLAGS) $(XAPPFLAGS) $(PLUGININTERFACEFLAGS) +$(PLUGINS_OBJ): export CPPFLAGS = $(BASEFLAGS) $(ASN1C_FLAGS) $(X2FLAGS) $(PLUGINFLAGS) $(PLUGININTERFACEFLAGS) +$(XAPP_OBJ):export CPPFLAGS=$(BASEFLAGS) $(SUBSCRFLAGS) $(PLUGINFLAGS) $(CURLFLAGS) $(E2AP_C_FLAGS) $(ASN1C_FLAGS) $(E2SM_FLAGS) $(X2FLAGS) $(XAPPFLAGS) $(PLUGININTERFACEFLAGS) -OBJ= $(SUBSCR_OBJ) $(XAPP_OBJ) $(JSON_OBJ) $(PLUGINS_OBJ) $(CURL_OBJ) $(E2AP_C_OBJ) $(ASN1C_MODULES) $(E2SM_OBJ) $(X2AP_OBJ) $(PLUGININTERFACE_OBJ) +OBJ= $(SUBSCR_OBJ) $(XAPP_OBJ) $(PLUGINS_OBJ) $(CURL_OBJ) $(E2AP_C_OBJ) $(ASN1C_MODULES) $(E2SM_OBJ) $(X2AP_OBJ) $(PLUGININTERFACE_OBJ) adm-ctrl-xapp: $(OBJ) $(CXX) -o $@ $(OBJ) $(LIBS) $(CPPFLAGS) $(CLOGFLAGS) @@ -81,4 +82,4 @@ install: adm-ctrl-xapp install -D adm-ctrl-xapp /usr/local/bin/adm-ctrl-xapp clean: - -rm *.o $(JSON)/*.o $(E2AP_c)/*.o $(SUBSCR)/*.o $(PLUGIN)/*.o $(E2SM)/*.o $(X2AP)/*.o $(CURL)/*.o $(PLUGININTERFACE)/*.o adm-ctrl-xapp + -rm *.o $(E2AP_c)/*.o $(SUBSCR)/*.o $(PLUGIN)/*.o $(E2SM)/*.o $(X2AP)/*.o $(CURL)/*.o $(PLUGININTERFACE)/*.o adm-ctrl-xapp diff --git a/src/README.md b/src/README.md index 3630d6f..8c30d4d 100644 --- a/src/README.md +++ b/src/README.md @@ -73,6 +73,7 @@ A sample script ./run_xapp.sh is provided to illustrate and execute the xAPP -u : URL path for VES collector -g : comma separated list of gNodeBs to send subscription request to -i : time interval (in seconds) at which to post metrics to the VES collector +-x : xapp-id string used to identify in A1 work flows -c : operation model (CONTROL, REPORT, E2AP_PROC_ONLY) * in the E2AP_PROC_ONLY mode, the Admission Control xAPP will simply process only the E2AP Indication part of the RMR message. It does not process the X2 portion of the message or invoke the sliding window algorithm or send a response back. This is useful to just test basic functionality without a valid X2 message. @@ -162,4 +163,4 @@ Other Components - get_config.cc : processing command line and environment variables at start up. - - message_processor_class.cc : The RMR message processing engine. It listens for messages on RMR and invokes appropriate handler (e.g subscriptions, E2AP, Policy). + - message_processor_class.cc : The RMR message processing engine. It listens for messages on RMR and invokes appropriate handler (e.g subscriptions, E2AP, Policy). It also periodically posts processing latency metrics to stdout diff --git a/src/X2AP/sgnb_addition_request.cc b/src/X2AP/sgnb_addition_request.cc index 20d4168..004f1ab 100644 --- a/src/X2AP/sgnb_addition_request.cc +++ b/src/X2AP/sgnb_addition_request.cc @@ -479,7 +479,7 @@ bool sgnb_addition_request::get_fields(X2N_InitiatingMessage_t * init_msg, sgnb_ break; case (X2N_ProtocolIE_ID_id_SubscriberProfileIDforRFP): - //sgnb->subscriberprofileidforrfp = memb_ptr->value.choice.SubscriberProfileIDforRFP; + dout.subscriber_profile_id = memb_ptr->value.choice.SubscriberProfileIDforRFP; break; case (X2N_ProtocolIE_ID_id_MeNBCell_ID): diff --git a/src/adm-ctrl-xapp.cc b/src/adm-ctrl-xapp.cc index f7ddb30..387ca1d 100644 --- a/src/adm-ctrl-xapp.cc +++ b/src/adm-ctrl-xapp.cc @@ -22,33 +22,22 @@ #include "adm-ctrl-xapp.hpp" +int run_program = 1; + -static int RunProg = 1; // keep loop running // list of plugins -typedef std::vector > plugin_list; plugin_list Plugins; std::map plugin_rmr_map; +// Policy handler : All plugins (of abstract class Policy) are registered with the plugin list. +// The policy handler is registered as a call back to the RMR message handler. When a policy related RMR message +// is received, this function is invoked. It finds the appropriate plugin from the plugin list, passes the policy message and then +// returns back the response (in the response string) -int add_subscription(subscription_handler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){ - unsigned char node_buffer[32]; - std::copy(gNodeB.begin(), gNodeB.end(), node_buffer); - node_buffer[gNodeB.length()] = '\0'; - int res = sub_handler.RequestSubscription(he, he_resp, RIC_SUB_REQ, std::bind(static_cast( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer)); - return res; -}; - - -int delete_subscription(subscription_handler & sub_handler, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){ - unsigned char node_buffer[32]; - std::copy(gNodeB.begin(), gNodeB.end(), node_buffer); - node_buffer[gNodeB.length()] = '\0'; - - int res = sub_handler.RequestSubscriptionDelete(he, he_resp, RIC_SUB_DEL_REQ, std::bind(static_cast( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer)); - return res; -}; - +// NOTE : This version of policy handler was written with R1 policy protocol in mind. +// Specifically, it was assumed that each message type corresponds to either set/get for a specific xAPP. +// It still works with R2, but ideally should be modified since a single policy type could be applied to multiple plugins. void policy_handler(int message_type, const char * message, int message_len, std::string & response, bool set){ auto it = plugin_rmr_map.find(message_type); @@ -77,29 +66,37 @@ void policy_handler(int message_type, const char * message, int message_len, st } else{ response = "{\"status\":\"FAIL\", \"message\":\"Could not find plugin associated with RMR message type = " + std::to_string(message_type) + "\"}"; + mdclog_write(MDCLOG_ERR, "Error ! %s, %d : %s\n", __FILE__, __LINE__, response.c_str()); } }; + +// polling function that routinely queries all plugins for metrics and then posts them on +// VES url void metrics_collector(std::string ves_url, plugin_list * plugins, unsigned int interval){ // Instantiate the ves collector curl_interface curl_obj(ves_url); - std::string metrics_response; + std::vector response_vector; int res; - while(RunProg){ + while(run_program){ for(unsigned int i = 0; i < plugins->size(); i++){ - res = (*plugins)[i].get()->getMetrics(metrics_response); + response_vector.clear(); + res = (*plugins)[i].get()->getMetrics(response_vector); if (res != 0){ - mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), metrics_response.c_str()); + mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d: could not get metrics from plugin %s. Reason = %s", __FILE__, __LINE__, (*plugins)[i].get()->getName().c_str(), (*plugins)[i].get()->get_error().c_str()); } else{ - res = curl_obj.post_metrics(metrics_response); - if (!res){ - mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str()); - } - else{ - mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", metrics_response.c_str()); + // send each response + for(auto &e: response_vector){ + res = curl_obj.post_metrics(e); + if (!res){ + mdclog_write(MDCLOG_WARN, "VES :: Warning : %s, %d , could not post metrics to %s. Reason = %s\n", __FILE__, __LINE__, ves_url.c_str(), curl_obj.getError().c_str()); + } + else{ + mdclog_write(MDCLOG_INFO, "VES :: Successfully posted metrics %s to VES collector\n", e.c_str()); + } } } } @@ -111,9 +108,10 @@ void metrics_collector(std::string ves_url, plugin_list * plugins, unsigned int void EndProgram(int signum){ std::cout <<"Signal received. Stopping program ....." << std::endl; - RunProg = 0; + run_program = 0; + } - +// ideally should be expanded for rollback purposes etc. void msg_error(rmr_mbuf_t *message){ mdclog_write(MDCLOG_ERR, "Error: %s, %d Could not send RMR message of length %d and type %d, Reason %s", __FILE__, __LINE__, message->len, message->mtype, strerror(errno) ); }; @@ -144,8 +142,8 @@ int main(int argc, char *argv[]){ mdclog_write(MDCLOG_WARN, "WARNING : gNodeB not set for subscription. Subscription MAY FAIL"); } - std::string operating_mode; + // How are we operating ? if (my_config.operating_mode == "CONTROL"){ // Full closed loop : process E2AP indication, @@ -173,35 +171,25 @@ int main(int argc, char *argv[]){ // Finished passing command line/environment arguments //============================================================= - // instantiate xapp object - if(my_config.num_threads >= 1){ - mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads); - // Create XaPP that starts with specified number of threads - my_xapp = std::make_unique(my_config.name, my_config.port, 1024, my_config.num_threads); - } - else{ - mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto"); - //Let XaPP pick threads based on hardware - my_xapp = std::make_unique(my_config.name, my_config.port, 1024); - } - - + // instantiate xapp-rmr-framework object + mdclog_write(MDCLOG_INFO, "XaPP listener threads specified = %d", my_config.num_threads); mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", my_config.name); mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", my_config.port); + my_xapp = std::make_unique(my_config.name, my_config.port, 1024); - // Instantiate admission logic handler - Plugins.emplace_back(std::make_unique(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, 1, my_config.report_mode_only)); - // Add reference to plugin list . We add twice (once for set policy and once for get policy ids) - // Plugin list is used by policy handler and metrics collector - plugin_rmr_map.insert(std::pair(DC_ADM_INT_CONTROL, Plugins[0].get())); - plugin_rmr_map.insert(std::pair(DC_ADM_GET_POLICY, Plugins[0].get())); + // Instantiate admission logic handler (with only one instance for now) + int num_instances = 1; + Plugins.emplace_back(std::make_unique(my_config.a1_schema_file, my_config.sample_file, my_config.ves_schema_file, num_instances, my_config.xapp_id, my_config.report_mode_only)); + + // Add reference to plugin list . + // Plugin list is used by policy handler and metrics collector + plugin_rmr_map.insert(std::pair(A1_POLICY_REQ, Plugins[0].get())); - // instantiate curl object for ves + // instantiate ves thread (it polls all plugins and sends out their metrics) std::thread metrics_thread(metrics_collector, my_config.ves_collector_url, &Plugins, my_config.measurement_interval); - // Instantiate subscription handler subscription_handler sub_handler; @@ -219,7 +207,7 @@ int main(int argc, char *argv[]){ } - // Start the listening loops + // Start the RMR listening loops std::vector thread_ids(my_config.num_threads); unsigned int i = 0; for(auto &e: message_procs){ @@ -230,120 +218,28 @@ int main(int argc, char *argv[]){ mdclog_write(MDCLOG_INFO, "xAPP is UP and Listening on RMR. ...\n"); mdclog_write(MDCLOG_INFO, "Number of message processors = %lu", message_procs.size()); - //====================================================== - // sgnb Subscription spec - - int request_id = 2; // will be over-written by subscription handler - int req_seq = 1; - int function_id = 0; - int action_id = 4; - int action_type = my_config.report_mode_only ? 0:1; - - int message_type = 1; - int procedure_code = 27; - std::string egnb_id = "Testgnb"; - std::string plmn_id = "Testplmn"; - - unsigned char event_buf[128]; - size_t event_buf_len = 128; - bool res; + //Register signal handler to stop + signal(SIGINT, EndProgram); + signal(SIGTERM, EndProgram); + // Instantiate startup/shutown subroutine objects + init boot_shutdown((*my_xapp), sub_handler, my_config); - e2sm_event_trigger_helper trigger_data; - e2sm_event_trigger event_trigger; - - trigger_data.egNB_id = egnb_id; - trigger_data.plmn_id = plmn_id; - trigger_data.egNB_id_type = 2; - trigger_data.interface_direction = 1; - trigger_data.procedure_code = procedure_code; - trigger_data.message_type = message_type; - res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data); - if (!res){ - mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str()); - exit(0); - } - - subscription_helper sgnb_add_subscr_req; - subscription_response_helper subscr_response; - - sgnb_add_subscr_req.clear(); - sgnb_add_subscr_req.set_request(request_id, req_seq); - sgnb_add_subscr_req.set_function_id(function_id); - sgnb_add_subscr_req.add_action(action_id, action_type); - - - sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len); - mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len); - - //====================================================== - // Purely for testing purposes ... write subscription ASN binary to file - // FILE *pfile; - // pfile = fopen("event_trigger.pr", "wb"); - // fwrite(event_buf, 1, event_buf_len, pfile); - // fclose(pfile); - //====================================================== - - - // keep sending subscription request till successfull for all gnodebs or exceed max attempts ? - // or exceed max_iterations - auto it = my_config.gNodeB_list.begin(); - int loop = 0; - int num_nodes = my_config.gNodeB_list.size(); - - while((loop < num_nodes * my_config.max_sub_loops) && my_config.gNodeB_list.size() > 0 && RunProg){ - int attempt = 0; - int subscr_result = -1; - - while(1){ - mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", (*it).c_str(), attempt); - subscr_result = add_subscription(sub_handler, my_xapp.get(), sgnb_add_subscr_req, subscr_response, *it); - if (subscr_result == SUBSCR_SUCCESS){ - break; - } - sleep(5); - attempt ++; - if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){ - break; - } - } - - if(subscr_result == SUBSCR_SUCCESS){ - mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (*it).c_str()); - // remove node from list, - // move to next gnobde - it = my_config.gNodeB_list.erase(it); - } - - if (it == my_config.gNodeB_list.end()){ - it = my_config.gNodeB_list.begin(); - } - - loop++; - - } - - if (my_config.gNodeB_list.size() == 0){ - std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl; - } - else{ - std::cerr <<"SUBSCRIPTION REQUEST :: Failed to subscribe for following gNodeBs" << std::endl; - for(const auto &e: my_config.gNodeB_list){ - std::cerr <<"Failed to subscribe for gNodeB " << e << std::endl; - } - } - - //Register signal handler to stop - signal(SIGINT, EndProgram); - signal(SIGTERM, EndProgram); + // Trigger start functions + boot_shutdown.startup(); //Wait for stop - while(RunProg){ + while(run_program){ sleep(10); } - + + + // we are in shutdown mode + // send out subscription deletes + boot_shutdown.shutdown(); + i = 0; for(auto &e: message_procs){ mdclog_write(MDCLOG_INFO, "Thread %d : Number of packets handled = %lu", thread_ids[i], e.get()->get_messages()); diff --git a/src/adm-ctrl-xapp.hpp b/src/adm-ctrl-xapp.hpp index 8faf2dd..5f9dc33 100644 --- a/src/adm-ctrl-xapp.hpp +++ b/src/adm-ctrl-xapp.hpp @@ -43,10 +43,16 @@ #define DEFAULT_VES_SCHEMA_FILE "/etc/xapp/ves-schema.json" #define DEFAULT_SAMPLE_FILE "/etc/xapp/samples.json" #define DEFAULT_VES_COLLECTOR_URL "127.0.0.1:6350" +#define DEFAULT_XAPP_ID "ac-xapp-123" #define DEFAULT_VES_MEASUREMENT_INTERVAL 10 #define MAX_SUBSCRIPTION_ATTEMPTS 10 //================================================ +// convenient typedef for the list of plugins to be loaded +// currently only the admission control plugin. any plugin +// should be inheriting from the Policy abstract class +typedef std::vector > plugin_list; + // configuration parameters struct configuration { @@ -67,6 +73,7 @@ struct configuration { bool report_mode_only = true; std::string operating_mode = "REPORT"; int max_sub_loops = 2; + std::string xapp_id; void fill_gnodeb_list(char * gNodeB_string){ gNodeB_list.clear(); char * gnb = strtok(gNodeB_string, ","); @@ -81,9 +88,28 @@ struct configuration { }; +// class that handles startup and shutdown operations +class init { +public: + init(XaPP & , subscription_handler & , configuration &); + void startup(void); + void shutdown(void); + +private: + void startup_subscribe_requests(void ); + void shutdown_subscribe_deletes(void); + void startup_get_policies(void ); + + subscription_handler * sub_handler_ref; + XaPP * xapp_ref; + configuration * config_ref; +}; + void usage(char *command); void get_environment_config(configuration & config_instance); void get_command_line_config(int argc, char **argv, configuration &config_instance); + +extern int run_program; extern bool report_mode_only; #endif diff --git a/src/admission_init_routines.cc b/src/admission_init_routines.cc new file mode 100644 index 0000000..4958343 --- /dev/null +++ b/src/admission_init_routines.cc @@ -0,0 +1,255 @@ +/* +================================================================================== + + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ +/* Author : Ashwin Sridharan + Date : Dec 2019 +*/ + + +/* + holds all functionality related to message exchange upon startup + - subscription requests + - policy requests + + NOTE : This module only sends out requests. Responses are assumed to be + handled on RMR listening threads that are expected to already running in + main + +*/ + +#include "adm-ctrl-xapp.hpp" + + +// function to call to add subscriptions +// Note 1 : it is synchronous. will block till it succeeds or fails +// Note 2: we bind and pass the xapp tx function to separate out RMR from subscription process + +int add_subscription(subscription_handler *sub_handler_ref, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){ + unsigned char node_buffer[32]; + std::copy(gNodeB.begin(), gNodeB.end(), node_buffer); + node_buffer[gNodeB.length()] = '\0'; + + int res = sub_handler_ref->request_subscription(he, he_resp, gNodeB, RIC_SUB_REQ, std::bind(static_cast( &XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer, link_types::HIGH_RELIABILITY, tx_types::ROUTE)); + return res; +}; + + +// function to call to delete subscription +// Note 1 : it is synchronous. will block till it succeeds or fails +// Note 2: we bind and pass the xapp tx function to separate out RMR from subscription process + +int delete_subscription(subscription_handler *sub_handler_ref, XaPP * xapp_ref, subscription_helper & he, subscription_response_helper he_resp, std::string & gNodeB){ + unsigned char node_buffer[32]; + std::copy(gNodeB.begin(), gNodeB.end(), node_buffer); + node_buffer[gNodeB.length()] = '\0'; + + int res = sub_handler_ref->request_subscription_delete(he, he_resp, gNodeB, RIC_SUB_DEL_REQ, std::bind(static_cast(&XaPP::Send), xapp_ref, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, node_buffer, link_types::HIGH_RELIABILITY, tx_types::ROUTE)); + return res; +}; + + + +init::init (XaPP & xapp, subscription_handler & sub_handler, configuration & my_config){ + xapp_ref = &xapp; + sub_handler_ref = &sub_handler; + config_ref = &my_config; +} + + +// Main handle to subscribe to requests +// AC xAPP basically subscribes to just one subscription (SgNB Addition Request), but can be extended to subscribe to +// multiple too. +void init::startup_subscribe_requests(void ){ + + //====================================================== + // sgnb Subscription spec + + int request_id = 2; // will be over-written by subscription handler + int req_seq = 1; + int function_id = 0; + int action_id = 4; + int action_type = config_ref->report_mode_only ? 0:1; + + int message_type = 1; + int procedure_code = 27; + std::string egnb_id = "Testgnb"; + std::string plmn_id = "Testplmn"; + + unsigned char event_buf[128]; + size_t event_buf_len = 128; + bool res; + + + e2sm_event_trigger_helper trigger_data; + e2sm_event_trigger event_trigger; + + trigger_data.egNB_id = egnb_id; + trigger_data.plmn_id = plmn_id; + trigger_data.egNB_id_type = 2; + trigger_data.interface_direction = 1; + trigger_data.procedure_code = procedure_code; + trigger_data.message_type = message_type; + //====================================================== + + // Encode the event trigger definition + res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data); + if (!res){ + mdclog_write(MDCLOG_ERR, "Error : %s, %d: Could not encode subscription Request. Reason = %s\n", __FILE__, __LINE__, event_trigger.get_error().c_str()); + exit(0); + } + mdclog_write(MDCLOG_INFO, "Encoded event trigger definition into PDU of size %lu bytes\n", event_buf_len); + + // create the subscription + subscription_helper sgnb_add_subscr_req; + subscription_response_helper subscr_response; + + sgnb_add_subscr_req.clear(); + sgnb_add_subscr_req.set_request(request_id, req_seq); + sgnb_add_subscr_req.set_function_id(function_id); + sgnb_add_subscr_req.add_action(action_id, action_type); + + sgnb_add_subscr_req.set_event_def(&event_buf[0], event_buf_len); + + + //====================================================== + // Purely for testing purposes ... write subscription ASN binary to file + // FILE *pfile; + // pfile = fopen("event_trigger.pr", "wb"); + // fwrite(event_buf, 1, event_buf_len, pfile); + // fclose(pfile); + //====================================================== + + + // for each gNodeB, try MAX_SUBSCRIPTION_ATTEMPTS + // record gNodeBs for which we could not subscribe. + // note that there could be multiple subscriptions for each gNodeB. + // for AC xAPP we are doing just one ... + std::vector failed_gNodeBs; + + for(auto &it: config_ref->gNodeB_list){ + int attempt = 0; + int subscr_result = -1; + + while(1){ + + if(!run_program){ + std::cout <<"Shutdown signal received during subscription process. Quitting ....." << std::endl; + break; + } + + mdclog_write(MDCLOG_INFO, "Sending subscription request for %s ... Attempt number = %d\n", it.c_str(), attempt); + subscr_result = add_subscription(sub_handler_ref, xapp_ref, sgnb_add_subscr_req, subscr_response, it); + if (subscr_result == SUBSCR_SUCCESS){ + break; + } + sleep(5); + attempt ++; + if (attempt > MAX_SUBSCRIPTION_ATTEMPTS){ + break; + } + } + + if(subscr_result == SUBSCR_SUCCESS){ + mdclog_write(MDCLOG_INFO, "Successfully subscribed for gNodeB %s", (it).c_str()); + } + else{ + failed_gNodeBs.push_back(it); + } + } + + if (failed_gNodeBs.size() == 0){ + std::cout <<"SUBSCRIPTION REQUEST :: Successfully subscribed to events for all gNodeBs " << std::endl; + } + else{ + std::cerr <<"SUBSCRIPTION REQUEST :: Failed to subscribe for following gNodeBs" << std::endl; + for(const auto &e: failed_gNodeBs){ + std::cerr <<"Failed to subscribe for gNodeB " << e << std::endl; + } + } + + +} + + +// Main handle to delete subscription requests +// Called upon shutdown +void init::shutdown_subscribe_deletes(){ + std::vector sub_ids; + + subscription_helper sub_helper; + subscription_response_helper subscr_response; + + // get list of subscriptions + sub_handler_ref->get_subscription_keys(sub_ids); + + // send delete for each one .. + // this is synchronous, hence will block ... + for(auto & id: sub_ids){ + std::string gnodeb_id = std::get<0>(id); + subscription_response_helper * sub_info = sub_handler_ref->get_subscription(id); + int subscr_result = -1; + if(sub_info != NULL){ + sub_helper.set_request(0, 0); // requirement of subscription manager ... ? + sub_helper.set_function_id(sub_info->get_function_id()); + mdclog_write(MDCLOG_INFO, "Sending subscription delete for gNodeB %s\n", gnodeb_id.c_str()); + subscr_result = delete_subscription(sub_handler_ref, xapp_ref, sub_helper, subscr_response, gnodeb_id); + if(subscr_result == SUBSCR_SUCCESS){ + mdclog_write(MDCLOG_INFO, "Successfully deleted subscription for %s, %d\n", gnodeb_id.c_str(), sub_helper.get_function_id()); + } + else{ + mdclog_write(MDCLOG_ERR, "Error : %s, %d. Could not delete subcription for %s, %d. Reason = %d\n", __FILE__, __LINE__, gnodeb_id.c_str(), sub_helper.get_function_id(), subscr_result); + } + } + else{ + mdclog_write(MDCLOG_ERR, "Error : %s, %d. Could not get subscription for %s, %d\n", __FILE__, __LINE__, std::get<0>(id).c_str(), std::get<1>(id)); + } + } + +} + + +//Request policies on start up +// This is async : once query is sent. responses from A1 are handled on RMR threads +void init::startup_get_policies(void){ + + int policy_id = 21000; + + // we simply create json from scratch for now since it is quite simple + std::string policy_query = "{\"policy_id\":" + std::to_string(policy_id) + "}"; + unsigned char * message = (unsigned char *)calloc(policy_query.length(), sizeof(unsigned char)); + memcpy(message, policy_query.c_str(), policy_query.length()); + mdclog_write(MDCLOG_INFO, "Sending request for policy id %d\n", policy_id); + xapp_ref->Send(A1_POLICY_QUERY, policy_query.length(), message, link_types::HIGH_RELIABILITY); + free(message); + +} + + +// start up subroutines go hear +void init::startup(void){ + startup_subscribe_requests(); + startup_get_policies(); + +} + +// shutdown subroutines go here +void init::shutdown(void ){ + std::cout <<"Initiating shutdown subroutines ..." << std::endl; + shutdown_subscribe_deletes(); +} + diff --git a/src/get_config.cc b/src/get_config.cc index c0733c3..f711bd3 100644 --- a/src/get_config.cc +++ b/src/get_config.cc @@ -109,6 +109,11 @@ void get_environment_config(configuration & config_instance){ mdclog_write(MDCLOG_INFO, "xAPP set to Test Mode state %d from Environment Variable", config_instance.test_mode); } + if (const char *id = std::getenv("XAPP_ID")){ + config_instance.xapp_id.assign(id); + mdclog_write(MDCLOG_INFO, "xAPP ID set to Test Mode state %d from Environment Variable", config_instance.xapp_id); + } + if (const char *log_env = std::getenv("LOG_LEVEL")){ if (!strcmp(log_env, "MDCLOG_INFO")){ config_instance.log_level = MDCLOG_INFO; @@ -146,6 +151,7 @@ void get_command_line_config(int argc, char **argv, configuration &config_instan {"interval", required_argument, 0, 'i'}, {"gNodeB", required_argument, 0, 'g'}, {"opmode", required_argument, 0, 'c'}, + {"xappid", required_argument, 0, 'x'}, {"verbose", no_argument, &config_instance.log_level, MDCLOG_DEBUG}, {"test", no_argument, &config_instance.test_mode, 1}, @@ -155,7 +161,7 @@ void get_command_line_config(int argc, char **argv, configuration &config_instan while(1) { int option_index = 0; - char c = getopt_long(argc, argv, "n:p:t:s:g:a:v:u:i:c:", long_options, &option_index); + char c = getopt_long(argc, argv, "n:p:t:s:g:a:v:u:i:c:x:", long_options, &option_index); if(c == -1){ break; @@ -207,6 +213,11 @@ void get_command_line_config(int argc, char **argv, configuration &config_instan mdclog_write(MDCLOG_INFO, "VES collector url set to %s from command line ", config_instance.ves_collector_url.c_str()); break; + case 'x': + config_instance.xapp_id.assign(optarg); + mdclog_write(MDCLOG_INFO, "XAPP ID set to %s from command line ", config_instance.xapp_id.c_str()); + break; + case 'i': config_instance.measurement_interval = atoi(optarg); if (config_instance.measurement_interval < 1 || config_instance.measurement_interval > MAX_SLEEP){ diff --git a/src/json/json_handler.cc b/src/json/json_handler.cc deleted file mode 100644 index a14b341..0000000 --- a/src/json/json_handler.cc +++ /dev/null @@ -1,686 +0,0 @@ -/* -================================================================================== - - Copyright (c) 2018-2019 AT&T Intellectual Property. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -================================================================================== -*/ -/* - Author : Ashwin Sridharan - - -*/ - -#include -#include - -TrieNode::TrieNode(int val): val_type(-1){ - _id.tag = DataContainer::Types::integer; - _id.value.i = val; -} - -TrieNode::TrieNode( std::string val) : val_type(-1){ - _id.tag = DataContainer::Types::str; - _id.value.s.assign(val); -} - -void TrieNode::set_value(const char * val){ - _val.tag = DataContainer::Types::str; - _val.value.s.assign(val); -} - - -void TrieNode::set_value(bool val){ - _val.tag = DataContainer::Types::boolean; - _val.value.b = val; - -} - -void TrieNode::set_value(int val){ - _val.tag = DataContainer::Types::integer; - _val.value.i = val; - //std::cout <<"Assigned integer " << val << std::endl; - - -} - -void TrieNode::set_value(unsigned int val){ - _val.tag = DataContainer::Types::uinteger; - _val.value.ui = val; - -} - -void TrieNode::set_value( long int val){ - _val.tag = DataContainer::Types::big_integer; - _val.value.l = val; -} - -void TrieNode::set_value(unsigned long int val){ - _val.tag = DataContainer::Types::ubig_integer; - _val.value.ul = val; -} - -void TrieNode::set_value(double val){ - _val.tag = DataContainer::Types::real; - _val.value.f = val; - -} - - -void TrieNode::set_value(std::string val){ - _val.tag = DataContainer::Types::str; - _val.value.s.assign(val); -} - -void TrieNode::set_value(const char * c, size_t len){ - _val.tag = DataContainer::Types::str; - _val.value.s.assign(c, len); -} - -void TrieNode::add_child(TrieNode * node){ - _children.push_back(node); -}; - - - -void TrieNode::print_id(void){ - switch(_id.tag){ - - case DataContainer::Types::integer : - std::cout <<"Type = " << _id.tag << " Value = " << _id.value.i << std::endl; - break; - case DataContainer::Types::str : - std::cout <<"Type = " << _id.tag << " Value = " << _id.value.s << std::endl; - break; - default: - std::cerr<< "Error ! ID not set or unknown type " << _id.tag; - } -}; - -void TrieNode::print_value(void){ - switch(_val.tag){ - - case DataContainer::Types::boolean : - std::cout <<"Type = " << _val.tag << " Value = " << _val.value.b << std::endl; - break; - case DataContainer::Types::integer : - std::cout <<"Type = " << _val.tag << " Value = " << _val.value.i << std::endl; - break; - case DataContainer::Types::uinteger : - std::cout <<"Type = " << _val.tag << " Value = " << _val.value.ui << std::endl; - break; - case DataContainer::Types::big_integer : - std::cout <<"Type = " << _val.tag << " Value = " << _val.value.l << std::endl; - break; - case DataContainer::Types::ubig_integer : - std::cout <<"Type = " << _val.tag << " Value = " << _val.value.ul << std::endl; - break; - case DataContainer::Types::real : - std::cout <<"Type = " << _val.tag << " Value = " << _val.value.f << std::endl; - break; - case DataContainer::Types::str : - std::cout <<"Type = " << _val.tag << " Value = " << _val.value.s << std::endl; - break; - default: - std::cerr<< "Error ! Value not set or unknown type " << _val.tag; - } -}; - - -jsonHandler::jsonHandler(void):_is_root(false), _is_schema(false), _is_buffer(false){ - -}; - - - - -void jsonHandler::load_file(std::string input_file, std::string & contents ){ - - std::FILE *fp ; - try{ - fp = std::fopen(input_file.c_str(), "rb"); - } - catch(std::exception &e){ - std::string error_string = "Error opening input schema file " + input_file; - throw std::runtime_error(error_string); - } - - if (fp){ - std::fseek(fp, 0, SEEK_END); - contents.resize(std::ftell(fp)); - std::rewind(fp); - std::fread(&contents[0], 1, contents.size(), fp); - std::fclose(fp); - } - - else{ - std::string error_string = "Error opening input file " + input_file; - throw std::runtime_error(error_string); - } - -} - - - -void jsonHandler::load_schema(std::string input_file){ - - load_file(input_file, _contents); - Document _doc; - if (_doc.Parse(_contents.c_str()).HasParseError()){ - std::string error_string = input_file + " is invalid JSON" ; - throw std::runtime_error(error_string); - } - - _ref_schema_doc= std::make_unique(_doc); - _is_schema = true; - - -} - -void jsonHandler::load_schema(std::string input_file, TrieNode * root){ - - load_file(input_file, _contents); - std::string response; - Document _doc; - if (_doc.Parse(_contents.c_str()).HasParseError()){ - std::string error_string = input_file + " is invalid JSON" ; - throw std::runtime_error(error_string); - } - - // Get message schema - bool res; - Value _schema_root; - Value &_root = _doc; - res = find_member(_root, response, root, _schema_root); - if (res == false){ - throw std::runtime_error(response); - } - - _ref_schema_doc= std::make_unique(_schema_root); - _is_schema = true; - -} - -void jsonHandler::load_buffer(std::string input_file){ - - load_file(input_file, _buffer); - Document _doc; - if (_doc.Parse(_buffer.c_str()).HasParseError()){ - std::string error_string = input_file + " is invalid JSON" ; - throw std::runtime_error(error_string); - } - _is_buffer = true; - -} - -void jsonHandler::load_buffer(std::string input_file, TrieNode * root){ - - load_file(input_file, _buffer); - Document _doc; - std::string response; - - if (_doc.Parse(_buffer.c_str()).HasParseError()){ - std::string error_string = input_file + " is invalid JSON" ; - throw std::runtime_error(error_string); - } - - bool res; - Value _buffer_root; - res = find_member(_doc, response, root, _buffer_root); - if(res == false){ - throw std::runtime_error(response); - } - - StringBuffer out_buffer; - Writer writer(out_buffer); - _buffer_root.Accept(writer); - _buffer.assign(out_buffer.GetString(), out_buffer.GetLength()); - _is_buffer = true; -} - - -std::string jsonHandler::get_buffer(void){ - std::string response; - if (_is_buffer){ - response.assign(_buffer); - } - else{ - response = ""; - } - - return response; -} - - - - - - -bool jsonHandler::find_member(const std::string schema, std::string & response, TrieNode * root, Value & TargetVal){ - - Document doc; - std::string contents(schema); - - if(doc.Parse(contents.c_str()).HasParseError()){ - response.assign("Error Parsing JSON File"); - return false; - } - - return find_member(doc, response, root, TargetVal); - return true; -}; - - -bool jsonHandler::find_member(Value & doc_root, std::string & response, TrieNode * root, Value & TargetVal){ - - if (!root){ - response.assign("Null Trie root node"); - return false; - } - //std::cout <<"LOoking for schema root" << std::endl; - - Value & json_node = doc_root; - TrieNode * trie_node = root; - Value::MemberIterator itr; - - while(1){ - - DataContainer const * d = trie_node->get_id(); - if (! d){ - response.assign("Error could not find any id for trie node "); - return false; - } - - if (d->tag == DataContainer::Types::integer && json_node.IsArray()){ - if (json_node.Size() < d->value.i){ - response.assign("Error json array size "); - response += std::to_string(json_node.Size()) + " is smaller than trie node index " + std::to_string( d->value.i); - return false; - } - - if (trie_node->is_child()){ - response.assign("Error child trie points to an array ? "); - return false; - } - - trie_node = trie_node->get_children()[0]; - json_node = json_node[d->value.i]; - } - else if (d->tag == DataContainer::Types::str && json_node.IsObject()){ - - itr = json_node.FindMember(d->value.s.c_str()); - if (itr == json_node.MemberEnd()){ - response.assign("Error ! Could not find key = "); - response += d->value.s; - return false; - } - if (trie_node->is_child()){ - // reached end of trie - if (itr->value.IsObject()){ - TargetVal = itr->value.GetObject(); - //std::cout <<"Reached root = " << itr->name.GetString() << std::endl; - } - else if (itr->value.IsArray()){ - TargetVal = itr->value.GetArray(); - } - else{ - response.assign("Error ! JSON node selected must be object or array in current version"); - std::cerr << response << std::endl; - return false; - } - break; - } - else{ - trie_node = trie_node->get_children()[0]; - trie_node->print_id(); - - if (itr->value.IsObject()){ - json_node = itr->value.GetObject(); - } - else if (itr->value.IsArray()){ - json_node = itr->value.GetArray(); - } - else{ - std::string error_string= " Path must be an object or array"; - response.assign(error_string); - return false; - } - } - } - else{ - std::string error_string = "Mismatch when setting root : Trie node is of type = " + std::to_string (d->tag) + " and json node is of type = " + std::to_string(json_node.GetType()); - response.assign(error_string); - return false; - } - - } - - return true; - -} - -bool jsonHandler::is_valid(const char *message, int message_length, std::string & response){ - - Document doc; - if (! _is_schema){ - return false; - } - - SchemaValidator validator(*(_ref_schema_doc.get())); - - // ensure message has terminator by translating to string ? - std::string message_s(message, message_length); - - - // validate json - if (doc.Parse(message_s.c_str()).HasParseError()){ - - // return error message - std::string failed_message = "\"message\": \"Invalid JSON\""; - response.assign( failed_message ); - return false; - } - - - // Validate against our JSON input schema - if (!doc.Accept(validator)){ - - StringBuffer sb; - validator.GetInvalidSchemaPointer().StringifyUriFragment(sb); - std::string failed_message = std::string("\"message\": \"Schema Violation:") + std::string(sb.GetString()); - failed_message += std::string(" Invalid keyword :") + std::string(validator.GetInvalidSchemaKeyword()) + " \""; - response.assign(failed_message); - return false; - - } - response.assign("SUCCESS"); - return true; - - -} - - - -// should be thread safe since it can be expected to be called from multiple threads -// only static external variable referenced is the schema (which should be read-only and hence ok ?) - -// Returns 0 if success -// -1 if invalid json -// -2 if invalid schema (assuming schema provided) -// -3 unknown key -// -4 no buffer available - -int jsonHandler::get_values(const char *message, int message_length, std::string & response, TrieNode * root, std::vector & response_list){ - - Document doc; - - // ensure message has terminator by translating to string ? - std::string message_s(message, message_length); - - // validate json - if (doc.Parse(message_s.c_str()).HasParseError()){ - - // return error message - response.assign("Invalid JSON"); - return -1; - } - - // Validate against our JSON input schema - if ( _is_schema){ - SchemaValidator validator(*(_ref_schema_doc.get())); - - if (!doc.Accept(validator)){ - - StringBuffer sb; - validator.GetInvalidSchemaPointer().StringifyUriFragment(sb); - response = std::string("Schema Violation ") + std::string(sb.GetString()) ; - response += std::string(" Invalid keyword = ") + std::string(validator.GetInvalidSchemaKeyword()) + " \""; - return -2; - } - - } - - Value & doc_root = doc; - bool res = traverse_doc(doc_root, root, true, response, response_list); - if (!res){ - return -3; - } - - response.assign("SUCCESS"); - return 0; -} - - -int jsonHandler::get_values( std::string & response, TrieNode * root, std::vector & response_list){ - int res; - if (_is_buffer){ - Document _doc; - _doc.Parse(_buffer.c_str()); - Value & _buffer_root = _doc; - - res = traverse_doc(_buffer_root, root, true, response, response_list); - return res; - } - else{ - response = "Error ! No buffer loaded in json object for get"; - return -4; - } -} - - -// If in get mode, return all values we can get -// If in set mode, return false if we cannot set a value -bool jsonHandler::traverse_doc(Value & json_node, TrieNode * trie_node, bool get, std::string & response, std::vector & response_list ){ - - if (!trie_node){ - response.assign(" Null Trie node "); - return false; - } - - bool res; - - DataContainer const * d = trie_node->get_id(); - if (! d){ - response.assign(" Error could not find any id for trie node"); - return false; - } - - Value::MemberIterator itr; - - - if (d->tag == DataContainer::Types::integer && json_node.IsArray()){ - if (json_node.Size() < d->value.i){ - response = "Error json array size " + std::to_string( json_node.Size()) + " is smaller than trie node index " + std::to_string( d->value.i); - return false; - } - - if (trie_node->is_child()){ - response.assign("Error child trie points to an array ? "); - return false; - } - - for (auto & e: trie_node->get_children()){ - res = traverse_doc(json_node[d->value.i], e, get, response, response_list); - if (!res && ! get){ - // if not in get mode and we hit a not found - // don't go any further, else move to next ... - return res; - } - } - } - - else if (d->tag == DataContainer::Types::str && json_node.IsObject()){ - itr = json_node.FindMember(d->value.s.c_str()); - - if (itr == json_node.MemberEnd()){ - response = "Error ! Could not find key " + d->value.s; - return false; - } - - if (trie_node->is_child()){ - // end of the line : do we get or set values ? - bool is_set = false; - if (get){ - if (itr->value.IsBool()){ - trie_node->set_value(itr->value.GetBool()); - is_set = true; - } - else if (itr->value.IsInt()){ - trie_node->set_value(itr->value.GetInt()); - is_set = true; - } - else if(itr->value.IsUint()){ - trie_node->set_value(static_cast(itr->value.GetUint())); - is_set = true; - } - else if(itr->value.IsUint64()){ - trie_node->set_value(static_cast(itr->value.GetUint64())); - is_set = true; - } - else if (itr->value.IsInt64()){ - trie_node->set_value(static_cast(itr->value.GetInt64())); - is_set = true; - } - else if ( itr->value.IsDouble()){ - trie_node->set_value(itr->value.GetDouble()); - is_set = true; - } - else if ( itr->value.IsString()){ - trie_node->set_value(itr->value.GetString(), itr->value.GetStringLength()); - is_set = true; - } - else{ - response = " json node corresponding to child node key must of type bool, int or string. Is of type = " + std::to_string(itr->value.GetType()); - return false; - } - - if (is_set){ - response_list.push_back(trie_node); - } - - //std::cout <<"Set value of child node with key = " << d->value.s.c_str() << " Type = " << trie_node->get_type() << std::endl; - - } - else{ - DataContainer const * d_val = trie_node->get_value(); - if (d_val->tag == DataContainer::Types::boolean){ - itr->value.SetBool(d_val->value.b); - } - else if (d_val->tag == DataContainer::Types::integer){ - itr->value.SetInt(d_val->value.i); - - } - else if (d_val->tag == DataContainer::Types::uinteger){ - itr->value.SetUint(d_val->value.ui); - - } - else if (d_val->tag == DataContainer::Types::big_integer){ - itr->value.SetInt64(d_val->value.l); - } - else if (d_val->tag == DataContainer::Types::ubig_integer){ - itr->value.SetUint64(d_val->value.ul); - } - else if (d_val->tag == DataContainer::Types::real){ - itr->value.SetDouble(d_val->value.f); - } - else if (d_val->tag == DataContainer::Types::str){ - itr->value.SetString(d_val->value.s.c_str(), d_val->value.s.length()); - } - else{ - response = " unknown type for child node value = " + std::to_string(d_val->tag) + " cannot set json node key = " + d->value.s; - return false; - } - } - return true; - } - else{ - for (auto & e: trie_node->get_children()){ - res = traverse_doc(itr->value, e, get, response, response_list); - if(res == false && ! get){ - return false; - } - } - } - } - - else{ - response = "Mismatch : Trie node is of type = " + std::to_string(d->tag) + " while json node is of type = " + std::to_string( json_node.GetType()); - return false; - } - - return true; - - -} - - -int jsonHandler::set_values(const char * buffer, int len, std::string & response, std::vector root_nodes){ - - Document doc; - std::string message_s(buffer, len); - - // validate json - if (doc.Parse(message_s.c_str()).HasParseError()){ - // return error message - response.assign("Invalid JSON"); - return -1; - } - - - Value & doc_root = doc; - // fake list to maintain signature for re-using traverse_doc - // since we don't return trie nodes when setting ... - std::vector fake_list; - for(auto const & e: root_nodes){ - bool res = traverse_doc(doc_root, e, false, response, fake_list); - if (!res){ - return -3; - } - } - - StringBuffer out_buffer; - Writer writer(out_buffer); - doc_root.Accept(writer); - response.assign(out_buffer.GetString(), out_buffer.GetLength()); - return 0; - } - - -// wrapper if instead of providing buffer, we simply use stored json object and use it -int jsonHandler::set_values(std::string & response, std::vector root_nodes){ - if (_is_buffer){ - std::vector fake_list; - Document _doc; - _doc.Parse(_buffer.c_str()); - Value & _buffer_root = _doc; - - for(auto const & e: root_nodes){ - bool res = traverse_doc(_buffer_root, e, false, response, fake_list); - if (!res){ - return -3; - } - } - - StringBuffer out_buffer; - Writer writer(out_buffer); - _buffer_root.Accept(writer); - response.assign(out_buffer.GetString(), out_buffer.GetLength()); - return 0; - } - else{ - response = "Error ! " + std::string( __FILE__) + "," + std::to_string(__LINE__) + " : No buffer loaded in json object to set"; - return -1; - } - -} diff --git a/src/json/json_handler.hpp b/src/json/json_handler.hpp deleted file mode 100644 index 95765ef..0000000 --- a/src/json/json_handler.hpp +++ /dev/null @@ -1,150 +0,0 @@ - -/* -================================================================================== - - Copyright (c) 2018-2019 AT&T Intellectual Property. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -================================================================================== -*/ -/* - Author : Ashwin Sridharan - - -*/ - -#pragma once -#ifndef JSON_HANDLER -#define JSON_HANDLER - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -#define MAX_QUEUE_SIZE 100 - -using namespace rapidjson; - - -struct DataContainer { - enum Types {boolean, integer, uinteger, big_integer, ubig_integer, real, str} tag ; - struct { - union { - bool b; - int i; - unsigned int ui; - long int l; - unsigned long int ul; - double f; - } ; - std::string s; - } value; -}; - -class TrieNode; - -class TrieNode{ -public: - - TrieNode (int); - TrieNode( std::string ); - - void set_type(DataContainer::Types); - int get_type(void) const {return _val.tag;} ; - - void set_value(bool val); - void set_value(int val); - void set_value(unsigned int val); - - void set_value(long int ); - void set_value(unsigned long int ); - void set_value(double val); - void set_value(const char *); - void set_value(std::string val); - void set_value(const char * , size_t ); - - void print_id(void); - void print_value(void); - void add_child(TrieNode *); - - DataContainer const * get_value(void) const { return & _val; }; - DataContainer const * get_id(void) const { return & _id;}; - - std::vector & get_children(void){ return _children;} ; - std::string & name(void) {return _name; }; - bool is_child(void){ return _children.size() ? false : true; }; - -private: - std::vector _children; - int val_type; - std::string _name; - DataContainer _id; - DataContainer _val; -}; - -class jsonHandler { - -public: - jsonHandler(void); - - - void load_schema(std::string); - void load_schema(std::string, TrieNode *root); - void load_schema(std::string , std::vector & ); - - void load_buffer(std::string); - void load_buffer(std::string, TrieNode * ); - std::string get_buffer(void); - - int get_values(const char *, int , std::string & , TrieNode *, std::vector & ); - int get_values( std::string & , TrieNode *, std::vector & ); - - int set_values (const char *, int, std::string & , std::vector); - int set_values (std::string & , std::vector); - - bool is_valid(const char *, int, std::string &); - -private: - - void load_file(std::string, std::string &); - - bool traverse_doc(Value &, TrieNode *, bool, std::string &, std::vector & ); - - bool find_member(const std::string, std::string &, std::vector &, Value & ); - - bool find_member(const std::string, std::string &, TrieNode *, Value &); - bool find_member(Value &, std::string &, TrieNode *, Value &); - - bool add_array_objects(std::queue &, Value &); - - bool _is_root, _is_schema, _is_buffer; - - std::unique_ptr _ref_schema_doc; - std::map _key_value_pairs; - std::string _contents; - std::string _buffer; - -}; - - -#endif diff --git a/src/message_processor_class.cc b/src/message_processor_class.cc index a5a9685..e2edaf4 100644 --- a/src/message_processor_class.cc +++ b/src/message_processor_class.cc @@ -23,7 +23,7 @@ int verbose_flag = 0; -message_processor::message_processor(int mode, bool report_mode, size_t buffer_length, size_t reporting_interval): _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){ +message_processor::message_processor(int mode, bool report_mode, size_t buffer_length): _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){ processing_level = mode; report_mode_only = report_mode; _buffer_size = buffer_length; @@ -75,6 +75,8 @@ bool message_processor::operator()(rmr_mbuf_t *message){ return false; } + // start measurement + auto start = std::chrono::high_resolution_clock::now(); // main message processing code switch(message->mtype){ @@ -245,8 +247,11 @@ bool message_processor::operator()(rmr_mbuf_t *message){ case (RIC_SUB_FAILURE): case ( RIC_SUB_DEL_FAILURE ): if (_ref_sub_handler != NULL){ + // extract meid .. + unsigned char meid[32]; + rmr_get_meid(message, meid); mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype); - _ref_sub_handler->Response(message->mtype, message->payload, message->len); + _ref_sub_handler->Response(message->mtype, message->payload, message->len, (const char *)meid); } else{ state = MISSING_HANDLER_ERROR; @@ -256,16 +261,16 @@ bool message_processor::operator()(rmr_mbuf_t *message){ break; - case DC_ADM_INT_CONTROL: + case A1_POLICY_REQ: { if(_ref_policy_handler != NULL){ // Need to apply config. Since config may need to be // applied across all threads, we do a callback to the parent thread. // wait for config to be applied and then send response - _ref_policy_handler(DC_ADM_INT_CONTROL, (const char *) message->payload, message->len, response, true); + _ref_policy_handler(A1_POLICY_REQ, (const char *) message->payload, message->len, response, true); std::memcpy( (char *) message->payload, response.c_str(), response.length()); message->len = response.length(); - message->mtype = DC_ADM_INT_CONTROL_ACK; + message->mtype = A1_POLICY_RESP; send_msg = true; } else{ @@ -275,21 +280,6 @@ bool message_processor::operator()(rmr_mbuf_t *message){ } break; - case DC_ADM_GET_POLICY: - { - if(_ref_policy_handler != NULL){ - _ref_policy_handler(DC_ADM_GET_POLICY, (const char *) message->payload, message->len, response, false); - std::memcpy((char *)message->payload, response.c_str(), response.length()); - message->len = response.length(); - message->mtype = DC_ADM_GET_POLICY_ACK; - send_msg = true; - } - else{ - state = MISSING_HANDLER_ERROR; - mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__); - } - } - break; default: mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype); diff --git a/src/message_processor_class.hpp b/src/message_processor_class.hpp index 247c594..5ff3961 100644 --- a/src/message_processor_class.hpp +++ b/src/message_processor_class.hpp @@ -67,7 +67,7 @@ typedef enum { class message_processor { public: - message_processor(int mode=ALL, bool rep=true, size_t buffer_length = 2048, size_t reporting_interval = 100); + message_processor(int mode=ALL, bool rep=true, size_t buffer_length = 2048); ~message_processor(void); bool operator() (rmr_mbuf_t *); diff --git a/src/plugin-interface/plugin-interface.cc b/src/plugin-interface/plugin-interface.cc index a8ae20b..d7e6292 100644 --- a/src/plugin-interface/plugin-interface.cc +++ b/src/plugin-interface/plugin-interface.cc @@ -21,12 +21,6 @@ #include "plugin-interface.hpp" -std::string Policy::getError(void) const{ - return _error; -} -void Policy::setError(std::string & error){ - _error.assign(error); -} Policy::~Policy(void){ }; diff --git a/src/plugin-interface/plugin-interface.hpp b/src/plugin-interface/plugin-interface.hpp index e124d53..88750c8 100644 --- a/src/plugin-interface/plugin-interface.hpp +++ b/src/plugin-interface/plugin-interface.hpp @@ -20,7 +20,10 @@ #pragma once #ifndef POLICY_BASE #define POLICY_BASE -#include + +#include +#include + // Base abstract Class that provides interface to manage plugins. // Interface for following actions // -- configure policy @@ -37,12 +40,13 @@ class Policy public: virtual bool setPolicy(const char *, int, std::string &) = 0; virtual bool getPolicy(const char *, int , std::string & ) = 0; - virtual int getMetrics(std::string & ) = 0; + virtual int getMetrics(std::vector & ) = 0; virtual std::string getName(void) = 0; + virtual std::string get_error(void) const = 0; virtual ~Policy(void) = 0; - std::string getError(void) const; - void setError(std::string &); + + diff --git a/src/protector-plugin/NetworkProtector.cc b/src/protector-plugin/NetworkProtector.cc index b3e82f3..bdd6016 100644 --- a/src/protector-plugin/NetworkProtector.cc +++ b/src/protector-plugin/NetworkProtector.cc @@ -26,19 +26,31 @@ #include // For srand() and rand() -protector::protector(bool enforce, int windowSize_, int threshold_, double blockRate_, bool report): m_enforce(enforce), m_windowSize(windowSize_), m_threshold(threshold_), m_blockRate(blockRate_), m_req(0), m_rej(0) -{ - m_counter = 0; - m_window_ref = std::make_unique(m_windowSize); +protector::protector( bool report){ m_access = std::make_unique(); report_mode_only = report; + + // there is always a default policy with id 0 (never gets deleted, can only be re-configured) + // default values from policy constructor will be used. + policy_list.insert(std::pair(0, protector_policy())); + } +// constructor that over-rides default values for policy 0 +protector::protector( bool enforce, int window_size, int threshold, double blocking_rate, bool report){ + m_access = std::make_unique(); + report_mode_only = report; -bool protector::operator()(unsigned char *msg_ref, size_t msg_size, unsigned char * buffer, size_t *buf_size ) -{ + // there is always a default policy with id 0 (never gets deleted, can only be re-configured) + policy_list.insert(std::pair(0, protector_policy(enforce, window_size, threshold, blocking_rate))); + +} + + +bool protector::operator()(unsigned char *msg_ref, size_t msg_size, unsigned char * buffer, size_t *buf_size ){ bool res = true; + protector_policy * policy_ref; std::lock_guard lck(*(m_access.get())); @@ -78,7 +90,7 @@ bool protector::operator()(unsigned char *msg_ref, size_t msg_size, unsigned cha } if (res){ - + mdclog_write(MDCLOG_DEBUG, "Extracting SgNB Addition Request fields..."); //std::cout <<"%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << std::endl; @@ -91,23 +103,35 @@ bool protector::operator()(unsigned char *msg_ref, size_t msg_size, unsigned cha } if (res){ - mdclog_write(MDCLOG_DEBUG, "Decoded and extracted X2AP PDU data. Number of erabs = %u\n", sgnb_data.get_list()->size()); + mdclog_write(MDCLOG_DEBUG, "Decoded and extracted X2AP PDU data. Number of erabs = %lu\n", sgnb_data.get_list()->size()); mdclog_write(MDCLOG_DEBUG, "Applying admission control logic ..."); - // Admission control - m_req ++; + // Find if policy associated with this subscription id + auto it_policy = policy_list.find(sgnb_data.subscriber_profile_id); + + if (it_policy == policy_list.end()){ + // apply default policy + policy_ref = & policy_list[0]; + } + else{ + policy_ref = & (it_policy->second); + } + + net_requests ++; + policy_ref->_req++; + policy_ref->_window_ref.get()->update_window(1); - // update sliding window - m_window_ref.get()->update_window(1); - if ( m_enforce && m_window_ref.get()->net_events > m_threshold){ - res = selectiveBlock(); + // apply blocking probability if m_enforce + if ( policy_ref->_enforce && policy_ref->_window_ref.get()->net_events > policy_ref->_threshold){ + res = selectiveBlock(policy_ref->_block_rate); } else{ res = true; } if (!res){ - m_rej ++; + policy_ref->_rej ++; + net_rejects ++; } mdclog_write(MDCLOG_DEBUG, "Plugin decision for sgnb request = %d\n", res); @@ -138,56 +162,240 @@ bool protector::operator()(unsigned char *msg_ref, size_t msg_size, unsigned cha } } } - + + ASN_STRUCT_FREE(asn_DEF_X2N_X2AP_PDU, x2ap_recv); return res; } - -bool protector::configure(bool enforce, int windowSize_, int threshold_, double blockRate_) -{ + +// configure an existing policy +bool protector::configure(bool enforce, int windowSize_, int threshold_, double blockRate_, int id){ std::lock_guard lck(*(m_access.get())); + std::stringstream ss; + // basic validation of input + if(windowSize_ <= 0){ + ss << "Illegal value for window size = " << windowSize_ << " when configuring policy " << std::endl; + error_string = ss.str(); + return false; + } + + if(threshold_ < 0){ + ss << "Illegal value for trigger threshold = " << threshold_ << " when configuring policy " << std::endl; + error_string = ss.str(); + return false; + } + if(blockRate_ < 0 || blockRate_ > 100){ + ss << "Illegal value for blocking rate = " << blockRate_ << " when configuring policy " << std::endl; + error_string = ss.str(); + return false; + } + if (id < 0){ + ss << "Illegal value for class id = " << id << " when configuring policy " << std::endl; + error_string = ss.str(); + return false; + } + + // find policy + auto policy_it = policy_list.find(id); + if (policy_it == policy_list.end()){ + mdclog_write(MDCLOG_ERR, "Error : %s, %d : No policy with id %d found for configuration\n", __FILE__, __LINE__, id); + return false; + } - m_windowSize=windowSize_; - bool res = m_window_ref.get()->resize_window(m_windowSize); + bool res = policy_it->second._window_ref.get()->resize_window(windowSize_); if (!res){ + error_string = policy_it->second._window_ref.get()->get_error(); + return false; + } + policy_it->second._window_size = windowSize_; + + // enforce is set globally + policy_it->second._enforce = enforce; + + policy_it->second._threshold=threshold_; + policy_it->second._block_rate=blockRate_; + mdclog_write(MDCLOG_DEBUG, "Configured policy with id %d with enforce=%d, window size = %d, threshold = %d, blocking rate = %f\n", id, policy_it->second._enforce, policy_it->second._window_size, policy_it->second._threshold, policy_it->second._block_rate); + + return true; +} + +// add a policy +bool protector::add_policy(bool enforce, int windowSize_, int threshold_, double blockRate_, int id){ + std::lock_guard lck(*(m_access.get())); + std::stringstream ss; + + + // find policy + auto policy_it = policy_list.find(id); + if (policy_it != policy_list.end()){ + ss <<"Error : " << __FILE__ << "," << __LINE__ << ": " << "Policy with id " << id << " already exists. Cannot be added" << std::endl; + error_string = ss.str(); + mdclog_write(MDCLOG_ERR, "%s\n", error_string.c_str()); return false; } - m_enforce = enforce; - m_threshold=threshold_; - m_blockRate=blockRate_; + // basic validation of input + if(windowSize_ <= 0){ + ss << "Illegal value for window size = " << windowSize_ << " when adding policy " << std::endl; + error_string = ss.str(); + return false; + } + if(threshold_ < 0){ + ss << "Illegal value for trigger threshold = " << threshold_ << " when adding policy " << std::endl; + error_string = ss.str(); + return false; + } + + if(blockRate_ < 0 || blockRate_ > 100){ + ss << "Illegal value for blocking rate = " << blockRate_ << " when adding policy " << std::endl; + error_string = ss.str(); + return false; + } + if (id < 0){ + ss << "Illegal value for class id = " << id << " when adding policy " << std::endl; + error_string = ss.str(); + return false; + } + + + // create the policy + try{ + policy_list.insert(std::pair (id, protector_policy(enforce, windowSize_, threshold_, blockRate_))); + } + catch(std::exception &e){ + ss <<"Error : " << __FILE__ << "," << __LINE__ << ": " << "Error creating policy. Reason = " << e.what() << std::endl; + error_string = ss.str(); + mdclog_write(MDCLOG_ERR, "%s\n", error_string.c_str()); + return false; + } + + mdclog_write(MDCLOG_DEBUG, "Added new policy with id %d with enforce=%d, window size = %d, threshold = %d, blocking rate = %f\n", id, enforce, windowSize_, threshold_, blockRate_); - mdclog_write(MDCLOG_INFO, "Policy : Enforce mode set to %d\n", m_enforce); - mdclog_write(MDCLOG_INFO, "Policy: Trigger threshold set to %d\n", m_threshold); - mdclog_write(MDCLOG_INFO, "Policy : Blocking rate set to %f\n", m_blockRate); - mdclog_write(MDCLOG_INFO, "Policy : Window Size set to %d\n", m_windowSize); return true; } -unsigned long int protector::get_requests(void) const { - return m_req; +// delete a policy +bool protector::delete_policy(int id){ + std::lock_guard lck(*(m_access.get())); + std::stringstream ss; + auto policy_it = policy_list.find(id); + if (policy_it == policy_list.end()){ + ss <<"Error : " << __FILE__ << "," << __LINE__ << ": " << " No policy with id = " << id << " found" << std::endl; + error_string = ss.str(); + mdclog_write(MDCLOG_ERR, "%s\n", error_string.c_str()); + return false; + } + + policy_list.erase(policy_it); + mdclog_write(MDCLOG_DEBUG, "Deleted policy %d\n", id); + return true; +} + + +// query a policy : responsibility of caller to ensure +// vector is empty +// returns parameters of policy in the vector +bool protector::query_policy(int id, std::vector & info){ + + std::lock_guard lck(*(m_access.get())); + auto policy_it = policy_list.find(id); + if (policy_it == policy_list.end()){ + return false; + } + + info.push_back(policy_it->second._enforce); + info.push_back(policy_it->second._window_size); + info.push_back(policy_it->second._threshold); + info.push_back(policy_it->second._block_rate); + return true; } -unsigned long int protector::get_rejects(void) const { - return m_rej; +// returns requests that fall under a policy +// if id is -1, returns total requests +// if non-existent policy, returns -1 +// counters are cumulative +long int protector::get_requests(int id) const { + if (id == -1){ + return net_requests; + } + + std::lock_guard lck(*(m_access.get())); + auto policy_it = policy_list.find(id); + if (policy_it == policy_list.end()){ + return -1; + } + else{ + return policy_it->second._req; + } + } +// returns requests that fall under a policy +// if id is -1 , returns total rejects +// if non-existent policy, returns -1 +// counters are cumulative +long int protector::get_rejects(int id) const { + + if (id == -1){ + return net_rejects; + } + + std::lock_guard lck(*(m_access.get())); + auto policy_it = policy_list.find(id); + if (policy_it == policy_list.end()){ + return -1; + } + else{ + return policy_it->second._rej; + } + +} + +// returns list of active policies in +// supplied vector (policy is indexed by subscriber profile id) +void protector::get_active_policies(std::vector & active){ + std::lock_guard lck(*(m_access.get())); + for (const auto &e : policy_list){ + active.push_back(e.first); + } +} + +// returns true if policy active else false +bool protector::is_active(int id){ + auto policy_it = policy_list.find(id); + if (policy_it == policy_list.end()){ + return false; + } + else{ + return true; + } +} + +// clears counters for all policies void protector::clear() { + std::lock_guard lck(*(m_access.get())); - m_req = 0; - m_rej = 0; - m_window_ref.get()->clear(); + + for(auto &e : policy_list){ + e.second._window_ref.get()->clear(); + e.second._counter = 0; + e.second._req = 0; + e.second._rej = 0; + } + + net_requests = 0; + net_rejects = 0; } -bool protector::selectiveBlock() +bool protector::selectiveBlock(double block_rate) { unsigned int num = (rand() % 100) + 1; - if (num > m_blockRate) //not blocking + if (num > block_rate) //not blocking return true; else //blocking return false; diff --git a/src/protector-plugin/NetworkProtector.h b/src/protector-plugin/NetworkProtector.h index e8750db..8c782e2 100644 --- a/src/protector-plugin/NetworkProtector.h +++ b/src/protector-plugin/NetworkProtector.h @@ -23,6 +23,7 @@ #include "sliding_window.hpp" #include +#include #include // to decode the X2AP payload #include // to respond @@ -34,39 +35,62 @@ #define unlikely(x) (x) #endif -extern bool report_mode_only; +// each policy corresponds to a specific X2 subscriber profile ID +// and applies sliding window logic to UEs in that class (if enforce) +class protector_policy { +public: + protector_policy(bool enforce=true, int window_size=60, int threshold=10, double block_rate=10): _enforce(enforce), _window_size(window_size), _threshold(threshold), _block_rate(block_rate){ + _counter = 0; + _req = 0; + _rej = 0; + _window_ref = std::make_unique(_window_size); + }; + bool _enforce; // do we enforce policy ? + int _counter; // count the # of attaching access + int _window_size; // time in seconds window for the # of counts + int _threshold; // count above which we start enforcing if enforce set + double _block_rate; // % of rejecting rate for counter > threshold + std::unique_ptr _window_ref; + unsigned long int _req; // number of requests + unsigned long int _rej; // number of rejects + +}; + + class protector { public: - protector( bool , int , int , double , bool rep=true); - + protector( bool rep=true); + protector(bool enforce, int window_size, int threshold, double block_rate, bool rep=true); bool operator()(unsigned char *, size_t , unsigned char *, size_t *); - bool configure(bool enforce, int windowSize_, int threshold_, double blockRate_); + bool configure(bool enforce, int windowSize_, int threshold_, double blockRate_, int id); + bool add_policy (bool enforce, int windowSize_, int threshold_, double blockRate_, int id); + bool delete_policy(int id); + bool query_policy(int , std::vector &); + void get_active_policies(std::vector & ); + bool is_active(int id); + void clear(); - bool selectiveBlock(); + bool selectiveBlock(double); - unsigned long int get_requests(void) const; - unsigned long int get_rejects(void) const; + long int get_requests(int id) const; + long int get_rejects(int id) const; std::string get_error(void) { return error_string;}; private: - bool m_enforce; // whether to execute logic or not - int m_counter; // count the # of attaching access - int m_windowSize; // time in seconds window for the # of counts - int m_threshold; // count above which we start enforcing if enforce set - double m_blockRate; // % of rejecting rate for counter > threshold - time_t m_timeWindow; // time active window started - unsigned long int m_req; // number of requests - unsigned long int m_rej; // number of rejects - - std::unique_ptr m_window_ref; + + std::map policy_list; std::unique_ptr m_access; + sgnb_addition_helper sgnb_data; sgnb_addition_request sgnb_req; sgnb_addition_response sgnb_resp; + + unsigned long int net_requests = 0; + unsigned long int net_rejects = 0; std::string error_string; bool report_mode_only; diff --git a/src/protector-plugin/admission_policy.cc b/src/protector-plugin/admission_policy.cc index 6278197..067f385 100644 --- a/src/protector-plugin/admission_policy.cc +++ b/src/protector-plugin/admission_policy.cc @@ -19,127 +19,182 @@ #include "admission_policy.hpp" -admission::admission (std::string policy_schema_file, std::string samples_file, std::string metrics_schema_file, unsigned int num_instances, bool report_only){ - bool res; - - if (num_instances == 0){ - throw std::runtime_error("Error ! Number of instances of admission_policy protector pluging must be > 0"); - } - - std::string response; - std::string buffer; - std::string error_string; - - std::vector config_schema_path; - - config_schema_path.clear(); - - // path to node in schema to process policy request - config_schema_path.emplace_back("controls"); - config_schema_path.emplace_back((int)0); - config_schema_path.emplace_back("message_receives_payload_schema"); - config_schema_path[0].add_child(&config_schema_path[1]); - config_schema_path[1].add_child(&config_schema_path[2]); - - set_policy_req_obj.load_schema(policy_schema_file, &config_schema_path[0]); - // mdclog_write(MDCLOG_INFO, "Loaded schema for set Policy request"); - - //path to node in schema to process policy response - config_schema_path.clear(); - config_schema_path.emplace_back("controls"); - config_schema_path.emplace_back(0); - config_schema_path.emplace_back("message_sends_payload_schema"); - config_schema_path[0].add_child(&config_schema_path[1]); - config_schema_path[1].add_child(&config_schema_path[2]); - - set_policy_resp_obj.load_schema(policy_schema_file, &config_schema_path[0]); - mdclog_write(MDCLOG_INFO, "Loaded schema for set Policy response"); - - // load sample response - config_schema_path.clear(); - config_schema_path.emplace_back("message_sends_example"); - - set_policy_resp_obj.load_buffer(samples_file, &config_schema_path[0]); - - // verify that our sample conforms to the schema ... - buffer = set_policy_resp_obj.get_buffer(); - if (! set_policy_resp_obj.is_valid(buffer.c_str(), buffer.length(), response)){ - std::stringstream ss; - ss << "Error ! Sample loaded for SET policy response = " << buffer << " from file " << samples_file << " not valid. Reason = " << response; - throw std::runtime_error(ss.str()); - } - mdclog_write(MDCLOG_INFO, "Verified sample for set Policy response"); - - // path to node in schema to respond to get policy (current same as set policy) - config_schema_path.clear(); - config_schema_path.emplace_back("controls"); - config_schema_path.emplace_back(0); - config_schema_path.emplace_back("message_receives_payload_schema"); - config_schema_path[0].add_child(&config_schema_path[1]); - config_schema_path[1].add_child(&config_schema_path[2]); - - get_policy_resp_obj.load_schema(policy_schema_file, &config_schema_path[0]); - mdclog_write(MDCLOG_INFO, "Loaded schema for get Policy response"); - - // sample to respond to get policy - config_schema_path.clear(); - config_schema_path.emplace_back("message_receives_example"); +using namespace rapidjson; - get_policy_resp_obj.load_buffer(samples_file, &config_schema_path[0]); - - // verify that our sample conforms to schema - buffer = get_policy_resp_obj.get_buffer(); - if (! get_policy_resp_obj.is_valid(buffer.c_str(), buffer.length(), response)){ +// constructor loads the policy schema, metric schema, and example payload JSON files +// instantiates the required number of protector plugins +admission::admission (std::string policy_schema_file, std::string samples_file, std::string metrics_schema_file, unsigned int num_instances, std::string xapp_id, bool report_only){ + if (num_instances < 1 || num_instances > MAX_INSTANCES){ std::stringstream ss; - ss << "Error ! Sample loaded for GET policy response = " << buffer << " from file " << samples_file << " not valid. Reason = " << response; - throw std::runtime_error(ss.str()); + ss << "Error !" << __FILE__ << ", " << __LINE__ << " Number of instances must be between " << 1 << " and " << MAX_INSTANCES << ". Specified = " << num_instances << std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); + } + _xapp_id.assign(xapp_id); + std::string response; + std::string buffer; + std::stringstream ss; + Document _doc; + Value * ref; + std::string schema_key; + + // Load schema files and extract relevant validators + // schema for downstream message + bool res = load_schema(policy_schema_file, "/downstream_schema", downstream_schema_ref_, downstream_validator_); + if(res == false){ + throw std::runtime_error(error_string); + } + // schema for notification message + res = load_schema(policy_schema_file, "/notify_schema", notify_schema_ref_, notify_validator_); + if(res == false){ + throw std::runtime_error(error_string); + } + // schema for metrics message + res = load_schema(metrics_schema_file, "", metrics_schema_ref_, metrics_validator_); + if(res == false){ + throw std::runtime_error(error_string); } - - mdclog_write(MDCLOG_INFO, "Verified sample for get Policy response"); - - // schema & sample for metrics - metrics_obj.load_schema(metrics_schema_file); - mdclog_write(MDCLOG_INFO, "Loaded schema for ves metrics"); - - config_schema_path.clear(); - config_schema_path.emplace_back("metrics"); - metrics_obj.load_buffer(samples_file, &config_schema_path[0]); - // verify sample conforms to schema - buffer = metrics_obj.get_buffer(); - if (! metrics_obj.is_valid(buffer.c_str(), buffer.length(), response)){ - std::stringstream ss; - ss << "Error ! Sample loaded for VES = " << buffer << " from file " << samples_file << " not valid. Reason = " << response; - throw std::runtime_error(ss.str()); + // load samples file and extract sample messages + buffer.erase(); + load_file(samples_file, buffer); + if(_doc.Parse(buffer.c_str()).HasParseError()){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON in file " << samples_file << std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); + } + mdclog_write(MDCLOG_DEBUG, "Loaded sample file %s\n", samples_file.c_str()); + + StringBuffer s_buffer; + Writer writer(s_buffer); + + // extract notify sample message + ref = NULL; + schema_key = "/notify_policy_message"; + ref = Pointer(schema_key.c_str()).Get(_doc); + if(! ref){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); } + (*ref).Accept(writer); + if(notify_message_.Parse(s_buffer.GetString()).HasParseError()){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON snippet at key : " << schema_key << std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); + } + + + mdclog_write(MDCLOG_DEBUG, "Loaded sample message for notification policy"); + + // extract metrics sample message + ref = NULL; + schema_key = "/metrics"; + ref = Pointer(schema_key.c_str()).Get(_doc); + if(! ref){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); + } + s_buffer.Clear(); + writer.Reset(s_buffer); + (*ref).Accept(writer); + if(metrics_message_.Parse(s_buffer.GetString()).HasParseError()){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON snippet at key : " << schema_key<< std::endl; + error_string = ss.str(); + throw std::runtime_error(error_string); + } + - mdclog_write(MDCLOG_INFO, "Verified sample for metrics"); + mdclog_write(MDCLOG_DEBUG, "Loaded sample message for metrics"); + // set the keys we extract and update setPolicyVars(); - - //instantiate the core policy object + //instantiate the core policy plugin for(unsigned int i = 0; i < num_instances; i++){ instantiate_protector_plugin(report_only); } }; + + +bool admission::load_schema(const std::string & schema_file, const std::string & schema_key, std::unique_ptr & schema_ref, std::unique_ptr &validator_ref){ + + std::string buffer; + Value *ref; + Document doc; + std::stringstream ss; + bool res; + + // load policy schema file + res = load_file(schema_file, buffer); + if(!res){ + return false; + } + + if(doc.Parse(buffer.c_str()).HasParseError()){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Invalid JSON in file " << schema_file << std::endl; + error_string = ss.str(); + return false; + } + mdclog_write(MDCLOG_DEBUG, "Loaded schema file %s\n", schema_file.c_str()); + + ref = NULL; + Pointer p(schema_key.c_str()); + ref = p.Get(doc); + if(! ref){ + ss << "Error ! " << __FILE__ << "," << __LINE__ << " Could not find key " << schema_key << " in file " << schema_file << std::endl; + error_string = ss.str(); + return false; + } + + schema_ref = std::make_unique((*ref)); + validator_ref = std::make_unique((*schema_ref)); + mdclog_write(MDCLOG_DEBUG, "Loaded schema and validator for %s\n", schema_file.c_str()); + + return true; + +} + void admission::instantiate_protector_plugin(bool mode){ - _plugin_instances.emplace_back(bool(current_config["enforce"]), current_config["window_length"], current_config["trigger_threshold"], current_config["blocking_rate"], mode); + _plugin_instances.emplace_back(0, 60, 5000, 20, mode); } admission::~admission(void){ - prev_values.clear(); - curr_values.clear(); - policy_vars.clear(); - set_policy_response.clear(); - get_policy_response.clear(); - metric_responses.clear(); - policy_pointer.clear(); + counters.clear(); + +} + +bool admission::load_file(std::string input_file, std::string & contents ){ + + std::FILE *fp ; + try{ + fp = std::fopen(input_file.c_str(), "rb"); + } + catch(std::exception &e){ + error_string = "Error opening input file " + input_file + " Reason = " + e.what(); + return false; + } + + if (fp){ + std::fseek(fp, 0, SEEK_END); + contents.resize(std::ftell(fp)); + std::rewind(fp); + std::fread(&contents[0], 1, contents.size(), fp); + std::fclose(fp); + } + + else{ + error_string = "Error opening input file " + input_file; + return false; + } + + return true; } @@ -147,72 +202,51 @@ std::string admission::getName(void){ return std::string("admission control policy"); } +// Function sets path for each key +// in json object for set/get policy JSON +// and metrics payload string. This way, we do not create +// entire JSON from scratch, but rather just update the relevant portions void admission::setPolicyVars(void){ - - // keys in request to set policy - policy_vars.emplace_back("enforce"); - policy_vars.emplace_back("window_length"); - policy_vars.emplace_back("blocking_rate"); - policy_vars.emplace_back("trigger_threshold"); - + // generic variables in the policy that are re-used + generic_policy_vars.insert(std::pair("policy_type_id", Pointer("/policy_type_id"))); + generic_policy_vars.insert(std::pair("policy_instance_id", Pointer("/policy_instance_id"))); + generic_policy_vars.insert(std::pair("status", Pointer("/status"))); + generic_policy_vars.insert(std::pair("message", Pointer("/message"))); + generic_policy_vars.insert(std::pair("operation", Pointer("/operation"))); + generic_policy_vars.insert(std::pair("xapp_id", Pointer("/handler_id"))); - // keys in response to set policy - set_policy_response.emplace_back("status"); - set_policy_response.emplace_back("message"); + //variables in the sliding window policy + window_policy_vars.insert(std::pair("class", Pointer("/payload/class"))); + window_policy_vars.insert(std::pair("enforce", Pointer("/payload/enforce"))); + window_policy_vars.insert(std::pair("window_length", Pointer("/payload/window_length"))); + window_policy_vars.insert(std::pair("trigger_threshold", Pointer("/payload/trigger_threshold"))); + window_policy_vars.insert(std::pair("blocking_rate", Pointer("/payload/blocking_rate"))); - - policy_pointer.push_back(&policy_vars[0]); - policy_pointer.push_back(&policy_vars[1]); - policy_pointer.push_back(&policy_vars[2]); - policy_pointer.push_back(&policy_vars[3]); - - - // keys in metric response - metric_responses.emplace_back("event"); // 0 - metric_responses.emplace_back("commonEventHeader"); // 1 - metric_responses.emplace_back("measurementFields"); // 2 - metric_responses.emplace_back("additionalFields"); // 3 - metric_responses.emplace_back("SgNB Request Rate"); // 4 - metric_responses.emplace_back("SgNB Accept Rate"); // 5 - metric_responses.emplace_back("startEpochMicrosec"); // 6 - metric_responses.emplace_back("measurementInterval"); // 7 - metric_responses.emplace_back("lastEpochMicrosec");//8 - //metric_responses.emplace_back("TS"); //9 - - metric_responses[0].add_child(&metric_responses[1]); - metric_responses[0].add_child(&metric_responses[2]); - metric_responses[1].add_child(&metric_responses[6]); - metric_responses[1].add_child(&metric_responses[8]); + // variables in each metric message + metric_vars.insert(std::pair("class", Pointer("/event/measurementFields/additionalFields/Class Id"))); + metric_vars.insert(std::pair("request_count", Pointer("/event/measurementFields/additionalFields/SgNB Request Count"))); + metric_vars.insert(std::pair("accept_count", Pointer("/event/measurementFields/additionalFields/SgNB Accept Count"))); + metric_vars.insert(std::pair("report_interval", Pointer("/event/measurementFields/measurementInterval"))); + metric_vars.insert(std::pair("epoch", Pointer("/event/commonHeader/startEpochMicrosec"))); - metric_responses[2].add_child(&metric_responses[3]); - metric_responses[2].add_child(&metric_responses[7]); - metric_responses[3].add_child(&metric_responses[4]); - metric_responses[3].add_child(&metric_responses[5]); - //metric_responses[3].add_child(&metric_responses[9]); - - // default config map for the policy object - current_config.insert(std::pair("enforce", 1)); - current_config.insert(std::pair("window_length", 60)); - current_config.insert(std::pair("blocking_rate", 1)); - current_config.insert(std::pair("trigger_threshold", 1)); + // set xapp id in return message + generic_policy_vars["xapp_id"].Set(notify_message_, _xapp_id.c_str()); - prev_config = current_config; - + // Set up the counters for metrics auto ts = std::chrono::time_point_cast(std::chrono::system_clock::now()); auto epoch = ts.time_since_epoch(); auto val = std::chrono::duration_cast(epoch); prev_time_stamp = val.count(); - prev_values.push_back(0); - prev_values.push_back(0); - curr_values = prev_values; + } +// returns the plugin core protector * admission::get_protector_instance(unsigned int index){ if (index > _plugin_instances.size() -1){ - mdclog_write(MDCLOG_ERR, "%s, %d: Error . Requested index %u exceeds number of plugin instances %u", __FILE__, __LINE__, index, _plugin_instances.size()); + mdclog_write(MDCLOG_ERR, "%s, %d: Error . Requested index %u exceeds number of plugin instances %lu", __FILE__, __LINE__, index, _plugin_instances.size()); return NULL; } else{ @@ -221,129 +255,280 @@ protector * admission::get_protector_instance(unsigned int index){ }; - +// control plane function to set policy in plugin +// create policy returns successful if no previous policy with same class id exists and values are valid +// delete policy returns successful if policy table contains same policy instance id +// update policy returns successful if policy table contains same policy instance id, and values are valid bool admission::setPolicy(const char * message, int message_length, std::string & response){ - // Get the configuration - std::vector available_keys; - std::vector roots; - bool res; + bool res = false; + std::string policy_instance_id; + std::string status_message, operation; + std::stringstream ss; - std::vectorresp_pointer; - resp_pointer.push_back(&set_policy_response[0]); - resp_pointer.push_back(&set_policy_response[1]); + // policy key variables + bool enforce = false; + int window_size = -1, trigger_threshold = -1, class_id = -1; + double blocking_rate = -1; + + // reset the validator + (*downstream_validator_).Reset(); - std::string local_response; - for(unsigned int i = 0; i < policy_vars.size(); i++){ - int res = set_policy_req_obj.get_values(message, message_length, local_response, &policy_vars[i], available_keys); - if (res != 0 ){ - setError(local_response); - set_policy_response[0].set_value("FAIL"); - set_policy_response[1].set_value(local_response); - set_policy_resp_obj.set_values(response, resp_pointer); - - return false; - } + // step 1: verify JSON + Document doc; + if(doc.Parse(message).HasParseError()){ + status_message.assign("Invalid JSON"); } - if ( available_keys.size() == 0){ - local_response = "No Keys were found"; - setError(local_response); - set_policy_response[0].set_value("FAIL"); - set_policy_response[1].set_value(local_response); - - set_policy_resp_obj.set_values(response,resp_pointer); - return false; + // Validate against our JSON input schema + else if (!doc.Accept((*downstream_validator_))){ + StringBuffer sb; + (*downstream_validator_).GetInvalidSchemaPointer().StringifyUriFragment(sb); + std::string failed_message = std::string("\"message\": \"Schema Violation:") + std::string(sb.GetString()); + failed_message += std::string(" Invalid keyword :") + std::string((*downstream_validator_).GetInvalidSchemaKeyword()) + " \""; + sb.Clear(); + (*downstream_validator_).GetInvalidDocumentPointer().StringifyUriFragment(sb); + failed_message += std::string(" Invalid document :") + std::string(sb.GetString()); + status_message.assign(failed_message); + } - + else{ - // Get new config - prev_config = current_config; - - for(std::vector::iterator it = available_keys.begin(); it != available_keys.end(); ++it){ - DataContainer const * id = (*it)->get_id(); - DataContainer const * val = (*it)->get_value(); - auto e = current_config.find(id->value.s.c_str()); - if (e != current_config.end()){ - e->second = val->value.i; + // step 2: extract the standard keys expected in all downstream + // messages : policy_instance + Value * ref; + + for(auto const &e : generic_policy_vars){ + ref = NULL; + ref = e.second.Get(doc); + + // this key can be simultaneously put in notify + if (ref != NULL && e.first == "policy_type_id" ){ + e.second.Set(notify_message_, ref->GetInt()); + } + + // this key can be simultaneously put in notify + else if (ref != NULL && e.first == "policy_instance_id" ){ + e.second.Set(notify_message_, ref->GetString()); + policy_instance_id = ref->GetString(); + } + else if (ref != NULL && e.first == "operation"){ + operation = ref->GetString(); + } } - } - // Note : xapp schema specifies window be in 'minutes'. Sliding window implementation maintains in seconds, hence multiply by 60 - current_config["window_length"] *= 60; + // do we have this policy ? + auto it = policy_table.find(policy_instance_id); + res = true; - // Apply the config - res= true; - for(std::vector::iterator it_p = _plugin_instances.begin(); it_p != _plugin_instances.end(); ++it_p){ - - res = (*it_p).configure( bool(current_config["enforce"]), current_config["window_length"], current_config["trigger_threshold"], current_config["blocking_rate"]); - if (!res){ - mdclog_write(MDCLOG_ERR, "Error ::%s, %d :: Could not configure plugin\n", __FILE__, __LINE__); - set_policy_response[0].set_value("FAIL"); - set_policy_response[1].set_value("Could not apply configuration"); - break; + // if operation is create and policy already present, simply return with OK ? + // we may get the same create policy multiple times due to race conditions when the + // xapp starts up + if(operation == "CREATE" && it != policy_table.end()){ + res = true; } + else{ + + if (operation == "DELETE" || operation == "UPDATE"){ + // don't proceed if policy not found + if(it == policy_table.end()){ + ss <<" No policy instance = " << policy_instance_id << " found. Cannot perform operation = " << operation << std::endl; + status_message = ss.str(); + res = false; + } + else{ + class_id = it->second; // used if delete + } + } + + if (res){ + // perform the operation + res = false; + + if (operation == "DELETE"){ + res = _plugin_instances[0].delete_policy(class_id); + if(res){ + ss <<"Policy instance id " << policy_instance_id << " successfully deleted" << std::endl; + status_message = ss.str(); + policy_table.erase(policy_instance_id); + } + else{ + status_message = _plugin_instances[0].get_error(); + } + } + + else if (operation == "CREATE" or operation == "UPDATE"){ + // initialize policy params to invalid + window_size = -1; + class_id = -1; + trigger_threshold = -1; + blocking_rate = -1; + + // get values for policy keys + for(auto const& e: window_policy_vars){ + ref = NULL; + ref = e.second.Get(doc); + if (ref == NULL){ + continue; + } + if(e.first == "enforce"){ + enforce = ref->GetBool(); + } + else if (e.first == "window_length"){ + window_size = ref->GetInt(); + } + else if (e.first == "blocking_rate"){ + blocking_rate = ref->GetDouble(); + } + else if (e.first == "trigger_threshold"){ + trigger_threshold = ref->GetInt(); + } + else if (e.first == "class"){ + class_id = ref->GetInt(); + } + } + + + if(operation == "CREATE"){ + res = _plugin_instances[0].add_policy(enforce, window_size, trigger_threshold, blocking_rate, class_id); + status_message.assign(_plugin_instances[0].get_error()); + + if(res == true){ + // add to policy list + policy_table.insert(std::pair(policy_instance_id, class_id)); + } + } + else if (operation == "UPDATE"){ + res = _plugin_instances[0].configure(enforce, window_size, trigger_threshold, blocking_rate, class_id); + status_message.assign(_plugin_instances[0].get_error()); + } + + } + } + } + + if(res == true) + status_message.assign("SUCCESS"); } + - if (res){ - set_policy_response[0].set_value("SUCCESS"); - set_policy_response[1].set_value("configuration applied"); + // generate response + //generic_policy_vars["message"].Set(notify_message_, status_message.c_str()); + + if(res == false){ + generic_policy_vars["status"].Set(notify_message_, "ERROR"); + } + else { + if(operation == "DELETE"){ + generic_policy_vars["status"].Set(notify_message_, "DELETED"); + } + else{ + generic_policy_vars["status"].Set(notify_message_, "OK"); + } } - set_policy_resp_obj.set_values(response,resp_pointer); - return true; + + StringBuffer s_buffer; + Writer writer(s_buffer); + notify_message_.Accept(writer); + response.assign(s_buffer.GetString(), s_buffer.GetLength()); + mdclog_write(MDCLOG_DEBUG, "Set Policy Response = %s\n", response.c_str()); + return res; }; - -bool admission::getPolicy(const char * message, int message_length, std::string & response){ - // Note : by same token, when returning sliding window length : translate to - // minutes - policy_vars[0].set_value(bool(current_config["enforce"])); - policy_vars[1].set_value((int)( (double)current_config["window_length"]/60.0)); - policy_vars[2].set_value(current_config["blocking_rate"]); - policy_vars[3].set_value(current_config["trigger_threshold"]); - - int res = get_policy_resp_obj.set_values(response, policy_pointer); +// control plane function to retreive policy set in plugin +// This is just a placeholder. Still TBD ..... +bool admission::getPolicy(const char * message, int message_length, std::string & response){ return true; } -int admission::getMetrics(std::string & response){ - std::vector metric_pointers; - metric_pointers.push_back(&metric_responses[0]); +// control plane function to retreive metrics from plugin +// crafts into ves schema based JSON payload +int admission::getMetrics(std::vector & response_vector){ + + int res; + // the list of active policies on the protector plugin can + // dynamically change. - auto ts = std::chrono::time_point_cast(std::chrono::system_clock::now()); - auto epoch = ts.time_since_epoch(); - auto val_ms = std::chrono::duration_cast(epoch); - auto val_s = std::chrono::duration_cast(epoch); - - long int current_time_stamp = val_ms.count(); - long int current_time = val_s.count(); - long int interval = current_time_stamp - prev_time_stamp; + // run through active policies + for(auto const &e: policy_table){ + int id = e.second; + std:: string response; + process_counters(id, response); + response_vector.emplace_back(response); + } - curr_values[0] = _plugin_instances[0].get_requests(); - curr_values[1] = _plugin_instances[0].get_rejects(); + // also account for default policy + int id = -1; + std::string response; + process_counters(id, response); + response_vector.emplace_back(response); - //curr_values[0] = rand()%100 + prev_values[0]; - //curr_values[1] = rand()%20 + prev_values[1]; - - //std::cout <<" Accept counter = " << curr_values[0]<< " Reject counter = " << curr_values[1] << " Request Count = " << (curr_values[0] - prev_values[0]) << " Reject count = " << curr_values[1] - prev_values[1] << std::endl; - - metric_responses[4].set_value(std::to_string(curr_values[0] - prev_values[0])); - metric_responses[5].set_value(std::to_string((curr_values[0] - prev_values[0]) - (curr_values[1] - prev_values[1]))); - metric_responses[6].set_value(prev_time_stamp); - metric_responses[8].set_value(current_time_stamp); - metric_responses[7].set_value(interval); - //metric_responses[9].set_value(current_time); - prev_values = curr_values; - prev_time_stamp = current_time_stamp; - int res = metrics_obj.set_values(response, metric_pointers); + // clear out counters for expired policies + for (auto const &e : counters){ + + if (e.first != -1 && ! _plugin_instances[0].is_active(e.first)){ + counters.erase(e.first); + } + } - return res; + return 0; +} + + +void admission::process_counters(int id, std::string & response){ + + + long request_count, accept_count, curr_timestamp, time_interval; + long int requests = _plugin_instances[0].get_requests(id); + long int rejects = _plugin_instances[0].get_rejects(id); + + + std::chrono::time_point ts; + ts = std::chrono::time_point_cast (std::chrono::system_clock::now()); + curr_timestamp =ts.time_since_epoch().count(); + + // do we have counters for this policy ? + auto prev_it = counters.find(id); + if(prev_it == counters.end()){ + + // new policy seeing for first time + request_count = requests; + accept_count = requests - rejects; + time_interval = 0; + + // store + counters.insert(std::pair>(id, std::vector({requests, rejects, curr_timestamp}))); + } + else{ + request_count = requests - prev_it->second[0]; + accept_count = request_count - (rejects - prev_it->second[1]); + time_interval = ceil((curr_timestamp - prev_it->second[2])/1e+6); // seconds + + // update history + prev_it->second[0] = requests; + prev_it->second[1] = rejects; + prev_it->second[2] = curr_timestamp; + } + + // generate string and add to response vector + metric_vars["class"].Set(metrics_message_, std::to_string(id).c_str()); + metric_vars["request_count"].Set(metrics_message_, std::to_string(request_count).c_str()); + metric_vars["accept_count"].Set(metrics_message_, std::to_string(accept_count).c_str()); + metric_vars["epoch"].Set(metrics_message_, std::to_string(curr_timestamp).c_str()); + metric_vars["report_interval"].Set(metrics_message_, std::to_string(time_interval).c_str()); + + StringBuffer sb_buffer; + Writer writer(sb_buffer); + metrics_message_.Accept(writer); + response.assign(sb_buffer.GetString(), sb_buffer.GetLength()); + + } diff --git a/src/protector-plugin/admission_policy.hpp b/src/protector-plugin/admission_policy.hpp index afb5843..298352e 100644 --- a/src/protector-plugin/admission_policy.hpp +++ b/src/protector-plugin/admission_policy.hpp @@ -29,6 +29,14 @@ #pragma once #ifndef ADMISSION_CTRL #define ADMISSION_CTRL + +#include +#include +#include +#include + +#include +#include #include #include #include @@ -36,44 +44,45 @@ #include #include +#define MAX_INSTANCES 10 + class admission: virtual public Policy { public: - admission(std::string, std::string, std::string, unsigned int, bool report_only=true); + admission(std::string, std::string, std::string, unsigned int, std::string, bool report_only=true); ~admission(void); protector * get_protector_instance(unsigned int); bool setPolicy(const char *, int , std::string & ); bool getPolicy(const char * , int, std::string & ); std::string getName(void); - int getMetrics(std::string & ) ; - + int getMetrics(std::vector & ) ; + std::string get_error(void) const {return error_string;}; + unsigned int get_num_policies(void) {return policy_table.size();}; private: void storePolicy(void); void init_log(void); void setPolicyVars(void); void instantiate_protector_plugin(bool); + bool load_file(std::string, std::string &) ; + bool load_schema(const std::string & , const std::string & , std::unique_ptr & , std::unique_ptr &); + void process_counters(int , std::string & ); - std::map current_config; - std::map prev_config; std::vector _plugin_instances; + std::map policy_table; + std::map window_policy_vars; + std::map generic_policy_vars; + std::map metric_vars; - std::vector policy_vars; - std::vector set_policy_response; - std::vector get_policy_response; - std::vector metric_responses; - std::vector policy_pointer; - jsonHandler set_policy_req_obj; - jsonHandler set_policy_resp_obj; - - jsonHandler get_policy_req_obj; - jsonHandler get_policy_resp_obj; - - jsonHandler metrics_obj; - - std::vector prev_values; - std::vector curr_values; + std::unique_ptr downstream_schema_ref_, notify_schema_ref_, metrics_schema_ref_; + std::unique_ptr downstream_validator_, notify_validator_, metrics_validator_; + rapidjson::Document notify_message_, metrics_message_; + std::string _xapp_id; + + // stores past count of requests, reject and time stamp + std::map> counters; + std::string error_string; unsigned long int prev_time_stamp; }; diff --git a/src/protector-plugin/sliding_window.cc b/src/protector-plugin/sliding_window.cc index bdcf736..f278be4 100644 --- a/src/protector-plugin/sliding_window.cc +++ b/src/protector-plugin/sliding_window.cc @@ -95,7 +95,7 @@ bool sliding_window::update_window(unsigned int events){ bool sliding_window::resize_window(unsigned int window_size){ if (window_size < MIN_WINDOW_SIZE || window_size > MAX_WINDOW_SIZE){ std::stringstream ss; - ss << "Error ::" << __FILE__ << ","<< __LINE__ << " window size must be in [ " << MIN_WINDOW_SIZE << "," << MAX_WINDOW_SIZE << "]" << std::endl; + ss << "Error ::" << __FILE__ << ","<< __LINE__ << " Illegal window size ! window size must be in [ " << MIN_WINDOW_SIZE << "," << MAX_WINDOW_SIZE << "]" << std::endl; error_string = ss.str(); return false; } diff --git a/src/run_xapp.sh b/src/run_xapp.sh index 1848824..ec8ba55 100755 --- a/src/run_xapp.sh +++ b/src/run_xapp.sh @@ -1,12 +1,13 @@ #! /bin/bash SAMPLES=../schemas/samples.json -A1_SCHEMA=../schemas/adm-ctrl-xapp-schema.json +A1_SCHEMA=../schemas/rate-control-policy.json VES_SCHEMA=../schemas/ves_schema.json VES_POST_INTERVAL=10 #VES_URL=http://192.168.100.34:9200/acxapp/_doc VES_URL=http://127.0.0.1:6350 THREADS=1 -gNodeB=NYC123,ABC345,CDR331 +gNodeB=NYC123 OP_MODE=REPORT +XAPP_ID="ric-xapp-1-sjwn"; #echo "Running ./adm-ctrl-xapp -s $SAMPLES -a $A1_SCHEMA -i $VES_POST_INTERVAL -g $gNodeB -v $VES_SCHEMA -t $THREADS"; -./adm-ctrl-xapp -s $SAMPLES -a $A1_SCHEMA -u $VES_URL -i $VES_POST_INTERVAL -g $gNodeB -v $VES_SCHEMA -t $THREADS -c $OP_MODE +./adm-ctrl-xapp -s $SAMPLES -a $A1_SCHEMA -u $VES_URL -i $VES_POST_INTERVAL -g $gNodeB -v $VES_SCHEMA -t $THREADS -c $OP_MODE -x $XAPP_ID --verbose diff --git a/src/xapp_utils.cc b/src/xapp_utils.cc index 8130a1d..68394bc 100644 --- a/src/xapp_utils.cc +++ b/src/xapp_utils.cc @@ -24,21 +24,8 @@ #include "xapp_utils.hpp" -// Constructor that automatically determines number of threads -XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_attempts(0), _num_fails(0){ - - _num_threads = std::thread::hardware_concurrency(); - strcpy(_xapp_name, xapp_name); - strcpy(_proto_port, proto_port); - init(_msg_size); - get_routes(); - -}; - - - -// Constructor that takes number of threads as argument -XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size, int num_threads): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_threads(num_threads),_num_attempts(0), _num_fails(0) { +// Constructor +XaPP::XaPP(char *xapp_name, char *proto_port, int msg_size): _is_ready(0), _listen(false), _msg_size(msg_size) { strcpy(_xapp_name, xapp_name); strcpy(_proto_port, proto_port); @@ -59,6 +46,7 @@ XaPP::~XaPP(void){ if (_rmr_ctx){ rmr_close(_rmr_ctx); } + // delete mutex delete _transmit; }; @@ -75,13 +63,8 @@ void XaPP::init(int msg_size){ } // Initialze the rmr context - if ( (_rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE)) == NULL){ - // throw exception here .. - std::stringstream ss; - ss << "Error ::" << __FILE__ << "," << __LINE__ << " Error initiatilizing RMR context for " << _xapp_name << " on port " << _proto_port << " Reason = " << strerror(errno) << std::endl; - mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); - throw ss.str(); - } + _rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE); + assert(_rmr_ctx != NULL); } @@ -109,75 +92,73 @@ void XaPP::get_routes(void){ // Get a tx buffer in case we need to do a transmit from the main thread itself - if ( (_rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE)) == NULL){ - // throw exception here .. - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " Error getting a send buffer"; - throw std::runtime_error(error_string); - } - + _rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE); + assert(_rmr_tx_message != NULL); std::cout <<"Route Table received. Send buffer allocated" << std::endl; _transmit = new std::mutex(); } // Send method that takes TLV (type/length/value) input -bool XaPP::Send(int type, int payload_len, void *payload){ - - if (likely(_is_ready)){ - if (likely(payload_len <= RMR_BUFFER_SIZE)){ - _rmr_tx_message->mtype = type; - memcpy(_rmr_tx_message->payload, payload, payload_len); - _rmr_tx_message->len = payload_len; - return Send(_rmr_tx_message); - } - else{ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " message payload len " + std::to_string(payload_len) + " exceeds maximum buffer size " + std::to_string(RMR_BUFFER_SIZE); - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - } +bool XaPP::Send(int type, size_t payload_len, void *payload, link_types mode, tx_types send_type){ + + if (likely(payload_len <= RMR_BUFFER_SIZE)){ + _rmr_tx_message->mtype = type; + memcpy(_rmr_tx_message->payload, payload, payload_len); + _rmr_tx_message->len = payload_len; + return Send(_rmr_tx_message, mode, send_type); } else{ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " rmr not ready to send"; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); + std::stringstream ss; + ss << __FILE__ << "," << __LINE__ << " message payload length " << payload_len << " exceeds maximum allowed size " << RMR_BUFFER_SIZE << std::endl; + mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); } - + return false; } // Send method that takes TLV (type/length/value) input + MEID -bool XaPP::Send(int type, int payload_len, void *payload, unsigned char const * meid){ - if (!_is_ready){ - return false; +bool XaPP::Send(int type, size_t payload_len, void *payload, unsigned char const * meid, link_types mode, tx_types send_type){ + if (likely(payload_len <= RMR_BUFFER_SIZE)){ + _rmr_tx_message->mtype = type; + memcpy(_rmr_tx_message->payload, payload, payload_len); + _rmr_tx_message->len = payload_len; + rmr_str2meid(_rmr_tx_message, meid); + return Send(_rmr_tx_message, mode, send_type); + } + else{ + std::stringstream ss; + ss << __FILE__ << "," << __LINE__ << " message payload length " << payload_len << " exceeds maximum allowed size " << RMR_BUFFER_SIZE << std::endl; + mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); } - _rmr_tx_message->mtype = type; - memcpy(_rmr_tx_message->payload, payload, payload_len); - _rmr_tx_message->len = payload_len; - rmr_str2meid(_rmr_tx_message, meid); - return Send(_rmr_tx_message); - + return false; } // Send method that takes a buffer -bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){ +bool XaPP::Send(rmr_mbuf_t * rmr_tx_message, link_types mode, tx_types send_type){ if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE && rmr_tx_message->len > 0)){ - int i = 0; + unsigned int i = 0; rmr_tx_message->sub_id = RMR_VOID_SUBID; - while(i <= _num_retries){ + + while(i <= link_retries[mode]){ //rmr_tx_message->state = 0; // fix for nng - rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message); - _num_attempts ++; + // how to send + if(likely(send_type == ROUTE)){ + rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message); + } + else{ + rmr_tx_message = rmr_rts_msg(_rmr_ctx, rmr_tx_message); + } if (! rmr_tx_message){ - // CRITICAL EROR .. log it - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " rmr_send returned NULL"; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); + // CRITICAL EROR .. log it and return + std::stringstream ss; + ss << __FILE__ << "," << __LINE__ << " RMR send function returned NULL. Reason = " << strerror(errno) << std::endl; + mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); return false; } else if (rmr_tx_message->state == RMR_OK){ @@ -185,7 +166,7 @@ bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){ } else if(rmr_tx_message->state == RMR_ERR_RETRY){ i++; - _num_fails++; + std::this_thread::sleep_for(std::chrono::milliseconds(link_delays[mode])); } else { mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno)); @@ -240,11 +221,11 @@ void XaPP::_error_handler(rmr_mbuf_t *message){ // Some get/set methods //--------------------------------------- -std::string XaPP::getName(void){ +std::string XaPP::get_name(void){ return std::string(_xapp_name); } -int XaPP::getStatus(void){ +int XaPP::get_status(void){ return _is_ready; } @@ -257,30 +238,6 @@ void * XaPP::get_rmr_context(void){ return _rmr_ctx; } -void XaPP::set_num_retries(int num_retries){ - if (num_retries < 0 || num_retries > MAX_RETRIES){ - throw "[xapp_utils] : Illegal value of num_retries. Must be positive integer between 0 and MAX_RETRIES\n"; - } - - _num_retries = num_retries; -} - -int XaPP::get_num_retries(void){ - return _num_retries; -} - - -unsigned long XaPP::get_Send_attempts(void){ - return _num_attempts; -}; - - -unsigned long XaPP::get_Send_fails(void){ - return _num_fails; -}; - - - void init_logger(const char *AppName, mdclog_severity_t log_level) { diff --git a/src/xapp_utils.hpp b/src/xapp_utils.hpp index fd9843c..4537ebc 100644 --- a/src/xapp_utils.hpp +++ b/src/xapp_utils.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -36,7 +37,6 @@ #ifndef XAPP_UTIL # define XAPP_UTIL -#define DEBUG 0 #define XAPP_NAME_LENGTH 128 #define PROTO_PORT_LENGTH 16 @@ -55,6 +55,15 @@ #define unlikely(x) (x) #endif + +// define RMR Send behaviour for different link-types +// controls how often we try and delay between tries, as well as method + +enum link_types {LOW_LATENCY, HIGH_RELIABILITY}; +static const unsigned int link_delays[] = {1, 10}; // milli-seconds to wait before retries +static const unsigned int link_retries[] = {4, 15}; // number of times to retry +enum tx_types {ROUTE, RTS}; // regular rmr or rts + void init_logger(const char *AppName, mdclog_severity_t log_level); @@ -63,46 +72,37 @@ class XaPP { public: XaPP(char *, char *, int); - XaPP(char *, char *, int, int); ~XaPP(void); XaPP(XaPP &&) = default; // destructor freaks out with non-copyable thread otherwise .. - std::string getName(void); - int getStatus(void); + std::string get_name(void); + + int get_status(void); + + size_t get_num_active_threads(void) const { return thread_group.size(); }; + // ideally can reduce tempate definitions to just two // but for now leaving it open ... - // template definition to allow a user defined - // processor to be started in multiple threads - template - void Start(messageProcessor &&); - - // template definition to allow a user defined - // processor and error handler if a send fails - // to be started in multiple threads - template - void Start(messageProcessor &&, errorHandler &&); - // Template to allow a user defined processor to start - // on a single thread each time it is invoked + // on a thread template unsigned int StartThread(messageProcessor &&); - // Template to allow a user defined processor and + // Template to allow a user defined processor AND // error handle to start // on a single thread each time it // is invoked template unsigned int StartThread(messageProcessor &&, errorHandler &&); void Stop(void); - bool Send(int type, int payload_len, void *payload); - bool Send(int type, int payload_len, void *payload, unsigned char const *meid); - bool Send(rmr_mbuf_t * rmr_tx_message); + + // various flavours of send : first two finally call the last + bool Send(int type, size_t payload_len, void *payload, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE); + bool Send(int type, size_t payload_len, void *payload, unsigned char const *meid, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE); + bool Send(rmr_mbuf_t * rmr_tx_message, link_types mode = link_types::LOW_LATENCY, tx_types send_type = tx_types::ROUTE); + void * get_rmr_context(void); - void set_num_retries(int ); - int get_num_retries(void ); - unsigned long get_Send_attempts(void); - unsigned long get_Send_fails(void); private: @@ -119,10 +119,9 @@ private: int _is_ready; bool _listen; int _num_retries; + int _retry_interval; int _msg_size; unsigned int _num_threads; - unsigned long _num_attempts; - unsigned long _num_fails; void* _rmr_ctx; std::mutex *_transmit; @@ -133,7 +132,7 @@ private: }; - +// main workhorse thread which does the listen->process->respond loop template void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){ @@ -141,6 +140,8 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler // Get the thread id std::thread::id my_id = std::this_thread::get_id(); std::stringstream thread_id; + std::stringstream ss; + thread_id << my_id; // Stats counters @@ -148,65 +149,40 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler unsigned long attempts = 0; unsigned long fails = 0; - // Get the rmr context + // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe) void *rmr_context = parent->get_rmr_context(); - if (!rmr_context){ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " Thread : " + thread_id.str() + " Listener cannot run : no context available"; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; - } - + assert(rmr_context != NULL); + // Get buffer specific to this thread - rmr_mbuf_t *rmr_message; - if ( (rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE)) == NULL){ - // throw exception here .. - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string reason = strerror(errno); - std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a buffer : " + reason; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; - } - + rmr_mbuf_t *rmr_message = NULL; + rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE); + assert(rmr_message != NULL); // Create an epoll instance int rcv_fd, ep_fd; struct epoll_event eve, trigger; - if( (rcv_fd = rmr_get_rcvfd(rmr_context)) < 0){ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string reason = strerror(errno); - std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a receive file descriptor : " + reason; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; - } - - if( (ep_fd = epoll_create1(0) ) < 0){ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string reason = strerror(errno); - std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting an epoll file descriptor :" + reason; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; - } - + rcv_fd = rmr_get_rcvfd(rmr_context); + assert(rcv_fd > 0); + + ep_fd = epoll_create1(0); + assert(ep_fd > 0); + trigger.events = EPOLLIN|EPOLLET|EPOLLONESHOT; trigger.data.fd = rcv_fd; if (epoll_ctl (ep_fd, EPOLL_CTL_ADD, rcv_fd, &trigger) < 0){ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string reason = strerror(errno); - std::string error_string = identifier + " Thread: " + thread_id.str() + " Error registering epoll file descriptor : " + reason; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; + ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error registering epoll file descriptor" << " Reason = " << strerror(errno) << std::endl; + mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); + throw std::runtime_error(ss.str()); } - int num_retries = this->get_num_retries(); - int i = 0; int num_fds = 0; bool send_ok; mdclog_write(MDCLOG_INFO, "Starting thread %s", thread_id.str().c_str()); - + + // the workhorse loop while(parent->_isRunning()){ num_fds = epoll_wait(ep_fd, &eve, 1, EPOLL_TIMEOUT); @@ -216,11 +192,9 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler // Re-arm the trigger if (epoll_ctl (ep_fd, EPOLL_CTL_MOD, rcv_fd, &trigger) < 0){ - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string reason = strerror(errno); - std::string error_string = identifier + " Thread: " + thread_id.str() + " Error re-arming epoll : " + reason; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - throw error_string; + ss << __FILE__ << "," << __LINE__ << " Thread " << thread_id.str() << " Error re-arming epoll" << " Reason = " << strerror(errno) << std::endl; + mdclog_write(MDCLOG_ERR, ss.str().c_str(), ""); + throw std::runtime_error(ss.str()); } } @@ -233,54 +207,25 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler recvs++; bool res = msg_fn(rmr_message); - // is there anything to send + // is there anything to send ? if (res && rmr_message != NULL && likely (rmr_message->len > 0 && rmr_message->len <= RMR_BUFFER_SIZE)){ - i = 0; - rmr_message->sub_id = RMR_VOID_SUBID; + + rmr_message->sub_id = RMR_VOID_SUBID; // do we change this ? send_ok = false; - while(i < num_retries){ - - // Need to handle differently depending on whether message - // is for A1 (determined by type) or non-A1. - // For now, A1 requires we bypass the routing table and send - // directly back to originator using rmr_rts_msg rather than - // over the bus - - if (unlikely(rmr_message->mtype == DC_ADM_INT_CONTROL_ACK || rmr_message->mtype == DC_ADM_GET_POLICY_ACK)){ - rmr_message = rmr_rts_msg(rmr_context, rmr_message); - } - else{ - rmr_message->state = 0; // fix for nng - rmr_message = rmr_send_msg(rmr_context, rmr_message); - } - attempts ++; - - if (! rmr_message){ - // CRITICAL error. break out of loop - std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; - std::string error_string = identifier + " rmr_send returned NULL"; - mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); - break; - } - else if (rmr_message->state == RMR_OK){ - send_ok = true; - break; - } - - else if(rmr_message->state == RMR_ERR_RETRY){ - i++; - fails++; - } - else{ - mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_message->state, strerror(errno)); - break; - } - + if (unlikely(rmr_message->mtype == A1_POLICY_RESP)){ + // for a1 messages we use send in high reliability mode and RTS + send_ok = Send(rmr_message, HIGH_RELIABILITY, RTS); + } + else{ + rmr_message->state = 0; // fix for nng + send_ok = Send(rmr_message); } - + attempts ++; + if (send_ok == false){ error_handler(rmr_message); + fails ++; } } @@ -302,37 +247,6 @@ void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler mdclog_write(MDCLOG_INFO, "Finished thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails); } - -template -void XaPP::Start(messageProcessor && msg_fn){ - - std::lock_guard guard(*_transmit); - _listen = true; - - // Spin up the the workThreads ..... - for(unsigned int i = 0; i < _num_threads; i++){ - thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);})))); - } - -}; - -// template if calling function provides an error handler also -template -void XaPP::Start(messageProcessor && msg_fn, errorHandler && error_handler){ - - std::lock_guard guard(*_transmit); - _listen = true; - - // Spin up the the workThreads ..... - for(unsigned int i = 0; i < _num_threads; i++){ - //std::cout << "Starting thread number " << i << std::endl; - thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, error_handler, this);})))); - - } - - -}; - // Template to allow a user defined processor to start // on a specific thread template diff --git a/test/Makefile b/test/Makefile index 8ffb30a..c75471c 100644 --- a/test/Makefile +++ b/test/Makefile @@ -30,7 +30,7 @@ E2AP_C_DIR=../src/E2AP-c SUBSCR_DIR = ../src/E2AP-c/subscription E2SM_DIR=../src/E2SM X2AP_DIR=../src/X2AP -JSON_DIR=../src/json +#JSON_DIR=../src/json A1= ./a1 PROTECTOR=../src/protector-plugin @@ -44,7 +44,7 @@ SUBSCRFLAGS= -I$(SUBSCR_DIR) -I$(ASN1C_DIR) E2SMFLAGS = -I$(E2SM_DIR) -I$(ASN1C_DIR)/ X2FLAGS= -I$(X2AP_DIR) -I$(ASN1C_DIR)/ A1FLAGS = -I$(A1) -JSONFLAGS= -I$(JSON_DIR) +#JSONFLAGS= -I$(JSON_DIR) PROTECTORFLAGS= -I$(PROTECTOR) LIBS= -lrmr_nng -lnng -lpthread -lm @@ -63,7 +63,7 @@ SUBSCR_SRC= $(wildcard $(SUBSCR_DIR)/*.cc) E2SM_SRC= $(wildcard $(E2SM_DIR)/*.cc) X2AP_SRC = $(wildcard $(X2AP_DIR)/*.cc) ASN1C_SRC = $(wildcard $(ASN1C_DIR)/*.c) -JSON_SRC = $(wildcard $(JSON_DIR)/*.cc) +#JSON_SRC = $(wildcard $(JSON_DIR)/*.cc) PROTECTOR_SRC = $(wildcard $(PROTECTOR)/*.cc) #=========================================== @@ -75,7 +75,7 @@ SUBSCRIPTION_FLOW_UNIT_TEST = unit_test_subscription_flow.o E2AP_IND_UNIT_TEST= unit_test_e2ap_indication.o E2AP_CTRL_UNIT_TEST= unit_test_e2ap_control.o E2SM_UNIT_TEST=unit_test_e2sm.o -JSON_UNIT_TEST = unit_test_json.o +#JSON_UNIT_TEST = unit_test_json.o SGNB_ADD_REQ_UNIT_TEST = unit_test_sgnb_addition_request.o MESSAGE_PROC_UNIT_TEST = unit_test_message_processor.o SLIDING_WINDOW_UNIT_TEST = unit_test_sliding_window.o @@ -98,7 +98,7 @@ E2AP_C_OBJ = $(E2AP_C_SRC:.cc=.o) E2SM_OBJ = $(E2SM_SRC:.cc=.o) ASN1C_MODULES = $(ASN1C_SRC:.c=.o) X2AP_OBJ = $(X2AP_SRC:.cc=.o) -JSON_OBJ = $(JSON_SRC:.cc=.o) +#JSON_OBJ = $(JSON_SRC:.cc=.o) PROTECTOR_OBJ = $(PROTECTOR_SRC:.cc=.o) #=========================================== @@ -107,7 +107,7 @@ $(XAPP_UTILS_OBJ) $(XAPP_UNIT_TEST) :export CPPFLAGS= $(BASEFLAGS) $(XAPPFLAGS) $(A1_OBJ) $(MOCK_A1SERVER_OBJ) $(JSON_UNIT_TEST) :export CPPFLAGS= $(BASEFLAGS) $(A1FLAGS) $(XAPPFLAGS) -$(XAPP_MPROC_OBJ) $(MESSAGE_PROC_UNIT_TEST) $(MOCK_SUB_CLIENT_OBJ): export CPPFLAGS=$(BASEFLAGS) $(JSONFLAGS) $(XAPPFLAGS) $(E2FLAGS) $(X2FLAGS) $(SUBSCRFLAGS) $(PLUGINFLAGS) $(PROTECTORFLAGS) $(E2SMFLAGS) +$(XAPP_MPROC_OBJ) $(MESSAGE_PROC_UNIT_TEST) $(MOCK_SUB_CLIENT_OBJ): export CPPFLAGS=$(BASEFLAGS) $(XAPPFLAGS) $(E2FLAGS) $(X2FLAGS) $(SUBSCRFLAGS) $(PLUGINFLAGS) $(PROTECTORFLAGS) $(E2SMFLAGS) $(SUBSCRIPTION_FLOW_UNIT_TEST): export CPPFLAGS=$(BASEFLAGS) $(SUBSCRFLAGS) $(E2SMFLAGS) @@ -125,9 +125,9 @@ $(PLUGINS_OBJ) : export CPPFLAGS = $(BASEFLAGS) $(JSONFLAGS) $(MOCK_E2TERM_OBJ): export CPPFLAGS = $(BASEFLAGS) $(SUBSCRFLAGS) $(E2FLAGS) $(E2SMFLAGS) $(XAPPFLAGS) $(X2FLAGS) -$(JSON_OBJ) $(JSON_UNIT_TEST): export CPPFLAGS= $(BASEFLAGS) $(JSONFLAGS) +#$(JSON_OBJ) $(JSON_UNIT_TEST): export CPPFLAGS= $(BASEFLAGS) $(JSONFLAGS) -$(PROTECTOR_OBJ) $(SLIDING_WINDOW_UNIT_TEST) $(PROTECTOR_UNIT_TEST) $(ADMISSION_UNIT_TEST): export CPPFLAGS = $(BASEFLAGS) $(PLUGINFLAGS) $(JSONFLAGS) $(PROTECTORFLAGS) $(X2FLAGS) +$(PROTECTOR_OBJ) $(SLIDING_WINDOW_UNIT_TEST) $(PROTECTOR_UNIT_TEST) $(ADMISSION_UNIT_TEST): export CPPFLAGS = $(BASEFLAGS) $(PLUGINFLAGS) $(PROTECTORFLAGS) $(X2FLAGS) TEST_XAPP: $(XAPP_UTILS_OBJ) $(XAPP_UNIT_TEST) @@ -157,7 +157,7 @@ TEST_JSON: $(JSON_OBJ) $(JSON_UNIT_TEST) TEST_MESSAGE_PROCESSOR: $(MESSAGE_PROC_UNIT_TEST) $(XAPP_MPROC_OBJ) $(ASN1C_MODULES) $(E2AP_C_OBJ) $(PROTECTOR_OBJ) $(E2SM_OBJ) $(X2AP_OBJ) $(SUBSCR_OBJ) $(JSON_OBJ) $(PLUGINS_OBJ) $(CXX) -o $@ $^ $(LIBS) $(LOGLIBS) -TEST_ADMISSION: $(ADMISSION_UNIT_TEST) $(PROTECTOR_OBJ) $(X2AP_OBJ) $(ASN1C_MODULES) $(PLUGINS_OBJ) $(JSON_OBJ) +TEST_ADMISSION: $(ADMISSION_UNIT_TEST) $(PROTECTOR_OBJ) $(X2AP_OBJ) $(ASN1C_MODULES) $(PLUGINS_OBJ) $(CXX) -o $@ $^ $(LIBS) $(LOGLIBS) TEST_PROTECTOR_PLUGIN: $(PROTECTOR_UNIT_TEST) $(PROTECTOR_OBJ) $(X2AP_OBJ) $(ASN1C_MODULES) $(PLUGINS_OBJ) $(JSON_OBJ) @@ -171,7 +171,7 @@ mock-e2term-server: $(XAPP_UTILS_OBJ) $(E2AP_C_OBJ) $(SUBSCR_OBJ) $(E2SM_OBJ) $( $(CXX) -o $@ $^ $(LIBS) $(LOGLIBS) $(CPPFLAGS) -mock-a1-server: $(A1_OBJ) $(MOCK_A1SERVER_OBJ) $(XAPP_UTILS_OBJ) +mock-a1-tool: $(A1_OBJ) $(MOCK_A1SERVER_OBJ) $(XAPP_UTILS_OBJ) $(CXX) -o $@ $^ $(LIBS) $(LOGLIBS) all_tests: TEST_XAPP TEST_SUBSCRIPTION TEST_ADMISSION TEST_E2AP_INDICATION TEST_E2AP_CONTROL TEST_E2SM TEST_X2_SGNB TEST_JSON TEST_SUBSCRIPTION_FLOW TEST_MESSAGE_PROCESSOR TEST_SLIDING_WINDOW TEST_PROTECTOR_PLUGIN @@ -180,5 +180,5 @@ all_tests: TEST_XAPP TEST_SUBSCRIPTION TEST_ADMISSION TEST_E2AP_INDICATION TEST_ install: mock-e2term-server install -D mock-e2term-server /usr/local/bin/mock-e2term-server clean: - -rm TEST_* *.o e2e-test-client mock-e2term-server e2e-perf-client e2e-perf-server mock-a1-server + -rm TEST_* *.o e2e-test-client mock-e2term-server e2e-perf-client e2e-perf-server mock-a1-tool -cd ../src && make -f ../src/Makefile clean diff --git a/test/README.md b/test/README.md index 621f42e..0171512 100644 --- a/test/README.md +++ b/test/README.md @@ -1,19 +1,3 @@ -#================================================================================== -# Copyright (c) 2018-2019 AT&T Intellectual Property. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#================================================================================== - This directory contains various unit tests as well as mock-ups for integration testing. Several unit test cases are provided to test and illustrate usage of various components of the AC xAPP. The unit tests use the CATCH framework(https://github.com/catchorg/Catch2). It is a header only framework which can be installed simply by retreiving the header and placing it in the search path. To install : @@ -44,8 +28,3 @@ Pre-requisities for running the mock ves collector are : 2. cherrypy module (pip install cherrypy) The mock ves collector can be simply invoked as python ./mock_ves_collector.py - - -3. A mock A1 mediator : The mock up a1 mediator executable sends various JSON (valid/invalid) payloads in sequence to illustrate the behaviour and response of the AC xAPP. It can be compiled as 'make mock-a1-server' -To execute the mock-a1-server, it requires as argument a port to listen and send on (can be any port that does not conflict with others). The A1 mediator uses rts send/receive and hence bypasses the routing table. -./mock-a1-server -p diff --git a/test/mock_a1_mediator.cc b/test/mock_a1_mediator.cc index 4eae5df..e94c92b 100644 --- a/test/mock_a1_mediator.cc +++ b/test/mock_a1_mediator.cc @@ -19,7 +19,7 @@ /* Author : Ashwin Sridharan A sample test client to demonstrate A1 functionality. - Sends different kind of policy requests (valid/invalid) and prints out response + Sends different kind of policy requests (valid/invalid), create/update/delete and prints out response */ #include @@ -27,23 +27,31 @@ #include #include #include +#include +#include +#include +#include #include #include #include +#define MAX_TIMEOUTS 2 + std::string gNodeB = ""; +std::mutex notify_lock; +std::condition_variable notify_var; bool rcv_message(rmr_mbuf_t *message){ - std::string response; switch(message->mtype){ - case DC_ADM_INT_CONTROL_ACK: - std::cout <<"Received response = " << (char *)message->payload << " of len = " << strlen((char *)message->payload) << "Actual = " << message->len << std::endl; + case A1_POLICY_RESP: + { + std::lock_guard lck(notify_lock); + std::cout <<"A1 Mediator received response = " << (char *)message->payload << " of len = " << strlen((char *)message->payload) << std::endl; + } + // released the lock. notify the sleep thread (if any) + notify_var.notify_all(); break; - case DC_ADM_GET_POLICY_ACK: - std::cout <<"Received Policy = " << (char *)message->payload << " of len = " << strlen((char *)message->payload) << "Actual = " << message->len << std::endl; - break; - default: std::cout <<"Unknown RMR message of type " << message->mtype << " received" << std::endl; } @@ -69,57 +77,108 @@ void msg_error(rmr_mbuf_t *message){ int main(int argc, char *argv[]){ + char name[128] = "test_a1_client"; - char port[16] = "tcp:4560"; + char port[16] = "tcp:9000"; unsigned int num_threads = 1; std::unique_ptr my_xapp; + std::string schema_file; + + enum OPERATIONS{CREATE, UPDATE, DELETE}; + static const char * op_strings[] = {"CREATE", "UPDATE", "DELETE"}; + + OPERATIONS op = CREATE; + std::string instance_id = "ac-xapp-1"; + int class_id = 5; + int enforce = 0; + int blocking_rate = 90; // percentage + int window_length = 60; // seconds + int trigger_threshold = 40; + + std::chrono::seconds time_out(1); // Parse command line options static struct option long_options[] = { - /* Thse options require arguments */ - {"name", required_argument, 0, 'n'}, - {"port", required_argument, 0, 'p'}, - - }; - + /* Thse options require arguments */ + {"name", required_argument, 0, 'n'}, + {"port", required_argument, 0, 'p'}, + {"window", required_argument, 0, 'w'}, + {"blockrate", required_argument, 0, 'b'}, + {"trigger", required_argument, 0, 't'}, + {"class", required_argument, 0, 'c'}, + {"op", required_argument, 0, 'o'}, + {"instance", required_argument, 0, 'i'}, + {"enforce", no_argument, &enforce, 1}, - while(1) { + }; - int option_index = 0; - char c = getopt_long(argc, argv, "n:p:", long_options, &option_index); - if(c == -1){ - break; - } + while(1) { - switch(c) - { + int option_index = 0; + char c = getopt_long(argc, argv, "n:p:w:b:t:c:o:i:", long_options, &option_index); - case 0: - /* An option flag was set. - Do nothing for now */ - break; - - case 'n': - strcpy(name, optarg); - break; + if(c == -1){ + break; + } + + switch(c) + { + + case 0: + /* An option flag was set. + Do nothing for now */ + break; - case 'p': - strcpy(port, optarg); - break; + case 'n': + strcpy(name, optarg); + break; + case 'p': + strcpy(port, optarg); + break; - case 'h': - usage(argv[0]); - exit(0); + case 'w': + window_length = atoi(optarg); + break; + + case 't': + trigger_threshold = atoi(optarg); + break; + + case 'b': + blocking_rate = atof(optarg); + break; + + case 'o': + op = static_cast(atoi(optarg)); + break; + + case 'i': + instance_id.assign(optarg); + break; + + case 'c': + class_id = atoi(optarg); + break; + + case 'h': + usage(argv[0]); + exit(0); - default: - usage(argv[0]); - exit(1); - } - }; + default: + usage(argv[0]); + exit(1); + } + }; + + int log_level = MDCLOG_INFO; + init_logger(name, static_cast(log_level)); + + mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", name); + mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", port); init_logger(name, MDCLOG_INFO); @@ -127,66 +186,57 @@ int main(int argc, char *argv[]){ mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", port); mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto"); - my_xapp = std::make_unique(name, port, 1024, 1); + my_xapp = std::make_unique(name, port, 16384); // Start receiving loop ... std::vector thread_ids(num_threads); for(unsigned int i = 0; i < num_threads; i++){ - thread_ids[i] = (*my_xapp).StartThread(&rcv_message, msg_error); + thread_ids[i] = (*my_xapp).StartThread(rcv_message, msg_error); i++; }; - bool enforce = true; - int block_rate = 2; - int window_length = 20; - int trigger_threshold = 40; + char buffer[1024]; - while(1){ + std::string message_string ; + std::stringstream policy; + std::stringstream msg; + bool res = false; + switch(op){ - // Send a valid config - std::string message_string ; - std::string start = "{"; - std::string end = "}"; - - message_string = start + "\"enforce\":" + (enforce? "true":"false") + ","; - message_string += std::string("\"blocking_rate\":") + std::to_string(block_rate) + ","; - message_string += std::string("\"window_length\":") + std::to_string(window_length) + ","; - message_string += std::string("\"trigger_threshold\":") + std::to_string(trigger_threshold) + end; - memcpy(buffer, message_string.c_str(), message_string.length()); - my_xapp.get()->Send(DC_ADM_INT_CONTROL, message_string.length(), buffer); - + case CREATE: + case UPDATE: + policy <<"{ " << "\"enforce\":true, " << "\"window_length\":" << window_length << " , \"trigger_threshold\":" << trigger_threshold << ", \"blocking_rate\":" << blocking_rate << ", \"class\":" << class_id << " }" ; - sleep(2); + // Send a create/update + msg << "{ " << "\"policy_type_id\":" << 21000 << "," << "\"policy_instance_id\":\"" << instance_id << "\", \"operation\":\"" << op_strings[op] << "\", \"payload\" :" << policy.str() << "}"; + res = true; + break; - // // Send an invalid config - message_string = start + "\"enfce\":" + (enforce? "true":"false") + ","; - message_string += std::string("\"blocking_rate\":") + std::to_string(block_rate) + ","; - message_string += std::string("\"window_length\":") + std::to_string(window_length) + end; - memcpy(buffer, message_string.c_str(), message_string.length()); - my_xapp.get()->Send(DC_ADM_INT_CONTROL, message_string.length(), buffer); - sleep(2); + case DELETE: + // send a delete + msg << "{ " << "\"policy_type_id\":" << 21000 << "," << "\"policy_instance_id\": \"" << instance_id << "\", \"operation\": \"" << op_strings[op] << "\" }"; + res = true; + break; - // Send invalid JSON - message_string.assign("\"enforce\":false,"); - message_string += std::string("\"blocking_rate\":") + std::to_string(block_rate) + ","; - message_string += std::string("\"window_length\":") + std::to_string(window_length) + end; - memcpy(buffer, message_string.c_str(), message_string.length()); - my_xapp.get()->Send(DC_ADM_INT_CONTROL, message_string.length(), buffer); - - sleep(2); - + default: + std::cerr <<"Not yet supported " << std::endl; - // Send request for policy - // we don't care about contents of request for now ... - std::cout <<"Sending request to get policy" << std::endl; - my_xapp.get()->Send(DC_ADM_GET_POLICY, message_string.length(), buffer); - sleep(2); + } - window_length += 1; - + if(res){ + message_string = msg.str(); + std::cout <<"Sending message = " << message_string << std::endl; + memcpy(buffer, message_string.c_str(), message_string.length()); + my_xapp.get()->Send(A1_POLICY_REQ, message_string.length(), buffer, link_types::HIGH_RELIABILITY); } + + std::unique_lock lck(notify_lock); + // release lock and got to sleep waiting to be notified + notify_var.wait_for(lck, std::chrono::seconds(5)); + + // finish (*my_xapp).Stop(); } diff --git a/test/mock_e2term_server.cc b/test/mock_e2term_server.cc index 90aef8f..3224758 100644 --- a/test/mock_e2term_server.cc +++ b/test/mock_e2term_server.cc @@ -43,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -59,7 +60,7 @@ #include #include -#define X2_SGNB_ADDITION_REQUEST "test-data/X2AP-PDU-SgNBAdditionRequest.per" + unsigned long int num_indications = 0; @@ -73,7 +74,8 @@ bool RunProgram = true; bool subscription_active = false; int action_type = E2N_RICindicationType::E2N_RICindicationType_report; - +// PRE-ENCODED X2AP SGNB ADDITION REQUESTS +static const std::vector x2ap_files = {"test-data/X2AP-PDU-SgNBAdditionRequest_SubId_10.per", "test-data/X2AP-PDU-SgNBAdditionRequest_SubId_23.per", "test-data/X2AP-PDU-SgNBAdditionRequest_SubId_29.per", "test-data/X2AP-PDU-SgNBAdditionRequest_SubId_180.per", "test-data/X2AP-PDU-SgNBAdditionRequest_SubId_210.per"}; void usage(char *command){ std::cout <<"Usage : " << command << " "; @@ -404,8 +406,8 @@ int main(int argc, char *argv[]){ } init_logger(name, static_cast(log_level)); - XaPP my_xapp = XaPP(name, port, 16384, 1); - my_xapp.Start(Message_Handler); + XaPP my_xapp = XaPP(name, port, 16384); + my_xapp.StartThread(Message_Handler); @@ -430,19 +432,30 @@ int main(int argc, char *argv[]){ exit(-1); } - //====== x2ap sgnb addition request created by us - unsigned char x2ap_buf[1024]; - size_t x2ap_buf_size = 1024; - pfile = fopen(X2_SGNB_ADDITION_REQUEST, "r"); - if(pfile == NULL){ - std::cerr <<"Error ! Could not find test per file " << X2_SGNB_ADDITION_REQUEST << std::endl; - exit(-1); + //====== load the various x2ap requests + unsigned char ** x2ap_bufs = (unsigned char **)calloc(x2ap_files.size(), sizeof(unsigned char *)); + size_t * x2ap_buf_sizes = (size_t *)calloc(x2ap_files.size(), sizeof(size_t)); + + assert(x2ap_bufs != 0); + assert(x2ap_buf_sizes != 0); + + for(int i = 0; i < x2ap_files.size(); i++){ + std::ifstream in(x2ap_files[i], std::ifstream::ate | std::ifstream::binary); + x2ap_buf_sizes[i] = in.tellg(); + std::cout <<"Assigned " << x2ap_buf_sizes[i] << " bytes of memory for file " << x2ap_files[i] << std::endl; + x2ap_bufs[i] = (unsigned char *)calloc(x2ap_buf_sizes[i], sizeof(unsigned char)); + assert(x2ap_bufs[i] != 0); + + pfile = fopen(x2ap_files[i].c_str(), "r"); + if(pfile == NULL){ + std::cerr <<"Error ! Could not find test per file " << x2ap_files[i] << std::endl; + exit(-1); + } + + fread((char *)x2ap_bufs[i], sizeof(char), x2ap_buf_sizes[i], pfile); + fclose(pfile); } - x2ap_buf_size = fread((char *)x2ap_buf, sizeof(char), 1024, pfile); - fclose(pfile); - - //==== e2ap indication for generated x2ap pdus ric_indication_helper dinput ; dinput.action_id = 100; @@ -458,21 +471,11 @@ int main(int argc, char *argv[]){ unsigned char data[data_size]; ric_indication indication_pdu; + - - // prepare packet to send. we send - // same packet every time for now dinput.indication_header = buf_header; dinput.indication_header_size = buf_header_size; - dinput.indication_msg = x2ap_buf; - dinput.indication_msg_size = x2ap_buf_size; - dinput.indication_type = 1; // for now always ask for control back - res = indication_pdu.encode_e2ap_indication(&data[0], &data_size, dinput); - if (!res){ - std::cout <<"Error encoding E2AP Indication PDU. Reason = " << indication_pdu.get_error().c_str() << std::endl; - exit(-1); - } - + //Register signal handler to stop signal(SIGINT, EndProgram); @@ -490,12 +493,25 @@ int main(int argc, char *argv[]){ auto start_time = std::chrono::steady_clock::now(); int count = 0; - + int packet_index = 0; while(RunProgram){ if ( subscription_active && rate > 0 ){ my_xapp.Send(RIC_INDICATION, data_size, data); num_indications ++; + + + // choose packet to encode + dinput.indication_msg = x2ap_bufs[packet_index]; + dinput.indication_msg_size = x2ap_buf_sizes[packet_index]; + dinput.indication_type = 1; // for now always ask for control back + res = indication_pdu.encode_e2ap_indication(&data[0], &data_size, dinput); + if (!res){ + std::cout <<"Error encoding E2AP Indication PDU. Reason = " << indication_pdu.get_error().c_str() << std::endl; + exit(-1); + } + packet_index ++; + packet_index = packet_index % x2ap_files.size(); } std::this_thread::sleep_for(std::chrono::milliseconds(interval)); diff --git a/test/mock_ves_collector.py b/test/mock_ves_collector.py index 0c7122f..3ead24b 100755 --- a/test/mock_ves_collector.py +++ b/test/mock_ves_collector.py @@ -2,6 +2,8 @@ import cherrypy import json import os; +Rates = {}; + @cherrypy.expose class VES(object): def __init__ (self): @@ -14,6 +16,7 @@ class VES(object): measurement_interval = -1; sgnb_req_count = 0; sgnb_rej_count = 0; + class_id = None; timestamp = 0; if 'event' in json_data: @@ -22,18 +25,19 @@ class VES(object): measurement_interval = float(json_data['event']['measurementFields']['measurementInterval']); if 'additionalFields' in json_data['event']['measurementFields']: - if 'SgNB Request Rate' in json_data['event']['measurementFields']['additionalFields']: - sgnb_req_count = float(json_data['event']['measurementFields']['additionalFields']['SgNB Request Rate']); + if 'SgNB Request Count' in json_data['event']['measurementFields']['additionalFields']: + sgnb_req_count = float(json_data['event']['measurementFields']['additionalFields']['SgNB Request Count']); - if 'SgNB Accept Rate' in json_data['event']['measurementFields']['additionalFields']: - sgnb_accpt_count = float(json_data['event']['measurementFields']['additionalFields']['SgNB Accept Rate']); + if 'SgNB Accept Count' in json_data['event']['measurementFields']['additionalFields']: + sgnb_accpt_count = float(json_data['event']['measurementFields']['additionalFields']['SgNB Accept Count']); - if 'TS' in json_data['event']['measurementFields']['additionalFields']: - timestamp = json_data['event']['measurementFields']['additionalFields']['TS']; + if 'Class Id' in json_data['event']['measurementFields']['additionalFields']: + class_id = int(json_data['event']['measurementFields']['additionalFields']['Class Id']); + - print("{2}: Sgnb Request Rate = {0}, SgnB Accepts Rate = {1}\n".format( sgnb_req_count * 1000000.0 /measurement_interval, sgnb_accpt_count * 1000000.0/measurement_interval, timestamp)); - #print "Received = ", json_data; - + if measurement_interval > 0: + print("Class:{0}|Request Rate = {1}|Accept Rate = {2}\n".format(class_id, sgnb_req_count/float(measurement_interval), sgnb_accpt_count/float(measurement_interval))); + #============================= diff --git a/test/run_tests.sh b/test/run_tests.sh index f5dc0d5..13ba4bf 100755 --- a/test/run_tests.sh +++ b/test/run_tests.sh @@ -1,6 +1,8 @@ #! /bin/bash -test_cases=( "TEST_MESSAGE_PROCESSOR" "TEST_ADMISSION" "TEST_PROTECTOR_PLUGIN" "TEST_SUBSCRIPTION_FLOW" "TEST_SUBSCRIPTION" "TEST_E2AP_INDICATION" "TEST_E2AP_CONTROL" "TEST_E2SM" "TEST_JSON" "TEST_X2_SGNB" "TEST_SLIDING_WINDOW" "TEST_XAPP" ) +#test_cases=( "TEST_MESSAGE_PROCESSOR" "TEST_ADMISSION" "TEST_PROTECTOR_PLUGIN" "TEST_SUBSCRIPTION_FLOW" "TEST_SUBSCRIPTION" "TEST_E2AP_INDICATION" "TEST_E2AP_CONTROL" "TEST_E2SM" "TEST_JSON" "TEST_X2_SGNB" "TEST_SLIDING_WINDOW" ) + +test_cases=( "TEST_PROTECTOR_PLUGIN" ) # Run through test cases for((i = 0; i < ${#test_cases[@]}; i++)); do @@ -15,18 +17,18 @@ for((i = 0; i < ${#test_cases[@]}; i++)); do echo -e "UNIT TEST CASE: ${test} \e[31m FAILED \e[0m"; fi - # valgrind -q --tool=memcheck --leak-check=yes --track-origins=yes --leak-check=full ./${test} > /dev/null - # if [ $? -eq 0 ] - # then - # echo -e "Valgrind Test on ${test} \e[32m OK \e[0m "; - # else - # echo -e "Valgrind Test on ${test} \e[31m FAILED \e[0m"; - # fi + valgrind -q --tool=memcheck --leak-check=yes --track-origins=yes --leak-check=full ./${test} > /dev/null + if [ $? -eq 0 ] + then + echo -e "Valgrind Test on ${test} \e[32m OK \e[0m "; + else + echo -e "Valgrind Test on ${test} \e[31m FAILED \e[0m"; + fi done #=============================== # Generate coverage report -cd ../ -gcovr -r . --html > coverage_report.html +#cd ../ +#gcovr -r . --html > coverage_report.html diff --git a/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_10.per b/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_10.per new file mode 100644 index 0000000000000000000000000000000000000000..f9896041f4a649c644c8954f202cbf09aa7ad3dd GIT binary patch literal 368 zcmZROc4*9IVBlcLXJBIBVfewo%Hi8~&4J-GgMy)vv5BdfxrHST2E!Q!HV*a+3d}DU z&N1+Cd*-F(7v(0FWaj5FiUSQaHZlZiH((TIVB};t%h1Tm$#9WDjDsqr?RAdB5Mn({qjEEvG86iVlGAxd`WS9bR$w(#QlHn@ECBxQ;ONMJAHW>h~ CwOP0T literal 0 HcmV?d00001 diff --git a/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_180.per b/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_180.per new file mode 100644 index 0000000000000000000000000000000000000000..7d68e45b4e98943ca3b60672a16456aa8e4b7908 GIT binary patch literal 368 zcmZROc4*9IVBlcLXJBIBVfewo%Hi8~&4J-GgMy)vv5BdfxrHST2E!Q!HV*a+3d}DU z&N1+Cd*-F(7v(0FWaj5FiUSQaHZlZiH((TIVBE}bmZ6cAli?zR7zbkj1BW8#14lI84*QXGD3#9WLO+=$uI@tl95WpCBs#SONOlxmkifLY%&00 C5?W#a literal 0 HcmV?d00001 diff --git a/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_210.per b/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_210.per new file mode 100644 index 0000000000000000000000000000000000000000..0b8b80afcab745b703bf9b4b0d01ccc52295ffd0 GIT binary patch literal 368 zcmZROc4*9IVBlcLXJBIBVfewo%Hi8~&4J-GgMy)vv5BdfxrHST2E!Q!HV*a+3d}DU z&N1+Cd*-F(7v(0FWaj5FiUSQaHZlZiH((TIV7$n1mZ6cAli?zR7zbkj1BW8#14lI84*QXGD3#9WLO+=$uI@tl95WpCBs#SONOlxmkifLY%&0V CTv~tt literal 0 HcmV?d00001 diff --git a/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_23.per b/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_23.per new file mode 100644 index 0000000000000000000000000000000000000000..36769aa3fc5aa0e3100f9b98104f18b3c76cd404 GIT binary patch literal 368 zcmZROc4*9IVBlcLXJBIBVfewo%Hi8~&4J-GgMy)vv5BdfxrHST2E!Q!HV*a+3d}DU z&N1+Cd*-F(7v(0FWaj5FiUSQaHZlZiH((TIU=(9G%h1Tm$#9WDjDsqr?RAdB5Mn({qjEEvG86iVlGAxd`WS9bR$w(#QlHn@ECBxQ;ONMJAHW>iD CL|MfE literal 0 HcmV?d00001 diff --git a/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_29.per b/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_29.per new file mode 100644 index 0000000000000000000000000000000000000000..ec74f2c834a028423add2515a4e340fe50f31cd5 GIT binary patch literal 368 zcmZROc4*9IVBlcLXJBIBVfewo%Hi8~&4J-GgMy)vv5BdfxrHST2E!Q!HV*a+3d}DU z&N1+Cd*-F(7v(0FWaj5FiUSQaHZlZiH((TIV3c7v%h1Tm$#9WDjDsqr?RAdB5Mn({qjEEvG86iVlGAxd`WS9bR$w(#QlHn@ECBxQ;ONMJAHW>iJ ChFQx1 literal 0 HcmV?d00001 diff --git a/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_34.per b/test/test-data/X2AP-PDU-SgNBAdditionRequest_SubId_34.per new file mode 100644 index 0000000000000000000000000000000000000000..80f284aa3d6c07be3ee227875801f6a82db0cb75 GIT binary patch literal 368 zcmZROc4*9IVBlcLXJBIBVfewo%Hi8~&4J-GgMy)vv5BdfxrHST2E!Q!HV*a+3d}DU z&N1+Cd*-F(7v(0FWaj5FiUSQaHZlZiH((TIU{qu{%h1Tm$#9WDjDsqr?RAdB5Mn({qjEEvG86iVlGAxd`WS9bR$w(#QlHn@ECBxQ;ONMJAHW>iO Cy;;xz literal 0 HcmV?d00001 diff --git a/test/unit_test_admission_policy.cc b/test/unit_test_admission_policy.cc index 88cb3a5..81ab0ad 100644 --- a/test/unit_test_admission_policy.cc +++ b/test/unit_test_admission_policy.cc @@ -27,10 +27,11 @@ #include -#define SCHEMA_FILE "../schemas/adm-ctrl-xapp-policy-schema.json" +#define SCHEMA_FILE "../schemas/rate-control-policy.json" #define SAMPLE_FILE "../schemas/samples.json" #define VES_FILE "../schemas/ves_schema.json" - +#define INVALID_JSON "./test-data/invalid.json" +#define VALID_JSON "./test-data/valid.json" bool report_mode_only = false; @@ -40,99 +41,274 @@ TEST_CASE("Admission Policy Wrapper", "[acxapp]"){ mdclog_attr_init(&attr); mdclog_attr_set_ident(attr, "UNIT TEST MESSAGE PROCESSOR "); mdclog_init(attr); - mdclog_level_set(MDCLOG_INFO); + mdclog_level_set(MDCLOG_DEBUG); mdclog_attr_destroy(attr); + std::string xapp_id = "ac-xapp-test123"; + + SECTION("Invalid number of instances requested"){ + REQUIRE_THROWS( admission(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 0, xapp_id)); + REQUIRE_THROWS( admission(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, -1, xapp_id)); + REQUIRE_THROWS( admission(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1000, xapp_id)); + } + + SECTION("Invalid AC xAPP Schema file "){ + REQUIRE_THROWS( admission("hello there", SAMPLE_FILE, VES_FILE, 1, xapp_id)); + } + + SECTION("Invalid sample file"){ + REQUIRE_THROWS(admission(SCHEMA_FILE, "hello there", VES_FILE, 1, xapp_id)); + } - SECTION("Invalid AC xAPP Schema"){ - REQUIRE_THROWS( admission("hello there", SAMPLE_FILE, VES_FILE, 1)); + SECTION("Invalid VES schema file "){ + REQUIRE_THROWS(admission(SCHEMA_FILE, SAMPLE_FILE, "hello there", 1, xapp_id)); } - SECTION("Invalid sample file"){ - REQUIRE_THROWS(admission(SCHEMA_FILE, "hello there", VES_FILE, 1)); - REQUIRE_THROWS(admission(SCHEMA_FILE, "random.txt", VES_FILE, 1)); + SECTION("Invalid JSON in a schema"){ + REQUIRE_THROWS(admission(INVALID_JSON, SAMPLE_FILE, VES_FILE, 1, xapp_id)); } - SECTION("Invalid VES schema"){ - REQUIRE_THROWS(admission(SCHEMA_FILE, SAMPLE_FILE, "hello there", 1)); + SECTION("Valid JSON, but no valid schema key"){ + REQUIRE_THROWS(admission(VALID_JSON, SAMPLE_FILE, VES_FILE, 1, xapp_id)); + REQUIRE_THROWS(admission(SCHEMA_FILE, VALID_JSON, VES_FILE, 1, xapp_id)); + } - - SECTION("Set policy : invalid"){ - std::string invalid_policy1 = "{\"blocking_rate\":20\"}"; - std::string invalid_policy2 = "{\"blocking_rate\":120, \"enforce\":\"true\", \"window_length\":50, \"trigger_threshold\":10}"; + + SECTION("Test policy schema validation"){ bool res; std::string response; - admission adm_plugin(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1); - res = adm_plugin.setPolicy(invalid_policy1.c_str(), invalid_policy1.length(), response); + std::string valid_policy = "{\"policy_type_id\":21000, \"policy_instance_id\":\"hello-there\", \"operation\":\"CREATE\", \"payload\":{\"blocking_rate\":20, \"enforce\":true, \"window_length\":50, \"trigger_threshold\":10, \"class\":14}}"; + + std::string invalid_policy1 = "{\"policy_type_id\":21000, \"policy_instance_id\":\"hello-there\", \"operation\":\"CREATE\", \"payload\":{\"blocking_rate\":20, \"enforce\":true, \"window_length\":50, \"trigger_threshold\":10, \"class\":5000}}"; + + std::string invalid_policy2 = "{\"policy_instance_id\":\"hello-there\", \"operation\":\"CREATE\", \"payload\":{\"blocking_rate\":20, \"enforce\":true, \"window_length\":50, \"trigger_threshold\":10, \"class\":14}}"; + + std::string invalid_policy3 = "{\"policy_type_id\":21000, \"policy_instance_id\":\"hello-there\", \"operation\":\"CREATED\", \"payload\":{\"blocking_rate\":20, \"enforce\":true, \"window_length\":50, \"trigger_threshold\":10, \"class\":14}}"; + std::string invalid_policy4 = "{\"policy_type_id\":21000, \"policy_instance_id\":\"hello-there\", \"operation\":\"CREATE\", \"payload\":{\"blocking_rate\":20, \"enforce\":true, \"window_length\":50, \"trigger_threshold\":10}}"; + + std::string invalid_policy5 = "hello-there"; + + admission adm_plugin(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1, xapp_id); + + REQUIRE_THAT(adm_plugin.getName(), Catch::Matchers::Equals("admission control policy")); + + res = adm_plugin.setPolicy(invalid_policy1.c_str(), invalid_policy1.length(), response); REQUIRE(res == false); - REQUIRE_THAT(response, Catch::Matchers::Contains("FAIL")); res = adm_plugin.setPolicy(invalid_policy2.c_str(), invalid_policy2.length(), response); REQUIRE(res == false); - REQUIRE_THAT(response, Catch::Matchers::Contains("FAIL")); - } + res = adm_plugin.setPolicy(invalid_policy3.c_str(), invalid_policy3.length(), response); + REQUIRE(res == false); + + res = adm_plugin.setPolicy(invalid_policy4.c_str(), invalid_policy4.length(), response); + REQUIRE(res == false); + + res = adm_plugin.setPolicy(invalid_policy5.c_str(), invalid_policy5.length(), response); + REQUIRE(res == false); - SECTION("Set policy : valid"){ - std::string valid_policy = "{\"blocking_rate\":20, \"enforce\":true, \"window_length\":50, \"trigger_threshold\":10}"; - bool res; - std::string response; - - admission adm_plugin(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1); res = adm_plugin.setPolicy(valid_policy.c_str(), valid_policy.length(), response); - std::cout <<"Response = " << response << " for " << valid_policy << std::endl; REQUIRE(res == true); - REQUIRE_THAT(response, Catch::Matchers::Contains("SUCCESS")); + + } + + SECTION("Test Set/configure/delete policy"){ + std::string policy = "{\"policy_type_id\":21000, \"policy_instance_id\":\"hello-there\", \"operation\":\"CREATE\", \"payload\":{\"blocking_rate\":20, \"enforce\":true, \"window_length\":50, \"trigger_threshold\":10, \"class\":14}}"; + std::string delete_policy = "{\"policy_type_id\":21000, \"policy_instance_id\":\"hello-there\", \"operation\":\"DELETE\"}"; + + + bool res; + std::string response, test_policy, policy_instance_id; + rapidjson::Document doc; + rapidjson::StringBuffer s_buffer; + rapidjson::Writer writer(s_buffer); + + int num_policies = 10; + admission adm_plugin(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1, xapp_id); + doc.Parse(policy.c_str()); + for(int i = 1; i <= num_policies; i++){ + policy_instance_id = "rate_control_policy_" + std::to_string(i); + rapidjson::SetValueByPointer(doc, "/policy_instance_id", policy_instance_id.c_str()); + rapidjson::SetValueByPointer(doc, "/payload/class", i); + s_buffer.Clear(); + writer.Reset(s_buffer); + doc.Accept(writer); + test_policy.assign(s_buffer.GetString(), s_buffer.GetLength()); + + // valid policy should succeed + res = adm_plugin.setPolicy(test_policy.c_str(), test_policy.length(), response); + std::cout <<"Response1 = " << response << std::endl; + REQUIRE(res == true); + + // trying to add policy with same class as before should also return true + res = adm_plugin.setPolicy(test_policy.c_str(), test_policy.length(), response); + REQUIRE(res == true); + } + + REQUIRE(adm_plugin.get_num_policies() == num_policies); + + // configure existing policy should work + rapidjson::SetValueByPointer(doc, "/operation", "UPDATE"); + for(int i = 1; i <= num_policies; i++){ + policy_instance_id = "rate_control_policy_" + std::to_string(i); + rapidjson::SetValueByPointer(doc, "/policy_instance_id", policy_instance_id.c_str()); + rapidjson::SetValueByPointer(doc, "/payload/class", i); + rapidjson::SetValueByPointer(doc, "/payload/blocking_rate", 55.0); + s_buffer.Clear(); + writer.Reset(s_buffer); + doc.Accept(writer); + test_policy.assign(s_buffer.GetString(), s_buffer.GetLength()); + + // valid policy should succeed + res = adm_plugin.setPolicy(test_policy.c_str(), test_policy.length(), response); + REQUIRE(res == true); + + } + // configure non-existing policy should fail + policy_instance_id = "rate_control_policy_" + std::to_string(1000); + rapidjson::SetValueByPointer(doc, "/policy_instance_id", policy_instance_id.c_str()); + s_buffer.Clear(); + writer.Reset(s_buffer); + doc.Accept(writer); + test_policy.assign(s_buffer.GetString(), s_buffer.GetLength()); + res = adm_plugin.setPolicy(test_policy.c_str(), test_policy.length(), response); + REQUIRE(res == false); + + // configure exsting policy but with non-existing class should fail + policy_instance_id = "rate_control_policy_" + std::to_string(1); + rapidjson::SetValueByPointer(doc, "/policy_instance_id", policy_instance_id.c_str()); + rapidjson::SetValueByPointer(doc, "/payload/class", 200); + s_buffer.Clear(); + writer.Reset(s_buffer); + doc.Accept(writer); + test_policy.assign(s_buffer.GetString(), s_buffer.GetLength()); + res = adm_plugin.setPolicy(test_policy.c_str(), test_policy.length(), response); + REQUIRE(res == false); + + // delete existing policy should work + doc.Parse(delete_policy.c_str()); + for(int i = 1; i <= num_policies; i++){ + std::string policy_instance_id = "rate_control_policy_" + std::to_string(i); + rapidjson::SetValueByPointer(doc, "/policy_instance_id", policy_instance_id.c_str()); + s_buffer.Clear(); + writer.Reset(s_buffer); + doc.Accept(writer); + test_policy.assign(s_buffer.GetString(), s_buffer.GetLength()); + + // delete policy should succeed since these policies were created + res = adm_plugin.setPolicy(test_policy.c_str(), test_policy.length(), response); + REQUIRE(res == true); + } + + REQUIRE(adm_plugin.get_num_policies() == 0); + + //delete non-existing policy should fail + res = adm_plugin.setPolicy(test_policy.c_str(), test_policy.length(), response); + REQUIRE(res == false); + + } - SECTION("Get policy"){ + // SECTION("Get policy"){ - std::string valid_policy = "{\"blocking_rate\":20, \"enforce\":true, \"window_length\":50, \"trigger_threshold\":10}"; - bool res; - std::string response; + // std::string valid_policy = "{\"blocking_rate\":20, \"enforce\":true, \"window_length\":50, \"trigger_threshold\":10}"; + // bool res; + // std::string response; - // first apply policy - admission adm_plugin(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1); - res = adm_plugin.setPolicy(valid_policy.c_str(), valid_policy.length(), response); - std::cout <<"Response = " << response << std::endl; - REQUIRE(res == true); - REQUIRE_THAT(response, Catch::Matchers::Contains("SUCCESS")); + // // first apply policy + // admission adm_plugin(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1, xapp_id); + // res = adm_plugin.setPolicy(valid_policy.c_str(), valid_policy.length(), response); + // std::cout <<"Response = " << response << std::endl; + // REQUIRE(res == true); + // REQUIRE_THAT(response, Catch::Matchers::Contains("SUCCESS")); - // now retreive policy and check - res = adm_plugin.getPolicy(valid_policy.c_str(), valid_policy.length(), response); - REQUIRE(res == true); + // // now retreive policy and check + // res = adm_plugin.getPolicy(valid_policy.c_str(), valid_policy.length(), response); + // REQUIRE(res == true); - REQUIRE_THAT(response, Catch::Matchers::Contains("\"trigger_threshold\":10")); + // REQUIRE_THAT(response, Catch::Matchers::Contains("\"trigger_threshold\":10")); - } + // } SECTION("Metrics"){ - admission adm_plugin(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1); - int res; - std::string response; + + std::string policy = "{\"policy_type_id\":21000, \"policy_instance_id\":\"hello-there\", \"operation\":\"CREATE\", \"payload\":{\"blocking_rate\":20, \"enforce\":true, \"window_length\":50, \"trigger_threshold\":10, \"class\":14}}"; + + bool res; + std::string response, test_policy, policy_instance_id; + rapidjson::Document doc; + rapidjson::StringBuffer s_buffer; + rapidjson::Writer writer(s_buffer); + + int num_policies = 10; + admission adm_plugin(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1, xapp_id); + doc.Parse(policy.c_str()); + // Add some policies first + for(int i = 1; i <= num_policies; i++){ + policy_instance_id = "rate_control_policy_" + std::to_string(i); + rapidjson::SetValueByPointer(doc, "/policy_instance_id", policy_instance_id.c_str()); + rapidjson::SetValueByPointer(doc, "/payload/class", i); + s_buffer.Clear(); + writer.Reset(s_buffer); + doc.Accept(writer); + test_policy.assign(s_buffer.GetString(), s_buffer.GetLength()); + + // valid policy should succeed + res = adm_plugin.setPolicy(test_policy.c_str(), test_policy.length(), response); + REQUIRE(res == true); + //REQUIRE_THAT(response, Catch::Matchers::Contains("SUCCESS")); + } + + + // now get metrics + // First time, measurementInterval should be zero + std::vector metrics; + adm_plugin.getMetrics(metrics); + REQUIRE(metrics.size() == num_policies + 1); + for(auto const e : metrics){ + std::cout << e<< std::endl; + REQUIRE_THAT(e, Catch::Matchers::Contains("\"Class Id\"")); + REQUIRE_THAT(e, Catch::Matchers::Contains("\"measurementInterval\":\"0\"")); + } + + int interval = 5; + // wait for 'x' seconds and try again + std::this_thread::sleep_for(std::chrono::seconds(interval)); + rapidjson::Pointer int_ref("/event/measurementFields/measurementInterval"); + + metrics.clear(); + adm_plugin.getMetrics(metrics); + REQUIRE(metrics.size() == num_policies + 1); + for(auto const e : metrics){ + REQUIRE(doc.Parse(e.c_str()).HasParseError() == 0); + std::cout << e<< std::endl; + REQUIRE_THAT(e, Catch::Matchers::Contains("\"Class Id\"")); + // extract measurement interval + rapidjson::Value *interval_val = int_ref.Get(doc); + REQUIRE(interval_val != NULL); + double read_interval_approx = atof(interval_val->GetString()); + REQUIRE(read_interval_approx == Approx(interval).margin(1)); + } + - res = adm_plugin.getMetrics(response); - REQUIRE(res == 0); - std::cout << "Metrics response = " << response << std::endl; - REQUIRE_THAT(response, Catch::Matchers::Contains("\"SgNB Request Rate\":\"0\"")); } - SECTION("Get plugin"){ - admission adm_plugin(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1);; +SECTION("Get plugin"){ + admission adm_plugin(SCHEMA_FILE, SAMPLE_FILE, VES_FILE, 1, xapp_id); protector * p = NULL; p = adm_plugin.get_protector_instance(100); REQUIRE( p == NULL); p = adm_plugin.get_protector_instance(0); REQUIRE(p != NULL); - REQUIRE(p->get_requests() == 0); + REQUIRE(p->get_requests(0) == 0); } diff --git a/test/unit_test_protector_plugin.cc b/test/unit_test_protector_plugin.cc index 64daa3a..b6597fa 100644 --- a/test/unit_test_protector_plugin.cc +++ b/test/unit_test_protector_plugin.cc @@ -18,7 +18,7 @@ */ /* Author : Ashwin Sridharan - Date : Sept 2019 + Date : Nov 2019 */ #define CATCH_CONFIG_MAIN @@ -32,15 +32,18 @@ TEST_CASE("Protector Plugin", "Test Input types"){ mdclog_attr_t *attr; mdclog_attr_init(&attr); - mdclog_attr_set_ident(attr, "UNIT TEST MESSAGE PROCESSOR "); + mdclog_attr_set_ident(attr, "UNIT TEST PROTECTOR PLUGIN "); mdclog_init(attr); - mdclog_level_set(MDCLOG_INFO); + mdclog_level_set(MDCLOG_ERR); mdclog_attr_destroy(attr); FILE *pfile; unsigned char scratch_buf[512]; size_t scratch_buf_size = 512; - + + // Load buffers with different types of X2AP PDUs to test + // behaviour if valid/invalid + //======================================================== // valid x2 sgnb addition request unsigned char x2ap_buf[256]; size_t x2ap_buf_size ; @@ -76,10 +79,10 @@ TEST_CASE("Protector Plugin", "Test Input types"){ SECTION("Valid X2 SgnB Addition Request"){ - protector n_plugin(1, 20, 5, 0, false); + protector n_plugin(false); bool res = n_plugin(x2ap_buf, x2ap_buf_size, scratch_buf, &scratch_buf_size); REQUIRE(res == true); - REQUIRE(n_plugin.get_requests() == 1); + REQUIRE(n_plugin.get_requests(0) == 1); // todo: need to validate response ... // decode x2ap response @@ -91,7 +94,7 @@ TEST_CASE("Protector Plugin", "Test Input types"){ protector n_plugin(1, 20, 5, 0, false); bool res = n_plugin(incorrect_x2ap_buf, incorrect_x2ap_buf_size, scratch_buf, &scratch_buf_size); REQUIRE(res == false); - REQUIRE(n_plugin.get_requests() == 0); + REQUIRE(n_plugin.get_requests(0) == 0); } SECTION("Valid X2 but not Initating message"){ @@ -99,7 +102,7 @@ TEST_CASE("Protector Plugin", "Test Input types"){ protector n_plugin(1, 20, 5, 0, false); bool res = n_plugin(x2ap_sgnb_ack, x2ap_sgnb_ack_size, scratch_buf, &scratch_buf_size); REQUIRE(res == false); - REQUIRE(n_plugin.get_requests() == 0); + REQUIRE(n_plugin.get_requests(0) == 0); } @@ -108,7 +111,7 @@ TEST_CASE("Protector Plugin", "Test Input types"){ protector n_plugin(1, 20, 5, 0, false); bool res = n_plugin(x2ap_resource_req_buf, x2ap_resource_req_buf_size, scratch_buf, &scratch_buf_size); REQUIRE(res == false); - REQUIRE(n_plugin.get_requests() == 0); + REQUIRE(n_plugin.get_requests(0) == 0); } @@ -123,7 +126,7 @@ TEST_CASE("Protector Plugin", "Test Input types"){ res = n_plugin(x2ap_buf, x2ap_buf_size, scratch_buf, &scratch_buf_size); REQUIRE(res == true); num_valid ++; - + res = n_plugin(x2ap_sgnb_ack, x2ap_sgnb_ack_size, scratch_buf, &scratch_buf_size); REQUIRE(res == false); @@ -135,77 +138,250 @@ TEST_CASE("Protector Plugin", "Test Input types"){ } - REQUIRE(num_valid == n_plugin.get_requests()); - REQUIRE(n_plugin.get_rejects() == 0); + REQUIRE(num_valid == n_plugin.get_requests(0)); + REQUIRE(num_valid == n_plugin.get_requests(-1)); + REQUIRE(n_plugin.get_rejects(0) == 0); } - SECTION("No enforce"){ + + SECTION("No blocking"){ bool res; int num_packets = 1000; - protector n_plugin(0, 20, 5, 100, false); + protector n_plugin(1, 20, 5, 0, false); for(int i = 0; i < num_packets; i++){ res = n_plugin(x2ap_buf, x2ap_buf_size, scratch_buf, &scratch_buf_size); REQUIRE(res == true); } - REQUIRE(n_plugin.get_requests() == num_packets); - REQUIRE(n_plugin.get_rejects() == 0); + REQUIRE(n_plugin.get_requests(0) == num_packets); + REQUIRE(n_plugin.get_rejects(0) == 0); } - - SECTION("No blocking"){ + + SECTION("All blocking"){ bool res; int num_packets = 1000; - protector n_plugin(1, 20, 5, 0, false); + protector n_plugin(1, 20, 0, 100, false); for(int i = 0; i < num_packets; i++){ + scratch_buf_size = 512; res = n_plugin(x2ap_buf, x2ap_buf_size, scratch_buf, &scratch_buf_size); REQUIRE(res == true); } - REQUIRE(n_plugin.get_requests() == num_packets); - REQUIRE(n_plugin.get_rejects() == 0); - + REQUIRE(n_plugin.get_requests(0) == num_packets); + REQUIRE(n_plugin.get_rejects(0) == num_packets); + + REQUIRE(n_plugin.get_requests(-1) == num_packets); + REQUIRE(n_plugin.get_rejects(-1) == num_packets); } + + SECTION("Add/delete/configure/query policies"){ + bool res; + int policy_id = 5; + std::vector info; + std::vector active_list; + + protector n_plugin(1, 20, 0, 100, false); + + // metrics query default policy always returns + REQUIRE(n_plugin.get_requests(0) == 0); + REQUIRE(n_plugin.get_rejects(0) == 0); - SECTION("All blocking"){ + //metrics invalid policy returns -1; + REQUIRE(n_plugin.get_requests(100) == -1); + REQUIRE(n_plugin.get_rejects(100) == -1); + + // 1 active policy + n_plugin.get_active_policies(active_list); + REQUIRE(active_list.size() == 1); + + // verify returned policy + REQUIRE(n_plugin.is_active(active_list[0]) == true); + + // add a policy + res = n_plugin.add_policy(1, 35, 10, 10, policy_id); + REQUIRE(res == true); + + active_list.clear(); + n_plugin.get_active_policies(active_list); + REQUIRE(active_list.size() == 2); + + // query default policy + res = n_plugin.query_policy(0, info); + REQUIRE(res == true); + REQUIRE(info.size() == 4); + REQUIRE(info[0] == 1); + REQUIRE(info[1] == 20); + REQUIRE(info[2] == 0); + REQUIRE(info[3] == 100); + + // query new policy + info.clear(); + res = n_plugin.query_policy(policy_id, info); + REQUIRE(res == true); + REQUIRE(info.size() == 4); + REQUIRE(info[0] == 1); + REQUIRE(info[1] == 35); + REQUIRE(info[2] == 10); + REQUIRE(info[3] == 10); + + // try adding same policy + res = n_plugin.add_policy(1, 200, 10, 10, policy_id); + REQUIRE(res == false); + REQUIRE_THAT(n_plugin.get_error(), Catch::Matchers::Contains("already exists")); + + // delete the policy + res = n_plugin.delete_policy(policy_id); + REQUIRE(res == true); + + // delete non-existent policy + res = n_plugin.delete_policy(policy_id); + REQUIRE(res == false); + REQUIRE_THAT(n_plugin.get_error(), Catch::Matchers::Contains("No policy with id")); + + // configure non-existent policy + res = n_plugin.configure(1, 20, 0, 100, policy_id); + REQUIRE(res == false); + + // invalid window size to configure policy + res = n_plugin.configure(0, -1, 0, 100, 0); + REQUIRE(res == false); + REQUIRE_THAT(n_plugin.get_error(), Catch::Matchers::Contains("Illegal value for window")); + + // invalid trigger in configure policy + res = n_plugin.configure(0, 12, -1, 100, 0); + REQUIRE(res == false); + REQUIRE_THAT(n_plugin.get_error(), Catch::Matchers::Contains("Illegal")); + + // invalid class in configure policy + res = n_plugin.configure(0, 20, 1, 100, -25); + REQUIRE(res == false); + REQUIRE_THAT(n_plugin.get_error(), Catch::Matchers::Contains("Illegal")); + + // invalid blocking rate in configure policy + res = n_plugin.configure(0, 21, 0, 105, 0); + REQUIRE(res == false); + REQUIRE_THAT(n_plugin.get_error(), Catch::Matchers::Contains("Illegal")); + + + // invalid window size in add policy + res = n_plugin.add_policy(0, -1, 0, 100, 25); + REQUIRE(res == false); + REQUIRE_THAT(n_plugin.get_error(), Catch::Matchers::Contains("Illegal")); + + // invalid trigger in add policy + res = n_plugin.add_policy(0, 12, -1, 100, 25); + REQUIRE(res == false); + REQUIRE_THAT(n_plugin.get_error(), Catch::Matchers::Contains("Illegal")); + + // invalid class in add policy + res = n_plugin.add_policy(0, 20, 1, 100, -25); + REQUIRE(res == false); + REQUIRE_THAT(n_plugin.get_error(), Catch::Matchers::Contains("Illegal")); + + // invalid blocking rate in add policy + res = n_plugin.add_policy(0, 21, 0, 105, 25); + REQUIRE(res == false); + REQUIRE_THAT(n_plugin.get_error(), Catch::Matchers::Contains("Illegal")); + + // query a non-existant policy + res = n_plugin.query_policy(200, info); + REQUIRE(res == false); + + + } + + SECTION("Test turning enforcement on off "){ bool res; - int num_packets = 1000; protector n_plugin(1, 20, 0, 100, false); + int num_packets = 10; + for(int i = 0; i < num_packets; i++){ scratch_buf_size = 512; res = n_plugin(x2ap_buf, x2ap_buf_size, scratch_buf, &scratch_buf_size); REQUIRE(res == true); } - REQUIRE(n_plugin.get_requests() == num_packets); - REQUIRE(n_plugin.get_rejects() == num_packets); + // enforcement with 100% blocking. all should be rejected + REQUIRE(n_plugin.get_requests(0) == num_packets); + REQUIRE(n_plugin.get_rejects(0) == num_packets); + + n_plugin.clear(); + REQUIRE(n_plugin.get_requests(0) == 0); + REQUIRE(n_plugin.get_rejects(0) == 0); + + // no enforcement even if blocking set to 100% + // all should be accepted + n_plugin.configure(0, 20, 0, 100, 0); + for(int i = 0; i < num_packets; i++){ + scratch_buf_size = 512; + res = n_plugin(x2ap_buf, x2ap_buf_size, scratch_buf, &scratch_buf_size); + REQUIRE(res == true); + } + + REQUIRE(n_plugin.get_requests(0) == num_packets); + REQUIRE(n_plugin.get_rejects(0) == 0); } - SECTION("Configuration"){ + SECTION("Test using various policies against pdus with different subscriber profile ids"){ + + // use pre-generated X2AP PDUs with two different subscriber profile ids + unsigned char x2ap1_buf[512]; + size_t x2ap1_size; + unsigned char x2ap2_buf[512]; + size_t x2ap2_size; + + pfile = fopen("test-data/X2AP-PDU-SgNBAdditionRequest_SubId_29.per", "r"); + REQUIRE(pfile != NULL); + x2ap1_size = fread((char *)x2ap1_buf, sizeof(char), 512, pfile); + fclose(pfile); + + pfile = fopen("test-data/X2AP-PDU-SgNBAdditionRequest_SubId_34.per", "r"); + REQUIRE(pfile != NULL); + x2ap2_size = fread((char *)x2ap2_buf, sizeof(char), 512, pfile); + fclose(pfile); + + bool res; protector n_plugin(1, 20, 0, 100, false); - res = n_plugin(x2ap_buf, x2ap_buf_size, scratch_buf, &scratch_buf_size); + // test with just default policy + scratch_buf_size = 512; + res = n_plugin(x2ap1_buf, x2ap1_size, scratch_buf, &scratch_buf_size); REQUIRE(res == true); - REQUIRE(n_plugin.get_requests() == 1); - REQUIRE(n_plugin.get_rejects() == 1); - + scratch_buf_size = 512; + res = n_plugin(x2ap2_buf, x2ap2_size, scratch_buf, &scratch_buf_size); + REQUIRE(res == true); + REQUIRE(n_plugin.get_requests(0) == 2); + n_plugin.clear(); - REQUIRE(n_plugin.get_requests() == 0); - REQUIRE(n_plugin.get_rejects() == 0); - n_plugin.configure(0, 20, 0, 100); + // add a policy and test + n_plugin.add_policy(1, 20, 0, 100, 29); scratch_buf_size = 512; - res = n_plugin(x2ap_buf, x2ap_buf_size, scratch_buf, &scratch_buf_size); + res = n_plugin(x2ap1_buf, x2ap1_size, scratch_buf, &scratch_buf_size); REQUIRE(res == true); - REQUIRE(n_plugin.get_requests() == 1); - REQUIRE(n_plugin.get_rejects() == 0); + scratch_buf_size = 512; + res = n_plugin(x2ap2_buf, x2ap2_size, scratch_buf, &scratch_buf_size); + REQUIRE(res == true); + REQUIRE(n_plugin.get_requests(0) == 1); + REQUIRE(n_plugin.get_requests(29) == 1); - res = n_plugin.configure(0, -1, 0, 100); - REQUIRE(res == false); + // add another policy and test + n_plugin.add_policy(1, 20, 0, 100, 34); + scratch_buf_size = 512; + res = n_plugin(x2ap2_buf, x2ap2_size, scratch_buf, &scratch_buf_size); + REQUIRE(res == true); + scratch_buf_size = 512; + res = n_plugin(x2ap1_buf, x2ap1_size, scratch_buf, &scratch_buf_size); + REQUIRE(res == true); + REQUIRE(n_plugin.get_requests(34) == 1); + REQUIRE(n_plugin.get_requests(29) == 2); + REQUIRE(n_plugin.get_requests(-1) == 4); // 1 for 0, 2 for 29 and 1 for 34 + } } diff --git a/test/unit_test_sgnb_addition_request.cc b/test/unit_test_sgnb_addition_request.cc index c061af2..595eeb3 100644 --- a/test/unit_test_sgnb_addition_request.cc +++ b/test/unit_test_sgnb_addition_request.cc @@ -166,6 +166,7 @@ TEST_CASE("X2AP PDUs", "[X2AP SgNB Addition Request]"){ REQUIRE(encode_data.menb_ue_x2ap_id == decode_data.menb_ue_x2ap_id); REQUIRE(encode_data.bit_rate_max_up == decode_data.bit_rate_max_up); REQUIRE(encode_data.bit_rate_max_dn == decode_data.bit_rate_max_dn); + REQUIRE(encode_data.subscriber_profile_id == decode_data.subscriber_profile_id); ASN_STRUCT_FREE(asn_DEF_X2N_X2AP_PDU, x2ap_pdu_obj); } @@ -231,6 +232,8 @@ TEST_CASE("X2AP PDUs", "[X2AP SgNB Addition Request]"){ REQUIRE(encode_data.menb_ue_x2ap_id == decode_data.menb_ue_x2ap_id); REQUIRE(encode_data.bit_rate_max_up == decode_data.bit_rate_max_up); REQUIRE(encode_data.bit_rate_max_dn == decode_data.bit_rate_max_dn); + REQUIRE(encode_data.subscriber_profile_id == decode_data.subscriber_profile_id); + ASN_STRUCT_FREE(asn_DEF_X2N_X2AP_PDU, x2ap_pdu_obj); } diff --git a/test/unit_test_subscription_flow.cc b/test/unit_test_subscription_flow.cc index 9c99f77..995188f 100644 --- a/test/unit_test_subscription_flow.cc +++ b/test/unit_test_subscription_flow.cc @@ -42,24 +42,37 @@ #include +// globally list gnodeb-id we use +std::string gNodeBID = "abc123"; + // global queue for testing -std::queue message_bus; +// acts like a channel +std::queue> message_bus; // global lock for testing std::mutex get_object ; bool is_running = true; -bool mock_fail(int mtype, size_t len, void * payload, int mode){ + +// ================================================== +// various mock transmission functions that simulate underlying +// transmission layer behaviour + +// this function immediately fails +// simulates a channel that is not available +bool mock_fail(int mtype, size_t len, void * payload, std::string gNodeB_id, int mode){ return false; } -bool mock_silent(int mtype, size_t len, void * payload, int mode){ +// silently returns without actually doing any transmission +// simulates a lost transmission +bool mock_silent(int mtype, size_t len, void * payload, std::string gNodeB_id, int mode){ return true; } - -bool mock_tx(int mytpe, size_t len, void *payload, int mode){ +// simulates a working transmission channel +bool mock_tx(int mytpe, size_t len, void *payload, std::string gNodeB_id, int mode){ bool res; int i; @@ -170,14 +183,13 @@ bool mock_tx(int mytpe, size_t len, void *payload, int mode){ std::lock_guard guard(get_object); std::string msg((char *)buffer, buf_size); //std::cout <<"Pushed to queue" << std::endl; - message_bus.push(msg); + message_bus.push(std::make_pair(gNodeB_id, msg)); } ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv); if(msg_ok) - return true; - + return true; else return false; @@ -186,71 +198,19 @@ bool mock_tx(int mytpe, size_t len, void *payload, int mode){ -// Randomly generate number of subscription response and delete -// response packets and push to queue -void random_tx(int num_packets){ - subscription_response_helper he_resp; - subscription_response sub_resp; - subscription_delete sub_del_req; - subscription_delete_response sub_del_resp; - bool res; - unsigned char buffer[256]; - size_t buf_size = 256; - - he_resp.add_action(10); - - // generate subscription responses - for(int i = 0; i < num_packets; i++){ - - // set up response object - he_resp.set_request(i, 1); - he_resp.set_function_id(0); - buf_size = 256; - res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, true); - { - std::lock_guard guard(get_object); - std::string msg((char *)buffer, buf_size); - message_bus.push(msg); - } - - buf_size = 256; - res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, false); - { - std::lock_guard guard(get_object); - std::string msg((char *)buffer, buf_size); - message_bus.push(msg); - } - - buf_size = 256; - res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, true); - { - std::lock_guard guard(get_object); - std::string msg((char *)buffer, buf_size); - message_bus.push(msg); - } - - buf_size = 256; - res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, false); - { - std::lock_guard guard(get_object); - std::string msg((char *)buffer, buf_size); - message_bus.push(msg); - } - - - } -} +// simulates response :takes what is in the queue, processes it and then invokes +// subscription_handler response +void mock_RAN (subscription_handler &_ref_sub_handler, int delay){ -void mock_RAN (subscription_handler &_ref_sub_handler){ // Behaviour : - + std::string gNodeB_id; unsigned char incorrect_e2ap[128]; size_t incorrect_e2ap_size = 128; for(int i = 0; i < 128; i++){ incorrect_e2ap[i] = 'b'; } - + FILE *pfile = fopen("test-data/e2ap_indication_test.per", "r"); if(pfile == NULL){ std::cout << "Error opening e2ap_indication_test.per" << std::endl; @@ -261,40 +221,46 @@ void mock_RAN (subscription_handler &_ref_sub_handler){ fclose(pfile); unsigned char message_buf[512]; - std::string pdu; - + std::pair pdu; + + bool is_resp; while(is_running){ - // send some random data, i.e incorrect E2AP - _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size); + + // test illegal packet response : send some random data, i.e incorrect E2AP + _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size, gNodeBID.c_str()); //std::cout <<"Sent random data to subscription handler" << std::endl; - // send an E2AP which is not subscription request - _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size); + //test incorrect packet response : send an E2AP which is not subscription request + _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size, gNodeBID.c_str()); //std::cout <<"Sent incorrect e2ap to subscription handler" << std::endl; // now look in the queue, pop it and send the data - // finally send correct payload + // finally send correct payload if queue not empty + is_resp = false; { std::lock_guard guard(get_object); if(! message_bus.empty()){ pdu = message_bus.front(); - memcpy(message_buf, pdu.c_str(), pdu.length()); + gNodeB_id = pdu.first; + memcpy(message_buf, pdu.second.c_str(), pdu.second.length()); message_bus.pop(); + is_resp =true; } } - - _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.length()); - //std::cout <<"Sent response to subscription handler" << std::endl; - - - - sleep(1); + + if(is_resp){ + sleep(delay); + _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.second.length(), gNodeB_id.c_str()); + //std::cout <<"Sent response to subscription handler" << std::endl; + } + } } +// wrapper function that we use to test sending subscriptions with various channels +void send_request(subscription_handler &subscription_manager, std::vector & status_vector, std::vector & gNodeBs, int index, bool (*tx)(int, size_t, void *, std::string, int), int mode ){ -void send_request(subscription_handler &subscription_manager, std::vector & status_vector, int index, bool (*tx)(int, size_t, void *, int), int mode ){ subscription_helper subscription_info; subscription_request sub_req; subscription_response_helper subscription_response_info; @@ -333,12 +299,11 @@ void send_request(subscription_handler &subscription_manager, std::vector & subscription_info.add_action(action_id, action_type); subscription_info.set_event_def(&event_buf[0], event_buf_len); - auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode); - res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter); + auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBs[index], mode); + res = subscription_manager.request_subscription(subscription_info, subscription_response_info , gNodeBs[index], RIC_SUB_REQ, transmitter); if (res == SUBSCR_SUCCESS ){ - // store -ve of request id - status_vector[index] = -1 * subscription_info.get_request_id(); + status_vector[index] = -1 ; } else{ status_vector[index] = res; @@ -347,24 +312,26 @@ void send_request(subscription_handler &subscription_manager, std::vector & std::cout <<"Subscription = " << subscription_info.get_request_id() << " Result = " << res << std::endl; } -void delete_request(subscription_handler &subscription_manager, std::vector & status_vector, int index, int request_id, bool ( *tx)(int, size_t, void *, int), int mode ){ + +// wrapper function that we use to test sending delete requests with various channels +void delete_request(subscription_handler &subscription_manager, std::vector & status_vector, std::vector & gNodeBs, int index, int request_id, bool ( *tx)(int, size_t, void *, std::string, int), int mode ){ subscription_helper subscription_info; subscription_response_helper subscription_response_info; //verify subscription deleted - subscription_info.set_request(request_id, 1); + subscription_info.set_request(0, 0); subscription_info.set_function_id(0); - auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode); - status_vector[index] = subscription_manager.RequestSubscriptionDelete(subscription_info, subscription_response_info, RIC_SUB_DEL_REQ, transmitter); + auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBs[index], mode); + status_vector[index] = subscription_manager.request_subscription_delete(subscription_info, subscription_response_info, gNodeBs[index], RIC_SUB_DEL_REQ, transmitter); }; -TEST_CASE("Test subscription work flow", "E2AP Subscription"){ +TEST_CASE("Test various channel responses", "E2AP Subscription"){ subscription_handler subscription_manager; @@ -372,18 +339,10 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ mdclog_attr_init(&attr); mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW "); mdclog_init(attr); - mdclog_level_set(MDCLOG_DEBUG); + mdclog_level_set(MDCLOG_INFO); mdclog_attr_destroy(attr); - unsigned char node_buffer[32]; - std::string gNodeB = "TEST_GNOBDE"; - - std::copy(gNodeB.begin(), gNodeB.end(), node_buffer); - node_buffer[gNodeB.length()] = '\0'; - - - //==================================== - + //==================================== SECTION("Verify behaviour if no listener "){ std::cout <<"+++++++++" << std::endl << "TEST WITH NO LISTENER " << std::endl; @@ -392,9 +351,13 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ subscription_manager.clear(); std::vector source_list; + std::vector gNodeBs; + for(int i = 0; i < num_sources; i++){ + gNodeBs.push_back("test-gnodeb-" + std::to_string(i)); + } for(int i = 0; i < num_sources; i++){ - source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_fail), 0)); + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_fail), 0)); } for(int i = 0; i < num_sources; i++){ @@ -415,9 +378,13 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ subscription_manager.clear(); std::vector source_list; - + std::vector gNodeBs; + for(int i = 0; i < num_sources; i++){ + gNodeBs.push_back("test-gnodeb-" + std::to_string(i)); + } + for(int i = 0; i < num_sources; i++){ - source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_silent), 0)); + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_silent), 0)); } for(int i = 0; i < num_sources; i++){ @@ -433,6 +400,20 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ } +} + + +TEST_CASE("Test config", "E2AP Subscription"){ + + subscription_handler subscription_manager; + + mdclog_attr_t *attr; + mdclog_attr_init(&attr); + mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW "); + mdclog_init(attr); + mdclog_level_set(MDCLOG_INFO); + mdclog_attr_destroy(attr); + SECTION("Verify timeout behaviour if listener does not response"){ std::cout <<"+++++++++" << std::endl << "TEST TIMEOUT BEHAVIOUR " << std::endl; @@ -441,13 +422,18 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ int timeout_val = 2; int num_tries = 1; std::vector status_vector(num_sources, 0); - + + std::vector gNodeBs; + for(int i = 0; i < num_sources; i++){ + gNodeBs.push_back("test-gnodeb-" + std::to_string(i)); + } + subscription_manager.clear(); subscription_manager.set_timeout(timeout_val); subscription_manager.set_num_retries(num_tries); auto start = std::chrono::steady_clock::now(); - send_request(subscription_manager, status_vector, 0, mock_silent, 0); + send_request(subscription_manager, status_vector, gNodeBs, 0, mock_silent, 0); auto end = std::chrono::steady_clock::now(); auto diff = end - start; @@ -460,7 +446,7 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ status_vector.clear(); start = std::chrono::steady_clock::now(); - send_request(subscription_manager, status_vector, 0, mock_silent, 0); + send_request(subscription_manager, status_vector, gNodeBs, 0, mock_silent, 0); end = std::chrono::steady_clock::now(); diff = end - start; @@ -470,7 +456,184 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ } +} + +TEST_CASE("Test sunny day scenarios", "E2AP Subscription"){ + + subscription_handler subscription_manager; + + mdclog_attr_t *attr; + mdclog_attr_init(&attr); + mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW "); + mdclog_init(attr); + mdclog_level_set(MDCLOG_INFO); + mdclog_attr_destroy(attr); + + SECTION("Verify subscription request/response success"){ + std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl; + + subscription_manager.clear(); + + int num_sources = 10; + int num_sinks = 5; + + std::vector source_list; + std::vector sink_list; + + std::vector status_vector(num_sources, 0); + + // First Test null cases in queries,i.e non-existing request + subscription_identifier id = std::make_tuple (gNodeBID, 10); + REQUIRE(subscription_manager.is_subscription_entry(id) == false); + id = std::make_tuple(gNodeBID, 1); + REQUIRE(subscription_manager.is_request_entry(id) == false); + REQUIRE(subscription_manager.get_request_status(id) == -1); + + // start up the sinks + is_running = true; + for(int i = 0; i < num_sinks; i++){ + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1)); + } + + // generate the gnodeb list for which we are subscribing + // default ran_function_id is zero + std::vector gNodeBs; + for(int i = 0; i < num_sources; i++){ + gNodeBs.push_back("test-gnodeb-" + std::to_string(i)); + } + + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] < 0); + id = std::make_tuple(gNodeBs[i], 0); + REQUIRE(subscription_manager.is_subscription_entry(id) == true); + } + + // stop the sinks + is_running =false; + for(int i = 0; i < num_sinks; i++){ + sink_list[i].join(); + } + + REQUIRE(subscription_manager.num_complete() == num_sources); + REQUIRE(subscription_manager.num_pending() == 0); + + // test getting subscription : + // case 1: fake request + id = std::make_tuple(gNodeBID, 0); + const subscription_response_helper * sub_info = subscription_manager.get_subscription(id); + REQUIRE(sub_info == NULL); + + // case 2: valid request : get all the keys and use them + std::vector key_list; + subscription_manager.get_subscription_keys(key_list); + REQUIRE(key_list.size() == subscription_manager.num_complete()); + for(auto &e: key_list){ + sub_info = subscription_manager.get_subscription(e); + REQUIRE(sub_info != NULL); + } + } + + + + SECTION("Delete requests that have succeeeded"){ + std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl; + + subscription_manager.clear(); + + int num_sources = 10; + int num_sinks = 5; + const subscription_response_helper * sub_resp_info; + + std::vector source_list; + std::vector sink_list; + + std::vector status_vector(num_sources, 0); + std::vector gNodeBs; + + // generate the gnodeb list for which we are subscribing + // default ran_function_id is zero + for(int i = 0; i < num_sources; i++){ + gNodeBs.push_back("test-gnodeb-" + std::to_string(i)); + } + + // start up the sinks + is_running = true; + for(int i = 0; i < num_sinks; i++){ + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1)); + } + + // First do subscriptions ... + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] < 0 ); + subscription_identifier id = std::make_tuple (gNodeBs[i], 0); + REQUIRE(subscription_manager.is_subscription_entry(id) == true); + sub_resp_info = subscription_manager.get_subscription(id); + REQUIRE(sub_resp_info != NULL); + REQUIRE(sub_resp_info->get_request_id() == 0); + + } + + REQUIRE(subscription_manager.num_complete() == num_sources); + REQUIRE(subscription_manager.num_pending() == 0); + + + // Store ids .. + std::vector completed_requests = status_vector; + + // Delete successes + source_list.clear(); + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i], std::ref(mock_tx), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] == SUBSCR_SUCCESS); + } + + REQUIRE(subscription_manager.num_pending() == 0); + + + // stop the sinks + is_running =false; + for(int i = 0; i < num_sinks; i++){ + sink_list[i].join(); + } + + REQUIRE(subscription_manager.num_complete() == 0); + REQUIRE(subscription_manager.num_pending() == 0); + + } + + +} + +TEST_CASE("Test rainy day scenarios", "E2AP Subscription"){ + + subscription_handler subscription_manager; + + mdclog_attr_t *attr; + mdclog_attr_init(&attr); + mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW "); + mdclog_init(attr); + mdclog_level_set(MDCLOG_INFO); + mdclog_attr_destroy(attr); + SECTION("Verify rejection of illegal pdus"){ + std::cout <<"+++++++++" << std::endl <<"TEST WITH ILLEGAL PDU PARAMS" << std::endl; subscription_helper subscription_info; subscription_response_helper subscription_response_info; @@ -483,8 +646,6 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ int request_id = 1; int req_seq = 1; - int message_type = 1; - int procedure_code = 27; unsigned char event_buf[] = "Hello world"; size_t event_buf_len = strlen((const char *)event_buf); @@ -495,13 +656,11 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ subscription_info.add_action(action_id, action_type); subscription_info.set_event_def(&event_buf[0], event_buf_len); - - auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, 0); - res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter); + std::vector gNodeBs; + auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBID, 0); + res = subscription_manager.request_subscription(subscription_info, subscription_response_info , gNodeBID, RIC_SUB_REQ, transmitter); REQUIRE(res == SUBSCR_ERR_ENCODE); - - } SECTION("Verify subscription request/response fail"){ @@ -516,15 +675,22 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ std::vector sink_list; std::vector status_vector(num_sources, 0); + + // generate the gnodeb list for which we are subscribing + // default ran_function_id is zero + std::vector gNodeBs; + for(int i = 0; i < num_sources; i++){ + gNodeBs.push_back("test-gnodeb-" + std::to_string(i)); + } // start up the sinks is_running = true; for(int i = 0; i < num_sinks; i++){ - sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1)); } for(int i = 0; i < num_sources; i++){ - source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), -1)); + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), -1)); } @@ -544,76 +710,31 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ } - SECTION("Verify subscription request/response success"){ - std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl; + SECTION("Delete requests for non-existent subscription requests"){ + std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl; - - subscription_manager.clear(); + subscription_manager.clear(); + subscription_identifier id = std::make_tuple (gNodeBID, 0); + REQUIRE(subscription_manager.get_request_status(id) == -1); + REQUIRE(subscription_manager.is_subscription_entry(id) == false); + REQUIRE(subscription_manager.is_request_entry(id) == false); int num_sources = 10; - int num_sinks = 5; - - std::vector source_list; - std::vector sink_list; + std::vector source_list; std::vector status_vector(num_sources, 0); + srand(100); + std::vector gNodeBs; - // Test null cases in queries - REQUIRE(subscription_manager.is_subscription_entry(10) == false); - REQUIRE(subscription_manager.is_request_entry(1) == false); - REQUIRE(subscription_manager.get_request_status(1) == -1); - - // start up the sinks - is_running = true; - for(int i = 0; i < num_sinks; i++){ - sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); - } - - for(int i = 0; i < num_sources; i++){ - source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); - } - - + // generate the gnodeb list for which we are subscribing + // default ran_function_id is zero for(int i = 0; i < num_sources; i++){ - source_list[i].join(); - REQUIRE(status_vector[i] < 0); - REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); - } - - // stop the sinks - is_running =false; - for(int i = 0; i < num_sinks; i++){ - sink_list[i].join(); + gNodeBs.push_back("test-gnodeb-" + std::to_string(i)); } - - REQUIRE(subscription_manager.num_complete() == num_sources); - REQUIRE(subscription_manager.num_pending() == 0); - - const subscription_response_helper * sub_info = subscription_manager.get_subscription(-1); - REQUIRE(sub_info == NULL); - - sub_info = subscription_manager.get_subscription(-1 * status_vector[0]); - REQUIRE(sub_info != NULL); - REQUIRE(sub_info->get_request_id() == -1 * status_vector[0]); - - } - SECTION("Delete requests for non-existent subscription requests"){ - std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl; - - subscription_manager.clear(); - REQUIRE(subscription_manager.get_request_status(0) == -1); - REQUIRE(subscription_manager.is_subscription_entry(0) == false); - REQUIRE(subscription_manager.is_request_entry(0) == false); - - int num_sources = 10; - - std::vector source_list; - std::vector status_vector(num_sources, 0); - srand(100); for(int i = 0; i < num_sources; i++){ int req_id = rand()%1000; - source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i,req_id , std::ref(mock_tx), 0)); + source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i,req_id , std::ref(mock_tx), 0)); } @@ -622,79 +743,10 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ REQUIRE(status_vector[i] == SUBSCR_ERR_MISSING); } - } - - - SECTION("Delete requests that have succeeeded"){ - std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl; - - subscription_manager.clear(); - - int num_sources = 10; - int num_sinks = 5; - const subscription_response_helper * sub_resp_info; - - std::vector source_list; - std::vector sink_list; - - std::vector status_vector(num_sources, 0); - - // start up the sinks - is_running = true; - for(int i = 0; i < num_sinks; i++){ - sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); - } - - // First do subscriptions ... - for(int i = 0; i < num_sources; i++){ - source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); - } - - - for(int i = 0; i < num_sources; i++){ - source_list[i].join(); - REQUIRE(status_vector[i] < 0 ); - REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); - sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]); - REQUIRE(sub_resp_info != NULL); - REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]); - - } - - REQUIRE(subscription_manager.num_complete() == num_sources); - REQUIRE(subscription_manager.num_pending() == 0); - - - // Store ids .. - std::vector completed_requests = status_vector; - - // Delete successes - source_list.clear(); - for(int i = 0; i < num_sources; i++){ - source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_tx), 0)); - } - - - for(int i = 0; i < num_sources; i++){ - source_list[i].join(); - REQUIRE(status_vector[i] == SUBSCR_SUCCESS); - } - - REQUIRE(subscription_manager.num_pending() == 0); - - - // stop the sinks - is_running =false; - for(int i = 0; i < num_sinks; i++){ - sink_list[i].join(); - } - REQUIRE(subscription_manager.num_complete() == 0); - REQUIRE(subscription_manager.num_pending() == 0); - - } + SECTION("Deletes that fail"){ std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT FAIL " << std::endl; @@ -712,22 +764,30 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ // start up the sinks is_running = true; for(int i = 0; i < num_sinks; i++){ - sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1)); + } + + // generate the gnodeb list for which we are subscribing + // default ran_function_id is zero + std::vector gNodeBs; + for(int i = 0; i < num_sources; i++){ + gNodeBs.push_back("test-gnodeb-" + std::to_string(i)); } // First do subscriptions ... for(int i = 0; i < num_sources; i++){ - source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] < 0 ); - REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); - sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]); + subscription_identifier id = std::make_tuple(gNodeBs[i], 0); + REQUIRE(subscription_manager.is_subscription_entry(id) == true); + sub_resp_info = subscription_manager.get_subscription(id); REQUIRE(sub_resp_info != NULL); - REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]); + REQUIRE(sub_resp_info->get_request_id() == 0); } @@ -735,13 +795,13 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ REQUIRE(subscription_manager.num_pending() == 0); - // Store ids .. + // Store status results std::vector completed_requests = status_vector; - // Delete failures + // Delete failures : mock_tx set to respond with failure source_list.clear(); for(int i = 0; i < num_sources; i++){ - source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_tx), 1)); + source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i], std::ref(mock_tx), 1)); } for(int i = 0; i < num_sources; i++){ @@ -756,7 +816,7 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ sink_list[i].join(); } - // subscriptions are still there + // subscriptions are still there (did not get deleted) REQUIRE(subscription_manager.num_complete() == num_sources); REQUIRE(subscription_manager.num_pending() == 0); @@ -777,26 +837,34 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ std::vector sink_list; std::vector status_vector(num_sources, 0); + std::vector gNodeBs; + + // generate the gnodeb list for which we are subscribing + // default ran_function_id is zero + for(int i = 0; i < num_sources; i++){ + gNodeBs.push_back("test-gnodeb-" + std::to_string(i)); + } // start up the sinks is_running = true; for(int i = 0; i < num_sinks; i++){ - sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1)); } // First do subscriptions ... for(int i = 0; i < num_sources; i++){ - source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] < 0 ); - REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); - sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]); + subscription_identifier id = std::make_tuple(gNodeBs[i], 0); + REQUIRE(subscription_manager.is_subscription_entry(id) == true); + sub_resp_info = subscription_manager.get_subscription(id); REQUIRE(sub_resp_info != NULL); - REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]); + REQUIRE(sub_resp_info->get_request_id() == 0); } @@ -810,7 +878,7 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ // Delete with time-outs source_list.clear(); for(int i = 0; i < num_sources; i++){ - source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_silent), 0)); + source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i], std::ref(mock_silent), 0)); } @@ -830,33 +898,98 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ } - SECTION("Spurious messages"){ - std::cout <<"+++++++++" << std::endl << "TEST WITH SPURIOUS RESPONSES" << std::endl; + - // In this section, we basically inject - // spurious messages to subscription handler. - // There are no outcomes. basically - // handler should be able to ignore these messages + SECTION("Verify timeout behaviour if transmitter sends after delay"){ + std::cout <<"+++++++++" << std::endl << "TEST DELAYED ARRIVAL OF SUBSCRIPTIONS " << std::endl; + + subscription_manager.clear(); + int num_sources = 10; + int num_sinks = 5; + + std::vector source_list; + std::vector sink_list; + + std::vector status_vector(num_sources, 0); - int num_packets = 50; - int num_sinks = 10; - std::vector sink_list; + // set subscription manager timeout on short fuse + int time_out = 1; + int num_tries = 1; + subscription_manager.set_timeout(time_out); + subscription_manager.set_num_retries(num_tries); + + // start up the sinks with delayed response + is_running = true; + int delay = 5; + for(int i = 0; i < num_sinks; i++){ + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), delay)); + } + + + // generate the gnodeb list for which we are subscribing + // default ran_function_id is zero + std::vector gNodeBs; + for(int i = 0; i < num_sources; i++){ + gNodeBs.push_back("test-gnodeb-" + std::to_string(i)); + } + + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT); + } + + // stop the sinks + is_running =false; + for(int i = 0; i < num_sinks; i++){ + sink_list[i].join(); + } + + REQUIRE(subscription_manager.num_complete() == 0); + REQUIRE(subscription_manager.num_pending() == 0); + + } + + SECTION("Duplicate requests"){ + std::cout <<"+++++++++" << std::endl << "TEST DUPLICATE SUBSCRIPTION REQUESTS " << std::endl; subscription_manager.clear(); - std::cout <<"Message queue size prior to fill = " << message_bus.size() << std::endl; - random_tx(num_packets); - std::cout <<"Message queue size post fill = " << message_bus.size() << std::endl; - + + int num_sources = 20; + int num_sinks = 5; + + std::vector source_list; + std::vector sink_list; + + std::vector status_vector(num_sources, 0); + + // generate IDENTICAL gnodeb list for which we are subscribing + // default ran_function_id is zero + std::vector gNodeBs; + for(int i = 0; i < num_sources; i++){ + gNodeBs.push_back("test-gnodeb-" + std::to_string(0)); + } // start up the sinks is_running = true; for(int i = 0; i < num_sinks; i++){ - sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1)); + } + + // send out subscriptions + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0)); } - // wait for queue to drain out - while(! message_bus.empty()){ - sleep(2); + // exactly ONE subscription should succeed. all others should fail with SUBSCR_ERR_DUPLICATE + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE( (status_vector[i] == -1 || status_vector[i] == SUBSCR_ERR_DUPLICATE)); + } // stop the sinks @@ -864,14 +997,57 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){ for(int i = 0; i < num_sinks; i++){ sink_list[i].join(); } - REQUIRE(subscription_manager.num_complete() == 0); + + REQUIRE(subscription_manager.num_complete() == 1); + } + + + SECTION("Duplicate responses"){ + // this scenario can happen if there was an initial successful + // subscription with request + // followed by another one. The response for the second one should + // result in a duplicate subscription error + + + std::cout <<"+++++++++" << std::endl << "TEST DUPLICATE SUBSCRIPTION RESPONSES" << std::endl; + + subscription_manager.clear(); + + int num_sources = 1; + int num_sinks = 1; + std::vector status_vector (num_sources, 0); + std::vector gNodeBs; + gNodeBs.push_back("test-gnodeb"); + + std::vector sink_list; + // start up the sinks + is_running = true; + for(int i = 0; i < num_sinks; i++){ + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1)); + } + + // send a subscription : this should succeed + send_request(subscription_manager, status_vector, gNodeBs, 0, mock_tx, 0); + REQUIRE(status_vector[0] == -1); REQUIRE(subscription_manager.num_pending() == 0); + REQUIRE(subscription_manager.num_complete() == 1); - std::cout <<"Message queue size at end = " << message_bus.size() << std::endl; + // now send same subscription again + send_request(subscription_manager, status_vector, gNodeBs, 0, mock_tx, 0); + REQUIRE(status_vector[0] == SUBSCR_ERR_DUPLICATE); + REQUIRE(subscription_manager.num_pending() == 0); + REQUIRE(subscription_manager.num_complete() == 1); + + // stop the sinks + is_running =false; + for(int i = 0; i < num_sinks; i++){ + sink_list[i].join(); + } + } - -}; + +} diff --git a/test/unit_test_xapp.cc b/test/unit_test_xapp.cc index 6801ec2..e1d47f1 100644 --- a/test/unit_test_xapp.cc +++ b/test/unit_test_xapp.cc @@ -27,7 +27,6 @@ #include #include -#define RMR_RT_FILE "/home/asridharan/projects/ric-xapp-dev/ric-app/adm-ctrl-xapp/test/uta_rtg.rt" #define MESSAGE_SIZE 512 int num_recv_pkts = 0; @@ -62,13 +61,20 @@ void dropped_pkts(rmr_mbuf_t *send_msg){ num_dropped_pkts++; } -bool ping_x(rmr_mbuf_t *rcv_msg){ - rcv_msg->mtype = 101; //pong +bool pong_a1(rmr_mbuf_t *rcv_msg){ + rcv_msg->mtype = A1_POLICY_RESP; num_ping_pkts++; return true; } + bool pong_x(rmr_mbuf_t *rcv_msg){ + rcv_msg->mtype = 102; //ping port + num_ping_pkts++; + return true; +} + +bool ping_recv(rmr_mbuf_t * rcv_msg){ num_pong_pkts++; return false; } @@ -79,39 +85,25 @@ TEST_CASE("Test xapp functionality", "[xapp]"){ // Test parameters char app_name[128] = "Test App"; char port[16] = "tcp:4999"; - int num_retries = 4; + init_logger("UNIT_TEST_XAPP", MDCLOG_INFO); - mdclog_attr_t *attr; - mdclog_attr_init(&attr); - mdclog_attr_set_ident(attr, "UNIT TEST XAPP FRAMEWORK"); - mdclog_init(attr); - mdclog_level_set(MDCLOG_INFO); - mdclog_attr_destroy(attr); - - SECTION("RMR illegal options"){ - char illegal_port [] = "udp:-1"; - REQUIRE_THROWS(XaPP(app_name, illegal_port, sizeof(Test_message), 1)); - REQUIRE_THROWS(XaPP(app_name, port, RMR_BUFFER_SIZE + 1, 1)); + SECTION("Illegal buffer size"){ + REQUIRE_THROWS(XaPP(app_name, port, RMR_BUFFER_SIZE + 1)); } - + SECTION("All good"){ - REQUIRE_NOTHROW(XaPP(app_name, port, sizeof(Test_message), 1)); + REQUIRE_NOTHROW(XaPP(app_name, port, sizeof(Test_message))); } SECTION("Simple + memory"){ - XaPP check_xapp = XaPP(app_name, port, sizeof(Test_message), 1); - REQUIRE(check_xapp.getName() == std::string(app_name)); + XaPP check_xapp = XaPP(app_name, port, sizeof(Test_message)); + REQUIRE(check_xapp.get_name() == std::string(app_name)); } SECTION("Configuration test"){ - XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) , 1); - test_xapp.set_num_retries(num_retries); - - REQUIRE(test_xapp.get_num_retries() == num_retries); - REQUIRE(test_xapp.getStatus() == 1); - REQUIRE(test_xapp.get_num_retries() != 2); - REQUIRE(test_xapp.getStatus() == true); + XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message)); + REQUIRE(test_xapp.get_status() == true); REQUIRE(test_xapp.get_rmr_context() != NULL); } @@ -123,11 +115,10 @@ TEST_CASE("Test xapp functionality", "[xapp]"){ failed_tx = 0; // Instantiate and configure xAPP - XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) , 1); - test_xapp.set_num_retries(num_retries); + XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message)); // Start receiver for test - test_xapp.Start(&rcvd_pkts); + test_xapp.StartThread(rcvd_pkts); sleep(1); // Test Send normal message @@ -136,7 +127,7 @@ TEST_CASE("Test xapp functionality", "[xapp]"){ for(i = 0; i < NumPkts; i++){ clock_gettime(CLOCK_REALTIME, &(my_message.ts)); snprintf(my_message.payload, MESSAGE_SIZE, "hello world %d", i); - bool res = test_xapp.Send(0, sizeof(Test_message), (void *) (&my_message)); + bool res = test_xapp.Send(102, sizeof(Test_message), (void *) (&my_message)); if (!res){ failed_tx ++; } @@ -144,10 +135,12 @@ TEST_CASE("Test xapp functionality", "[xapp]"){ } sleep(1); - - test_xapp.Stop(); + REQUIRE(test_xapp.get_num_active_threads() == 1); + test_xapp.Stop(); + REQUIRE(test_xapp.get_num_active_threads() == 0); + std::cout <<"Num Packets Sent = " << NumPkts << " Received packets = " << num_recv_pkts << " Dropped packets = " << num_dropped_pkts << " Failed sends = "<< failed_tx << std::endl; - std::cout <<"Num attempts = " << test_xapp.get_Send_attempts() << " Num failed = " << test_xapp.get_Send_fails() << std::endl; + REQUIRE(num_recv_pkts > 0); REQUIRE(num_dropped_pkts == 0); @@ -155,229 +148,222 @@ TEST_CASE("Test xapp functionality", "[xapp]"){ REQUIRE(num_recv_pkts == NumPkts); } -// SECTION("Transmission test with start specific thread"){ -// num_recv_pkts = 0; -// num_dropped_pkts = 0; -// failed_tx = 0; - -// // Instantiate and configure xAPP -// XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) , 1); -// test_xapp.set_num_retries(num_retries); - -// // Start receiver for test -// test_xapp.StartThread(&rcvd_pkts); -// sleep(1); - -// // Test Send normal message -// Test_message my_message; -// uint32_t i = 0; -// for(i = 0; i < NumPkts; i++){ -// clock_gettime(CLOCK_REALTIME, &(my_message.ts)); -// snprintf(my_message.payload, MESSAGE_SIZE, "hello world %d", i); -// bool res = test_xapp.Send(0, sizeof(Test_message), (void *) (&my_message)); -// if (!res){ -// failed_tx ++; -// } -// usleep(10); - -// } -// sleep(1); - -// test_xapp.Stop(); -// std::cout <<"Num Packets Sent = " << NumPkts << " Received packets = " << num_recv_pkts << " Dropped packets = " << num_dropped_pkts << " Failed sends = "<< failed_tx << std::endl; -// std::cout <<"Num attempts = " << test_xapp.get_Send_attempts() << " Num failed = " << test_xapp.get_Send_fails() << std::endl; - -// REQUIRE(num_recv_pkts > 0); -// REQUIRE(num_dropped_pkts == 0); -// REQUIRE(failed_tx == 0); -// REQUIRE(num_recv_pkts == NumPkts); -// } + SECTION("Transmission test error handler with start"){ -// SECTION("Transmission test error handler with start"){ - -// num_recv_pkts = 0; -// num_dropped_pkts = 0; -// failed_tx = 0; + num_recv_pkts = 0; + num_dropped_pkts = 0; + failed_tx = 0; -// // Instantiate and configure xAPP -// XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) , 1); -// test_xapp.set_num_retries(num_retries); + // Instantiate and configure xAPP + XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) ); + -// // Start receiver for test -// test_xapp.Start(&echo_into_space_pkts, &dropped_pkts); -// sleep(1); + // Start receiver for test + test_xapp.StartThread(&echo_into_space_pkts, &dropped_pkts); + sleep(1); -// // Test Send normal message -// Test_message my_message; -// uint32_t i = 0; -// for(i = 0; i < NumPkts; i++){ -// clock_gettime(CLOCK_REALTIME, &(my_message.ts)); -// snprintf(my_message.payload, MESSAGE_SIZE, "hello world %d", i); -// bool res = test_xapp.Send(0, sizeof(Test_message), (void *) (&my_message)); -// if (!res){ -// failed_tx ++; -// } -// usleep(10); + // Test Send normal message + Test_message my_message; + uint32_t i = 0; + for(i = 0; i < NumPkts; i++){ + clock_gettime(CLOCK_REALTIME, &(my_message.ts)); + snprintf(my_message.payload, MESSAGE_SIZE, "hello world %d", i); + bool res = test_xapp.Send(102, sizeof(Test_message), (void *) (&my_message)); + if (!res){ + failed_tx ++; + } + usleep(10); -// } -// sleep(1); + } + sleep(1); -// test_xapp.Stop(); -// std::cout <<"Num Packets Sent = " << NumPkts << " Received packets = " << num_recv_pkts << " Dropped packets = " << num_dropped_pkts << " Failed sends = "<< failed_tx << std::endl; -// std::cout <<"Num attempts = " << test_xapp.get_Send_attempts() << " Num failed = " << test_xapp.get_Send_fails() << std::endl; + test_xapp.Stop(); + std::cout <<"Num Packets Sent = " << NumPkts << " Received packets = " << num_recv_pkts << " Dropped packets = " << num_dropped_pkts << " Failed sends = "<< failed_tx << std::endl; + -// REQUIRE(num_recv_pkts == NumPkts); -// REQUIRE(num_dropped_pkts == NumPkts); -// REQUIRE(failed_tx == 0); -// } + REQUIRE(num_recv_pkts == NumPkts); + REQUIRE(num_dropped_pkts == NumPkts); + REQUIRE(failed_tx == 0); + } -// SECTION("Transmission test error handler with start thread"){ + SECTION("Transmission test error handler with start thread"){ -// num_recv_pkts = 0; -// num_dropped_pkts = 0; -// failed_tx = 0; + num_recv_pkts = 0; + num_dropped_pkts = 0; + failed_tx = 0; -// // Instantiate and configure xAPP -// XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) , 1); -// test_xapp.set_num_retries(num_retries); + // Instantiate and configure xAPP + XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) ); + -// // Start receiver for test -// test_xapp.StartThread(&echo_into_space_pkts, &dropped_pkts); -// sleep(1); + // Start receiver for test + test_xapp.StartThread(&echo_into_space_pkts, &dropped_pkts); + sleep(1); -// // Test Send normal message -// Test_message my_message; -// uint32_t i = 0; -// for(i = 0; i < NumPkts; i++){ -// clock_gettime(CLOCK_REALTIME, &(my_message.ts)); -// snprintf(my_message.payload, MESSAGE_SIZE, "hello world %d", i); -// bool res = test_xapp.Send(0, sizeof(Test_message), (void *) (&my_message)); -// if (!res){ -// failed_tx ++; -// } -// usleep(10); + // Test Send normal message + Test_message my_message; + uint32_t i = 0; + for(i = 0; i < NumPkts; i++){ + clock_gettime(CLOCK_REALTIME, &(my_message.ts)); + snprintf(my_message.payload, MESSAGE_SIZE, "hello world %d", i); + bool res = test_xapp.Send(102, sizeof(Test_message), (void *) (&my_message)); + if (!res){ + failed_tx ++; + } + usleep(10); -// } -// sleep(1); + } + sleep(1); -// test_xapp.Stop(); -// std::cout <<"Num Packets Sent = " << NumPkts << " Received packets = " << num_recv_pkts << " Dropped packets = " << num_dropped_pkts << " Failed sends = "<< failed_tx << std::endl; -// std::cout <<"Num attempts = " << test_xapp.get_Send_attempts() << " Num failed = " << test_xapp.get_Send_fails() << std::endl; + test_xapp.Stop(); + std::cout <<"Num Packets Sent = " << NumPkts << " Received packets = " << num_recv_pkts << " Dropped packets = " << num_dropped_pkts << " Failed sends = "<< failed_tx << std::endl; + -// REQUIRE(num_recv_pkts == NumPkts); -// REQUIRE(num_dropped_pkts == NumPkts); -// REQUIRE(failed_tx == 0); + REQUIRE(num_recv_pkts == NumPkts); + REQUIRE(num_dropped_pkts == NumPkts); + REQUIRE(failed_tx == 0); -// } + } -// SECTION("Test ping pong : two xapps send to each other. "){ + SECTION("Test ping pong : two xapps send to each other. "){ -// char ping_name[] = "ping"; -// char pong_name[] = "pong"; -// char ping_port[] = "tcp:4999"; -// char pong_port[] = "tcp:4998"; + char ping_name[] = "ping"; + char pong_name[] = "pong"; + char ping_port[] = "tcp:4999"; + char pong_port[] = "tcp:4998"; -// // Instantiate ping xAPP -// XaPP ping_xapp = XaPP(ping_name, ping_port, sizeof(Test_message) , 1); + // Instantiate ping xAPP + XaPP ping_xapp = XaPP(ping_name, ping_port, sizeof(Test_message) ); -// // Instantiate pong xapp -// XaPP pong_xapp = XaPP(pong_name, pong_port, sizeof(Test_message) , 1); + // Instantiate pong xapp + XaPP pong_xapp = XaPP(pong_name, pong_port, sizeof(Test_message) ); -// // Start receiver on ping -// ping_xapp.StartThread(ping_x); -// sleep(1); + // Start receiver on ping + ping_xapp.StartThread(ping_recv); + sleep(1); + + // Start receiver on pong + pong_xapp.StartThread(pong_x); + + // send messages from ping to pong + Test_message my_message; + uint32_t i = 0; + for(i = 0; i < NumPkts; i++){ + clock_gettime(CLOCK_REALTIME, &(my_message.ts)); + snprintf(my_message.payload, MESSAGE_SIZE, "hello world %d", i); + bool res = ping_xapp.Send(101, sizeof(Test_message), (void *) (&my_message)); + if (!res){ + failed_tx ++; + } + } + + sleep(1); + pong_xapp.Stop(); + + REQUIRE(failed_tx == 0); + REQUIRE(num_ping_pkts == NumPkts); + REQUIRE(num_pong_pkts == NumPkts); + + + // Re-run experiment but now with A1 message type when + // ping responds + pong_xapp.StartThread(pong_a1); + sleep(1); -// // Start receiver on pong -// pong_xapp.StartThread(&pong_x); + failed_tx = 0; + num_ping_pkts = 0; + num_pong_pkts = 0; + + + for(i = 0; i < NumPkts; i++){ + clock_gettime(CLOCK_REALTIME, &(my_message.ts)); + snprintf(my_message.payload, MESSAGE_SIZE, "hello world %d", i); + bool res = ping_xapp.Send(101, sizeof(Test_message), (void *) (&my_message)); + if (!res){ + failed_tx ++; + } + } + + sleep(1); + ping_xapp.Stop(); + pong_xapp.Stop(); + + std::cerr <<"Pong received ping pkts = " << num_ping_pkts << " Ping received a1 packets = " << num_pong_pkts << " failures = " << failed_tx << std::endl; -// // send messages to ping -// Test_message my_message; -// uint32_t i = 0; -// for(i = 0; i < NumPkts; i++){ -// clock_gettime(CLOCK_REALTIME, &(my_message.ts)); -// snprintf(my_message.payload, MESSAGE_SIZE, "hello world %d", i); -// bool res = ping_xapp.Send(0, sizeof(Test_message), (void *) (&my_message)); -// if (!res){ -// failed_tx ++; -// } -// } - -// sleep(1); -// ping_xapp.Stop(); -// pong_xapp.Stop(); - -// REQUIRE(failed_tx == 0); -// REQUIRE(num_ping_pkts == NumPkts); -// REQUIRE(num_pong_pkts == NumPkts); - -// } -// } - - -// TEST_CASE(" Test out various transmission methods ..", "1"){ - -// // Test parameters -// char app_name[128] = "Test App"; -// char port[16] = "tcp:4999"; + REQUIRE(num_ping_pkts >= NumPkts); + REQUIRE(num_pong_pkts >= NumPkts); + REQUIRE(failed_tx == 0); + + + + + } +} + + +TEST_CASE(" Test out various transmission methods ..", "1"){ + + // Test parameters + char app_name[128] = "Test App"; + char port[16] = "tcp:4999"; + init_logger("UNIT_TEST_XAPP", MDCLOG_INFO); -// mdclog_attr_t *attr; -// mdclog_attr_init(&attr); -// mdclog_attr_set_ident(attr, "UNIT TEST XAPP FRAMEWORK"); -// mdclog_init(attr); -// mdclog_level_set(MDCLOG_INFO); -// mdclog_attr_destroy(attr); -// bool res; -// unsigned char my_meid[32] = "ABC123"; -// Test_message my_message; + bool res; + unsigned char my_meid[32] = "ABC123"; + Test_message my_message; -// SECTION("Test if message larger than allowed"){ -// num_recv_pkts = 0; -// num_dropped_pkts = 0; -// failed_tx = 0; + SECTION("Test if message larger than allowed"){ + num_recv_pkts = 0; + num_dropped_pkts = 0; + failed_tx = 0; -// // Instantiate and configure xAPP -// XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) , 1); + // Instantiate and configure xAPP + XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) ); -// // Start receiver for test -// test_xapp.StartThread(&rcvd_pkts); -// sleep(1); + // Start receiver for test + test_xapp.StartThread(&rcvd_pkts); + sleep(1); -// // Test sending a message of size larger than allowed -// res = test_xapp.Send(0, RMR_BUFFER_SIZE + 100, (void *)(&my_message)); -// test_xapp.Stop(); + // Test sending a message of size larger than allowed + res = test_xapp.Send(102, RMR_BUFFER_SIZE + 100, (void *)(&my_message)); + REQUIRE(res == false); + + res = test_xapp.Send(102, RMR_BUFFER_SIZE + 100, (void *)(&my_message), "id"); + REQUIRE(res == false); + test_xapp.Stop(); -// REQUIRE(res == false); -// } + + } -// SECTION("Test with tlv"){ -// num_recv_pkts = 0; -// num_dropped_pkts = 0; -// failed_tx = 0; + SECTION("Test with tlv"){ + num_recv_pkts = 0; + num_dropped_pkts = 0; + failed_tx = 0; -// // Instantiate and configure xAPP -// XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) , 1); + // Instantiate and configure xAPP + XaPP test_xapp = XaPP(app_name, port, sizeof(Test_message) ); -// // Start receiver for test -// test_xapp.StartThread(&rcvd_pkts); -// sleep(1); + // Start receiver for test + test_xapp.StartThread(&rcvd_pkts); + sleep(1); -// // Test Send with tlv -// clock_gettime(CLOCK_REALTIME, &(my_message.ts)); -// snprintf(my_message.payload, MESSAGE_SIZE, "hello world"); -// res = test_xapp.Send(0, sizeof(Test_message), (void *) (&my_message)); -// sleep(1); + // Test Send with tlv + clock_gettime(CLOCK_REALTIME, &(my_message.ts)); + snprintf(my_message.payload, MESSAGE_SIZE, "hello world"); + res = test_xapp.Send(102, sizeof(Test_message), (void *) (&my_message)); + sleep(1); -// // Test send with tlv and meid -// res = test_xapp.Send(0, sizeof(Test_message), (void *) (&my_message), my_meid); -// sleep(1); + // Test send with tlv and meid + res = test_xapp.Send(102, sizeof(Test_message), (void *) (&my_message), my_meid); + sleep(1); -// test_xapp.Stop(); + test_xapp.Stop(); -// REQUIRE(num_recv_pkts == 2); -// REQUIRE(!strcmp((const char *)meid, (const char *)my_meid)); -// } + REQUIRE(num_recv_pkts == 2); + REQUIRE(!strcmp((const char *)meid, (const char *)my_meid)); + } } diff --git a/test/uta_rtg.rt b/test/uta_rtg.rt index ce9b132..db5a34c 100644 --- a/test/uta_rtg.rt +++ b/test/uta_rtg.rt @@ -15,7 +15,6 @@ # limitations under the License. newrt|start -rte|0|localhost:4999 rte|102|localhost:4999 rte|101|localhost:4998 rte|2|localhost:38000 @@ -32,5 +31,6 @@ rte|12040|localhost:38000 rte|12050|localhost:4560 rte|20000|localhost:4560 rte|20002|localhost:4560 +rte|20010|localhost:4560 rte|1111|localhost:38000 newrt|end -- 2.16.6