Unittesting for tm. 89/9889/6
authorsmahana123 <s.mahana@samsung.com>
Thu, 1 Dec 2022 07:04:49 +0000 (12:34 +0530)
committersmahana123 <s.mahana@samsung.com>
Mon, 5 Dec 2022 10:33:03 +0000 (16:03 +0530)
Issue-Id: AIMLFW-6

Signed-off-by: smahana123 <s.mahana@samsung.com>
Change-Id: Ibc302f732c15571f92e7f6317fe94a1b5845f509
Signed-off-by: smahana123 <s.mahana@samsung.com>
tests/README.MD
tests/common/conf_log.yaml
tests/test.env [new file with mode: 0644]
tests/test_common_db_fun.py [new file with mode: 0644]
tests/test_tm_apis.py
tests/test_trainingmgr_config.py [new file with mode: 0644]
tests/test_trainingmgr_operations.py [new file with mode: 0644]
tests/test_trainingmgr_ps_db.py [new file with mode: 0644]
tests/test_trainingmgr_util.py [new file with mode: 0644]

index ca6c657..cc39e6d 100644 (file)
@@ -17,7 +17,6 @@
 # ==================================================================================
 #Change the current working directory to home directory
 tm
-
 # Directory struture
 tm
 |__tests
@@ -42,4 +41,4 @@ pip3 install -r requirements_test.txt
 # Example to run test cases.
 
 # Generate test report
-sudo python3 -m pytest -rA . --capture=tee-sys --cov-report term-missing --cov-report xml:coverage.xml   --cov-report html:htmlcov --junitxml test-reports/junit.xml --cov=./
\ No newline at end of file
+python3 -m pytest -rA . --capture=tee-sys --cov-report term-missing --cov-report xml:coverage.xml   --cov-report html:htmlcov --junitxml test-reports/junit.xml --cov=./trainingmgr/
\ No newline at end of file
index 4ecb1fa..dc8d902 100644 (file)
@@ -29,7 +29,7 @@ handlers:
     class: logging.handlers.RotatingFileHandler
     level: DEBUG
     formatter: simple
-    filename: ./training_manager_test.log
+    filename: training_manager.log
     maxBytes: 10485760
     backupCount: 20
     encoding: utf8
