From 0fa9a8469f9b58fb16706ee0645f5d1f6b47a908 Mon Sep 17 00:00:00 2001 From: smahana123 Date: Thu, 1 Dec 2022 12:34:49 +0530 Subject: [PATCH] Unittesting for tm. Issue-Id: AIMLFW-6 Signed-off-by: smahana123 Change-Id: Ibc302f732c15571f92e7f6317fe94a1b5845f509 Signed-off-by: smahana123 --- tests/README.MD | 3 +- tests/common/conf_log.yaml | 2 +- tests/test.env | 30 ++ tests/test_common_db_fun.py | 441 ++++++++++++++++++++++++ tests/test_tm_apis.py | 498 ++++++++++++++++++++++++++- tests/test_trainingmgr_config.py | 106 ++++++ tests/test_trainingmgr_operations.py | 87 +++++ tests/test_trainingmgr_ps_db.py | 87 +++++ tests/test_trainingmgr_util.py | 640 +++++++++++++++++++++++++++++++++++ 9 files changed, 1874 insertions(+), 20 deletions(-) create mode 100644 tests/test.env create mode 100644 tests/test_common_db_fun.py create mode 100644 tests/test_trainingmgr_config.py create mode 100644 tests/test_trainingmgr_operations.py create mode 100644 tests/test_trainingmgr_ps_db.py create mode 100644 tests/test_trainingmgr_util.py diff --git a/tests/README.MD b/tests/README.MD index ca6c657..cc39e6d 100644 --- a/tests/README.MD +++ b/tests/README.MD @@ -17,7 +17,6 @@ # ================================================================================== #Change the current working directory to home directory tm - # Directory struture tm |__tests @@ -42,4 +41,4 @@ pip3 install -r requirements_test.txt # Example to run test cases. # Generate test report -sudo python3 -m pytest -rA . --capture=tee-sys --cov-report term-missing --cov-report xml:coverage.xml --cov-report html:htmlcov --junitxml test-reports/junit.xml --cov=./ \ No newline at end of file +python3 -m pytest -rA . --capture=tee-sys --cov-report term-missing --cov-report xml:coverage.xml --cov-report html:htmlcov --junitxml test-reports/junit.xml --cov=./trainingmgr/ \ No newline at end of file diff --git a/tests/common/conf_log.yaml b/tests/common/conf_log.yaml index 4ecb1fa..dc8d902 100644 --- a/tests/common/conf_log.yaml +++ b/tests/common/conf_log.yaml @@ -29,7 +29,7 @@ handlers: class: logging.handlers.RotatingFileHandler level: DEBUG formatter: simple - filename: ./training_manager_test.log + filename: training_manager.log maxBytes: 10485760 backupCount: 20 encoding: utf8 diff --git a/tests/test.env b/tests/test.env new file mode 100644 index 0000000..325004a --- /dev/null +++ b/tests/test.env @@ -0,0 +1,30 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +KF_ADAPTER_PORT=5001 +KF_ADAPTER_IP=localhost +DATA_EXTRACTION_API_PORT=32000 +DATA_EXTRACTION_API_IP=localhost +DEPLOYMENT_MANAGER_PORT=9999 +DEPLOYMENT_MANAGER_IP=localhost +TRAINING_MANAGER_PORT=32002 +TRAINING_MANAGER_IP=localhost +PS_USER=postgres +PS_PASSWORD="abcd" +PS_IP="localhost" +PS_PORT="30001" diff --git a/tests/test_common_db_fun.py b/tests/test_common_db_fun.py new file mode 100644 index 0000000..3f12089 --- /dev/null +++ b/tests/test_common_db_fun.py @@ -0,0 +1,441 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import pytest +import sys +from dotenv import load_dotenv +import json +from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainingjobs, \ + change_field_of_latest_version, change_field_value_by_version, \ + get_field_by_latest_version, get_field_of_given_version, \ + change_in_progress_to_failed_by_latest_version, change_steps_state_of_latest_version, \ + change_steps_state_by_version, delete_trainingjob_version, get_info_by_version, \ + get_trainingjob_info_by_name, get_latest_version_trainingjob_name, \ + get_all_versions_info_by_name, get_all_distinct_trainingjobs, \ + get_all_version_num_by_trainingjob_name, update_model_download_url, \ + add_update_trainingjob, get_all_jobs_latest_status_version + +mimic_db = { + "usecase_name": "Tester", + "description": "Current UseCase Is For Testing Only", + "feature_list": "*", + "pipeline_name": "qoe-pipeline", + "experiment_name": "default", + "arguments": "{epoches : 1}", + "query_filter": "", + "creation_time": "29-09-2022", + "run_id": 1729, + "steps_state": json.dumps({"DATA_EXTRACTION" : "IN_PROGRESS"}), + "updation_time": "29-09-2022", + "version": 1, + "enable_versioning": True, + "target_deployment": "Near RT Ric", + "pipeline_version": 1, + "datalake_source": "InfluxSource", + "incremental_training": False, + "model": "", + "model_version": "", + "model_url":"", + "notification_url": "", + "_measurement": "liveCell", + "bucket": "UEdata", + "accuracy": 70 + } + +class db_helper: + '''Mimics as a Db''' + def __init__(self, req_cols, raise_exception = False, check_success_obj = None): + self.cols = req_cols + self.raise_exception = raise_exception + self.check_success_obj = check_success_obj + self.counter = 0 + + def get_new_conn(self): + return db_helper(self.cols, self.raise_exception, self.check_success_obj) + + def cursor(self): + return db_helper(self.cols, self.raise_exception, self.check_success_obj) + + def execute(self, query, values = None): + if self.raise_exception: + raise Exception("DB Error") + + def fetchall(self): + out = [] + if(len(self.cols) > 0): + if(self.cols[self.counter][0] == "*"): + for (col, value) in mimic_db.items(): + out.append(value) + elif(self.cols[self.counter][0] == None): + self.counter += 1 + return None + else: + for col in self.cols[self.counter]: + out.append(mimic_db[col]) + self.counter += 1 + return [out] + + def close(self): + ''' For checking success in fxn not returning anything, If you call close, then It means query as exceuted as expected ''' + if self.check_success_obj: + self.check_success_obj.setwin() + + def rollback(self): + pass + + def commit(self): + pass + +class Check: + def __init__(self): + self.finished = False + + def setwin(self): + self.finished = True + +class Test_Common_Db_Fun: + def setup_method(self): + pass + + def test_get_data_extraction_in_progress_trainingjobs(self): + db_obj = db_helper([["usecase_name", "steps_state"]]) + out = get_data_extraction_in_progress_trainingjobs(db_obj) + + assert out != None, 'Function get_data_extraction_in_progress_trainingjobs has failed' + + def test_get_data_extraction_in_progress_trainingjobs(self): + checker = Check() + try: + db_obj = db_helper([["usecase_name", "steps_state"]], raise_exception=True, check_success_obj=checker) + out = get_data_extraction_in_progress_trainingjobs(db_obj) + assert out != None, 'Fxn get_usecases_which_has_data_extraction_in_progress Failed' + except Exception as err: + assert str(err) == "bad operand type for unary +: 'str'", 'Negative test get_usecases_which_has_data_extraction_in_progress FAILED, Doesnt returned required error' + assert checker.finished, 'Cursor Not Closed Properly for fxn test_negative_get_usecases_which_has_data_extraction_in_progress' + + def test_change_field_of_latest_version(self): + for field in ["notification_url", "run_id"]: + checker = Check() + db_obj = db_helper([["version", "usecase_name"]], check_success_obj=checker) + change_field_of_latest_version("Tester", db_obj, field, "a_dummy_value") + + assert checker.finished, 'change_field_of_latest_version Failed For Usecame {} and Field {} '.format("Tester", field) + + def test_negative_change_field_of_latest_version(self): + for field in ["notification_url", "run_id"]: + checker = Check() + db_obj = db_helper([["version", "usecase_name"]],raise_exception=True ,check_success_obj=checker) + try: + change_field_of_latest_version("Tester", db_obj, field, "a_dummy_value") + assert checker.finished, 'change_field_of_latest_version Failed For Usecame {} and Field {} '.format("Tester", field) + assert False + except Exception: + assert True + + def test_change_field_value_by_version_2(self): + checker = Check() + db_obj = db_helper([[None]], check_success_obj=checker) + change_field_value_by_version("Tester", 1, db_obj, "deletion_in_progress", "a_dummy_value") + assert checker.finished, 'change_field_of_given_version FAILED' + + def test_negative_change_field_value_by_version_2(self): + checker = Check() + db_obj = db_helper([[None]], raise_exception=True, check_success_obj=checker) + try: + change_field_value_by_version("Tester", 1, db_obj, "deletion_in_progress", "a_dummy_value") + assert checker.finished, 'change_field_value_by_version FAILED' + assert False + except Exception: + assert True + + def test_get_field_by_latest_version(self): + for field in ["notification_url", "model_url", "target_deployment"]: + db_obj = db_helper([["version"], [field]]) + out = get_field_by_latest_version("Tester", db_obj, field) + assert out != None, 'get_field_by_latest_version FAILED at field = {}'.format(field) + + def test_negative_get_field_by_latest_version(self): + for field in ["notification_url", "model_url", "target_deployment"]: + checker = Check() + try: + db_obj = db_helper([["version"], [field]], raise_exception=True, check_success_obj=checker) + out = get_field_by_latest_version("Tester", db_obj, field) + assert out != None, 'get_field_by_latest_version FAILED at field = {}'.format(field) + assert False + except Exception: + assert True + + def test_get_field_of_given_version(self): + db_obj = db_helper([["steps_state"]]) + out = get_field_of_given_version("Tester", 1, db_obj, "steps_state") + assert out != None, ' test_get_field_of_given_version FAILED' + + def test_negative_get_field_of_given_version(self): + checker = Check() + try: + db_obj = db_helper([["steps_state"]], raise_exception=True, check_success_obj=checker) + out = get_field_of_given_version("Tester", 1, db_obj, "steps_state") + assert out != None, ' test_get_field_of_given_version FAILED' + assert False + except Exception: + assert True + + def test_change_in_progress_to_failed_by_latest_version(self): + checker = Check() + db_obj = db_helper([["version"], ["steps_state"]], check_success_obj=checker) + change_in_progress_to_failed_by_latest_version("Tester", db_obj) + assert checker.finished, 'change_in_progress_to_failed_by_latest_version FAILED' + + def test_negative_change_in_progress_state_to_failed_of_latest_version(self): + checker = Check() + try: + db_obj = db_helper([["version"], ["steps_state"]], raise_exception=True,check_success_obj=checker) + change_in_progress_to_failed_by_latest_version("Tester", db_obj) + assert checker.finished, 'change_in_progress_to_failed_by_latest_version FAILED' + except Exception as err: + fxn_name = "change_in_progress_to_failed_by_latest_version(" + assert str(err) == "bad operand type for unary +: 'str'", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + def test_change_steps_state_of_latest_version(self): + checker = Check() + db_obj = db_helper([["version"], ["steps_state"], [None]], check_success_obj=checker) + change_steps_state_of_latest_version("Tester", db_obj, 1, 2) # Dummy Key and Values + assert checker.finished, 'change_steps_state_of_latest_version FAILED' + + def test_negative_change_steps_state_of_latest_version(self): + checker = Check() + try: + db_obj = db_helper([["version"], ["steps_state"], [None]], raise_exception=True ,check_success_obj=checker) + change_steps_state_of_latest_version("Tester", db_obj, 1, 2) # Dummy Key and Values + assert checker.finished, 'change_steps_state_of_latest_version FAILED' + except Exception as err: + fxn_name = "change_steps_state_of_latest_version" + assert str(err) == "Failed to execute query in change_steps_state_of_latest_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + def test_change_steps_state_by_version(self): + checker = Check() + db_obj = db_helper([["steps_state"], [None]] , check_success_obj=checker) + change_steps_state_by_version("Tester", 1, db_obj, 1, 2) # Dummy Key and Values + assert checker.finished, 'change_steps_state_by_version FAILED' + + def test_negative_change_steps_state_by_version(self): + checker = Check() + try: + db_obj = db_helper([["steps_state"], [None]] , raise_exception=True,check_success_obj=checker) + change_steps_state_by_version("Tester", 1, db_obj, 1, 2) # Dummy Key and Values + assert checker.finished, 'change_steps_state_by_version FAILED' + except Exception as err: + fxn_name = "change_steps_state_by_version" + assert str(err) == "Failed to execute query in change_steps_state_by_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + def test_delete_trainingjob_version(self): + checker = Check() + db_obj = db_helper([[None]], check_success_obj=checker) + delete_trainingjob_version("Tester", 1, db_obj) + assert checker.finished, 'delete_trainingjob_version FAILED' + + def test_negative_delete_trainingjob_version(self): + checker = Check() + try: + db_obj = db_helper([[None]], raise_exception=True,check_success_obj=checker) + delete_trainingjob_version("Tester", 1, db_obj) + assert checker.finished, 'delete_trainingjob_version FAILED' + except Exception as err: + fxn_name = "delete_trainingjob_version" + assert str(err) == "Failed to execute query in delete_trainingjob_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + def test_get_info_by_version(self): + db_obj = db_helper([["*"]]) + out = get_info_by_version("Tester", 1, db_obj) + assert out != None, 'get_info_by_version FAILED' + + def test_negative_get_info_by_version(self): + checker = Check() + try: + db_obj = db_helper([["*"]], raise_exception=True,check_success_obj=checker) + out = get_info_by_version("Tester", 1, db_obj) + assert out != None, 'get_info_by_version FAILED' + except Exception as err: + fxn_name = "get_info_by_version" + assert str(err) == "Failed to execute query in get_info_by_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + def test_get_trainingjob_info_by_name(self): + db_obj = db_helper([["version"], ["*"]]) + out = get_trainingjob_info_by_name("Tester", db_obj) + assert out != None, 'get_trainingjob_info_by_name FAILED' + + def test_negative_get_trainingjob_info_by_name(self): + checker = Check() + try: + db_obj = db_helper([["version"], ["*"]], raise_exception=True,check_success_obj=checker) + out = get_trainingjob_info_by_name("Tester", db_obj) + assert out != None, 'get_trainingjob_info_by_name FAILED' + except Exception as err: + fxn_name = "get_trainingjob_info_by_name" + assert str(err) == "Failed to execute query in get_trainingjob_info_by_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + def test_get_latest_version_trainingjob_name(self): + db_obj = db_helper([["version"]]) + out = get_latest_version_trainingjob_name("Tester", db_obj) + assert type(out) == int, 'get_latest_version_trainingjob_name FAILED' + + def test_negative_get_latest_version_trainingjob_name(self): + checker = Check() + try: + db_obj = db_helper([["version"]], raise_exception=True,check_success_obj=checker) + out = get_latest_version_trainingjob_name("Tester", db_obj) + assert type(out) == int, 'get_latest_version FAILED' + except Exception as err: + fxn_name = "get_latest_version" + assert str(err) == "Failed to execute query in get_latest_version_trainingjob_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + def test_get_all_distinct_trainingjobs(self): + db_obj = db_helper([["usecase_name"]]) + out = get_all_distinct_trainingjobs(db_obj) + assert type(out) == list, 'get_all_distinct_trainingjobs FAILED' + + def test_negative_get_all_distinct_trainingjobs(self): + checker = Check() + try: + db_obj = db_helper([["usecase_name"]], raise_exception=True,check_success_obj=checker) + out = get_all_distinct_trainingjobs(db_obj) + assert type(out) == list, 'get_all_distinct_trainingjobs FAILED' + except Exception as err: + fxn_name = "get_all_distinct_trainingjobs" + assert str(err) == "Failed to execute query in get_all_distinct_trainingjobsDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + + def test_get_all_version_num_by_trainingjob_name(self): + db_obj = db_helper([["version"]]) + out = get_all_version_num_by_trainingjob_name("Tester", db_obj) + assert type(out) == list, 'get_all_version_num_by_trainingjob_name FAILED' + + def test_negative_get_all_version_num_by_trainingjob_name(self): + checker = Check() + try: + db_obj = db_helper([["version"]], raise_exception=True,check_success_obj=checker) + out = get_all_version_num_by_trainingjob_name("Tester", db_obj) + assert type(out) == list, 'get_all_version_num_by_trainingjob_name FAILED' + except Exception as err: + fxn_name = "get_all_version_num_by_trainingjob_name" + assert str(err) == "Failed to execute query in get_all_version_num_by_trainingjob_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + def test_update_model_download_url(self): + checker = Check() + db_obj = db_helper([[None]], check_success_obj=checker) + update_model_download_url("Tester", 1, "http/dummy/url", db_obj) + assert checker.finished, 'update_model_download_url FAILED' + + def test_negative_update_model_download_url(self): + checker = Check() + try: + db_obj = db_helper([[None]], raise_exception=True ,check_success_obj=checker) + update_model_download_url("Tester", 1, "http/dummy/url", db_obj) + assert checker.finished, 'update_model_download_url FAILED' + except Exception as err: + fxn_name = "update_model_download_url" + assert str(err) == "Failed to execute query in update_model_download_urlDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + def test_add_update_trainingjob(self): + checker = Check() + db_obj = db_helper([[None]], check_success_obj=checker) + add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, True, 1, 'InfluxSource', 'Tester',db_obj) + assert checker.finished, 'add_update_trainingjob FAILED, When adding = True' + + def test_negative_add_update_trainingjob_2(self): + checker = Check() + db_obj = db_helper([[None]], check_success_obj=checker) + add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, False, 1, 'InfluxSource', 'Tester',db_obj) + assert checker.finished, 'add_update_trainingjob FAILED, When adding = True' + + def test_negative_add_update_trainingjob_3(self): + checker = Check() + db_obj = db_helper([[None]], check_success_obj=checker) + try: + add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', False, True, 1, 'InfluxSource', 'Tester',db_obj) + assert checker.finished, 'add_update_trainingjob FAILED, When adding = True' + assert False + except Exception: + assert True + + def test_negative_add_update_trainingjob_4(self): + checker = Check() + db_obj = db_helper([[None]], check_success_obj=checker) + try: + add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', False, False, 1, 'InfluxSource', 'Tester',db_obj) + assert checker.finished, 'add_update_trainingjob FAILED, When adding = True' + assert False + except Exception: + assert True + + def test_negative_add_update_trainingjob_5(self): + checker = Check() + try: + db_obj = db_helper([[None]], raise_exception=True, check_success_obj=checker) + add_update_trainingjob('Testing ', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, True, 'Near RT-RIC', 1, 'InfluxSource', 'Tester' + ,True, '', '', db_obj, '', 'liveCell', 'UEData') + assert checker.finished, 'add_update_trainingjob FAILED, When adding = True' + except Exception as err: + fxn_name = "add_update_trainingjob" + assert str(err) == "add_update_trainingjob() takes from 12 to 15 positional arguments but 19 were given", 'Negative test {} FAILED when adding = True , Doesnt returned required error'.format(fxn_name) + + def test_get_all_jobs_latest_status_version(self): + db_obj = db_helper([["usecase_name"]]) + out = get_all_jobs_latest_status_version(db_obj) + assert out == [['Tester']], 'get_all_distinct_trainingjobs FAILED' + + def test_negative_get_all_jobs_latest_status_version(self): + checker = Check() + try: + db_obj = db_helper([["usecase_name"]], raise_exception=True,check_success_obj=checker) + out = get_all_jobs_latest_status_version(db_obj) + assert type(out) == list, 'get_all_jobs_latest_status_version FAILED' + except Exception as err: + fxn_name = "get_all_jobs_latest_status_version" + assert str(err) == "Failed to execute query in get_all_jobs_latest_status_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) + + def test_get_all_versions_info_by_name(self): + trainingjob_name = "usecase556" + db_obj = db_helper([["usecase_name"]]) + out = get_all_versions_info_by_name(trainingjob_name,db_obj) + assert out == [['Tester']], 'get_all_versions_info_by_name FAILED' + + def test_negative_get_all_versions_info_by_name(self): + checker = Check() + trainingjob_name = "usecase556" + my_dict = dict([(1,'apple'), (2,'ball')]) + db_obj = db_helper([["usecase_name"]]) + try: + db_obj = db_helper([["usecase_name"]], raise_exception=True,check_success_obj=checker) + out = get_all_versions_info_by_name(trainingjob_name,db_obj) + assert type(out) == list, 'get_all_jobs_latest_status_version FAILED' + except Exception as err: + fxn_name = "get_all_versions_info_by_name" + assert str(err) == "Failed to execute query in get_all_versions_info_by_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name) + assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name) diff --git a/tests/test_tm_apis.py b/tests/test_tm_apis.py index 1689a47..8c84793 100644 --- a/tests/test_tm_apis.py +++ b/tests/test_tm_apis.py @@ -16,6 +16,7 @@ # # ================================================================================== import json +import requests from unittest import mock from mock import patch import pytest @@ -27,18 +28,267 @@ import sys import datetime from flask_api import status from dotenv import load_dotenv +from threading import Lock from trainingmgr import trainingmgr_main from trainingmgr.common.tmgr_logger import TMLogger -trainingmgr_main.LOGGER = TMLogger("./tests/common/conf_log.yaml").logger +from trainingmgr.common.trainingmgr_config import TrainingMgrConfig +trainingmgr_main.LOGGER = pytest.logger trainingmgr_main.LOCK = Lock() trainingmgr_main.DATAEXTRACTION_JOBS_CACHE = {} + +class Test_upload_pipeline: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ") + attrs_TRAININGMGR_CONFIG_OBJ = {'kf_adapter_ip.return_value': '123', 'kf_adapter_port.return_value' : '100'} + mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ) + @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ) + def test_upload_pipeline_negative(self, mock1): + trainingmgr_main.LOGGER.debug("******* *******") + expected_data = "result" + trainingjob_req = { + "pipe_name":"usecase1", + } + response = self.client.post("/pipelines//upload".format("usecase1"), data=json.dumps(trainingjob_req), + content_type="application/json") + + trainingmgr_main.LOGGER.debug(response.data) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert expected_data in response.json.keys() + + +class Test_data_extraction_notification: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + db_result2 = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}', + '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33', + '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}', + datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False)] + + de_response2 = Response() + de_response2.code = "expired" + de_response2.error_type = "expired" + de_response2.status_code = status.HTTP_200_OK + de_response2.headers={"content-type": "application/json"} + de_response2._content = b'{"task_status": "Completed", "result": "Data Extraction Completed"}' + resp= ({"str1":"rp1","str2":"rp2"} ,status.HTTP_200_OK) + + @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result2) + @patch('trainingmgr.trainingmgr_main.training_start', return_value = de_response2) + @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version') + @patch('trainingmgr.trainingmgr_main.change_field_of_latest_version') + @patch('trainingmgr.trainingmgr_main.change_in_progress_to_failed_by_latest_version', return_value = True) + @patch('trainingmgr.trainingmgr_main.response_for_training', return_value = resp) + def test_data_extraction_notification(self, mock1, mock2, mock3, mock4, mock5, mock6): + trainingmgr_main.LOGGER.debug("******* Data_Extraction_Notification *******") + trainingjob_req = { + "trainingjob_name":"usecase1", + } + expected_data = "Data Extraction Completed" + response = self.client.post("/trainingjob/dataExtractionNotification".format("usecase1"), + data=json.dumps(trainingjob_req), + content_type="application/json") + trainingmgr_main.LOGGER.debug(response.data) + assert response.status_code == status.HTTP_200_OK + +class Test_trainingjobs_operations: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + db_result2 = [('usecase2', 'version2', '{"overall_status":"status_ok"}')] + @patch('trainingmgr.trainingmgr_main.get_all_jobs_latest_status_version', return_value = db_result2) + @patch('trainingmgr.trainingmgr_main.get_one_word_status', return_value = "status OK") + def test_trainingjobs_operations(self,mock1,mock2): + trainingmgr_main.LOGGER.debug("******* test_trainingjobs_operations get *******") + expected_data = '{"trainingjobs": [{"trainingjob_name": "usecase2", "version": "version2", "overall_status": "status OK"}]}' + response = self.client.get("/trainingjobs/latest",content_type="application/json") + trainingmgr_main.LOGGER.debug(response.data) + assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal" + assert expected_data in str(response.data) + + db_result3 = [] + @patch('trainingmgr.trainingmgr_main.get_all_jobs_latest_status_version', return_value = db_result3) + @patch('trainingmgr.trainingmgr_main.get_one_word_status', return_value = "status OK") + def test_trainingjobs_operations_get_exception(self,mock1,mock2): + trainingmgr_main.LOGGER.debug("******* test_trainingjobs_operations get exception*******") + expected_data = "Failed to fetch training job info from db" + response = self.client.get("/trainingjobs/latest",content_type="application/json") + trainingmgr_main.LOGGER.debug(response.data) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal" + assert expected_data in str(response.data) + +class Test_pipeline_notification: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + mocked_mm_sdk=mock.Mock(name="MM_SDK") + attrs_mm_sdk = {'check_object.return_value': True} + mocked_mm_sdk.configure_mock(**attrs_mm_sdk) + mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ") + attrs_TRAININGMGR_CONFIG_OBJ = {'my_ip.return_value': 123, 'my_port.return_value' : 100} + mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ) + message1="Pipeline notification success." + code1=status.HTTP_200_OK + response_tuple1=({"result": message1}, code1) + @patch('trainingmgr.trainingmgr_main.MM_SDK', return_value = mocked_mm_sdk) + @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ) + @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version') + @patch('trainingmgr.trainingmgr_main.update_model_download_url') + @patch('trainingmgr.trainingmgr_main.get_latest_version_trainingjob_name', return_value = "usecase1") + @patch('trainingmgr.trainingmgr_main.response_for_training', return_value = response_tuple1) + def test_pipeline_notification(self,mock1, mock2, mock3, mock4, mock5, mock6): + trainingmgr_main.LOGGER.debug("******* test_pipeline_notification post *******") + trainingjob_req = { + "trainingjob_name":"usecase1", + "run_status":"Succeeded", + } + expected_data = "Pipeline notification success." + response = self.client.post("/trainingjob/pipelineNotification".format("usecase1"),data=json.dumps(trainingjob_req), + content_type="application/json") + trainingmgr_main.LOGGER.debug(response.data) + assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal" + assert expected_data in str(response.data) + + message2="Pipeline notification -Training failed " + code2=status.HTTP_500_INTERNAL_SERVER_ERROR + response_tuple2=({"result": message2}, code2) + @patch('trainingmgr.trainingmgr_main.MM_SDK', return_value = mocked_mm_sdk) + @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ) + @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version') + @patch('trainingmgr.trainingmgr_main.update_model_download_url') + @patch('trainingmgr.trainingmgr_main.get_latest_version_trainingjob_name', return_value = "usecase1") + @patch('trainingmgr.trainingmgr_main.response_for_training', return_value = response_tuple2) + @patch('trainingmgr.trainingmgr_main.change_in_progress_to_failed_by_latest_version', return_value = True) + def test_negative_pipeline_notification(self,mock1, mock2, mock3, mock4, mock5, mock6, mock7): + trainingmgr_main.LOGGER.debug("******* test_pipeline_notification post exception*******") + trainingjob_req = { + "trainingjob_name":"usecase1", + "run_status":"Not_Succeeded", + } + expected_data = "Pipeline notification -Training failed " + response = self.client.post("/trainingjob/pipelineNotification".format("usecase1"), + data=json.dumps(trainingjob_req), + content_type="application/json") + trainingmgr_main.LOGGER.debug(response.data) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal" + assert expected_data in str(response.data) + + db_result4 = [("test_data1","test_data2"),("version1")] + @patch('trainingmgr.trainingmgr_main.get_field_of_given_version', return_value = db_result4) + def test_get_steps_state_2(self,mock1): + trainingmgr_main.LOGGER.debug("******* test_get_steps_state get *******") + expected_data = "test_data1" + response = self.client.get("/trainingjobs///steps_state".format("usecase1"), + content_type="application/json") + trainingmgr_main.LOGGER.debug(response.data) + assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal" + assert expected_data in str(response.data) + + db_result5 = [] + @patch('trainingmgr.trainingmgr_main.get_field_of_given_version', return_value = db_result5) + def test_negative_get_steps_state_2(self,mock1): + expected_data = "Exception" + response = self.client.get("/trainingjobs///steps_state".format("usecase1"), + content_type="application/json") + trainingmgr_main.LOGGER.debug(response.data) + assert response.status_code == status.HTTP_404_NOT_FOUND, "Return status code NOT equal" + assert expected_data in str(response.data) + +class Test_get_trainingjob_by_name_version: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + @patch('trainingmgr.trainingmgr_main.get_info_by_version',return_value=[('usecase7', 'auto test', '*', 'prediction with model name', 'Default', '{"arguments": {"epochs": "1", "usecase": "usecase7"}}', 'Enb=20 and Cellnum=6', datetime.datetime(2022, 9, 20,11, 40, 30), '7d09c0bf-7575-4475-86ff-5573fb3c4716', '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FINISHED"}', datetime.datetime(2022, 9, 20, 11, 42, 20), 1, True, 'Near RT RIC', '{"datalake_source": {"CassandraSource": {}}}', '{"datalake_source": {"CassandraSource": {}}}','http://10.0.0.47:32002/model/usecase7/1/Model.zip','','','','','')]) + @patch('trainingmgr.trainingmgr_main.get_metrics',return_value={"metrics": [{"Accuracy": "0.0"}]}) + @patch('trainingmgr.trainingmgr_main.get_one_key',return_value='cassandra') + def test_get_trainingjob_by_name_version(self,mock1,mock2,mock3): + usecase_name = "usecase7" + version = "1" + response = self.client.get("/trainingjobs/{}/{}".format(usecase_name, version)) + expected_data = b'{"trainingjob": {"trainingjob_name": "usecase7", "description": "auto test", "feature_list": "*", "pipeline_name": "prediction with model name", "experiment_name": "Default", "arguments": {"epochs": "1", "usecase": "usecase7"}, "query_filter": "Enb=20 and Cellnum=6", "creation_time": "2022-09-20 11:40:30", "run_id": "7d09c0bf-7575-4475-86ff-5573fb3c4716", "steps_state": {"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FINISHED"}, "updation_time": "2022-09-20 11:42:20", "version": 1, "enable_versioning": true, "pipeline_version": "Near RT RIC", "datalake_source": "cassandra", "model_url": "{\\"datalake_source\\": {\\"CassandraSource\\": {}}}", "notification_url": "http://10.0.0.47:32002/model/usecase7/1/Model.zip", "_measurement": "", "bucket": "", "accuracy": {"metrics": [{"Accuracy": "0.0"}]}}}' + + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_200_OK, "not equal code" + assert response.data == expected_data, "not equal data" + + @patch('trainingmgr.trainingmgr_main.get_info_by_version',return_value=False) + @patch('trainingmgr.trainingmgr_main.get_metrics',return_value={"metrics": [{"Accuracy": "0.0"}]}) + @patch('trainingmgr.trainingmgr_main.get_one_key',return_value='cassandra') + def test_negative_get_trainingjob_by_name_version(self,mock1,mock2,mock3): + usecase_name = "usecase7" + version = "1" + response = self.client.get("/trainingjobs/{}/{}".format(usecase_name, version)) + expected_data = b'{"trainingjob": {"trainingjob_name": "usecase7", "description": "auto test", "feature_list": "*", "pipeline_name": "prediction with model name", "experiment_name": "Default", "arguments": {"epochs": "1", "usecase": "usecase7"}, "query_filter": "Enb=20 and Cellnum=6", "creation_time": "2022-09-20 11:40:30", "run_id": "7d09c0bf-7575-4475-86ff-5573fb3c4716", "steps_state": {"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FINISHED"}, "updation_time": "2022-09-20 11:42:20", "version": 1, "enable_versioning": true, "pipeline_version": "Near RT RIC", "datalake_source": "cassandra", "model_url": "{\\"datalake_source\\": {\\"CassandraSource\\": {}}}", "notification_url": "http://10.0.0.47:32002/model/usecase7/1/Model.zip", "_measurement": "", "bucket": "", "accuracy": {"metrics": [{"Accuracy": "0.0"}]}}}' + + trainingmgr_main.LOGGER.debug(expected_data) + trainingmgr_main.LOGGER.debug(response.data) + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 404, "not equal code" + +class Test_unpload_pipeline: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + def test_negative_upload_pipeline(self): + pipeline_name = "qoe" + response = self.client.post("/pipelines/{}/upload".format(pipeline_name)) + expected = "jjjj" + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 500, "not equal code" + + @patch('trainingmgr.trainingmgr_main.LOGGER.debug', return_value = True) + def test_negative_upload_pipeline_2(self,mock1): + pipeline_name = "qoe" + response = self.client.post("/pipelines/{}/upload".format(pipeline_name)) + expected = ValueError("file not found in request.files") + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 500, "not equal code" + +class Test_get_steps_state: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + @patch('trainingmgr.trainingmgr_main.get_field_of_given_version',return_value=[['data_extracted','data_pending'], ['data1','data2']]) + def test_get_steps_state(self,mock1): + usecase_name = "usecase7" + version = "1" + response = self.client.get("/trainingjobs/{}/{}/steps_state".format(usecase_name, version)) + expected_data = b'data_extracted' + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_200_OK, "not equal code" + assert response.data == expected_data, "not equal data" + + @patch('trainingmgr.trainingmgr_main.get_field_of_given_version',return_value=False) + def test_negative_get_steps_state(self,mock1): + usecase_name = "usecase7" + version = "1" + response = self.client.get("/trainingjobs/{}/{}/steps_state".format(usecase_name, version)) + expected_data = b'data_extracted' + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 404, "not equal code" + + @patch('trainingmgr.trainingmgr_main.get_field_of_given_version',return_value=Exception("Not found given trainingjob with version")) + def test_negative_get_steps_state_2(self,mock1): + usecase_name = "usecase7" + version = "1" + response = self.client.get("/trainingjobs/{}/{}/steps_state".format(usecase_name, version)) + expected_data = b'data_extracted' + assert response.status_code == 500, "not equal code" + class Test_training_main: def setup_method(self): self.client = trainingmgr_main.APP.test_client(self) self.logger = trainingmgr_main.LOGGER - ## Postive_1 - #@pytest.mark.skip() @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False) @patch('trainingmgr.trainingmgr_main.add_update_trainingjob') def test_trainingjob_operations(self,mock1,mock2): @@ -68,14 +318,12 @@ class Test_training_main: assert response.data == expected_data assert response.status_code == status.HTTP_201_CREATED, "Return status code NOT equal" - ## Postive_2 db_result = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}', '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33', '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}', datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False)] training_data = ('','','','','','','','','','','') - #@pytest.mark.skip() @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True) @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result) @patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = training_data) @@ -104,12 +352,10 @@ class Test_training_main: response = self.client.put("/trainingjobs/".format("usecase1"), data=json.dumps(trainingjob_req), content_type="application/json") - trainingmgr_main.LOGGER.debug(response.data) - + trainingmgr_main.LOGGER.debug(response.data) assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal" assert expected_data in str(response.data) - #@pytest.mark.skip() @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True) def test_negative_trainingjob_operations_post_conflit(self,mock1): trainingmgr_main.LOGGER.debug("******* test_negative_trainingjob_operations_post_conflit *******") @@ -134,15 +380,11 @@ class Test_training_main: response = self.client.post("/trainingjobs/".format("usecase1"), data=json.dumps(trainingjob_req), content_type="application/json") - trainingmgr_main.LOGGER.debug(response.data) - + trainingmgr_main.LOGGER.debug(response.data) assert response.status_code == status.HTTP_409_CONFLICT, "Return status code NOT equal" assert expected_data in str(response.data) - # ***** Start training test **** - # Positive_1 - #@pytest.mark.skip() db_result = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}', '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33', '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}', @@ -156,7 +398,6 @@ class Test_training_main: de_response.headers={"content-type": "application/json"} de_response._content = b'{"task_status": "Completed", "result": "Data Pipeline Execution Completed"}' - #@pytest.mark.skip() @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True) @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result) @patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response) @@ -170,8 +411,6 @@ class Test_training_main: assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal" assert expected_data in str(response.data) - - db_result1 = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}', '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33', '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}', @@ -195,4 +434,229 @@ class Test_training_main: content_type="application/json") trainingmgr_main.LOGGER.debug(response.data) assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal" - assert expected_data in str(response.data) \ No newline at end of file + assert expected_data in str(response.data) + +class Test_get_versions_for_pipeline: + @patch('trainingmgr.common.trainingmgr_config.TMLogger', return_value = TMLogger("tests/common/conf_log.yaml")) + def setup_method(self,mock1,mock2): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + load_dotenv('tests/test.env') + self.TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig() + + the_response = Response() + the_response.code = "expired" + the_response.error_type = "expired" + the_response.status_code = 200 + the_response.headers={"content-type": "application/json"} + the_response._content = b'{"versions_list": ["football", "baseball"]}' + + @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response) + def test_get_versions_for_pipeline(self,mock1): + + response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline")) + trainingmgr_main.LOGGER.debug(response.data) + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 500, "Return status code NOT equal" + + @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = requests.exceptions.ConnectionError('Mocked error')) + def test_negative_get_versions_for_pipeline_1(self,mock1): + response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline")) + print(response.data) + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception " + + @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = TypeError('Mocked error')) + def test_negative_get_versions_for_pipeline_2(self,mock1): + response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline")) + print(response.data) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception " + + the_response1 = Response() + the_response1.code = "expired" + the_response1.error_type = "expired" + the_response1.status_code = 200 + the_response1.headers={"content-type": "application/text"} + the_response._content = b'{"versions_list": ["football", "baseball"]}' + @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response1) + def test_negative_get_versions_for_pipeline_3(self,mock1): + response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline")) + print(response.data) + assert response.content_type != "application/text", "not equal content type" + +class Test_get_all_pipeline_names: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + the_response = Response() + the_response.code = "expired" + the_response.error_type = "expired" + the_response.status_code = 200 + the_response.headers={"content-type": "application/json"} + the_response._content = b'{ "exp1":"id1","exp2":"id2"}' + @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response) + def test_get_all_pipeline_names(self,mock1): + response = self.client.get("/pipelines") + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 500, "Return status code NOT equal" + + @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = requests.exceptions.ConnectionError('Mocked error')) + def test_negative_get_all_pipeline_names_1(self,mock1): + response = self.client.get("/pipelines") + print(response.data) + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception " + + @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = TypeError('Mocked error')) + def test_negative_get_all_pipeline_names_2(self,mock1): + response = self.client.get("/pipelines") + print(response.data) + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception " + + the_response1 = Response() + the_response1.code = "expired" + the_response1.error_type = "expired" + the_response1.status_code = 200 + the_response1.headers={"content-type": "application/text"} + the_response1._content = b'{ "exp1":"id1","exp2":"id2"}' + @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response1) + def test_negative_get_all_pipeline_names_3(self,mock1): + response = self.client.get("/pipelines") + print(response.data) + assert response.content_type != "application/text", "not equal content type" + +class Test_get_all_exp_names: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + the_response = Response() + the_response.code = "expired" + the_response.error_type = "expired" + the_response.status_code = 200 + the_response.headers={"content-type": "application/json"} + the_response._content = b'{ "exp1":"id1","exp2":"id2"}' + @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response) + def test_get_all_experiment_names(self,mock1): + response = self.client.get("/experiments") + print(response.data) + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 500, "Return status code NOT equal" + + @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = requests.exceptions.ConnectionError('Mocked error')) + def test_negative_get_all_experiment_names_1(self,mock1): + response = self.client.get("/experiments") + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception " + + @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = TypeError('Mocked error')) + def test_negative_get_all_experiment_names_2(self,mock1): + response = self.client.get("/experiments") + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception " + + the_response1 = Response() + the_response1.code = "expired" + the_response1.error_type = "expired" + the_response1.status_code = 200 + the_response1.headers={"content-type": "application/text"} + the_response1._content = b'{ "exp1":"id1","exp2":"id2"}' + @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response1) + def test_negative_get_all_experiment_names_3(self,mock1): + response = self.client.get("/experiments") + assert response.content_type != "application/text", "not equal content type" + +class Test_get_metadata: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + resulttt = [('usecase7', '1','auto test', + '*','prediction with model name', + 'Default','Enb=20 and Cellnum=6','epochs:1','FINISHED', + '{"metrics": "FINISHED"}','Near RT RIC','1', + 'Cassandra DB','usecase7', '1','auto test','*', + 'prediction with model name', + 'Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}', + 'Default',False,'Cassandra DB','usecase7', '1','auto test','*','prediction with model name', + 'Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}', + 'Near RT RIC','3','Cassandra DB','usecase7', '1','auto test','*', + 'prediction with model name','Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}','Near RT RIC','3','Cassandra DB') + ] + mock_uc_config_obj = mock.Mock(name='mocked uc_config_obj') + @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', return_value = resulttt) + @patch('trainingmgr.trainingmgr_main.get_metrics', return_value = 90) + @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mock_uc_config_obj) + def test_get_metadata(self,mock1,mock2,mock3): + usecase_name = "usecase7" + response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name)) + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal" + + @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', side_effect = Exception('Mocked error')) + def test_negative_get_metadata_1(self,mock1): + usecase_name = "usecase7" + response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name)) + + print(response.data) + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception " + + class Test_get_model: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + trainingmgr_main.LOGGER = TMLogger("tests/common/conf_log.yaml").logger + self.logger = trainingmgr_main.LOGGER + + @patch('trainingmgr.trainingmgr_main.send_file', return_value = 'File') + def test_negative_get_model(self,mock1): + trainingjob_name = "usecase777" + version = 2 + result = 'File' + response = trainingmgr_main.get_model(trainingjob_name,version) + assert response[1] == 500, "The function get_model Failed" + +class Test_get_metadata_1: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + resulttt = [('usecase7', '1','auto test', + '*','prediction with model name', + 'Default','Enb=20 and Cellnum=6','epochs:1','FINISHED', + '{"metrics": "FINISHED"}','Near RT RIC','1', + 'Cassandra DB','usecase7', '1','auto test','*', + 'prediction with model name', + 'Default',False,'Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}', + 'Default',False,'Cassandra DB','usecase7', '1','auto test','*','prediction with model name', + 'Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}', + 'Near RT RIC','3','Cassandra DB','usecase7', '1','auto test','*', + 'prediction with model name','Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}','Near RT RIC','3','Cassandra DB') + ] + + mock_uc_config_obj = mock.Mock(name='mocked uc_config_obj') + @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', return_value = resulttt) + @patch('trainingmgr.trainingmgr_main.get_metrics', return_value = 90) + @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mock_uc_config_obj) + def test_get_metadata(self,mock1,mock2,mock3): + usecase_name = "usecase7" + response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name)) + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal" + + @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', return_value = None) + def test_negative_get_metadata_1(self,mock1): + usecase_name = "usecase7" + response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name)) + print(response.data) + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_404_NOT_FOUND, "Should have thrown the exception " + + @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False) + def test_training_negative_de_notfound(self,mock1): + trainingmgr_main.LOGGER.debug("******* test_training_404_NotFound *******") + expected_data = '' + response = self.client.post("/trainingjobs//training".format("usecase1"), + content_type="application/json") + trainingmgr_main.LOGGER.debug(response.data) + assert response.status_code == status.HTTP_404_NOT_FOUND, "Return status code NOT equal" diff --git a/tests/test_trainingmgr_config.py b/tests/test_trainingmgr_config.py new file mode 100644 index 0000000..ce944cd --- /dev/null +++ b/tests/test_trainingmgr_config.py @@ -0,0 +1,106 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== +import json +import requests +from unittest import mock +from mock import patch +import pytest +import flask +from requests.models import Response +from threading import Lock +import os +import sys +import datetime +from flask_api import status +from dotenv import load_dotenv +from trainingmgr.common import trainingmgr_config +from trainingmgr.common.trainingmgr_config import TrainingMgrConfig +from trainingmgr.common.tmgr_logger import TMLogger +from trainingmgr import trainingmgr_main +trainingmgr_main.LOGGER = pytest.logger + +class Test_trainingmgr_config: + @patch('trainingmgr.common.trainingmgr_config.TMLogger', return_value = TMLogger("tests/common/conf_log.yaml")) + def setup_method(self,mock1,mock2): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + load_dotenv('tests/test.env') + self.TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig() + + def test_kf_adapter_port(self): + expected_data = '5001' + result = self.TRAININGMGR_CONFIG_OBJ.kf_adapter_port + assert result == expected_data + + def test_kf_adapter_ip(self): + expected_data = 'localhost' + result = self.TRAININGMGR_CONFIG_OBJ.kf_adapter_ip + assert result == expected_data + + def test_data_extraction_port(self): + expected_data = '32000' + result = self.TRAININGMGR_CONFIG_OBJ.data_extraction_port + assert result == expected_data + + def test_data_extraction_ip(self): + expected_data = 'localhost' + result = self.TRAININGMGR_CONFIG_OBJ.data_extraction_ip + assert result == expected_data + + def test_my_port(self): + expected_data = '32002' + x = TrainingMgrConfig + result = self.TRAININGMGR_CONFIG_OBJ.my_port + assert result == expected_data + + def test_my_ip(self): + expected_data = 'localhost' + result = self.TRAININGMGR_CONFIG_OBJ.my_ip + assert result == expected_data + + def test_logger(self): + expected_data = TMLogger("tests/common/conf_log.yaml").logger + result = self.TRAININGMGR_CONFIG_OBJ.logger + assert result == expected_data + + def test_ps_user(self): + expected_data = 'postgres' + result = self.TRAININGMGR_CONFIG_OBJ.ps_user + assert result == expected_data + + def test_ps_password(self): + expected_data = "abcd" + result = self.TRAININGMGR_CONFIG_OBJ.ps_password + assert result == expected_data + + def test_ps_ip(self): + expected_data = 'localhost' + result = self.TRAININGMGR_CONFIG_OBJ.ps_ip + assert result == expected_data + + def test_ps_port(self): + expected_data = '30001' + x = TrainingMgrConfig + result = self.TRAININGMGR_CONFIG_OBJ.ps_port + assert result == expected_data + + def test_is_config_loaded_properly(self): + expected_data = True + result = TrainingMgrConfig.is_config_loaded_properly(self.TRAININGMGR_CONFIG_OBJ) + assert result == expected_data + diff --git a/tests/test_trainingmgr_operations.py b/tests/test_trainingmgr_operations.py new file mode 100644 index 0000000..54eb4fe --- /dev/null +++ b/tests/test_trainingmgr_operations.py @@ -0,0 +1,87 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== +import json +import requests +from unittest import mock +from mock import patch +import pytest +import flask +from requests.models import Response +from threading import Lock +import os +import sys +import datetime +from flask_api import status +from dotenv import load_dotenv +from threading import Lock +from trainingmgr import trainingmgr_main +from trainingmgr.common import trainingmgr_operations +from trainingmgr.common.tmgr_logger import TMLogger +from trainingmgr.common.trainingmgr_config import TrainingMgrConfig +trainingmgr_main.LOGGER = pytest.logger +trainingmgr_main.LOCK = Lock() +trainingmgr_main.DATAEXTRACTION_JOBS_CACHE = {} + +class DummyVariable: + kf_adapter_ip = "localhost" + kf_adapter_port = 5001 + logger = trainingmgr_main.LOGGER + +class Test_training_start: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + def test_negative_training_start(self): + training_config_obj = DummyVariable() + dict_data = { + "brand": "Ford", + "model": "Mustang", + "year": 1964 + } + trainingjob_name = "usecase12" + expected_data = { + "brand": "Ford", + "model": "Mustang", + "year": 1964 + } + try: + response = trainingmgr_operations.training_start(training_config_obj,dict_data,trainingjob_name) + assert response == expected_data,"data not equal" + assert False + except Exception: + assert True + +class Test_upload_pipeline: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ") + attrs_TRAININGMGR_CONFIG_OBJ = {'kf_adapter_ip.return_value': '123', 'kf_adapter_port.return_value' : '100'} + mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ) + @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ) + def test_upload_pipeline_negative(self, mock1): + expected_data = "result" + trainingjob_req = { + "pipe_name":"usecase1", + } + response = self.client.post("/pipelines//upload".format("usecase1"), data=json.dumps(trainingjob_req), + content_type="application/json") + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert expected_data in response.json.keys() diff --git a/tests/test_trainingmgr_ps_db.py b/tests/test_trainingmgr_ps_db.py new file mode 100644 index 0000000..2355829 --- /dev/null +++ b/tests/test_trainingmgr_ps_db.py @@ -0,0 +1,87 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== +import pytest +import sys +import os +from mock import patch +from trainingmgr.db.trainingmgr_ps_db import PSDB +from dotenv import load_dotenv + +class cred_handle: + def __init__(self): + self.ps_user = "gdgdgd" + self.ps_password = "hdhd" + self.ps_ip = 12345 + self.ps_port = 1000 + + def logger(self): + return cred_handle() + + def error(self): + raise Exception('Connection Failed To Exist') + +class Cursor: + def __init__(self, db_name): + self.db_name = db_name + + def execute(self, query): + pass + + def fetchall(self): + return [self.db_name] + + def close(self): + pass + +class connection: + def __init__(self, db_name): + self.autocommit = False + self.db_name = db_name + + def cursor(self): + return Cursor(self.db_name) + + def rollback(self): + pass + + def commit(self): + pass + + def close(self): + pass + +class Test_PSDB: + @patch('trainingmgr.db.trainingmgr_ps_db.pg8000.dbapi.connect', return_value = connection('usecase_manager_database')) + def setup_method(self,mock1,mock2): + self.obj = PSDB(cred_handle()) + + def test_init_psdb(self): + assert self.obj != None, 'PSDB Object Creation Failed' + + @patch('trainingmgr.db.trainingmgr_ps_db.pg8000.dbapi.connect', return_value = connection('usecase_manager_database')) + def test_get_new_conn(self, mock1): + out = self.obj.get_new_conn() + assert out != None, 'New Connection Failed' + + def test_negative_get_new_conn(self): + try: + out = self.obj.get_new_conn() + assert out != None, 'New Connection Failed' + assert False + except Exception: + assert True \ No newline at end of file diff --git a/tests/test_trainingmgr_util.py b/tests/test_trainingmgr_util.py new file mode 100644 index 0000000..3721fed --- /dev/null +++ b/tests/test_trainingmgr_util.py @@ -0,0 +1,640 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +"""" +This file contains the unittesting for Training management utility functions +""" +from pickle import FALSE +import sys +from mock import patch +from threading import Lock +import pytest +import datetime +from dotenv import load_dotenv +from flask_api import status +import logging + +from trainingmgr.db.trainingmgr_ps_db import PSDB +import trainingmgr.trainingmgr_main +from trainingmgr.common import trainingmgr_util +from trainingmgr.common.tmgr_logger import TMLogger +from trainingmgr.common.trainingmgr_config import TrainingMgrConfig +from trainingmgr.common.trainingmgr_util import response_for_training, check_key_in_dictionary,check_trainingjob_data, \ + get_one_key, get_metrics, handle_async_feature_engineering_status_exception_case, get_one_word_status, check_trainingjob_data, \ + validate_trainingjob_name +from requests.models import Response +from trainingmgr import trainingmgr_main +from trainingmgr.common.tmgr_logger import TMLogger +trainingmgr_main.LOGGER = pytest.logger + +class Test_response_for_training: + def setup_method(self): + self.client = trainingmgr_main.APP.test_client(self) + self.logger = trainingmgr_main.LOGGER + + fs_result = Response() + fs_result.code = "expired" + fs_result.error_type = "expired" + fs_result.status_code = status.HTTP_200_OK + fs_result.headers={'content-type': 'application/json'} + fs_result._content={'Accept-Charset': 'UTF-8'} + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT") + @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1) + @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result) + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version') + def test_response_for_training(self,mock1,mock2, mock3, mock4, mock5, mock6): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr.trainingmgr_main.LOGGER + is_success = True + trainingjob_name = "usecase7" + ps_db_obj = () + mm_sdk = () + result = response_for_training(code, message, logger, is_success, trainingjob_name, ps_db_obj, mm_sdk) + assert result != None + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT") + @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1) + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version') + def test_negative_response_for_training(self,mock1,mock2, mock3, mock4, mock5): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT") + @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1) + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version') + def test_negative_response_for_training_2(self,mock1,mock2, mock3, mock4, mock5): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + def test_negative_response_for_training_3(self): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + def test_negative_response_for_training_4(self,mock1,mock2): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT") + def test_negative_response_for_training_5(self,mock1,mock2,mock3): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT") + @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1) + def test_negative_response_for_training_6(self,mock1,mock2,mock3,mock4): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + def test_negative_response_for_training_7(self,mock1): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1) + def test_negative_response_for_training_8(self,mock1,mock2,mock3): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version') + def test_negative_response_for_training_9(self,mock1): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + def test_negative_response_for_training_10(self,mock1): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecalse7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT") + @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1) + @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result) + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version') + def test_negative_response_for_training_11(self,mock1,mock2,mock3,mock4,mock5): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT") + @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1) + @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result) + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version') + def test_negative_response_for_training_12(self,mock1,mock2,mock3,mock4,mock5,mock6): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + assert False + except Exception: + assert True + +class Test_response_for_training_1: + fs_result = Response() + fs_result.code = "expired" + fs_result.error_type = "expired" + fs_result.status_code = status.HTTP_200_OK + fs_result.headers={'content-type': 'application/jsn'} + fs_result._content={'Accept-Charset': 'UTF-8'} + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT") + @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1) + @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result) + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version') + def test_response_for_training_1(self,mock1,mock2, mock3, mock4, mock5, mock6): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + +class Test_response_for_training_2: + fs_result = Response() + fs_result.code = "expired" + fs_result.error_type = "expired" + fs_result.status_code = status.HTTP_404_NOT_FOUND + fs_result.headers={'content-type': 'application/json'} + fs_result._content={'Accept-Charset': 'UTF-8'} + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT") + @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1) + @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result) + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version') + def test_response_for_training_2(self,mock1,mock2, mock3, mock4, mock5, mock6): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + +class Test_response_for_training_3: + fs_result = Response() + fs_result.code = "expired" + fs_result.error_type = "expired" + fs_result.status_code = status.HTTP_200_OK + fs_result.headers={'content-type': 'application/jsn'} + fs_result._content={'Accept-Charset': 'UTF-8'} + + @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']]) + @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT") + @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1) + @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result) + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version') + @patch('requests.post',return_result=Exception(status.HTTP_500_INTERNAL_SERVER_ERROR)) + def test_negative_response_for_training_3_1(self,mock1,mock2, mock3, mock4, mock5, mock6, mock7): + code = status.HTTP_500_INTERNAL_SERVER_ERROR + message = "Run status is not scheduled for " + logger = trainingmgr_main.LOGGER + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + try: + response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk) + assert False + except Exception: + assert True + +class Test_check_key_in_dictionary: + def test_check_key_in_dictionary(self): + fields = ["model","brand","year"] + dictionary = { + "brand": "Ford", + "model": "Mustang", + "year": 1964 + } + assert check_key_in_dictionary(fields, dictionary) == True, "data not equal" + + def test_check_key_in_dictionary(self): + fields = ["model","brand","type"] + dictionary = { + "brand": "Ford", + "model": "Mustang", + "year": 1964 + } + assert check_key_in_dictionary(fields, dictionary) == False, "data not equal" + + def test_negative_check_key_in_dictionary_1(self): + fields = ["Ford","Apple","Mosquito"] + dictionary = { + "brand": "Ford", + "model": "Mustang", + "year": 1964 + } + try: + check_key_in_dictionary(fields, dictionary) + assert False + except Exception: + assert True + +class Test_check_trainingjob_data: + @patch('trainingmgr.common.trainingmgr_util.check_key_in_dictionary',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.isinstance',return_value=True) + def test_check_trainingjob_data(self,mock1,mock2): + usecase_name = "usecase8" + json_data = { "description":"unittest", "feature_list": ["apple", "banana", "cherry"] , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "_measurement":2 , "bucket":"bucket1"} + + expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1') + assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal" + + def test_negative_check_trainingjob_data_1(self): + usecase_name = "usecase8" + json_data = { "description":"unittest", "feature_list": ["apple", "banana", "cherry"] , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "_measurement":2 , "bucket":"bucket1"} + + expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1') + try: + assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal" + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.check_key_in_dictionary',return_value=True) + def test_negative_check_trainingjob_data_2(self,mock1): + usecase_name = "usecase8" + json_data = { "description":"unittest", "feature_list": ["apple", "banana", "cherry"] , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "_measurement":2 , "bucket":"bucket1"} + + expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1') + try: + assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal" + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.isinstance',return_value=True) + def test_negative_check_trainingjob_data_3(self,mock1): + usecase_name = "usecase8" + json_data = None + expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1') + try: + assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal" + assert False + except Exception: + assert True + +class Test_get_one_key: + def test_get_one_key(self): + dictionary = { + "brand": "Ford", + "model": "Mustang", + "year": 1964 + } + only_key = "year" + expected_data = only_key + assert get_one_key(dictionary) == expected_data,"data not equal" + + def test_get_one_key_2(self): + dictionary = {'name': 'Jack', 'age': 26} + only_key = "age" + expected_data = only_key + assert get_one_key(dictionary) == expected_data,"data not equal" + + def test_negative_get_one_key_1(self): + dictionary = { + "brand": "Ford", + "model": "Mustang", + "year": 1964 + } + only_key = "model" + expected_data = only_key + try: + assert get_one_key(dictionary) == expected_data,"data not equal" + assert False + except Exception: + assert True + + def test_negative_get_one_key_2(self): + dictionary = {'name': 'Jack', 'age': 26} + only_key = "name" + expected_data = only_key + try: + assert get_one_key(dictionary) == expected_data,"data not equal" + assert False + except Exception: + assert True + +class dummy_mmsdk: + def check_object(self, param1, param2, param3): + return True + + def get_metrics(self, usecase_name, version): + thisdict = { + "brand": "Ford", + "model": "Mustang", + "year": 1964 + } + return thisdict + +class Test_get_metrics: + @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value='usecase_data') + def test_get_metrics_with_version(self,mock1): + usecase_name = "usecase7" + version = 1 + mm_sdk = dummy_mmsdk() + expected_data = 'usecase_data' + get_metrics(usecase_name, version, dummy_mmsdk()) + assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal" + + @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value=None) + def test_negative_get_metrics_1(self,mock1): + usecase_name = "usecase7" + version = 1 + mm_sdk = dummy_mmsdk() + expected_data = 'usecase_data' + try: + assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal" + assert False + except Exception: + assert True + + @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value=Exception("Problem while downloading metrics")) + def test_negative_get_metrics_2(self,mock1): + usecase_name = "usecase7" + version = 1 + mm_sdk = dummy_mmsdk() + expected_data = 'usecase_data' + try: + assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal" + assert False + except Exception: + assert True + + def test_negative_get_metrics_3(self): + usecase_name = "usecase7" + version = 1 + mm_sdk = dummy_mmsdk() + expected_data = 'final_data' + try: + get_metrics(usecase_name, version, dummy_mmsdk()) + assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal" + assert False + except Exception: + assert True + +class dummy_mmsdk_1: + def check_object(self, param1, param2, param3): + return False + + def get_metrics(self, usecase_name, version): + thisdict = { + "brand": "Ford", + "model": "Mustang", + "year": 1964 + } + return thisdict + +class Test_get_metrics_2: + @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value='usecase_data') + def test_negative_get_metrics_2_1(self,mock1): + usecase_name = "usecase7" + version = 1 + mm_sdk = dummy_mmsdk_1() + expected_data = 'usecase_data' + get_metrics(usecase_name, version, dummy_mmsdk()) + try: + get_metrics(usecase_name, version, dummy_mmsdk()) + assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal" + assert False + except Exception: + assert True + +class Test_handle_async_feature_engineering_status_exception_case: + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.response_for_training',return_value=True) + def test_handle_async_feature_engineering_status_exception_case(self, mock1, mock2): + lock = Lock() + featurestore_job_cache = {'usecase7': 'Geeks', 2: 'For', 3: 'Geeks'} + code = 123 + message = "Into the field" + logger = "123" + is_success = True + usecase_name = "usecase7" + ps_db_obj = () + mm_sdk = () + assert handle_async_feature_engineering_status_exception_case(lock, featurestore_job_cache, code, + message, logger, is_success, + usecase_name, ps_db_obj, mm_sdk) == None,"data not equal" + + @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version',return_value=True) + @patch('trainingmgr.common.trainingmgr_util.response_for_training',return_value=True) + # @patch('trainingmgr.common.trainingmgr_util.dataextraction_job_cache',return_value = Exception("Could not get info from db for ")) + def test_negative_handle_async_feature_engineering_status_exception_case(self, mock1, mock2): + lock = Lock() + featurestore_job_cache = {'usecase7': 'Geeks', 2: 'For', 3: 'Geeks'} + code = 123 + message = "Into the field" + logger = "123" + is_success = True + usecase_name = "" + ps_db_obj = () + mm_sdk = () + try: + handle_async_feature_engineering_status_exception_case(lock, featurestore_job_cache, code, + message, logger, is_success, + usecase_name, ps_db_obj, mm_sdk) + assert handle_async_feature_engineering_status_exception_case(lock, featurestore_job_cache, code, + message, logger, is_success, + usecase_name, ps_db_obj, mm_sdk) == None,"data not equal" + assert False + except Exception: + assert True + +class Test_get_one_word_status: + def test_get_one_word_status(self): + steps_state = [0,1,2,3] + expected_data = "IN_PROGRESS" + assert get_one_word_status(steps_state) == expected_data,"data not equal" + +class Test_validate_trainingjob_name: + @patch('trainingmgr.common.trainingmgr_util.get_all_versions_info_by_name',return_value=True) + def test_validate_trainingjob_name_1(self,mock1): + trainingjob_name = "usecase8" + ps_db_obj = () + expected_data = True + assert validate_trainingjob_name(trainingjob_name,ps_db_obj) == expected_data,"data not equal" + + @patch('trainingmgr.common.trainingmgr_util.get_all_versions_info_by_name', return_value = Exception("Could not get info from db for ")) + def test_validate_trainingjob_name_2(self,mock1): + trainingjob_name = "usecase8" + ps_db_obj = () + expected_data = True + try: + validate_trainingjob_name(trainingjob_name,ps_db_obj) + assert validate_trainingjob_name(trainingjob_name,ps_db_obj) == expected_data,"data not equal" + assert False + except Exception: + assert True + + def test_negative_validate_trainingjob_name(self): + trainingjob_name = "usecase8" + ps_db_obj = () + expected_data = True + try: + validate_trainingjob_name(trainingjob_name,ps_db_obj) + assert validate_trainingjob_name(trainingjob_name,ps_db_obj) == expected_data,"data not equal" + assert False + except Exception: + assert True -- 2.16.6