# ==================================================================================
#Change the current working directory to home directory
tm
-
# Directory struture
tm
|__tests
# Example to run test cases.
# Generate test report
-sudo python3 -m pytest -rA . --capture=tee-sys --cov-report term-missing --cov-report xml:coverage.xml --cov-report html:htmlcov --junitxml test-reports/junit.xml --cov=./
\ No newline at end of file
+python3 -m pytest -rA . --capture=tee-sys --cov-report term-missing --cov-report xml:coverage.xml --cov-report html:htmlcov --junitxml test-reports/junit.xml --cov=./trainingmgr/
\ No newline at end of file
class: logging.handlers.RotatingFileHandler
level: DEBUG
formatter: simple
- filename: ./training_manager_test.log
+ filename: training_manager.log
maxBytes: 10485760
backupCount: 20
encoding: utf8
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+KF_ADAPTER_PORT=5001
+KF_ADAPTER_IP=localhost
+DATA_EXTRACTION_API_PORT=32000
+DATA_EXTRACTION_API_IP=localhost
+DEPLOYMENT_MANAGER_PORT=9999
+DEPLOYMENT_MANAGER_IP=localhost
+TRAINING_MANAGER_PORT=32002
+TRAINING_MANAGER_IP=localhost
+PS_USER=postgres
+PS_PASSWORD="abcd"
+PS_IP="localhost"
+PS_PORT="30001"
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import pytest
+import sys
+from dotenv import load_dotenv
+import json
+from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainingjobs, \
+ change_field_of_latest_version, change_field_value_by_version, \
+ get_field_by_latest_version, get_field_of_given_version, \
+ change_in_progress_to_failed_by_latest_version, change_steps_state_of_latest_version, \
+ change_steps_state_by_version, delete_trainingjob_version, get_info_by_version, \
+ get_trainingjob_info_by_name, get_latest_version_trainingjob_name, \
+ get_all_versions_info_by_name, get_all_distinct_trainingjobs, \
+ get_all_version_num_by_trainingjob_name, update_model_download_url, \
+ add_update_trainingjob, get_all_jobs_latest_status_version
+
+mimic_db = {
+ "usecase_name": "Tester",
+ "description": "Current UseCase Is For Testing Only",
+ "feature_list": "*",
+ "pipeline_name": "qoe-pipeline",
+ "experiment_name": "default",
+ "arguments": "{epoches : 1}",
+ "query_filter": "",
+ "creation_time": "29-09-2022",
+ "run_id": 1729,
+ "steps_state": json.dumps({"DATA_EXTRACTION" : "IN_PROGRESS"}),
+ "updation_time": "29-09-2022",
+ "version": 1,
+ "enable_versioning": True,
+ "target_deployment": "Near RT Ric",
+ "pipeline_version": 1,
+ "datalake_source": "InfluxSource",
+ "incremental_training": False,
+ "model": "",
+ "model_version": "",
+ "model_url":"",
+ "notification_url": "",
+ "_measurement": "liveCell",
+ "bucket": "UEdata",
+ "accuracy": 70
+ }
+
+class db_helper:
+ '''Mimics as a Db'''
+ def __init__(self, req_cols, raise_exception = False, check_success_obj = None):
+ self.cols = req_cols
+ self.raise_exception = raise_exception
+ self.check_success_obj = check_success_obj
+ self.counter = 0
+
+ def get_new_conn(self):
+ return db_helper(self.cols, self.raise_exception, self.check_success_obj)
+
+ def cursor(self):
+ return db_helper(self.cols, self.raise_exception, self.check_success_obj)
+
+ def execute(self, query, values = None):
+ if self.raise_exception:
+ raise Exception("DB Error")
+
+ def fetchall(self):
+ out = []
+ if(len(self.cols) > 0):
+ if(self.cols[self.counter][0] == "*"):
+ for (col, value) in mimic_db.items():
+ out.append(value)
+ elif(self.cols[self.counter][0] == None):
+ self.counter += 1
+ return None
+ else:
+ for col in self.cols[self.counter]:
+ out.append(mimic_db[col])
+ self.counter += 1
+ return [out]
+
+ def close(self):
+ ''' For checking success in fxn not returning anything, If you call close, then It means query as exceuted as expected '''
+ if self.check_success_obj:
+ self.check_success_obj.setwin()
+
+ def rollback(self):
+ pass
+
+ def commit(self):
+ pass
+
+class Check:
+ def __init__(self):
+ self.finished = False
+
+ def setwin(self):
+ self.finished = True
+
+class Test_Common_Db_Fun:
+ def setup_method(self):
+ pass
+
+ def test_get_data_extraction_in_progress_trainingjobs(self):
+ db_obj = db_helper([["usecase_name", "steps_state"]])
+ out = get_data_extraction_in_progress_trainingjobs(db_obj)
+
+ assert out != None, 'Function get_data_extraction_in_progress_trainingjobs has failed'
+
+ def test_get_data_extraction_in_progress_trainingjobs(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["usecase_name", "steps_state"]], raise_exception=True, check_success_obj=checker)
+ out = get_data_extraction_in_progress_trainingjobs(db_obj)
+ assert out != None, 'Fxn get_usecases_which_has_data_extraction_in_progress Failed'
+ except Exception as err:
+ assert str(err) == "bad operand type for unary +: 'str'", 'Negative test get_usecases_which_has_data_extraction_in_progress FAILED, Doesnt returned required error'
+ assert checker.finished, 'Cursor Not Closed Properly for fxn test_negative_get_usecases_which_has_data_extraction_in_progress'
+
+ def test_change_field_of_latest_version(self):
+ for field in ["notification_url", "run_id"]:
+ checker = Check()
+ db_obj = db_helper([["version", "usecase_name"]], check_success_obj=checker)
+ change_field_of_latest_version("Tester", db_obj, field, "a_dummy_value")
+
+ assert checker.finished, 'change_field_of_latest_version Failed For Usecame {} and Field {} '.format("Tester", field)
+
+ def test_negative_change_field_of_latest_version(self):
+ for field in ["notification_url", "run_id"]:
+ checker = Check()
+ db_obj = db_helper([["version", "usecase_name"]],raise_exception=True ,check_success_obj=checker)
+ try:
+ change_field_of_latest_version("Tester", db_obj, field, "a_dummy_value")
+ assert checker.finished, 'change_field_of_latest_version Failed For Usecame {} and Field {} '.format("Tester", field)
+ assert False
+ except Exception:
+ assert True
+
+ def test_change_field_value_by_version_2(self):
+ checker = Check()
+ db_obj = db_helper([[None]], check_success_obj=checker)
+ change_field_value_by_version("Tester", 1, db_obj, "deletion_in_progress", "a_dummy_value")
+ assert checker.finished, 'change_field_of_given_version FAILED'
+
+ def test_negative_change_field_value_by_version_2(self):
+ checker = Check()
+ db_obj = db_helper([[None]], raise_exception=True, check_success_obj=checker)
+ try:
+ change_field_value_by_version("Tester", 1, db_obj, "deletion_in_progress", "a_dummy_value")
+ assert checker.finished, 'change_field_value_by_version FAILED'
+ assert False
+ except Exception:
+ assert True
+
+ def test_get_field_by_latest_version(self):
+ for field in ["notification_url", "model_url", "target_deployment"]:
+ db_obj = db_helper([["version"], [field]])
+ out = get_field_by_latest_version("Tester", db_obj, field)
+ assert out != None, 'get_field_by_latest_version FAILED at field = {}'.format(field)
+
+ def test_negative_get_field_by_latest_version(self):
+ for field in ["notification_url", "model_url", "target_deployment"]:
+ checker = Check()
+ try:
+ db_obj = db_helper([["version"], [field]], raise_exception=True, check_success_obj=checker)
+ out = get_field_by_latest_version("Tester", db_obj, field)
+ assert out != None, 'get_field_by_latest_version FAILED at field = {}'.format(field)
+ assert False
+ except Exception:
+ assert True
+
+ def test_get_field_of_given_version(self):
+ db_obj = db_helper([["steps_state"]])
+ out = get_field_of_given_version("Tester", 1, db_obj, "steps_state")
+ assert out != None, ' test_get_field_of_given_version FAILED'
+
+ def test_negative_get_field_of_given_version(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["steps_state"]], raise_exception=True, check_success_obj=checker)
+ out = get_field_of_given_version("Tester", 1, db_obj, "steps_state")
+ assert out != None, ' test_get_field_of_given_version FAILED'
+ assert False
+ except Exception:
+ assert True
+
+ def test_change_in_progress_to_failed_by_latest_version(self):
+ checker = Check()
+ db_obj = db_helper([["version"], ["steps_state"]], check_success_obj=checker)
+ change_in_progress_to_failed_by_latest_version("Tester", db_obj)
+ assert checker.finished, 'change_in_progress_to_failed_by_latest_version FAILED'
+
+ def test_negative_change_in_progress_state_to_failed_of_latest_version(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["version"], ["steps_state"]], raise_exception=True,check_success_obj=checker)
+ change_in_progress_to_failed_by_latest_version("Tester", db_obj)
+ assert checker.finished, 'change_in_progress_to_failed_by_latest_version FAILED'
+ except Exception as err:
+ fxn_name = "change_in_progress_to_failed_by_latest_version("
+ assert str(err) == "bad operand type for unary +: 'str'", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+ def test_change_steps_state_of_latest_version(self):
+ checker = Check()
+ db_obj = db_helper([["version"], ["steps_state"], [None]], check_success_obj=checker)
+ change_steps_state_of_latest_version("Tester", db_obj, 1, 2) # Dummy Key and Values
+ assert checker.finished, 'change_steps_state_of_latest_version FAILED'
+
+ def test_negative_change_steps_state_of_latest_version(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["version"], ["steps_state"], [None]], raise_exception=True ,check_success_obj=checker)
+ change_steps_state_of_latest_version("Tester", db_obj, 1, 2) # Dummy Key and Values
+ assert checker.finished, 'change_steps_state_of_latest_version FAILED'
+ except Exception as err:
+ fxn_name = "change_steps_state_of_latest_version"
+ assert str(err) == "Failed to execute query in change_steps_state_of_latest_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+ def test_change_steps_state_by_version(self):
+ checker = Check()
+ db_obj = db_helper([["steps_state"], [None]] , check_success_obj=checker)
+ change_steps_state_by_version("Tester", 1, db_obj, 1, 2) # Dummy Key and Values
+ assert checker.finished, 'change_steps_state_by_version FAILED'
+
+ def test_negative_change_steps_state_by_version(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["steps_state"], [None]] , raise_exception=True,check_success_obj=checker)
+ change_steps_state_by_version("Tester", 1, db_obj, 1, 2) # Dummy Key and Values
+ assert checker.finished, 'change_steps_state_by_version FAILED'
+ except Exception as err:
+ fxn_name = "change_steps_state_by_version"
+ assert str(err) == "Failed to execute query in change_steps_state_by_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+ def test_delete_trainingjob_version(self):
+ checker = Check()
+ db_obj = db_helper([[None]], check_success_obj=checker)
+ delete_trainingjob_version("Tester", 1, db_obj)
+ assert checker.finished, 'delete_trainingjob_version FAILED'
+
+ def test_negative_delete_trainingjob_version(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([[None]], raise_exception=True,check_success_obj=checker)
+ delete_trainingjob_version("Tester", 1, db_obj)
+ assert checker.finished, 'delete_trainingjob_version FAILED'
+ except Exception as err:
+ fxn_name = "delete_trainingjob_version"
+ assert str(err) == "Failed to execute query in delete_trainingjob_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+ def test_get_info_by_version(self):
+ db_obj = db_helper([["*"]])
+ out = get_info_by_version("Tester", 1, db_obj)
+ assert out != None, 'get_info_by_version FAILED'
+
+ def test_negative_get_info_by_version(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["*"]], raise_exception=True,check_success_obj=checker)
+ out = get_info_by_version("Tester", 1, db_obj)
+ assert out != None, 'get_info_by_version FAILED'
+ except Exception as err:
+ fxn_name = "get_info_by_version"
+ assert str(err) == "Failed to execute query in get_info_by_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+ def test_get_trainingjob_info_by_name(self):
+ db_obj = db_helper([["version"], ["*"]])
+ out = get_trainingjob_info_by_name("Tester", db_obj)
+ assert out != None, 'get_trainingjob_info_by_name FAILED'
+
+ def test_negative_get_trainingjob_info_by_name(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["version"], ["*"]], raise_exception=True,check_success_obj=checker)
+ out = get_trainingjob_info_by_name("Tester", db_obj)
+ assert out != None, 'get_trainingjob_info_by_name FAILED'
+ except Exception as err:
+ fxn_name = "get_trainingjob_info_by_name"
+ assert str(err) == "Failed to execute query in get_trainingjob_info_by_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+ def test_get_latest_version_trainingjob_name(self):
+ db_obj = db_helper([["version"]])
+ out = get_latest_version_trainingjob_name("Tester", db_obj)
+ assert type(out) == int, 'get_latest_version_trainingjob_name FAILED'
+
+ def test_negative_get_latest_version_trainingjob_name(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["version"]], raise_exception=True,check_success_obj=checker)
+ out = get_latest_version_trainingjob_name("Tester", db_obj)
+ assert type(out) == int, 'get_latest_version FAILED'
+ except Exception as err:
+ fxn_name = "get_latest_version"
+ assert str(err) == "Failed to execute query in get_latest_version_trainingjob_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+ def test_get_all_distinct_trainingjobs(self):
+ db_obj = db_helper([["usecase_name"]])
+ out = get_all_distinct_trainingjobs(db_obj)
+ assert type(out) == list, 'get_all_distinct_trainingjobs FAILED'
+
+ def test_negative_get_all_distinct_trainingjobs(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["usecase_name"]], raise_exception=True,check_success_obj=checker)
+ out = get_all_distinct_trainingjobs(db_obj)
+ assert type(out) == list, 'get_all_distinct_trainingjobs FAILED'
+ except Exception as err:
+ fxn_name = "get_all_distinct_trainingjobs"
+ assert str(err) == "Failed to execute query in get_all_distinct_trainingjobsDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+
+ def test_get_all_version_num_by_trainingjob_name(self):
+ db_obj = db_helper([["version"]])
+ out = get_all_version_num_by_trainingjob_name("Tester", db_obj)
+ assert type(out) == list, 'get_all_version_num_by_trainingjob_name FAILED'
+
+ def test_negative_get_all_version_num_by_trainingjob_name(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["version"]], raise_exception=True,check_success_obj=checker)
+ out = get_all_version_num_by_trainingjob_name("Tester", db_obj)
+ assert type(out) == list, 'get_all_version_num_by_trainingjob_name FAILED'
+ except Exception as err:
+ fxn_name = "get_all_version_num_by_trainingjob_name"
+ assert str(err) == "Failed to execute query in get_all_version_num_by_trainingjob_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+ def test_update_model_download_url(self):
+ checker = Check()
+ db_obj = db_helper([[None]], check_success_obj=checker)
+ update_model_download_url("Tester", 1, "http/dummy/url", db_obj)
+ assert checker.finished, 'update_model_download_url FAILED'
+
+ def test_negative_update_model_download_url(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([[None]], raise_exception=True ,check_success_obj=checker)
+ update_model_download_url("Tester", 1, "http/dummy/url", db_obj)
+ assert checker.finished, 'update_model_download_url FAILED'
+ except Exception as err:
+ fxn_name = "update_model_download_url"
+ assert str(err) == "Failed to execute query in update_model_download_urlDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+ def test_add_update_trainingjob(self):
+ checker = Check()
+ db_obj = db_helper([[None]], check_success_obj=checker)
+ add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, True, 1, 'InfluxSource', 'Tester',db_obj)
+ assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
+
+ def test_negative_add_update_trainingjob_2(self):
+ checker = Check()
+ db_obj = db_helper([[None]], check_success_obj=checker)
+ add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, False, 1, 'InfluxSource', 'Tester',db_obj)
+ assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
+
+ def test_negative_add_update_trainingjob_3(self):
+ checker = Check()
+ db_obj = db_helper([[None]], check_success_obj=checker)
+ try:
+ add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', False, True, 1, 'InfluxSource', 'Tester',db_obj)
+ assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
+ assert False
+ except Exception:
+ assert True
+
+ def test_negative_add_update_trainingjob_4(self):
+ checker = Check()
+ db_obj = db_helper([[None]], check_success_obj=checker)
+ try:
+ add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', False, False, 1, 'InfluxSource', 'Tester',db_obj)
+ assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
+ assert False
+ except Exception:
+ assert True
+
+ def test_negative_add_update_trainingjob_5(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([[None]], raise_exception=True, check_success_obj=checker)
+ add_update_trainingjob('Testing ', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, True, 'Near RT-RIC', 1, 'InfluxSource', 'Tester'
+ ,True, '', '', db_obj, '', 'liveCell', 'UEData')
+ assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
+ except Exception as err:
+ fxn_name = "add_update_trainingjob"
+ assert str(err) == "add_update_trainingjob() takes from 12 to 15 positional arguments but 19 were given", 'Negative test {} FAILED when adding = True , Doesnt returned required error'.format(fxn_name)
+
+ def test_get_all_jobs_latest_status_version(self):
+ db_obj = db_helper([["usecase_name"]])
+ out = get_all_jobs_latest_status_version(db_obj)
+ assert out == [['Tester']], 'get_all_distinct_trainingjobs FAILED'
+
+ def test_negative_get_all_jobs_latest_status_version(self):
+ checker = Check()
+ try:
+ db_obj = db_helper([["usecase_name"]], raise_exception=True,check_success_obj=checker)
+ out = get_all_jobs_latest_status_version(db_obj)
+ assert type(out) == list, 'get_all_jobs_latest_status_version FAILED'
+ except Exception as err:
+ fxn_name = "get_all_jobs_latest_status_version"
+ assert str(err) == "Failed to execute query in get_all_jobs_latest_status_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+ def test_get_all_versions_info_by_name(self):
+ trainingjob_name = "usecase556"
+ db_obj = db_helper([["usecase_name"]])
+ out = get_all_versions_info_by_name(trainingjob_name,db_obj)
+ assert out == [['Tester']], 'get_all_versions_info_by_name FAILED'
+
+ def test_negative_get_all_versions_info_by_name(self):
+ checker = Check()
+ trainingjob_name = "usecase556"
+ my_dict = dict([(1,'apple'), (2,'ball')])
+ db_obj = db_helper([["usecase_name"]])
+ try:
+ db_obj = db_helper([["usecase_name"]], raise_exception=True,check_success_obj=checker)
+ out = get_all_versions_info_by_name(trainingjob_name,db_obj)
+ assert type(out) == list, 'get_all_jobs_latest_status_version FAILED'
+ except Exception as err:
+ fxn_name = "get_all_versions_info_by_name"
+ assert str(err) == "Failed to execute query in get_all_versions_info_by_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+ assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
#
# ==================================================================================
import json
+import requests
from unittest import mock
from mock import patch
import pytest
import datetime
from flask_api import status
from dotenv import load_dotenv
+from threading import Lock
from trainingmgr import trainingmgr_main
from trainingmgr.common.tmgr_logger import TMLogger
-trainingmgr_main.LOGGER = TMLogger("./tests/common/conf_log.yaml").logger
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+trainingmgr_main.LOGGER = pytest.logger
trainingmgr_main.LOCK = Lock()
trainingmgr_main.DATAEXTRACTION_JOBS_CACHE = {}
+
+class Test_upload_pipeline:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
+ attrs_TRAININGMGR_CONFIG_OBJ = {'kf_adapter_ip.return_value': '123', 'kf_adapter_port.return_value' : '100'}
+ mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
+ @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
+ def test_upload_pipeline_negative(self, mock1):
+ trainingmgr_main.LOGGER.debug("******* *******")
+ expected_data = "result"
+ trainingjob_req = {
+ "pipe_name":"usecase1",
+ }
+ response = self.client.post("/pipelines/<pipe_name>/upload".format("usecase1"), data=json.dumps(trainingjob_req),
+ content_type="application/json")
+
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert expected_data in response.json.keys()
+
+
+class Test_data_extraction_notification:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ db_result2 = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
+ '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
+ '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
+ datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False)]
+
+ de_response2 = Response()
+ de_response2.code = "expired"
+ de_response2.error_type = "expired"
+ de_response2.status_code = status.HTTP_200_OK
+ de_response2.headers={"content-type": "application/json"}
+ de_response2._content = b'{"task_status": "Completed", "result": "Data Extraction Completed"}'
+ resp= ({"str1":"rp1","str2":"rp2"} ,status.HTTP_200_OK)
+
+ @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result2)
+ @patch('trainingmgr.trainingmgr_main.training_start', return_value = de_response2)
+ @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
+ @patch('trainingmgr.trainingmgr_main.change_field_of_latest_version')
+ @patch('trainingmgr.trainingmgr_main.change_in_progress_to_failed_by_latest_version', return_value = True)
+ @patch('trainingmgr.trainingmgr_main.response_for_training', return_value = resp)
+ def test_data_extraction_notification(self, mock1, mock2, mock3, mock4, mock5, mock6):
+ trainingmgr_main.LOGGER.debug("******* Data_Extraction_Notification *******")
+ trainingjob_req = {
+ "trainingjob_name":"usecase1",
+ }
+ expected_data = "Data Extraction Completed"
+ response = self.client.post("/trainingjob/dataExtractionNotification".format("usecase1"),
+ data=json.dumps(trainingjob_req),
+ content_type="application/json")
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.status_code == status.HTTP_200_OK
+
+class Test_trainingjobs_operations:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ db_result2 = [('usecase2', 'version2', '{"overall_status":"status_ok"}')]
+ @patch('trainingmgr.trainingmgr_main.get_all_jobs_latest_status_version', return_value = db_result2)
+ @patch('trainingmgr.trainingmgr_main.get_one_word_status', return_value = "status OK")
+ def test_trainingjobs_operations(self,mock1,mock2):
+ trainingmgr_main.LOGGER.debug("******* test_trainingjobs_operations get *******")
+ expected_data = '{"trainingjobs": [{"trainingjob_name": "usecase2", "version": "version2", "overall_status": "status OK"}]}'
+ response = self.client.get("/trainingjobs/latest",content_type="application/json")
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+ assert expected_data in str(response.data)
+
+ db_result3 = []
+ @patch('trainingmgr.trainingmgr_main.get_all_jobs_latest_status_version', return_value = db_result3)
+ @patch('trainingmgr.trainingmgr_main.get_one_word_status', return_value = "status OK")
+ def test_trainingjobs_operations_get_exception(self,mock1,mock2):
+ trainingmgr_main.LOGGER.debug("******* test_trainingjobs_operations get exception*******")
+ expected_data = "Failed to fetch training job info from db"
+ response = self.client.get("/trainingjobs/latest",content_type="application/json")
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal"
+ assert expected_data in str(response.data)
+
+class Test_pipeline_notification:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ mocked_mm_sdk=mock.Mock(name="MM_SDK")
+ attrs_mm_sdk = {'check_object.return_value': True}
+ mocked_mm_sdk.configure_mock(**attrs_mm_sdk)
+ mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
+ attrs_TRAININGMGR_CONFIG_OBJ = {'my_ip.return_value': 123, 'my_port.return_value' : 100}
+ mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
+ message1="Pipeline notification success."
+ code1=status.HTTP_200_OK
+ response_tuple1=({"result": message1}, code1)
+ @patch('trainingmgr.trainingmgr_main.MM_SDK', return_value = mocked_mm_sdk)
+ @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
+ @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
+ @patch('trainingmgr.trainingmgr_main.update_model_download_url')
+ @patch('trainingmgr.trainingmgr_main.get_latest_version_trainingjob_name', return_value = "usecase1")
+ @patch('trainingmgr.trainingmgr_main.response_for_training', return_value = response_tuple1)
+ def test_pipeline_notification(self,mock1, mock2, mock3, mock4, mock5, mock6):
+ trainingmgr_main.LOGGER.debug("******* test_pipeline_notification post *******")
+ trainingjob_req = {
+ "trainingjob_name":"usecase1",
+ "run_status":"Succeeded",
+ }
+ expected_data = "Pipeline notification success."
+ response = self.client.post("/trainingjob/pipelineNotification".format("usecase1"),data=json.dumps(trainingjob_req),
+ content_type="application/json")
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+ assert expected_data in str(response.data)
+
+ message2="Pipeline notification -Training failed "
+ code2=status.HTTP_500_INTERNAL_SERVER_ERROR
+ response_tuple2=({"result": message2}, code2)
+ @patch('trainingmgr.trainingmgr_main.MM_SDK', return_value = mocked_mm_sdk)
+ @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
+ @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
+ @patch('trainingmgr.trainingmgr_main.update_model_download_url')
+ @patch('trainingmgr.trainingmgr_main.get_latest_version_trainingjob_name', return_value = "usecase1")
+ @patch('trainingmgr.trainingmgr_main.response_for_training', return_value = response_tuple2)
+ @patch('trainingmgr.trainingmgr_main.change_in_progress_to_failed_by_latest_version', return_value = True)
+ def test_negative_pipeline_notification(self,mock1, mock2, mock3, mock4, mock5, mock6, mock7):
+ trainingmgr_main.LOGGER.debug("******* test_pipeline_notification post exception*******")
+ trainingjob_req = {
+ "trainingjob_name":"usecase1",
+ "run_status":"Not_Succeeded",
+ }
+ expected_data = "Pipeline notification -Training failed "
+ response = self.client.post("/trainingjob/pipelineNotification".format("usecase1"),
+ data=json.dumps(trainingjob_req),
+ content_type="application/json")
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal"
+ assert expected_data in str(response.data)
+
+ db_result4 = [("test_data1","test_data2"),("version1")]
+ @patch('trainingmgr.trainingmgr_main.get_field_of_given_version', return_value = db_result4)
+ def test_get_steps_state_2(self,mock1):
+ trainingmgr_main.LOGGER.debug("******* test_get_steps_state get *******")
+ expected_data = "test_data1"
+ response = self.client.get("/trainingjobs/<trainingjob_name>/<version>/steps_state".format("usecase1"),
+ content_type="application/json")
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+ assert expected_data in str(response.data)
+
+ db_result5 = []
+ @patch('trainingmgr.trainingmgr_main.get_field_of_given_version', return_value = db_result5)
+ def test_negative_get_steps_state_2(self,mock1):
+ expected_data = "Exception"
+ response = self.client.get("/trainingjobs/<trainingjob_name>/<version>/steps_state".format("usecase1"),
+ content_type="application/json")
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.status_code == status.HTTP_404_NOT_FOUND, "Return status code NOT equal"
+ assert expected_data in str(response.data)
+
+class Test_get_trainingjob_by_name_version:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ @patch('trainingmgr.trainingmgr_main.get_info_by_version',return_value=[('usecase7', 'auto test', '*', 'prediction with model name', 'Default', '{"arguments": {"epochs": "1", "usecase": "usecase7"}}', 'Enb=20 and Cellnum=6', datetime.datetime(2022, 9, 20,11, 40, 30), '7d09c0bf-7575-4475-86ff-5573fb3c4716', '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FINISHED"}', datetime.datetime(2022, 9, 20, 11, 42, 20), 1, True, 'Near RT RIC', '{"datalake_source": {"CassandraSource": {}}}', '{"datalake_source": {"CassandraSource": {}}}','http://10.0.0.47:32002/model/usecase7/1/Model.zip','','','','','')])
+ @patch('trainingmgr.trainingmgr_main.get_metrics',return_value={"metrics": [{"Accuracy": "0.0"}]})
+ @patch('trainingmgr.trainingmgr_main.get_one_key',return_value='cassandra')
+ def test_get_trainingjob_by_name_version(self,mock1,mock2,mock3):
+ usecase_name = "usecase7"
+ version = "1"
+ response = self.client.get("/trainingjobs/{}/{}".format(usecase_name, version))
+ expected_data = b'{"trainingjob": {"trainingjob_name": "usecase7", "description": "auto test", "feature_list": "*", "pipeline_name": "prediction with model name", "experiment_name": "Default", "arguments": {"epochs": "1", "usecase": "usecase7"}, "query_filter": "Enb=20 and Cellnum=6", "creation_time": "2022-09-20 11:40:30", "run_id": "7d09c0bf-7575-4475-86ff-5573fb3c4716", "steps_state": {"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FINISHED"}, "updation_time": "2022-09-20 11:42:20", "version": 1, "enable_versioning": true, "pipeline_version": "Near RT RIC", "datalake_source": "cassandra", "model_url": "{\\"datalake_source\\": {\\"CassandraSource\\": {}}}", "notification_url": "http://10.0.0.47:32002/model/usecase7/1/Model.zip", "_measurement": "", "bucket": "", "accuracy": {"metrics": [{"Accuracy": "0.0"}]}}}'
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_200_OK, "not equal code"
+ assert response.data == expected_data, "not equal data"
+
+ @patch('trainingmgr.trainingmgr_main.get_info_by_version',return_value=False)
+ @patch('trainingmgr.trainingmgr_main.get_metrics',return_value={"metrics": [{"Accuracy": "0.0"}]})
+ @patch('trainingmgr.trainingmgr_main.get_one_key',return_value='cassandra')
+ def test_negative_get_trainingjob_by_name_version(self,mock1,mock2,mock3):
+ usecase_name = "usecase7"
+ version = "1"
+ response = self.client.get("/trainingjobs/{}/{}".format(usecase_name, version))
+ expected_data = b'{"trainingjob": {"trainingjob_name": "usecase7", "description": "auto test", "feature_list": "*", "pipeline_name": "prediction with model name", "experiment_name": "Default", "arguments": {"epochs": "1", "usecase": "usecase7"}, "query_filter": "Enb=20 and Cellnum=6", "creation_time": "2022-09-20 11:40:30", "run_id": "7d09c0bf-7575-4475-86ff-5573fb3c4716", "steps_state": {"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FINISHED"}, "updation_time": "2022-09-20 11:42:20", "version": 1, "enable_versioning": true, "pipeline_version": "Near RT RIC", "datalake_source": "cassandra", "model_url": "{\\"datalake_source\\": {\\"CassandraSource\\": {}}}", "notification_url": "http://10.0.0.47:32002/model/usecase7/1/Model.zip", "_measurement": "", "bucket": "", "accuracy": {"metrics": [{"Accuracy": "0.0"}]}}}'
+
+ trainingmgr_main.LOGGER.debug(expected_data)
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 404, "not equal code"
+
+class Test_unpload_pipeline:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ def test_negative_upload_pipeline(self):
+ pipeline_name = "qoe"
+ response = self.client.post("/pipelines/{}/upload".format(pipeline_name))
+ expected = "jjjj"
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 500, "not equal code"
+
+ @patch('trainingmgr.trainingmgr_main.LOGGER.debug', return_value = True)
+ def test_negative_upload_pipeline_2(self,mock1):
+ pipeline_name = "qoe"
+ response = self.client.post("/pipelines/{}/upload".format(pipeline_name))
+ expected = ValueError("file not found in request.files")
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 500, "not equal code"
+
+class Test_get_steps_state:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ @patch('trainingmgr.trainingmgr_main.get_field_of_given_version',return_value=[['data_extracted','data_pending'], ['data1','data2']])
+ def test_get_steps_state(self,mock1):
+ usecase_name = "usecase7"
+ version = "1"
+ response = self.client.get("/trainingjobs/{}/{}/steps_state".format(usecase_name, version))
+ expected_data = b'data_extracted'
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_200_OK, "not equal code"
+ assert response.data == expected_data, "not equal data"
+
+ @patch('trainingmgr.trainingmgr_main.get_field_of_given_version',return_value=False)
+ def test_negative_get_steps_state(self,mock1):
+ usecase_name = "usecase7"
+ version = "1"
+ response = self.client.get("/trainingjobs/{}/{}/steps_state".format(usecase_name, version))
+ expected_data = b'data_extracted'
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 404, "not equal code"
+
+ @patch('trainingmgr.trainingmgr_main.get_field_of_given_version',return_value=Exception("Not found given trainingjob with version"))
+ def test_negative_get_steps_state_2(self,mock1):
+ usecase_name = "usecase7"
+ version = "1"
+ response = self.client.get("/trainingjobs/{}/{}/steps_state".format(usecase_name, version))
+ expected_data = b'data_extracted'
+ assert response.status_code == 500, "not equal code"
+
class Test_training_main:
def setup_method(self):
self.client = trainingmgr_main.APP.test_client(self)
self.logger = trainingmgr_main.LOGGER
- ## Postive_1
- #@pytest.mark.skip()
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False)
@patch('trainingmgr.trainingmgr_main.add_update_trainingjob')
def test_trainingjob_operations(self,mock1,mock2):
assert response.data == expected_data
assert response.status_code == status.HTTP_201_CREATED, "Return status code NOT equal"
- ## Postive_2
db_result = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
'', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
'{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False)]
training_data = ('','','','','','','','','','','')
- #@pytest.mark.skip()
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
@patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result)
@patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = training_data)
response = self.client.put("/trainingjobs/<trainingjob_name>".format("usecase1"),
data=json.dumps(trainingjob_req),
content_type="application/json")
- trainingmgr_main.LOGGER.debug(response.data)
-
+ trainingmgr_main.LOGGER.debug(response.data)
assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
assert expected_data in str(response.data)
- #@pytest.mark.skip()
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
def test_negative_trainingjob_operations_post_conflit(self,mock1):
trainingmgr_main.LOGGER.debug("******* test_negative_trainingjob_operations_post_conflit *******")
response = self.client.post("/trainingjobs/<trainingjob_name>".format("usecase1"),
data=json.dumps(trainingjob_req),
content_type="application/json")
- trainingmgr_main.LOGGER.debug(response.data)
-
+ trainingmgr_main.LOGGER.debug(response.data)
assert response.status_code == status.HTTP_409_CONFLICT, "Return status code NOT equal"
assert expected_data in str(response.data)
- # ***** Start training test ****
- # Positive_1
- #@pytest.mark.skip()
db_result = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
'', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
'{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
de_response.headers={"content-type": "application/json"}
de_response._content = b'{"task_status": "Completed", "result": "Data Pipeline Execution Completed"}'
- #@pytest.mark.skip()
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
@patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result)
@patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response)
assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
assert expected_data in str(response.data)
-
-
db_result1 = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
'', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
'{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
content_type="application/json")
trainingmgr_main.LOGGER.debug(response.data)
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal"
- assert expected_data in str(response.data)
\ No newline at end of file
+ assert expected_data in str(response.data)
+
+class Test_get_versions_for_pipeline:
+ @patch('trainingmgr.common.trainingmgr_config.TMLogger', return_value = TMLogger("tests/common/conf_log.yaml"))
+ def setup_method(self,mock1,mock2):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+ load_dotenv('tests/test.env')
+ self.TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
+
+ the_response = Response()
+ the_response.code = "expired"
+ the_response.error_type = "expired"
+ the_response.status_code = 200
+ the_response.headers={"content-type": "application/json"}
+ the_response._content = b'{"versions_list": ["football", "baseball"]}'
+
+ @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response)
+ def test_get_versions_for_pipeline(self,mock1):
+
+ response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline"))
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 500, "Return status code NOT equal"
+
+ @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = requests.exceptions.ConnectionError('Mocked error'))
+ def test_negative_get_versions_for_pipeline_1(self,mock1):
+ response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline"))
+ print(response.data)
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+ @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = TypeError('Mocked error'))
+ def test_negative_get_versions_for_pipeline_2(self,mock1):
+ response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline"))
+ print(response.data)
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+ the_response1 = Response()
+ the_response1.code = "expired"
+ the_response1.error_type = "expired"
+ the_response1.status_code = 200
+ the_response1.headers={"content-type": "application/text"}
+ the_response._content = b'{"versions_list": ["football", "baseball"]}'
+ @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response1)
+ def test_negative_get_versions_for_pipeline_3(self,mock1):
+ response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline"))
+ print(response.data)
+ assert response.content_type != "application/text", "not equal content type"
+
+class Test_get_all_pipeline_names:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ the_response = Response()
+ the_response.code = "expired"
+ the_response.error_type = "expired"
+ the_response.status_code = 200
+ the_response.headers={"content-type": "application/json"}
+ the_response._content = b'{ "exp1":"id1","exp2":"id2"}'
+ @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response)
+ def test_get_all_pipeline_names(self,mock1):
+ response = self.client.get("/pipelines")
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 500, "Return status code NOT equal"
+
+ @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = requests.exceptions.ConnectionError('Mocked error'))
+ def test_negative_get_all_pipeline_names_1(self,mock1):
+ response = self.client.get("/pipelines")
+ print(response.data)
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+ @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = TypeError('Mocked error'))
+ def test_negative_get_all_pipeline_names_2(self,mock1):
+ response = self.client.get("/pipelines")
+ print(response.data)
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+ the_response1 = Response()
+ the_response1.code = "expired"
+ the_response1.error_type = "expired"
+ the_response1.status_code = 200
+ the_response1.headers={"content-type": "application/text"}
+ the_response1._content = b'{ "exp1":"id1","exp2":"id2"}'
+ @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response1)
+ def test_negative_get_all_pipeline_names_3(self,mock1):
+ response = self.client.get("/pipelines")
+ print(response.data)
+ assert response.content_type != "application/text", "not equal content type"
+
+class Test_get_all_exp_names:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ the_response = Response()
+ the_response.code = "expired"
+ the_response.error_type = "expired"
+ the_response.status_code = 200
+ the_response.headers={"content-type": "application/json"}
+ the_response._content = b'{ "exp1":"id1","exp2":"id2"}'
+ @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response)
+ def test_get_all_experiment_names(self,mock1):
+ response = self.client.get("/experiments")
+ print(response.data)
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 500, "Return status code NOT equal"
+
+ @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = requests.exceptions.ConnectionError('Mocked error'))
+ def test_negative_get_all_experiment_names_1(self,mock1):
+ response = self.client.get("/experiments")
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+ @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = TypeError('Mocked error'))
+ def test_negative_get_all_experiment_names_2(self,mock1):
+ response = self.client.get("/experiments")
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+ the_response1 = Response()
+ the_response1.code = "expired"
+ the_response1.error_type = "expired"
+ the_response1.status_code = 200
+ the_response1.headers={"content-type": "application/text"}
+ the_response1._content = b'{ "exp1":"id1","exp2":"id2"}'
+ @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response1)
+ def test_negative_get_all_experiment_names_3(self,mock1):
+ response = self.client.get("/experiments")
+ assert response.content_type != "application/text", "not equal content type"
+
+class Test_get_metadata:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ resulttt = [('usecase7', '1','auto test',
+ '*','prediction with model name',
+ 'Default','Enb=20 and Cellnum=6','epochs:1','FINISHED',
+ '{"metrics": "FINISHED"}','Near RT RIC','1',
+ 'Cassandra DB','usecase7', '1','auto test','*',
+ 'prediction with model name',
+ 'Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}',
+ 'Default',False,'Cassandra DB','usecase7', '1','auto test','*','prediction with model name',
+ 'Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}',
+ 'Near RT RIC','3','Cassandra DB','usecase7', '1','auto test','*',
+ 'prediction with model name','Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}','Near RT RIC','3','Cassandra DB')
+ ]
+ mock_uc_config_obj = mock.Mock(name='mocked uc_config_obj')
+ @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', return_value = resulttt)
+ @patch('trainingmgr.trainingmgr_main.get_metrics', return_value = 90)
+ @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mock_uc_config_obj)
+ def test_get_metadata(self,mock1,mock2,mock3):
+ usecase_name = "usecase7"
+ response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name))
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+
+ @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', side_effect = Exception('Mocked error'))
+ def test_negative_get_metadata_1(self,mock1):
+ usecase_name = "usecase7"
+ response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name))
+
+ print(response.data)
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+ class Test_get_model:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ trainingmgr_main.LOGGER = TMLogger("tests/common/conf_log.yaml").logger
+ self.logger = trainingmgr_main.LOGGER
+
+ @patch('trainingmgr.trainingmgr_main.send_file', return_value = 'File')
+ def test_negative_get_model(self,mock1):
+ trainingjob_name = "usecase777"
+ version = 2
+ result = 'File'
+ response = trainingmgr_main.get_model(trainingjob_name,version)
+ assert response[1] == 500, "The function get_model Failed"
+
+class Test_get_metadata_1:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ resulttt = [('usecase7', '1','auto test',
+ '*','prediction with model name',
+ 'Default','Enb=20 and Cellnum=6','epochs:1','FINISHED',
+ '{"metrics": "FINISHED"}','Near RT RIC','1',
+ 'Cassandra DB','usecase7', '1','auto test','*',
+ 'prediction with model name',
+ 'Default',False,'Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}',
+ 'Default',False,'Cassandra DB','usecase7', '1','auto test','*','prediction with model name',
+ 'Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}',
+ 'Near RT RIC','3','Cassandra DB','usecase7', '1','auto test','*',
+ 'prediction with model name','Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}','Near RT RIC','3','Cassandra DB')
+ ]
+
+ mock_uc_config_obj = mock.Mock(name='mocked uc_config_obj')
+ @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', return_value = resulttt)
+ @patch('trainingmgr.trainingmgr_main.get_metrics', return_value = 90)
+ @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mock_uc_config_obj)
+ def test_get_metadata(self,mock1,mock2,mock3):
+ usecase_name = "usecase7"
+ response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name))
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+
+ @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', return_value = None)
+ def test_negative_get_metadata_1(self,mock1):
+ usecase_name = "usecase7"
+ response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name))
+ print(response.data)
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_404_NOT_FOUND, "Should have thrown the exception "
+
+ @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False)
+ def test_training_negative_de_notfound(self,mock1):
+ trainingmgr_main.LOGGER.debug("******* test_training_404_NotFound *******")
+ expected_data = ''
+ response = self.client.post("/trainingjobs/<trainingjob_name>/training".format("usecase1"),
+ content_type="application/json")
+ trainingmgr_main.LOGGER.debug(response.data)
+ assert response.status_code == status.HTTP_404_NOT_FOUND, "Return status code NOT equal"
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+import json
+import requests
+from unittest import mock
+from mock import patch
+import pytest
+import flask
+from requests.models import Response
+from threading import Lock
+import os
+import sys
+import datetime
+from flask_api import status
+from dotenv import load_dotenv
+from trainingmgr.common import trainingmgr_config
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+from trainingmgr.common.tmgr_logger import TMLogger
+from trainingmgr import trainingmgr_main
+trainingmgr_main.LOGGER = pytest.logger
+
+class Test_trainingmgr_config:
+ @patch('trainingmgr.common.trainingmgr_config.TMLogger', return_value = TMLogger("tests/common/conf_log.yaml"))
+ def setup_method(self,mock1,mock2):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+ load_dotenv('tests/test.env')
+ self.TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
+
+ def test_kf_adapter_port(self):
+ expected_data = '5001'
+ result = self.TRAININGMGR_CONFIG_OBJ.kf_adapter_port
+ assert result == expected_data
+
+ def test_kf_adapter_ip(self):
+ expected_data = 'localhost'
+ result = self.TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
+ assert result == expected_data
+
+ def test_data_extraction_port(self):
+ expected_data = '32000'
+ result = self.TRAININGMGR_CONFIG_OBJ.data_extraction_port
+ assert result == expected_data
+
+ def test_data_extraction_ip(self):
+ expected_data = 'localhost'
+ result = self.TRAININGMGR_CONFIG_OBJ.data_extraction_ip
+ assert result == expected_data
+
+ def test_my_port(self):
+ expected_data = '32002'
+ x = TrainingMgrConfig
+ result = self.TRAININGMGR_CONFIG_OBJ.my_port
+ assert result == expected_data
+
+ def test_my_ip(self):
+ expected_data = 'localhost'
+ result = self.TRAININGMGR_CONFIG_OBJ.my_ip
+ assert result == expected_data
+
+ def test_logger(self):
+ expected_data = TMLogger("tests/common/conf_log.yaml").logger
+ result = self.TRAININGMGR_CONFIG_OBJ.logger
+ assert result == expected_data
+
+ def test_ps_user(self):
+ expected_data = 'postgres'
+ result = self.TRAININGMGR_CONFIG_OBJ.ps_user
+ assert result == expected_data
+
+ def test_ps_password(self):
+ expected_data = "abcd"
+ result = self.TRAININGMGR_CONFIG_OBJ.ps_password
+ assert result == expected_data
+
+ def test_ps_ip(self):
+ expected_data = 'localhost'
+ result = self.TRAININGMGR_CONFIG_OBJ.ps_ip
+ assert result == expected_data
+
+ def test_ps_port(self):
+ expected_data = '30001'
+ x = TrainingMgrConfig
+ result = self.TRAININGMGR_CONFIG_OBJ.ps_port
+ assert result == expected_data
+
+ def test_is_config_loaded_properly(self):
+ expected_data = True
+ result = TrainingMgrConfig.is_config_loaded_properly(self.TRAININGMGR_CONFIG_OBJ)
+ assert result == expected_data
+
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+import json
+import requests
+from unittest import mock
+from mock import patch
+import pytest
+import flask
+from requests.models import Response
+from threading import Lock
+import os
+import sys
+import datetime
+from flask_api import status
+from dotenv import load_dotenv
+from threading import Lock
+from trainingmgr import trainingmgr_main
+from trainingmgr.common import trainingmgr_operations
+from trainingmgr.common.tmgr_logger import TMLogger
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+trainingmgr_main.LOGGER = pytest.logger
+trainingmgr_main.LOCK = Lock()
+trainingmgr_main.DATAEXTRACTION_JOBS_CACHE = {}
+
+class DummyVariable:
+ kf_adapter_ip = "localhost"
+ kf_adapter_port = 5001
+ logger = trainingmgr_main.LOGGER
+
+class Test_training_start:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ def test_negative_training_start(self):
+ training_config_obj = DummyVariable()
+ dict_data = {
+ "brand": "Ford",
+ "model": "Mustang",
+ "year": 1964
+ }
+ trainingjob_name = "usecase12"
+ expected_data = {
+ "brand": "Ford",
+ "model": "Mustang",
+ "year": 1964
+ }
+ try:
+ response = trainingmgr_operations.training_start(training_config_obj,dict_data,trainingjob_name)
+ assert response == expected_data,"data not equal"
+ assert False
+ except Exception:
+ assert True
+
+class Test_upload_pipeline:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
+ attrs_TRAININGMGR_CONFIG_OBJ = {'kf_adapter_ip.return_value': '123', 'kf_adapter_port.return_value' : '100'}
+ mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
+ @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
+ def test_upload_pipeline_negative(self, mock1):
+ expected_data = "result"
+ trainingjob_req = {
+ "pipe_name":"usecase1",
+ }
+ response = self.client.post("/pipelines/<pipe_name>/upload".format("usecase1"), data=json.dumps(trainingjob_req),
+ content_type="application/json")
+ assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+ assert expected_data in response.json.keys()
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+import pytest
+import sys
+import os
+from mock import patch
+from trainingmgr.db.trainingmgr_ps_db import PSDB
+from dotenv import load_dotenv
+
+class cred_handle:
+ def __init__(self):
+ self.ps_user = "gdgdgd"
+ self.ps_password = "hdhd"
+ self.ps_ip = 12345
+ self.ps_port = 1000
+
+ def logger(self):
+ return cred_handle()
+
+ def error(self):
+ raise Exception('Connection Failed To Exist')
+
+class Cursor:
+ def __init__(self, db_name):
+ self.db_name = db_name
+
+ def execute(self, query):
+ pass
+
+ def fetchall(self):
+ return [self.db_name]
+
+ def close(self):
+ pass
+
+class connection:
+ def __init__(self, db_name):
+ self.autocommit = False
+ self.db_name = db_name
+
+ def cursor(self):
+ return Cursor(self.db_name)
+
+ def rollback(self):
+ pass
+
+ def commit(self):
+ pass
+
+ def close(self):
+ pass
+
+class Test_PSDB:
+ @patch('trainingmgr.db.trainingmgr_ps_db.pg8000.dbapi.connect', return_value = connection('usecase_manager_database'))
+ def setup_method(self,mock1,mock2):
+ self.obj = PSDB(cred_handle())
+
+ def test_init_psdb(self):
+ assert self.obj != None, 'PSDB Object Creation Failed'
+
+ @patch('trainingmgr.db.trainingmgr_ps_db.pg8000.dbapi.connect', return_value = connection('usecase_manager_database'))
+ def test_get_new_conn(self, mock1):
+ out = self.obj.get_new_conn()
+ assert out != None, 'New Connection Failed'
+
+ def test_negative_get_new_conn(self):
+ try:
+ out = self.obj.get_new_conn()
+ assert out != None, 'New Connection Failed'
+ assert False
+ except Exception:
+ assert True
\ No newline at end of file
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+""""
+This file contains the unittesting for Training management utility functions
+"""
+from pickle import FALSE
+import sys
+from mock import patch
+from threading import Lock
+import pytest
+import datetime
+from dotenv import load_dotenv
+from flask_api import status
+import logging
+
+from trainingmgr.db.trainingmgr_ps_db import PSDB
+import trainingmgr.trainingmgr_main
+from trainingmgr.common import trainingmgr_util
+from trainingmgr.common.tmgr_logger import TMLogger
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+from trainingmgr.common.trainingmgr_util import response_for_training, check_key_in_dictionary,check_trainingjob_data, \
+ get_one_key, get_metrics, handle_async_feature_engineering_status_exception_case, get_one_word_status, check_trainingjob_data, \
+ validate_trainingjob_name
+from requests.models import Response
+from trainingmgr import trainingmgr_main
+from trainingmgr.common.tmgr_logger import TMLogger
+trainingmgr_main.LOGGER = pytest.logger
+
+class Test_response_for_training:
+ def setup_method(self):
+ self.client = trainingmgr_main.APP.test_client(self)
+ self.logger = trainingmgr_main.LOGGER
+
+ fs_result = Response()
+ fs_result.code = "expired"
+ fs_result.error_type = "expired"
+ fs_result.status_code = status.HTTP_200_OK
+ fs_result.headers={'content-type': 'application/json'}
+ fs_result._content={'Accept-Charset': 'UTF-8'}
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+ @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+ @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+ def test_response_for_training(self,mock1,mock2, mock3, mock4, mock5, mock6):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr.trainingmgr_main.LOGGER
+ is_success = True
+ trainingjob_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ result = response_for_training(code, message, logger, is_success, trainingjob_name, ps_db_obj, mm_sdk)
+ assert result != None
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+ @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+ def test_negative_response_for_training(self,mock1,mock2, mock3, mock4, mock5):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+ @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+ def test_negative_response_for_training_2(self,mock1,mock2, mock3, mock4, mock5):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ def test_negative_response_for_training_3(self):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ def test_negative_response_for_training_4(self,mock1,mock2):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+ def test_negative_response_for_training_5(self,mock1,mock2,mock3):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+ @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+ def test_negative_response_for_training_6(self,mock1,mock2,mock3,mock4):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ def test_negative_response_for_training_7(self,mock1):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+ def test_negative_response_for_training_8(self,mock1,mock2,mock3):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+ def test_negative_response_for_training_9(self,mock1):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ def test_negative_response_for_training_10(self,mock1):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecalse7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+ @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+ @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+ def test_negative_response_for_training_11(self,mock1,mock2,mock3,mock4,mock5):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+ @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+ @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+ def test_negative_response_for_training_12(self,mock1,mock2,mock3,mock4,mock5,mock6):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ assert False
+ except Exception:
+ assert True
+
+class Test_response_for_training_1:
+ fs_result = Response()
+ fs_result.code = "expired"
+ fs_result.error_type = "expired"
+ fs_result.status_code = status.HTTP_200_OK
+ fs_result.headers={'content-type': 'application/jsn'}
+ fs_result._content={'Accept-Charset': 'UTF-8'}
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+ @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+ @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+ def test_response_for_training_1(self,mock1,mock2, mock3, mock4, mock5, mock6):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+class Test_response_for_training_2:
+ fs_result = Response()
+ fs_result.code = "expired"
+ fs_result.error_type = "expired"
+ fs_result.status_code = status.HTTP_404_NOT_FOUND
+ fs_result.headers={'content-type': 'application/json'}
+ fs_result._content={'Accept-Charset': 'UTF-8'}
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+ @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+ @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+ def test_response_for_training_2(self,mock1,mock2, mock3, mock4, mock5, mock6):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+class Test_response_for_training_3:
+ fs_result = Response()
+ fs_result.code = "expired"
+ fs_result.error_type = "expired"
+ fs_result.status_code = status.HTTP_200_OK
+ fs_result.headers={'content-type': 'application/jsn'}
+ fs_result._content={'Accept-Charset': 'UTF-8'}
+
+ @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+ @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+ @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+ @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+ @patch('requests.post',return_result=Exception(status.HTTP_500_INTERNAL_SERVER_ERROR))
+ def test_negative_response_for_training_3_1(self,mock1,mock2, mock3, mock4, mock5, mock6, mock7):
+ code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ message = "Run status is not scheduled for "
+ logger = trainingmgr_main.LOGGER
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+ assert False
+ except Exception:
+ assert True
+
+class Test_check_key_in_dictionary:
+ def test_check_key_in_dictionary(self):
+ fields = ["model","brand","year"]
+ dictionary = {
+ "brand": "Ford",
+ "model": "Mustang",
+ "year": 1964
+ }
+ assert check_key_in_dictionary(fields, dictionary) == True, "data not equal"
+
+ def test_check_key_in_dictionary(self):
+ fields = ["model","brand","type"]
+ dictionary = {
+ "brand": "Ford",
+ "model": "Mustang",
+ "year": 1964
+ }
+ assert check_key_in_dictionary(fields, dictionary) == False, "data not equal"
+
+ def test_negative_check_key_in_dictionary_1(self):
+ fields = ["Ford","Apple","Mosquito"]
+ dictionary = {
+ "brand": "Ford",
+ "model": "Mustang",
+ "year": 1964
+ }
+ try:
+ check_key_in_dictionary(fields, dictionary)
+ assert False
+ except Exception:
+ assert True
+
+class Test_check_trainingjob_data:
+ @patch('trainingmgr.common.trainingmgr_util.check_key_in_dictionary',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.isinstance',return_value=True)
+ def test_check_trainingjob_data(self,mock1,mock2):
+ usecase_name = "usecase8"
+ json_data = { "description":"unittest", "feature_list": ["apple", "banana", "cherry"] , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "_measurement":2 , "bucket":"bucket1"}
+
+ expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1')
+ assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal"
+
+ def test_negative_check_trainingjob_data_1(self):
+ usecase_name = "usecase8"
+ json_data = { "description":"unittest", "feature_list": ["apple", "banana", "cherry"] , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "_measurement":2 , "bucket":"bucket1"}
+
+ expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1')
+ try:
+ assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal"
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.check_key_in_dictionary',return_value=True)
+ def test_negative_check_trainingjob_data_2(self,mock1):
+ usecase_name = "usecase8"
+ json_data = { "description":"unittest", "feature_list": ["apple", "banana", "cherry"] , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "_measurement":2 , "bucket":"bucket1"}
+
+ expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1')
+ try:
+ assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal"
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.isinstance',return_value=True)
+ def test_negative_check_trainingjob_data_3(self,mock1):
+ usecase_name = "usecase8"
+ json_data = None
+ expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1')
+ try:
+ assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal"
+ assert False
+ except Exception:
+ assert True
+
+class Test_get_one_key:
+ def test_get_one_key(self):
+ dictionary = {
+ "brand": "Ford",
+ "model": "Mustang",
+ "year": 1964
+ }
+ only_key = "year"
+ expected_data = only_key
+ assert get_one_key(dictionary) == expected_data,"data not equal"
+
+ def test_get_one_key_2(self):
+ dictionary = {'name': 'Jack', 'age': 26}
+ only_key = "age"
+ expected_data = only_key
+ assert get_one_key(dictionary) == expected_data,"data not equal"
+
+ def test_negative_get_one_key_1(self):
+ dictionary = {
+ "brand": "Ford",
+ "model": "Mustang",
+ "year": 1964
+ }
+ only_key = "model"
+ expected_data = only_key
+ try:
+ assert get_one_key(dictionary) == expected_data,"data not equal"
+ assert False
+ except Exception:
+ assert True
+
+ def test_negative_get_one_key_2(self):
+ dictionary = {'name': 'Jack', 'age': 26}
+ only_key = "name"
+ expected_data = only_key
+ try:
+ assert get_one_key(dictionary) == expected_data,"data not equal"
+ assert False
+ except Exception:
+ assert True
+
+class dummy_mmsdk:
+ def check_object(self, param1, param2, param3):
+ return True
+
+ def get_metrics(self, usecase_name, version):
+ thisdict = {
+ "brand": "Ford",
+ "model": "Mustang",
+ "year": 1964
+ }
+ return thisdict
+
+class Test_get_metrics:
+ @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value='usecase_data')
+ def test_get_metrics_with_version(self,mock1):
+ usecase_name = "usecase7"
+ version = 1
+ mm_sdk = dummy_mmsdk()
+ expected_data = 'usecase_data'
+ get_metrics(usecase_name, version, dummy_mmsdk())
+ assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal"
+
+ @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value=None)
+ def test_negative_get_metrics_1(self,mock1):
+ usecase_name = "usecase7"
+ version = 1
+ mm_sdk = dummy_mmsdk()
+ expected_data = 'usecase_data'
+ try:
+ assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal"
+ assert False
+ except Exception:
+ assert True
+
+ @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value=Exception("Problem while downloading metrics"))
+ def test_negative_get_metrics_2(self,mock1):
+ usecase_name = "usecase7"
+ version = 1
+ mm_sdk = dummy_mmsdk()
+ expected_data = 'usecase_data'
+ try:
+ assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal"
+ assert False
+ except Exception:
+ assert True
+
+ def test_negative_get_metrics_3(self):
+ usecase_name = "usecase7"
+ version = 1
+ mm_sdk = dummy_mmsdk()
+ expected_data = 'final_data'
+ try:
+ get_metrics(usecase_name, version, dummy_mmsdk())
+ assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal"
+ assert False
+ except Exception:
+ assert True
+
+class dummy_mmsdk_1:
+ def check_object(self, param1, param2, param3):
+ return False
+
+ def get_metrics(self, usecase_name, version):
+ thisdict = {
+ "brand": "Ford",
+ "model": "Mustang",
+ "year": 1964
+ }
+ return thisdict
+
+class Test_get_metrics_2:
+ @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value='usecase_data')
+ def test_negative_get_metrics_2_1(self,mock1):
+ usecase_name = "usecase7"
+ version = 1
+ mm_sdk = dummy_mmsdk_1()
+ expected_data = 'usecase_data'
+ get_metrics(usecase_name, version, dummy_mmsdk())
+ try:
+ get_metrics(usecase_name, version, dummy_mmsdk())
+ assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal"
+ assert False
+ except Exception:
+ assert True
+
+class Test_handle_async_feature_engineering_status_exception_case:
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.response_for_training',return_value=True)
+ def test_handle_async_feature_engineering_status_exception_case(self, mock1, mock2):
+ lock = Lock()
+ featurestore_job_cache = {'usecase7': 'Geeks', 2: 'For', 3: 'Geeks'}
+ code = 123
+ message = "Into the field"
+ logger = "123"
+ is_success = True
+ usecase_name = "usecase7"
+ ps_db_obj = ()
+ mm_sdk = ()
+ assert handle_async_feature_engineering_status_exception_case(lock, featurestore_job_cache, code,
+ message, logger, is_success,
+ usecase_name, ps_db_obj, mm_sdk) == None,"data not equal"
+
+ @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version',return_value=True)
+ @patch('trainingmgr.common.trainingmgr_util.response_for_training',return_value=True)
+ # @patch('trainingmgr.common.trainingmgr_util.dataextraction_job_cache',return_value = Exception("Could not get info from db for "))
+ def test_negative_handle_async_feature_engineering_status_exception_case(self, mock1, mock2):
+ lock = Lock()
+ featurestore_job_cache = {'usecase7': 'Geeks', 2: 'For', 3: 'Geeks'}
+ code = 123
+ message = "Into the field"
+ logger = "123"
+ is_success = True
+ usecase_name = ""
+ ps_db_obj = ()
+ mm_sdk = ()
+ try:
+ handle_async_feature_engineering_status_exception_case(lock, featurestore_job_cache, code,
+ message, logger, is_success,
+ usecase_name, ps_db_obj, mm_sdk)
+ assert handle_async_feature_engineering_status_exception_case(lock, featurestore_job_cache, code,
+ message, logger, is_success,
+ usecase_name, ps_db_obj, mm_sdk) == None,"data not equal"
+ assert False
+ except Exception:
+ assert True
+
+class Test_get_one_word_status:
+ def test_get_one_word_status(self):
+ steps_state = [0,1,2,3]
+ expected_data = "IN_PROGRESS"
+ assert get_one_word_status(steps_state) == expected_data,"data not equal"
+
+class Test_validate_trainingjob_name:
+ @patch('trainingmgr.common.trainingmgr_util.get_all_versions_info_by_name',return_value=True)
+ def test_validate_trainingjob_name_1(self,mock1):
+ trainingjob_name = "usecase8"
+ ps_db_obj = ()
+ expected_data = True
+ assert validate_trainingjob_name(trainingjob_name,ps_db_obj) == expected_data,"data not equal"
+
+ @patch('trainingmgr.common.trainingmgr_util.get_all_versions_info_by_name', return_value = Exception("Could not get info from db for "))
+ def test_validate_trainingjob_name_2(self,mock1):
+ trainingjob_name = "usecase8"
+ ps_db_obj = ()
+ expected_data = True
+ try:
+ validate_trainingjob_name(trainingjob_name,ps_db_obj)
+ assert validate_trainingjob_name(trainingjob_name,ps_db_obj) == expected_data,"data not equal"
+ assert False
+ except Exception:
+ assert True
+
+ def test_negative_validate_trainingjob_name(self):
+ trainingjob_name = "usecase8"
+ ps_db_obj = ()
+ expected_data = True
+ try:
+ validate_trainingjob_name(trainingjob_name,ps_db_obj)
+ assert validate_trainingjob_name(trainingjob_name,ps_db_obj) == expected_data,"data not equal"
+ assert False
+ except Exception:
+ assert True