diff --git a/tests/test.env b/tests/test.env
new file mode 100644 (file)
index 0000000..325004a
--- /dev/null
@@ -0,0 +1,30 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+KF_ADAPTER_PORT=5001
+KF_ADAPTER_IP=localhost
+DATA_EXTRACTION_API_PORT=32000
+DATA_EXTRACTION_API_IP=localhost
+DEPLOYMENT_MANAGER_PORT=9999
+DEPLOYMENT_MANAGER_IP=localhost
+TRAINING_MANAGER_PORT=32002
+TRAINING_MANAGER_IP=localhost
+PS_USER=postgres
+PS_PASSWORD="abcd"
+PS_IP="localhost"
+PS_PORT="30001"
diff --git a/tests/test_common_db_fun.py b/tests/test_common_db_fun.py
new file mode 100644 (file)
index 0000000..3f12089
--- /dev/null
@@ -0,0 +1,441 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+   
+import pytest
+import sys
+from dotenv import load_dotenv
+import json
+from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainingjobs, \
+     change_field_of_latest_version, change_field_value_by_version, \
+     get_field_by_latest_version, get_field_of_given_version, \
+     change_in_progress_to_failed_by_latest_version, change_steps_state_of_latest_version, \
+     change_steps_state_by_version, delete_trainingjob_version, get_info_by_version, \
+     get_trainingjob_info_by_name, get_latest_version_trainingjob_name, \
+     get_all_versions_info_by_name, get_all_distinct_trainingjobs, \
+     get_all_version_num_by_trainingjob_name, update_model_download_url, \
+     add_update_trainingjob, get_all_jobs_latest_status_version
+
+mimic_db = {
+            "usecase_name": "Tester",
+            "description": "Current UseCase Is For Testing Only",
+            "feature_list": "*",
+            "pipeline_name": "qoe-pipeline",
+            "experiment_name": "default",
+            "arguments": "{epoches : 1}",
+            "query_filter": "",
+            "creation_time": "29-09-2022",
+            "run_id": 1729,
+            "steps_state": json.dumps({"DATA_EXTRACTION" : "IN_PROGRESS"}),
+            "updation_time": "29-09-2022",
+            "version": 1,
+            "enable_versioning": True,
+            "target_deployment": "Near RT Ric",
+            "pipeline_version": 1,
+            "datalake_source": "InfluxSource",
+            "incremental_training": False,
+            "model": "",
+            "model_version": "",
+            "model_url":"",
+            "notification_url": "",
+            "_measurement": "liveCell",
+            "bucket": "UEdata",
+            "accuracy": 70
+        }
+
+class db_helper:
+    '''Mimics as a Db'''
+    def __init__(self, req_cols, raise_exception = False, check_success_obj = None):
+        self.cols = req_cols
+        self.raise_exception = raise_exception
+        self.check_success_obj = check_success_obj
+        self.counter = 0
+        
+    def get_new_conn(self):
+        return db_helper(self.cols, self.raise_exception, self.check_success_obj)
+    
+    def cursor(self):
+        return db_helper(self.cols, self.raise_exception, self.check_success_obj)
+
+    def execute(self, query, values = None):
+        if self.raise_exception:
+            raise Exception("DB Error")
+
+    def fetchall(self):
+        out = []
+        if(len(self.cols) > 0):
+            if(self.cols[self.counter][0] == "*"):
+               for (col, value) in mimic_db.items():
+                    out.append(value)
+            elif(self.cols[self.counter][0] == None):
+                self.counter += 1
+                return None
+            else:
+                for col in self.cols[self.counter]:
+                    out.append(mimic_db[col])
+        self.counter += 1
+        return [out]
+
+    def close(self):
+        ''' For checking success in fxn not returning anything, If you call close, then It means query as exceuted as expected '''
+        if self.check_success_obj:
+            self.check_success_obj.setwin()
+        
+    def rollback(self):
+        pass
+
+    def commit(self):
+        pass
+
+class Check:
+    def __init__(self):
+        self.finished = False
+
+    def setwin(self):
+         self.finished = True
+
+class Test_Common_Db_Fun:
+    def setup_method(self):
+        pass
+
+    def test_get_data_extraction_in_progress_trainingjobs(self):
+        db_obj = db_helper([["usecase_name", "steps_state"]])
+        out = get_data_extraction_in_progress_trainingjobs(db_obj)
+        
+        assert out != None, 'Function get_data_extraction_in_progress_trainingjobs has failed'
+    
+    def test_get_data_extraction_in_progress_trainingjobs(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["usecase_name", "steps_state"]], raise_exception=True, check_success_obj=checker)
+            out = get_data_extraction_in_progress_trainingjobs(db_obj)
+            assert out != None, 'Fxn get_usecases_which_has_data_extraction_in_progress Failed'
+        except Exception as err:
+            assert str(err) == "bad operand type for unary +: 'str'", 'Negative test get_usecases_which_has_data_extraction_in_progress FAILED, Doesnt returned required error'
+            assert checker.finished, 'Cursor Not Closed Properly for fxn test_negative_get_usecases_which_has_data_extraction_in_progress'
+
+    def test_change_field_of_latest_version(self):
+        for field in ["notification_url", "run_id"]:
+            checker = Check()
+            db_obj = db_helper([["version", "usecase_name"]], check_success_obj=checker)
+            change_field_of_latest_version("Tester", db_obj, field, "a_dummy_value")
+
+            assert checker.finished, 'change_field_of_latest_version Failed For Usecame {} and Field {} '.format("Tester", field)
+    
+    def test_negative_change_field_of_latest_version(self):
+        for field in ["notification_url", "run_id"]:
+            checker = Check()
+            db_obj = db_helper([["version", "usecase_name"]],raise_exception=True ,check_success_obj=checker)
+            try:    
+                change_field_of_latest_version("Tester", db_obj, field, "a_dummy_value")
+                assert checker.finished, 'change_field_of_latest_version Failed For Usecame {} and Field {} '.format("Tester", field)
+                assert False
+            except Exception:
+                assert True
+
+    def test_change_field_value_by_version_2(self):
+        checker = Check()
+        db_obj = db_helper([[None]], check_success_obj=checker)
+        change_field_value_by_version("Tester", 1, db_obj, "deletion_in_progress", "a_dummy_value")
+        assert checker.finished, 'change_field_of_given_version FAILED'
+    
+    def test_negative_change_field_value_by_version_2(self):
+        checker = Check()
+        db_obj = db_helper([[None]], raise_exception=True, check_success_obj=checker)
+        try:
+            change_field_value_by_version("Tester", 1, db_obj, "deletion_in_progress", "a_dummy_value")
+            assert checker.finished, 'change_field_value_by_version FAILED'
+            assert False
+        except Exception:
+            assert True
+          
+    def test_get_field_by_latest_version(self):
+        for field in ["notification_url", "model_url", "target_deployment"]:
+            db_obj = db_helper([["version"], [field]])
+            out = get_field_by_latest_version("Tester", db_obj, field)
+            assert out != None, 'get_field_by_latest_version FAILED at field = {}'.format(field)
+    
+    def test_negative_get_field_by_latest_version(self):
+        for field in ["notification_url", "model_url", "target_deployment"]:
+            checker = Check()
+            try:
+                db_obj = db_helper([["version"], [field]], raise_exception=True, check_success_obj=checker)
+                out = get_field_by_latest_version("Tester", db_obj, field)
+                assert out != None, 'get_field_by_latest_version FAILED at field = {}'.format(field)
+                assert False
+            except Exception:
+                assert True
+
+    def test_get_field_of_given_version(self):
+        db_obj = db_helper([["steps_state"]])
+        out = get_field_of_given_version("Tester", 1, db_obj, "steps_state")
+        assert out != None, ' test_get_field_of_given_version FAILED'
+    
+    def test_negative_get_field_of_given_version(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["steps_state"]], raise_exception=True, check_success_obj=checker)
+            out = get_field_of_given_version("Tester", 1, db_obj, "steps_state")
+            assert out != None, ' test_get_field_of_given_version FAILED'
+            assert False
+        except Exception:
+            assert True
+
+    def test_change_in_progress_to_failed_by_latest_version(self):
+        checker = Check()
+        db_obj = db_helper([["version"], ["steps_state"]], check_success_obj=checker)
+        change_in_progress_to_failed_by_latest_version("Tester", db_obj)
+        assert checker.finished, 'change_in_progress_to_failed_by_latest_version FAILED'
+
+    def test_negative_change_in_progress_state_to_failed_of_latest_version(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["version"], ["steps_state"]], raise_exception=True,check_success_obj=checker)
+            change_in_progress_to_failed_by_latest_version("Tester", db_obj)
+            assert checker.finished, 'change_in_progress_to_failed_by_latest_version FAILED'
+        except Exception as err:
+                fxn_name = "change_in_progress_to_failed_by_latest_version("
+                assert str(err) == "bad operand type for unary +: 'str'", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+                assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+    def test_change_steps_state_of_latest_version(self):
+        checker = Check()
+        db_obj = db_helper([["version"], ["steps_state"], [None]],  check_success_obj=checker)
+        change_steps_state_of_latest_version("Tester", db_obj, 1, 2) # Dummy Key and Values
+        assert checker.finished, 'change_steps_state_of_latest_version FAILED'
+    
+    def test_negative_change_steps_state_of_latest_version(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["version"], ["steps_state"], [None]], raise_exception=True ,check_success_obj=checker)
+            change_steps_state_of_latest_version("Tester", db_obj, 1, 2) # Dummy Key and Values
+            assert checker.finished, 'change_steps_state_of_latest_version FAILED'
+        except Exception as err:
+                fxn_name = "change_steps_state_of_latest_version"
+                assert str(err) == "Failed to execute query in change_steps_state_of_latest_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+                assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+    def test_change_steps_state_by_version(self):
+        checker = Check()
+        db_obj = db_helper([["steps_state"], [None]]  , check_success_obj=checker)
+        change_steps_state_by_version("Tester", 1, db_obj, 1, 2) # Dummy Key and Values
+        assert checker.finished, 'change_steps_state_by_version FAILED'
+
+    def test_negative_change_steps_state_by_version(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["steps_state"], [None]]  , raise_exception=True,check_success_obj=checker)
+            change_steps_state_by_version("Tester", 1, db_obj, 1, 2) # Dummy Key and Values
+            assert checker.finished, 'change_steps_state_by_version FAILED'
+        except Exception as err:
+                fxn_name = "change_steps_state_by_version"
+                assert str(err) == "Failed to execute query in change_steps_state_by_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+                assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+    def test_delete_trainingjob_version(self):
+        checker = Check()
+        db_obj = db_helper([[None]], check_success_obj=checker)
+        delete_trainingjob_version("Tester", 1, db_obj)
+        assert checker.finished, 'delete_trainingjob_version FAILED'
+
+    def test_negative_delete_trainingjob_version(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([[None]], raise_exception=True,check_success_obj=checker)
+            delete_trainingjob_version("Tester", 1, db_obj)
+            assert checker.finished, 'delete_trainingjob_version FAILED'
+        except Exception as err:
+            fxn_name = "delete_trainingjob_version"
+            assert str(err) == "Failed to execute query in delete_trainingjob_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+            assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+        
+    def test_get_info_by_version(self):
+        db_obj = db_helper([["*"]])
+        out = get_info_by_version("Tester", 1, db_obj)
+        assert out != None, 'get_info_by_version FAILED'
+    
+    def test_negative_get_info_by_version(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["*"]], raise_exception=True,check_success_obj=checker)
+            out = get_info_by_version("Tester", 1, db_obj)
+            assert out != None, 'get_info_by_version FAILED'
+        except Exception as err:
+            fxn_name = "get_info_by_version"
+            assert str(err) == "Failed to execute query in get_info_by_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+            assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+    def test_get_trainingjob_info_by_name(self):
+        db_obj = db_helper([["version"], ["*"]])
+        out = get_trainingjob_info_by_name("Tester", db_obj)
+        assert out != None, 'get_trainingjob_info_by_name FAILED'
+
+    def test_negative_get_trainingjob_info_by_name(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["version"], ["*"]], raise_exception=True,check_success_obj=checker)
+            out = get_trainingjob_info_by_name("Tester", db_obj)
+            assert out != None, 'get_trainingjob_info_by_name FAILED'
+        except Exception as err:
+            fxn_name = "get_trainingjob_info_by_name"
+            assert str(err) == "Failed to execute query in get_trainingjob_info_by_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+            assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+    def test_get_latest_version_trainingjob_name(self):
+        db_obj = db_helper([["version"]])
+        out = get_latest_version_trainingjob_name("Tester", db_obj)
+        assert type(out) == int, 'get_latest_version_trainingjob_name FAILED' 
+
+    def test_negative_get_latest_version_trainingjob_name(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["version"]], raise_exception=True,check_success_obj=checker)
+            out = get_latest_version_trainingjob_name("Tester", db_obj)
+            assert type(out) == int, 'get_latest_version FAILED'
+        except Exception as err:
+            fxn_name = "get_latest_version"
+            assert str(err) == "Failed to execute query in get_latest_version_trainingjob_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+            assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+    def test_get_all_distinct_trainingjobs(self):
+        db_obj = db_helper([["usecase_name"]])
+        out = get_all_distinct_trainingjobs(db_obj)
+        assert type(out) == list, 'get_all_distinct_trainingjobs FAILED'
+    
+    def test_negative_get_all_distinct_trainingjobs(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["usecase_name"]],  raise_exception=True,check_success_obj=checker)
+            out = get_all_distinct_trainingjobs(db_obj)
+            assert type(out) == list, 'get_all_distinct_trainingjobs FAILED'
+        except Exception as err:
+            fxn_name = "get_all_distinct_trainingjobs"
+            assert str(err) == "Failed to execute query in get_all_distinct_trainingjobsDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+            assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+
+    def test_get_all_version_num_by_trainingjob_name(self):
+        db_obj = db_helper([["version"]])
+        out = get_all_version_num_by_trainingjob_name("Tester", db_obj)
+        assert type(out) == list, 'get_all_version_num_by_trainingjob_name FAILED'
+    
+    def test_negative_get_all_version_num_by_trainingjob_name(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["version"]], raise_exception=True,check_success_obj=checker)
+            out = get_all_version_num_by_trainingjob_name("Tester", db_obj)
+            assert type(out) == list, 'get_all_version_num_by_trainingjob_name FAILED'
+        except Exception as err:
+            fxn_name = "get_all_version_num_by_trainingjob_name"
+            assert str(err) == "Failed to execute query in get_all_version_num_by_trainingjob_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+            assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+    def test_update_model_download_url(self):
+        checker = Check()
+        db_obj = db_helper([[None]], check_success_obj=checker)
+        update_model_download_url("Tester", 1, "http/dummy/url", db_obj)
+        assert checker.finished, 'update_model_download_url FAILED'
+
+    def test_negative_update_model_download_url(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([[None]], raise_exception=True ,check_success_obj=checker)
+            update_model_download_url("Tester", 1, "http/dummy/url", db_obj)
+            assert checker.finished, 'update_model_download_url FAILED'
+        except Exception as err:
+            fxn_name = "update_model_download_url"
+            assert str(err) == "Failed to execute query in update_model_download_urlDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+            assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+
+    def test_add_update_trainingjob(self):
+        checker = Check()
+        db_obj = db_helper([[None]], check_success_obj=checker)
+        add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, True, 1, 'InfluxSource', 'Tester',db_obj)
+        assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
+    
+    def test_negative_add_update_trainingjob_2(self):
+        checker = Check()
+        db_obj = db_helper([[None]], check_success_obj=checker)
+        add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, False, 1, 'InfluxSource', 'Tester',db_obj)
+        assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
+    
+    def test_negative_add_update_trainingjob_3(self):
+        checker = Check()
+        db_obj = db_helper([[None]], check_success_obj=checker)
+        try:
+            add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', False, True, 1, 'InfluxSource', 'Tester',db_obj)
+            assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
+            assert False
+        except Exception:
+            assert True
+
+    def test_negative_add_update_trainingjob_4(self):
+        checker = Check()
+        db_obj = db_helper([[None]], check_success_obj=checker)
+        try:
+            add_update_trainingjob('Testing', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', False, False, 1, 'InfluxSource', 'Tester',db_obj)
+            assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
+            assert False
+        except Exception:
+            assert True
+
+    def test_negative_add_update_trainingjob_5(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([[None]], raise_exception=True, check_success_obj=checker)
+            add_update_trainingjob('Testing ', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, True, 'Near RT-RIC', 1, 'InfluxSource', 'Tester'
+            ,True, '', '', db_obj, '', 'liveCell', 'UEData')
+            assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
+        except Exception as err:
+            fxn_name = "add_update_trainingjob"
+            assert str(err) == "add_update_trainingjob() takes from 12 to 15 positional arguments but 19 were given", 'Negative test {} FAILED when  adding = True , Doesnt returned required error'.format(fxn_name)
+    
+    def test_get_all_jobs_latest_status_version(self):
+        db_obj = db_helper([["usecase_name"]])
+        out = get_all_jobs_latest_status_version(db_obj)
+        assert out == [['Tester']], 'get_all_distinct_trainingjobs FAILED'
+    
+    def test_negative_get_all_jobs_latest_status_version(self):
+        checker = Check()
+        try:
+            db_obj = db_helper([["usecase_name"]],  raise_exception=True,check_success_obj=checker)
+            out = get_all_jobs_latest_status_version(db_obj)
+            assert type(out) == list, 'get_all_jobs_latest_status_version FAILED'
+        except Exception as err:
+            fxn_name = "get_all_jobs_latest_status_version"
+            assert str(err) == "Failed to execute query in get_all_jobs_latest_status_versionDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+            assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
+    
+    def test_get_all_versions_info_by_name(self):
+        trainingjob_name = "usecase556"
+        db_obj = db_helper([["usecase_name"]])
+        out = get_all_versions_info_by_name(trainingjob_name,db_obj)
+        assert out == [['Tester']], 'get_all_versions_info_by_name FAILED'
+    
+    def test_negative_get_all_versions_info_by_name(self):
+        checker = Check()
+        trainingjob_name = "usecase556"
+        my_dict = dict([(1,'apple'), (2,'ball')])
+        db_obj = db_helper([["usecase_name"]])
+        try:
+            db_obj = db_helper([["usecase_name"]],  raise_exception=True,check_success_obj=checker)
+            out = get_all_versions_info_by_name(trainingjob_name,db_obj)
+            assert type(out) == list, 'get_all_jobs_latest_status_version FAILED'
+        except Exception as err:
+            fxn_name = "get_all_versions_info_by_name"
+            assert str(err) == "Failed to execute query in get_all_versions_info_by_nameDB Error", 'Negative test {} FAILED, Doesnt returned required error'.format(fxn_name)
+            assert checker.finished, 'Cursor Not Closed Properly for fxn {} | Negative Test'.format(fxn_name)
index 1689a47..8c84793 100644 (file)
@@ -16,6 +16,7 @@
 #
 # ==================================================================================
 import json
+import requests
 from unittest import mock
 from mock import patch
 import pytest
@@ -27,18 +28,267 @@ import sys
 import datetime
 from flask_api import status
 from dotenv import load_dotenv
+from threading import Lock
 from trainingmgr import trainingmgr_main 
 from trainingmgr.common.tmgr_logger import TMLogger
-trainingmgr_main.LOGGER = TMLogger("./tests/common/conf_log.yaml").logger
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+trainingmgr_main.LOGGER = pytest.logger
 trainingmgr_main.LOCK = Lock()
 trainingmgr_main.DATAEXTRACTION_JOBS_CACHE = {}
+
+class Test_upload_pipeline:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+
+    mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
+    attrs_TRAININGMGR_CONFIG_OBJ = {'kf_adapter_ip.return_value': '123', 'kf_adapter_port.return_value' : '100'}
+    mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
+    @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
+    def test_upload_pipeline_negative(self, mock1):
+        trainingmgr_main.LOGGER.debug("*******  *******")
+        expected_data = "result"
+        trainingjob_req = {
+                    "pipe_name":"usecase1",
+                    }
+        response = self.client.post("/pipelines/<pipe_name>/upload".format("usecase1"), data=json.dumps(trainingjob_req),
+                                    content_type="application/json")
+
+        trainingmgr_main.LOGGER.debug(response.data)
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+        assert expected_data in response.json.keys() 
+
+
+class Test_data_extraction_notification:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+
+    db_result2 = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
+    '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
+    '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
+    datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False)]
+
+    de_response2 = Response()
+    de_response2.code = "expired"
+    de_response2.error_type = "expired"
+    de_response2.status_code = status.HTTP_200_OK
+    de_response2.headers={"content-type": "application/json"}
+    de_response2._content = b'{"task_status": "Completed", "result": "Data Extraction Completed"}'
+    resp= ({"str1":"rp1","str2":"rp2"} ,status.HTTP_200_OK)
+    
+    @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result2)  
+    @patch('trainingmgr.trainingmgr_main.training_start', return_value = de_response2)
+    @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')  
+    @patch('trainingmgr.trainingmgr_main.change_field_of_latest_version')        
+    @patch('trainingmgr.trainingmgr_main.change_in_progress_to_failed_by_latest_version', return_value = True)
+    @patch('trainingmgr.trainingmgr_main.response_for_training', return_value = resp) 
+    def test_data_extraction_notification(self, mock1, mock2, mock3, mock4, mock5, mock6):
+        trainingmgr_main.LOGGER.debug("******* Data_Extraction_Notification *******")
+        trainingjob_req = {
+                    "trainingjob_name":"usecase1",
+                    }
+        expected_data = "Data Extraction Completed"
+        response = self.client.post("/trainingjob/dataExtractionNotification".format("usecase1"),
+                                    data=json.dumps(trainingjob_req),
+                                    content_type="application/json")
+        trainingmgr_main.LOGGER.debug(response.data)
+        assert response.status_code == status.HTTP_200_OK
+        
+class Test_trainingjobs_operations:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+
+    db_result2 = [('usecase2', 'version2', '{"overall_status":"status_ok"}')]
+    @patch('trainingmgr.trainingmgr_main.get_all_jobs_latest_status_version', return_value = db_result2)
+    @patch('trainingmgr.trainingmgr_main.get_one_word_status', return_value = "status OK")
+    def test_trainingjobs_operations(self,mock1,mock2):
+        trainingmgr_main.LOGGER.debug("******* test_trainingjobs_operations get *******")
+        expected_data = '{"trainingjobs": [{"trainingjob_name": "usecase2", "version": "version2", "overall_status": "status OK"}]}'
+        response = self.client.get("/trainingjobs/latest",content_type="application/json")
+        trainingmgr_main.LOGGER.debug(response.data)    
+        assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+        assert expected_data in str(response.data)
+
+    db_result3 = [] 
+    @patch('trainingmgr.trainingmgr_main.get_all_jobs_latest_status_version', return_value = db_result3)
+    @patch('trainingmgr.trainingmgr_main.get_one_word_status', return_value = "status OK")
+    def test_trainingjobs_operations_get_exception(self,mock1,mock2):
+        trainingmgr_main.LOGGER.debug("******* test_trainingjobs_operations get exception*******")
+        expected_data = "Failed to fetch training job info from db"
+        response = self.client.get("/trainingjobs/latest",content_type="application/json")
+        trainingmgr_main.LOGGER.debug(response.data)    
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal"
+        assert expected_data in str(response.data)
+
+class Test_pipeline_notification:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+        
+    mocked_mm_sdk=mock.Mock(name="MM_SDK")
+    attrs_mm_sdk = {'check_object.return_value': True}
+    mocked_mm_sdk.configure_mock(**attrs_mm_sdk)
+    mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
+    attrs_TRAININGMGR_CONFIG_OBJ = {'my_ip.return_value': 123, 'my_port.return_value' : 100}
+    mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
+    message1="Pipeline notification success."
+    code1=status.HTTP_200_OK
+    response_tuple1=({"result": message1}, code1)
+    @patch('trainingmgr.trainingmgr_main.MM_SDK', return_value = mocked_mm_sdk)
+    @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ) 
+    @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
+    @patch('trainingmgr.trainingmgr_main.update_model_download_url')
+    @patch('trainingmgr.trainingmgr_main.get_latest_version_trainingjob_name', return_value = "usecase1")
+    @patch('trainingmgr.trainingmgr_main.response_for_training', return_value = response_tuple1)
+    def test_pipeline_notification(self,mock1, mock2, mock3, mock4, mock5, mock6):
+        trainingmgr_main.LOGGER.debug("******* test_pipeline_notification post *******")
+        trainingjob_req = {
+                    "trainingjob_name":"usecase1",
+                    "run_status":"Succeeded",
+                    }
+        expected_data = "Pipeline notification success."
+        response = self.client.post("/trainingjob/pipelineNotification".format("usecase1"),data=json.dumps(trainingjob_req),
+                                    content_type="application/json")
+        trainingmgr_main.LOGGER.debug(response.data)    
+        assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+        assert expected_data in str(response.data)
+
+    message2="Pipeline notification -Training failed "
+    code2=status.HTTP_500_INTERNAL_SERVER_ERROR
+    response_tuple2=({"result": message2}, code2)
+    @patch('trainingmgr.trainingmgr_main.MM_SDK', return_value = mocked_mm_sdk)
+    @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ) 
+    @patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
+    @patch('trainingmgr.trainingmgr_main.update_model_download_url')
+    @patch('trainingmgr.trainingmgr_main.get_latest_version_trainingjob_name', return_value = "usecase1")
+    @patch('trainingmgr.trainingmgr_main.response_for_training', return_value = response_tuple2)
+    @patch('trainingmgr.trainingmgr_main.change_in_progress_to_failed_by_latest_version', return_value = True)
+    def test_negative_pipeline_notification(self,mock1, mock2, mock3, mock4, mock5, mock6, mock7):
+        trainingmgr_main.LOGGER.debug("******* test_pipeline_notification post exception*******")
+        trainingjob_req = {
+                    "trainingjob_name":"usecase1",
+                    "run_status":"Not_Succeeded",
+                    }
+        expected_data = "Pipeline notification -Training failed "
+        response = self.client.post("/trainingjob/pipelineNotification".format("usecase1"),
+                                    data=json.dumps(trainingjob_req),
+                                    content_type="application/json")
+        trainingmgr_main.LOGGER.debug(response.data)    
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal"
+        assert expected_data in str(response.data)
+    
+    db_result4 = [("test_data1","test_data2"),("version1")]
+    @patch('trainingmgr.trainingmgr_main.get_field_of_given_version', return_value = db_result4)
+    def test_get_steps_state_2(self,mock1):
+        trainingmgr_main.LOGGER.debug("******* test_get_steps_state get *******")
+        expected_data = "test_data1"
+        response = self.client.get("/trainingjobs/<trainingjob_name>/<version>/steps_state".format("usecase1"),
+                                    content_type="application/json")
+        trainingmgr_main.LOGGER.debug(response.data)    
+        assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+        assert expected_data in str(response.data)
+
+    db_result5 = []
+    @patch('trainingmgr.trainingmgr_main.get_field_of_given_version', return_value = db_result5)
+    def test_negative_get_steps_state_2(self,mock1):
+        expected_data = "Exception"
+        response = self.client.get("/trainingjobs/<trainingjob_name>/<version>/steps_state".format("usecase1"),
+                                    content_type="application/json")
+        trainingmgr_main.LOGGER.debug(response.data)    
+        assert response.status_code == status.HTTP_404_NOT_FOUND, "Return status code NOT equal"
+        assert expected_data in str(response.data)
+
+class Test_get_trainingjob_by_name_version:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+
+    @patch('trainingmgr.trainingmgr_main.get_info_by_version',return_value=[('usecase7', 'auto test', '*', 'prediction with model name', 'Default', '{"arguments": {"epochs": "1", "usecase": "usecase7"}}', 'Enb=20 and Cellnum=6', datetime.datetime(2022, 9, 20,11, 40, 30), '7d09c0bf-7575-4475-86ff-5573fb3c4716', '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FINISHED"}', datetime.datetime(2022, 9, 20, 11, 42, 20), 1, True, 'Near RT RIC', '{"datalake_source": {"CassandraSource": {}}}', '{"datalake_source": {"CassandraSource": {}}}','http://10.0.0.47:32002/model/usecase7/1/Model.zip','','','','','')])
+    @patch('trainingmgr.trainingmgr_main.get_metrics',return_value={"metrics": [{"Accuracy": "0.0"}]})
+    @patch('trainingmgr.trainingmgr_main.get_one_key',return_value='cassandra')
+    def test_get_trainingjob_by_name_version(self,mock1,mock2,mock3):
+        usecase_name = "usecase7"
+        version = "1"
+        response = self.client.get("/trainingjobs/{}/{}".format(usecase_name, version))
+        expected_data = b'{"trainingjob": {"trainingjob_name": "usecase7", "description": "auto test", "feature_list": "*", "pipeline_name": "prediction with model name", "experiment_name": "Default", "arguments": {"epochs": "1", "usecase": "usecase7"}, "query_filter": "Enb=20 and Cellnum=6", "creation_time": "2022-09-20 11:40:30", "run_id": "7d09c0bf-7575-4475-86ff-5573fb3c4716", "steps_state": {"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FINISHED"}, "updation_time": "2022-09-20 11:42:20", "version": 1, "enable_versioning": true, "pipeline_version": "Near RT RIC", "datalake_source": "cassandra", "model_url": "{\\"datalake_source\\": {\\"CassandraSource\\": {}}}", "notification_url": "http://10.0.0.47:32002/model/usecase7/1/Model.zip", "_measurement": "", "bucket": "", "accuracy": {"metrics": [{"Accuracy": "0.0"}]}}}'
+
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == status.HTTP_200_OK, "not equal code"
+        assert response.data == expected_data, "not equal data"
+
+    @patch('trainingmgr.trainingmgr_main.get_info_by_version',return_value=False)
+    @patch('trainingmgr.trainingmgr_main.get_metrics',return_value={"metrics": [{"Accuracy": "0.0"}]})
+    @patch('trainingmgr.trainingmgr_main.get_one_key',return_value='cassandra')
+    def test_negative_get_trainingjob_by_name_version(self,mock1,mock2,mock3):
+        usecase_name = "usecase7"
+        version = "1"
+        response = self.client.get("/trainingjobs/{}/{}".format(usecase_name, version))
+        expected_data = b'{"trainingjob": {"trainingjob_name": "usecase7", "description": "auto test", "feature_list": "*", "pipeline_name": "prediction with model name", "experiment_name": "Default", "arguments": {"epochs": "1", "usecase": "usecase7"}, "query_filter": "Enb=20 and Cellnum=6", "creation_time": "2022-09-20 11:40:30", "run_id": "7d09c0bf-7575-4475-86ff-5573fb3c4716", "steps_state": {"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FINISHED"}, "updation_time": "2022-09-20 11:42:20", "version": 1, "enable_versioning": true, "pipeline_version": "Near RT RIC", "datalake_source": "cassandra", "model_url": "{\\"datalake_source\\": {\\"CassandraSource\\": {}}}", "notification_url": "http://10.0.0.47:32002/model/usecase7/1/Model.zip", "_measurement": "", "bucket": "", "accuracy": {"metrics": [{"Accuracy": "0.0"}]}}}'
+
+        trainingmgr_main.LOGGER.debug(expected_data)
+        trainingmgr_main.LOGGER.debug(response.data)
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == 404, "not equal code"
+
+class Test_unpload_pipeline:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+    
+    def test_negative_upload_pipeline(self):
+        pipeline_name = "qoe"
+        response = self.client.post("/pipelines/{}/upload".format(pipeline_name))
+        expected = "jjjj"
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == 500, "not equal code"
+
+    @patch('trainingmgr.trainingmgr_main.LOGGER.debug', return_value = True)
+    def test_negative_upload_pipeline_2(self,mock1):
+        pipeline_name = "qoe"
+        response = self.client.post("/pipelines/{}/upload".format(pipeline_name))
+        expected = ValueError("file not found in request.files")
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == 500, "not equal code"
+
+class Test_get_steps_state:
+      def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+      
+      @patch('trainingmgr.trainingmgr_main.get_field_of_given_version',return_value=[['data_extracted','data_pending'], ['data1','data2']])
+      def test_get_steps_state(self,mock1):
+          usecase_name = "usecase7"
+          version = "1" 
+          response = self.client.get("/trainingjobs/{}/{}/steps_state".format(usecase_name, version))
+          expected_data = b'data_extracted'
+          assert response.content_type == "application/json", "not equal content type"
+          assert response.status_code == status.HTTP_200_OK, "not equal code"
+          assert response.data == expected_data, "not equal data"
+
+      @patch('trainingmgr.trainingmgr_main.get_field_of_given_version',return_value=False)
+      def test_negative_get_steps_state(self,mock1):
+          usecase_name = "usecase7"
+          version = "1" 
+          response = self.client.get("/trainingjobs/{}/{}/steps_state".format(usecase_name, version))
+          expected_data = b'data_extracted'
+          assert response.content_type == "application/json", "not equal content type"
+          assert response.status_code == 404, "not equal code"
+    
+      @patch('trainingmgr.trainingmgr_main.get_field_of_given_version',return_value=Exception("Not found given trainingjob with version"))
+      def test_negative_get_steps_state_2(self,mock1):
+          usecase_name = "usecase7"
+          version = "1" 
+          response = self.client.get("/trainingjobs/{}/{}/steps_state".format(usecase_name, version))
+          expected_data = b'data_extracted'
+          assert response.status_code == 500, "not equal code"
+          
 class Test_training_main:
     def setup_method(self):
         self.client = trainingmgr_main.APP.test_client(self)
         self.logger = trainingmgr_main.LOGGER
 
-    ## Postive_1
-    #@pytest.mark.skip()
     @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False)
     @patch('trainingmgr.trainingmgr_main.add_update_trainingjob')
     def test_trainingjob_operations(self,mock1,mock2):
@@ -68,14 +318,12 @@ class Test_training_main:
         assert response.data == expected_data
         assert response.status_code == status.HTTP_201_CREATED, "Return status code NOT equal" 
 
-    ## Postive_2
     db_result = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
      '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
       '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
        datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False)]
     
     training_data = ('','','','','','','','','','','')
-    #@pytest.mark.skip()
     @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
     @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result)
     @patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = training_data)
@@ -104,12 +352,10 @@ class Test_training_main:
         response = self.client.put("/trainingjobs/<trainingjob_name>".format("usecase1"),
                                     data=json.dumps(trainingjob_req),
                                     content_type="application/json")
-        trainingmgr_main.LOGGER.debug(response.data)    
-        
+        trainingmgr_main.LOGGER.debug(response.data)        
         assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal" 
         assert expected_data in str(response.data)
 
-    #@pytest.mark.skip()
     @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
     def test_negative_trainingjob_operations_post_conflit(self,mock1):
         trainingmgr_main.LOGGER.debug("******* test_negative_trainingjob_operations_post_conflit *******")
@@ -134,15 +380,11 @@ class Test_training_main:
         response = self.client.post("/trainingjobs/<trainingjob_name>".format("usecase1"),
                                     data=json.dumps(trainingjob_req),
                                     content_type="application/json")
-        trainingmgr_main.LOGGER.debug(response.data)    
-        
+        trainingmgr_main.LOGGER.debug(response.data)           
         assert response.status_code == status.HTTP_409_CONFLICT, "Return status code NOT equal"
         assert expected_data in str(response.data)
 
 
-    # ***** Start training test ****
-    # Positive_1
-    #@pytest.mark.skip()
     db_result = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
     '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
     '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
@@ -156,7 +398,6 @@ class Test_training_main:
     de_response.headers={"content-type": "application/json"}
     de_response._content = b'{"task_status": "Completed", "result": "Data Pipeline Execution Completed"}'
 
-    #@pytest.mark.skip()
     @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
     @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result)
     @patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response)
@@ -170,8 +411,6 @@ class Test_training_main:
         assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
         assert expected_data in str(response.data) 
 
-
     db_result1 = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
     '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
     '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
@@ -195,4 +434,229 @@ class Test_training_main:
                                     content_type="application/json")
         trainingmgr_main.LOGGER.debug(response.data)
         assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal" 
-        assert expected_data in str(response.data) 
\ No newline at end of file
+        assert expected_data in str(response.data) 
+
+class Test_get_versions_for_pipeline:
+    @patch('trainingmgr.common.trainingmgr_config.TMLogger', return_value = TMLogger("tests/common/conf_log.yaml"))
+    def setup_method(self,mock1,mock2):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+        load_dotenv('tests/test.env')
+        self.TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()   
+
+    the_response = Response()
+    the_response.code = "expired"
+    the_response.error_type = "expired"
+    the_response.status_code = 200
+    the_response.headers={"content-type": "application/json"}
+    the_response._content = b'{"versions_list": ["football", "baseball"]}'
+    
+    @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response)
+    def test_get_versions_for_pipeline(self,mock1):
+        
+        response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline"))     
+        trainingmgr_main.LOGGER.debug(response.data)
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == 500, "Return status code NOT equal"   
+        
+    @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = requests.exceptions.ConnectionError('Mocked error'))
+    def test_negative_get_versions_for_pipeline_1(self,mock1):
+        response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline"))       
+        print(response.data)
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+        
+    @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = TypeError('Mocked error'))
+    def test_negative_get_versions_for_pipeline_2(self,mock1):
+        response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline"))      
+        print(response.data)
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+    the_response1 = Response()
+    the_response1.code = "expired"
+    the_response1.error_type = "expired"
+    the_response1.status_code = 200
+    the_response1.headers={"content-type": "application/text"}
+    the_response._content = b'{"versions_list": ["football", "baseball"]}'
+    @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response1)
+    def test_negative_get_versions_for_pipeline_3(self,mock1):
+        response = self.client.get("/pipelines/{}/versions".format("qoe_pipeline"))       
+        print(response.data)
+        assert response.content_type != "application/text", "not equal content type"
+    
+class Test_get_all_pipeline_names:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+
+    the_response = Response()
+    the_response.code = "expired"
+    the_response.error_type = "expired"
+    the_response.status_code = 200
+    the_response.headers={"content-type": "application/json"}
+    the_response._content = b'{ "exp1":"id1","exp2":"id2"}'
+    @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response)
+    def test_get_all_pipeline_names(self,mock1):
+        response = self.client.get("/pipelines")      
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == 500, "Return status code NOT equal"   
+        
+    @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = requests.exceptions.ConnectionError('Mocked error'))
+    def test_negative_get_all_pipeline_names_1(self,mock1):
+        response = self.client.get("/pipelines")       
+        print(response.data)
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+        
+    @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = TypeError('Mocked error'))
+    def test_negative_get_all_pipeline_names_2(self,mock1):
+        response = self.client.get("/pipelines")       
+        print(response.data)
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+    the_response1 = Response()
+    the_response1.code = "expired"
+    the_response1.error_type = "expired"
+    the_response1.status_code = 200
+    the_response1.headers={"content-type": "application/text"}
+    the_response1._content = b'{ "exp1":"id1","exp2":"id2"}'
+    @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response1)
+    def test_negative_get_all_pipeline_names_3(self,mock1):
+        response = self.client.get("/pipelines")       
+        print(response.data)
+        assert response.content_type != "application/text", "not equal content type"
+
+class Test_get_all_exp_names:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+
+    the_response = Response()
+    the_response.code = "expired"
+    the_response.error_type = "expired"
+    the_response.status_code = 200
+    the_response.headers={"content-type": "application/json"}
+    the_response._content = b'{ "exp1":"id1","exp2":"id2"}'
+    @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response)
+    def test_get_all_experiment_names(self,mock1):
+        response = self.client.get("/experiments")      
+        print(response.data)
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == 500, "Return status code NOT equal"   
+        
+    @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = requests.exceptions.ConnectionError('Mocked error'))
+    def test_negative_get_all_experiment_names_1(self,mock1):
+        response = self.client.get("/experiments")
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+    @patch('trainingmgr.trainingmgr_main.requests.get', side_effect = TypeError('Mocked error'))
+    def test_negative_get_all_experiment_names_2(self,mock1):
+        response = self.client.get("/experiments")       
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+    
+    the_response1 = Response()
+    the_response1.code = "expired"
+    the_response1.error_type = "expired"
+    the_response1.status_code = 200
+    the_response1.headers={"content-type": "application/text"}
+    the_response1._content = b'{ "exp1":"id1","exp2":"id2"}'
+    @patch('trainingmgr.trainingmgr_main.requests.get', return_value = the_response1)
+    def test_negative_get_all_experiment_names_3(self,mock1):
+        response = self.client.get("/experiments")       
+        assert response.content_type != "application/text", "not equal content type"
+
+class Test_get_metadata:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+    
+    resulttt = [('usecase7', '1','auto test',
+           '*','prediction with model name',
+           'Default','Enb=20 and Cellnum=6','epochs:1','FINISHED',
+           '{"metrics": "FINISHED"}','Near RT RIC','1',
+           'Cassandra DB','usecase7', '1','auto test','*',
+           'prediction with model name',
+           'Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}',
+            'Default',False,'Cassandra DB','usecase7', '1','auto test','*','prediction with model name',
+           'Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}',
+           'Near RT RIC','3','Cassandra DB','usecase7', '1','auto test','*',
+            'prediction with model name','Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}','Near RT RIC','3','Cassandra DB')
+             ]
+    mock_uc_config_obj = mock.Mock(name='mocked uc_config_obj')
+    @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', return_value = resulttt)
+    @patch('trainingmgr.trainingmgr_main.get_metrics', return_value = 90)
+    @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mock_uc_config_obj)
+    def test_get_metadata(self,mock1,mock2,mock3):
+        usecase_name = "usecase7"
+        response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name))
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+
+    @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', side_effect = Exception('Mocked error'))
+    def test_negative_get_metadata_1(self,mock1):
+        usecase_name = "usecase7"
+        response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name))
+        
+        print(response.data)
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Should have thrown the exception "
+
+    class Test_get_model:
+         def setup_method(self):
+            self.client = trainingmgr_main.APP.test_client(self)
+            trainingmgr_main.LOGGER = TMLogger("tests/common/conf_log.yaml").logger
+            self.logger = trainingmgr_main.LOGGER
+        
+         @patch('trainingmgr.trainingmgr_main.send_file', return_value = 'File')
+         def test_negative_get_model(self,mock1):
+            trainingjob_name = "usecase777"
+            version = 2
+            result = 'File'
+            response = trainingmgr_main.get_model(trainingjob_name,version)
+            assert response[1] == 500, "The function get_model Failed" 
+
+class Test_get_metadata_1:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+    
+    resulttt = [('usecase7', '1','auto test',
+           '*','prediction with model name',
+           'Default','Enb=20 and Cellnum=6','epochs:1','FINISHED',
+           '{"metrics": "FINISHED"}','Near RT RIC','1',
+           'Cassandra DB','usecase7', '1','auto test','*',
+           'prediction with model name',
+           'Default',False,'Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}',
+            'Default',False,'Cassandra DB','usecase7', '1','auto test','*','prediction with model name',
+           'Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}',
+           'Near RT RIC','3','Cassandra DB','usecase7', '1','auto test','*',
+            'prediction with model name','Default','Enb=20 and Cellnum=6','epochs:1','{"metrics": [{"Accuracy": "0.0"}]}','Near RT RIC','3','Cassandra DB')
+             ]
+
+    mock_uc_config_obj = mock.Mock(name='mocked uc_config_obj')
+    @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', return_value = resulttt)
+    @patch('trainingmgr.trainingmgr_main.get_metrics', return_value = 90)
+    @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mock_uc_config_obj)
+    def test_get_metadata(self,mock1,mock2,mock3):
+        usecase_name = "usecase7"
+        response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name))  
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
+
+    @patch('trainingmgr.trainingmgr_main.get_all_versions_info_by_name', return_value = None)
+    def test_negative_get_metadata_1(self,mock1):
+        usecase_name = "usecase7"
+        response = self.client.get("/trainingjobs/metadata/{}".format(usecase_name)) 
+        print(response.data)
+        assert response.content_type == "application/json", "not equal content type"
+        assert response.status_code == status.HTTP_404_NOT_FOUND, "Should have thrown the exception "
+
+    @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False)
+    def test_training_negative_de_notfound(self,mock1):
+        trainingmgr_main.LOGGER.debug("******* test_training_404_NotFound *******")
+        expected_data = ''
+        response = self.client.post("/trainingjobs/<trainingjob_name>/training".format("usecase1"),
+                                    content_type="application/json")
+        trainingmgr_main.LOGGER.debug(response.data)
+        assert response.status_code == status.HTTP_404_NOT_FOUND, "Return status code NOT equal"
diff --git a/tests/test_trainingmgr_config.py b/tests/test_trainingmgr_config.py
new file mode 100644 (file)
index 0000000..ce944cd
--- /dev/null
@@ -0,0 +1,106 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+import json
+import requests
+from unittest import mock
+from mock import patch
+import pytest
+import flask
+from requests.models import Response
+from threading import Lock
+import os
+import sys
+import datetime
+from flask_api import status
+from dotenv import load_dotenv
+from trainingmgr.common import trainingmgr_config
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+from trainingmgr.common.tmgr_logger import TMLogger
+from trainingmgr import trainingmgr_main
+trainingmgr_main.LOGGER = pytest.logger
+
+class Test_trainingmgr_config:
+    @patch('trainingmgr.common.trainingmgr_config.TMLogger', return_value = TMLogger("tests/common/conf_log.yaml"))
+    def setup_method(self,mock1,mock2):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+        load_dotenv('tests/test.env')
+        self.TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()   
+   
+    def test_kf_adapter_port(self):
+        expected_data = '5001'
+        result = self.TRAININGMGR_CONFIG_OBJ.kf_adapter_port
+        assert result == expected_data
+
+    def test_kf_adapter_ip(self):
+        expected_data = 'localhost'
+        result = self.TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
+        assert result == expected_data
+
+    def test_data_extraction_port(self):
+        expected_data = '32000'
+        result = self.TRAININGMGR_CONFIG_OBJ.data_extraction_port
+        assert result == expected_data
+
+    def test_data_extraction_ip(self):
+        expected_data = 'localhost'
+        result = self.TRAININGMGR_CONFIG_OBJ.data_extraction_ip
+        assert result == expected_data
+
+    def test_my_port(self):
+        expected_data = '32002'
+        x = TrainingMgrConfig
+        result = self.TRAININGMGR_CONFIG_OBJ.my_port
+        assert result == expected_data
+
+    def test_my_ip(self):
+        expected_data = 'localhost'
+        result = self.TRAININGMGR_CONFIG_OBJ.my_ip
+        assert result == expected_data
+    
+    def test_logger(self):
+        expected_data = TMLogger("tests/common/conf_log.yaml").logger
+        result = self.TRAININGMGR_CONFIG_OBJ.logger
+        assert result == expected_data
+
+    def test_ps_user(self):
+        expected_data = 'postgres'
+        result = self.TRAININGMGR_CONFIG_OBJ.ps_user
+        assert result == expected_data
+
+    def test_ps_password(self):
+        expected_data = "abcd"
+        result = self.TRAININGMGR_CONFIG_OBJ.ps_password
+        assert result == expected_data
+
+    def test_ps_ip(self):
+        expected_data = 'localhost'
+        result = self.TRAININGMGR_CONFIG_OBJ.ps_ip
+        assert result == expected_data
+
+    def test_ps_port(self):
+        expected_data = '30001'
+        x = TrainingMgrConfig
+        result = self.TRAININGMGR_CONFIG_OBJ.ps_port
+        assert result == expected_data
+
+    def test_is_config_loaded_properly(self):
+        expected_data = True
+        result = TrainingMgrConfig.is_config_loaded_properly(self.TRAININGMGR_CONFIG_OBJ)
+        assert result == expected_data
+
diff --git a/tests/test_trainingmgr_operations.py b/tests/test_trainingmgr_operations.py
new file mode 100644 (file)
index 0000000..54eb4fe
--- /dev/null
@@ -0,0 +1,87 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+import json
+import requests
+from unittest import mock
+from mock import patch
+import pytest
+import flask
+from requests.models import Response
+from threading import Lock
+import os
+import sys
+import datetime
+from flask_api import status
+from dotenv import load_dotenv
+from threading import Lock
+from trainingmgr import trainingmgr_main 
+from trainingmgr.common import trainingmgr_operations
+from trainingmgr.common.tmgr_logger import TMLogger
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+trainingmgr_main.LOGGER = pytest.logger
+trainingmgr_main.LOCK = Lock()
+trainingmgr_main.DATAEXTRACTION_JOBS_CACHE = {}
+
+class DummyVariable:
+        kf_adapter_ip = "localhost"
+        kf_adapter_port = 5001
+        logger = trainingmgr_main.LOGGER
+
+class Test_training_start:
+    def setup_method(self): 
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+    
+    def test_negative_training_start(self):
+        training_config_obj =  DummyVariable()
+        dict_data = {
+                        "brand": "Ford",
+                        "model": "Mustang",
+                        "year": 1964
+                    }
+        trainingjob_name = "usecase12"
+        expected_data = {
+                        "brand": "Ford",
+                        "model": "Mustang",
+                        "year": 1964
+                    }
+        try:
+            response = trainingmgr_operations.training_start(training_config_obj,dict_data,trainingjob_name)
+            assert response == expected_data,"data not equal"
+            assert False
+        except Exception:
+            assert True
+
+class Test_upload_pipeline:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+
+    mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
+    attrs_TRAININGMGR_CONFIG_OBJ = {'kf_adapter_ip.return_value': '123', 'kf_adapter_port.return_value' : '100'}
+    mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
+    @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
+    def test_upload_pipeline_negative(self, mock1):
+        expected_data = "result"
+        trainingjob_req = {
+                    "pipe_name":"usecase1",
+                    }
+        response = self.client.post("/pipelines/<pipe_name>/upload".format("usecase1"), data=json.dumps(trainingjob_req),
+                                    content_type="application/json")
+        assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
+        assert expected_data in response.json.keys() 
diff --git a/tests/test_trainingmgr_ps_db.py b/tests/test_trainingmgr_ps_db.py
new file mode 100644 (file)
index 0000000..2355829
--- /dev/null
@@ -0,0 +1,87 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+import pytest
+import sys
+import os
+from mock import patch
+from trainingmgr.db.trainingmgr_ps_db import PSDB
+from dotenv import load_dotenv
+
+class cred_handle:
+    def __init__(self):
+        self.ps_user = "gdgdgd"
+        self.ps_password = "hdhd"
+        self.ps_ip = 12345
+        self.ps_port = 1000
+    
+    def logger(self):
+        return cred_handle()
+    
+    def error(self):
+        raise Exception('Connection Failed To Exist')
+
+class Cursor:
+    def __init__(self, db_name):
+        self.db_name = db_name
+
+    def execute(self, query):
+        pass
+
+    def fetchall(self):
+        return [self.db_name]
+    
+    def close(self):
+        pass
+
+class connection:
+    def __init__(self, db_name):
+        self.autocommit = False
+        self.db_name = db_name
+    
+    def cursor(self):
+        return Cursor(self.db_name)
+    
+    def rollback(self):
+        pass
+
+    def commit(self):
+        pass
+
+    def close(self):
+        pass
+
+class Test_PSDB:
+    @patch('trainingmgr.db.trainingmgr_ps_db.pg8000.dbapi.connect', return_value = connection('usecase_manager_database'))
+    def setup_method(self,mock1,mock2):
+        self.obj = PSDB(cred_handle())
+
+    def test_init_psdb(self):
+        assert self.obj != None, 'PSDB Object Creation Failed'
+
+    @patch('trainingmgr.db.trainingmgr_ps_db.pg8000.dbapi.connect', return_value = connection('usecase_manager_database'))
+    def test_get_new_conn(self, mock1):
+        out =  self.obj.get_new_conn()
+        assert out != None, 'New Connection Failed'
+
+    def test_negative_get_new_conn(self):
+        try:
+            out =  self.obj.get_new_conn()
+            assert out != None, 'New Connection Failed'
+            assert False
+        except Exception:
+            assert True
\ No newline at end of file
diff --git a/tests/test_trainingmgr_util.py b/tests/test_trainingmgr_util.py
new file mode 100644 (file)
index 0000000..3721fed
--- /dev/null
@@ -0,0 +1,640 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+""""
+This file contains the unittesting for Training management utility functions
+"""
+from pickle import FALSE
+import sys
+from mock import patch
+from threading import Lock
+import pytest
+import datetime
+from dotenv import load_dotenv
+from flask_api import status
+import logging
+
+from trainingmgr.db.trainingmgr_ps_db import PSDB
+import trainingmgr.trainingmgr_main
+from trainingmgr.common import trainingmgr_util 
+from trainingmgr.common.tmgr_logger import TMLogger
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+from trainingmgr.common.trainingmgr_util import response_for_training, check_key_in_dictionary,check_trainingjob_data, \
+    get_one_key, get_metrics, handle_async_feature_engineering_status_exception_case, get_one_word_status, check_trainingjob_data, \
+    validate_trainingjob_name
+from requests.models import Response   
+from trainingmgr import trainingmgr_main
+from trainingmgr.common.tmgr_logger import TMLogger
+trainingmgr_main.LOGGER = pytest.logger
+
+class Test_response_for_training:
+    def setup_method(self):
+        self.client = trainingmgr_main.APP.test_client(self)
+        self.logger = trainingmgr_main.LOGGER
+
+    fs_result = Response()
+    fs_result.code = "expired"
+    fs_result.error_type = "expired"
+    fs_result.status_code = status.HTTP_200_OK
+    fs_result.headers={'content-type': 'application/json'}
+    fs_result._content={'Accept-Charset': 'UTF-8'}
+
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+    @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)  
+    @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+    def test_response_for_training(self,mock1,mock2, mock3, mock4, mock5, mock6):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr.trainingmgr_main.LOGGER
+        is_success = True
+        trainingjob_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        result = response_for_training(code, message, logger, is_success, trainingjob_name, ps_db_obj, mm_sdk)
+        assert result != None
+
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+    @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)  
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+    def test_negative_response_for_training(self,mock1,mock2, mock3, mock4, mock5):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+            response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+            assert False
+        except Exception:
+            assert True
+    
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+    @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)  
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+    def test_negative_response_for_training_2(self,mock1,mock2, mock3, mock4, mock5):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+            response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+            assert False
+        except Exception:
+            assert True
+
+    def test_negative_response_for_training_3(self):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+            response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+            assert False
+        except Exception:
+            assert True
+    
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    def test_negative_response_for_training_4(self,mock1,mock2):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+            response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+            assert False
+        except Exception:
+            assert True
+    
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+    def test_negative_response_for_training_5(self,mock1,mock2,mock3):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+            response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+            assert False
+        except Exception:
+            assert True
+    
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+    @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)  
+    def test_negative_response_for_training_6(self,mock1,mock2,mock3,mock4):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+            response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+            assert False
+        except Exception:
+            assert True
+    
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    def test_negative_response_for_training_7(self,mock1):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+            response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+            assert False
+        except Exception:
+            assert True
+    
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)
+    def test_negative_response_for_training_8(self,mock1,mock2,mock3):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+            response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+            assert False
+        except Exception:
+            assert True
+     
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+    def test_negative_response_for_training_9(self,mock1):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+            response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+            assert False
+        except Exception:
+            assert True
+    
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    def test_negative_response_for_training_10(self,mock1):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecalse7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+            response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+            assert False
+        except Exception:
+            assert True
+    
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+    @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)  
+    @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+    def test_negative_response_for_training_11(self,mock1,mock2,mock3,mock4,mock5):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+             response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+             assert False
+        except Exception:
+             assert True
+
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+    @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)  
+    @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+    def test_negative_response_for_training_12(self,mock1,mock2,mock3,mock4,mock5,mock6):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+             assert False
+        except Exception:
+             assert True
+
+class Test_response_for_training_1:
+    fs_result = Response()
+    fs_result.code = "expired"
+    fs_result.error_type = "expired"
+    fs_result.status_code = status.HTTP_200_OK
+    fs_result.headers={'content-type': 'application/jsn'}
+    fs_result._content={'Accept-Charset': 'UTF-8'}
+
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+    @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)  
+    @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+    def test_response_for_training_1(self,mock1,mock2, mock3, mock4, mock5, mock6):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+             response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+             assert False
+        except Exception:
+             assert True
+
+class Test_response_for_training_2:
+    fs_result = Response()
+    fs_result.code = "expired"
+    fs_result.error_type = "expired"
+    fs_result.status_code = status.HTTP_404_NOT_FOUND
+    fs_result.headers={'content-type': 'application/json'}
+    fs_result._content={'Accept-Charset': 'UTF-8'}
+
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+    @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)  
+    @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+    def test_response_for_training_2(self,mock1,mock2, mock3, mock4, mock5, mock6):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+             response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+             assert False
+        except Exception:
+             assert True
+
+class Test_response_for_training_3:
+    fs_result = Response()
+    fs_result.code = "expired"
+    fs_result.error_type = "expired"
+    fs_result.status_code = status.HTTP_200_OK
+    fs_result.headers={'content-type': 'application/jsn'}
+    fs_result._content={'Accept-Charset': 'UTF-8'}
+
+    @patch('trainingmgr.common.trainingmgr_util.get_field_by_latest_version',return_value=[['www.google.com','h1','h2'], ['www.google.com','h1','h2'], ['www.google.com','h1','h2']])
+    @patch('trainingmgr.common.trainingmgr_util.change_field_of_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.get_metrics',return_value="PRESENT")
+    @patch('trainingmgr.common.trainingmgr_util.get_latest_version_trainingjob_name',return_value=1)  
+    @patch('trainingmgr.common.trainingmgr_util.requests.post',return_value=fs_result)
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version')
+    @patch('requests.post',return_result=Exception(status.HTTP_500_INTERNAL_SERVER_ERROR))
+    def test_negative_response_for_training_3_1(self,mock1,mock2, mock3, mock4, mock5, mock6, mock7):
+        code = status.HTTP_500_INTERNAL_SERVER_ERROR
+        message = "Run status is not scheduled for "
+        logger = trainingmgr_main.LOGGER
+        is_success = True
+        usecase_name = "usecase7"
+        ps_db_obj = ()
+        mm_sdk = ()
+        try:
+             response_for_training(code, message, logger, is_success, usecase_name, ps_db_obj, mm_sdk)
+             assert False
+        except Exception:
+             assert True
+
+class Test_check_key_in_dictionary:
+    def test_check_key_in_dictionary(self):
+        fields = ["model","brand","year"]
+        dictionary =  {
+                                    "brand": "Ford",
+                                    "model": "Mustang",
+                                    "year": 1964
+                      }
+        assert check_key_in_dictionary(fields, dictionary) == True, "data not equal"
+
+    def test_check_key_in_dictionary(self):
+        fields = ["model","brand","type"]
+        dictionary =  {
+                                    "brand": "Ford",
+                                    "model": "Mustang",
+                                    "year": 1964
+                      }
+        assert check_key_in_dictionary(fields, dictionary) == False, "data not equal"
+    
+    def test_negative_check_key_in_dictionary_1(self):
+        fields = ["Ford","Apple","Mosquito"]
+        dictionary =  {
+                                    "brand": "Ford",
+                                    "model": "Mustang",
+                                    "year": 1964
+                      }
+        try:
+            check_key_in_dictionary(fields, dictionary)
+            assert False
+        except Exception:
+            assert True
+
+class Test_check_trainingjob_data:    
+    @patch('trainingmgr.common.trainingmgr_util.check_key_in_dictionary',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.isinstance',return_value=True)  
+    def test_check_trainingjob_data(self,mock1,mock2):
+        usecase_name = "usecase8"
+        json_data = { "description":"unittest", "feature_list": ["apple", "banana", "cherry"] , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "_measurement":2 , "bucket":"bucket1"}
+    
+        expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1')
+        assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal"
+    
+    def test_negative_check_trainingjob_data_1(self):
+        usecase_name = "usecase8"
+        json_data = { "description":"unittest", "feature_list": ["apple", "banana", "cherry"] , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "_measurement":2 , "bucket":"bucket1"}
+    
+        expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1')
+        try:
+            assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal"
+            assert False
+        except Exception:
+            assert True
+
+    @patch('trainingmgr.common.trainingmgr_util.check_key_in_dictionary',return_value=True)
+    def test_negative_check_trainingjob_data_2(self,mock1):
+        usecase_name = "usecase8"
+        json_data = { "description":"unittest", "feature_list": ["apple", "banana", "cherry"] , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "_measurement":2 , "bucket":"bucket1"}
+    
+        expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1')
+        try:
+            assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal"
+            assert False
+        except Exception:
+            assert True
+    
+    @patch('trainingmgr.common.trainingmgr_util.isinstance',return_value=True)
+    def test_negative_check_trainingjob_data_3(self,mock1):
+        usecase_name = "usecase8"
+        json_data = None
+        expected_data = (['apple', 'banana', 'cherry'], 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1')
+        try:
+            assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal"
+            assert False
+        except Exception:
+            assert True
+
+class Test_get_one_key:
+    def test_get_one_key(self):
+        dictionary = {
+                        "brand": "Ford",
+                        "model": "Mustang",
+                        "year": 1964
+                    }
+        only_key = "year"
+        expected_data = only_key
+        assert get_one_key(dictionary) == expected_data,"data not equal"
+    
+    def test_get_one_key_2(self):
+        dictionary = {'name': 'Jack', 'age': 26}
+        only_key = "age"
+        expected_data = only_key
+        assert get_one_key(dictionary) == expected_data,"data not equal"
+    
+    def test_negative_get_one_key_1(self):
+        dictionary = {
+                        "brand": "Ford",
+                        "model": "Mustang",
+                        "year": 1964
+                    }
+        only_key = "model"
+        expected_data = only_key
+        try:
+            assert get_one_key(dictionary) == expected_data,"data not equal"
+            assert False
+        except Exception:
+            assert True
+    
+    def test_negative_get_one_key_2(self):
+        dictionary = {'name': 'Jack', 'age': 26}
+        only_key = "name"
+        expected_data = only_key
+        try:
+            assert get_one_key(dictionary) == expected_data,"data not equal"
+            assert False
+        except Exception:
+            assert True
+
+class dummy_mmsdk:
+    def check_object(self, param1, param2, param3):
+        return True
+    
+    def get_metrics(self, usecase_name, version):
+        thisdict = {
+                     "brand": "Ford",
+                     "model": "Mustang",
+                     "year": 1964
+                    }
+        return thisdict
+    
+class Test_get_metrics:   
+    @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value='usecase_data')
+    def test_get_metrics_with_version(self,mock1):
+        usecase_name = "usecase7"
+        version = 1
+        mm_sdk = dummy_mmsdk()
+        expected_data = 'usecase_data'
+        get_metrics(usecase_name, version, dummy_mmsdk())
+        assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal"
+
+    @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value=None)
+    def test_negative_get_metrics_1(self,mock1):
+        usecase_name = "usecase7"
+        version = 1
+        mm_sdk = dummy_mmsdk()
+        expected_data = 'usecase_data'
+        try:
+            assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal"
+            assert False
+        except Exception:
+            assert True
+    
+    @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value=Exception("Problem while downloading metrics"))
+    def test_negative_get_metrics_2(self,mock1):
+        usecase_name = "usecase7"
+        version = 1
+        mm_sdk = dummy_mmsdk()
+        expected_data = 'usecase_data'
+        try:
+            assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal"
+            assert False
+        except Exception:
+            assert True
+
+    def test_negative_get_metrics_3(self):
+        usecase_name = "usecase7"
+        version = 1
+        mm_sdk = dummy_mmsdk()
+        expected_data = 'final_data'
+        try:
+            get_metrics(usecase_name, version, dummy_mmsdk())
+            assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal"
+            assert False
+        except Exception:
+            assert True
+
+class dummy_mmsdk_1:
+    def check_object(self, param1, param2, param3):
+        return False
+    
+    def get_metrics(self, usecase_name, version):
+        thisdict = {
+                     "brand": "Ford",
+                     "model": "Mustang",
+                     "year": 1964
+                    }
+        return thisdict
+
+class Test_get_metrics_2:   
+    @patch('trainingmgr.common.trainingmgr_util.json.dumps',return_value='usecase_data')
+    def test_negative_get_metrics_2_1(self,mock1):
+        usecase_name = "usecase7"
+        version = 1
+        mm_sdk = dummy_mmsdk_1()
+        expected_data = 'usecase_data'
+        get_metrics(usecase_name, version, dummy_mmsdk())
+        try:
+            get_metrics(usecase_name, version, dummy_mmsdk())
+            assert get_metrics(usecase_name, version, mm_sdk) == expected_data, "data not equal"
+            assert False
+        except Exception:
+            assert True
+
+class Test_handle_async_feature_engineering_status_exception_case:
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.response_for_training',return_value=True)
+    def test_handle_async_feature_engineering_status_exception_case(self, mock1, mock2):
+           lock = Lock()
+           featurestore_job_cache = {'usecase7': 'Geeks', 2: 'For', 3: 'Geeks'}
+           code = 123
+           message = "Into the field" 
+           logger = "123"
+           is_success = True
+           usecase_name = "usecase7"
+           ps_db_obj = () 
+           mm_sdk = ()       
+           assert handle_async_feature_engineering_status_exception_case(lock, featurestore_job_cache, code,
+                                                           message, logger, is_success,
+                                                           usecase_name, ps_db_obj, mm_sdk) == None,"data not equal"
+    
+    @patch('trainingmgr.common.trainingmgr_util.change_in_progress_to_failed_by_latest_version',return_value=True)
+    @patch('trainingmgr.common.trainingmgr_util.response_for_training',return_value=True)
+    # @patch('trainingmgr.common.trainingmgr_util.dataextraction_job_cache',return_value = Exception("Could not get info from db for "))
+    def test_negative_handle_async_feature_engineering_status_exception_case(self, mock1, mock2):
+           lock = Lock()
+           featurestore_job_cache = {'usecase7': 'Geeks', 2: 'For', 3: 'Geeks'}
+           code = 123
+           message = "Into the field" 
+           logger = "123"
+           is_success = True
+           usecase_name = ""
+           ps_db_obj = () 
+           mm_sdk = ()    
+           try:   
+               handle_async_feature_engineering_status_exception_case(lock, featurestore_job_cache, code,
+                                                           message, logger, is_success,
+                                                           usecase_name, ps_db_obj, mm_sdk)
+               assert handle_async_feature_engineering_status_exception_case(lock, featurestore_job_cache, code,
+                                                           message, logger, is_success,
+                                                           usecase_name, ps_db_obj, mm_sdk) == None,"data not equal"
+               assert False
+           except Exception:
+               assert True
+
+class Test_get_one_word_status:
+    def test_get_one_word_status(self):
+           steps_state = [0,1,2,3]
+           expected_data = "IN_PROGRESS"
+           assert get_one_word_status(steps_state) == expected_data,"data not equal"
+
+class Test_validate_trainingjob_name:
+    @patch('trainingmgr.common.trainingmgr_util.get_all_versions_info_by_name',return_value=True)
+    def test_validate_trainingjob_name_1(self,mock1):
+        trainingjob_name = "usecase8"
+        ps_db_obj = ()
+        expected_data = True
+        assert validate_trainingjob_name(trainingjob_name,ps_db_obj) == expected_data,"data not equal"
+
+    @patch('trainingmgr.common.trainingmgr_util.get_all_versions_info_by_name', return_value = Exception("Could not get info from db for "))
+    def test_validate_trainingjob_name_2(self,mock1):
+        trainingjob_name = "usecase8"
+        ps_db_obj = ()
+        expected_data = True
+        try:
+            validate_trainingjob_name(trainingjob_name,ps_db_obj)
+            assert validate_trainingjob_name(trainingjob_name,ps_db_obj) == expected_data,"data not equal"
+            assert False
+        except Exception:
+            assert True
+    
+    def test_negative_validate_trainingjob_name(self):
+        trainingjob_name = "usecase8"
+        ps_db_obj = ()
+        expected_data = True
+        try:
+            validate_trainingjob_name(trainingjob_name,ps_db_obj)
+            assert validate_trainingjob_name(trainingjob_name,ps_db_obj) == expected_data,"data not equal"
+            assert False
+        except Exception:
+            assert True