from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
from trainingmgr.common.exceptions_utls import DBException, TMException
from trainingmgr.models import TrainingJob
+from trainingmgr.models import FeatureGroup
from trainingmgr.common.trainingConfig_parser import getField
trainingmgr_main.LOGGER = pytest.logger
trainingmgr_main.LOCK = Lock()
assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
assert expected_data in response.data
-@pytest.mark.skip("")
class Test_pipeline_notification:
def setup_method(self):
self.client = trainingmgr_main.APP.test_client(self)
self.logger = trainingmgr_main.LOGGER
+ @pytest.fixture
+ def mock_training_job(self):
+ """Create a mock TrainingJob object."""
+ creation_time = datetime.datetime.now()
+ updation_time = datetime.datetime.now()
+ return TrainingJob(
+ trainingjob_name="test_job",
+ )
+
mocked_mm_sdk=mock.Mock(name="MM_SDK")
attrs_mm_sdk = {'check_object.return_value': True, 'get_model_zip.return_value':""}
mocked_mm_sdk.configure_mock(**attrs_mm_sdk)
message1="Pipeline notification success."
code1=status.HTTP_200_OK
response_tuple1=({"result": message1}, code1)
- db_result = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
- '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
- '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
- datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False, False, "","")]
+ @patch('trainingmgr.trainingmgr_main.notification_rapp')
@patch('trainingmgr.trainingmgr_main.MM_SDK', return_value = mocked_mm_sdk)
@patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
@patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
@patch('trainingmgr.trainingmgr_main.update_model_download_url')
- @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value=db_result)
+ @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name')
@patch('trainingmgr.trainingmgr_main.get_latest_version_trainingjob_name', return_value = "usecase1")
@patch('trainingmgr.trainingmgr_main.response_for_training', return_value = response_tuple1)
- def test_pipeline_notification(self,mock1, mock2, mock3, mock4, mock5, mock6, mock7):
+ def test_pipeline_notification(self,mock1, mock2, mock_get_ver_and_name, mock4, mock5, mock6, mock7, mock8, mock_training_job):
trainingmgr_main.LOGGER.debug("******* test_pipeline_notification post *******")
+ mock_get_ver_and_name.return_value = mock_training_job
trainingjob_req = {
"trainingjob_name":"usecase1",
"run_status":"SUCCEEDED",
assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
assert expected_data in str(response.data)
- db_result = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
- '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
- '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
- datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False, True, "","")]
the_response_upload=Response()
the_response_upload.status_code=200
+ @patch('trainingmgr.trainingmgr_main.notification_rapp')
@patch('trainingmgr.trainingmgr_main.MM_SDK', return_value = mocked_mm_sdk)
@patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
@patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
@patch('trainingmgr.trainingmgr_main.update_model_download_url')
- @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value=db_result)
+ @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name')
@patch('trainingmgr.trainingmgr_main.requests.post', return_value=the_response_upload)
@patch('trainingmgr.trainingmgr_main.get_latest_version_trainingjob_name', return_value = "usecase1")
@patch('trainingmgr.trainingmgr_main.response_for_training', return_value = response_tuple1)
- def test_pipeline_notification_mme(self,mock1, mock2, mock3, mock4, mock5, mock6, mock7, mock8):
+ def test_pipeline_notification_mme(self,mock1, mock2, mock3, mock_get_ver_and_name, mock5, mock6, mock7, mock8, mock9, mock_training_job):
trainingmgr_main.LOGGER.debug("******* test_pipeline_notification post *******")
+ mock_get_ver_and_name.return_value = mock_training_job
trainingjob_req = {
"trainingjob_name":"usecase1",
"run_status":"SUCCEEDED",
assert expected_data in str(response.data)
db_result4 = [("test_data1","test_data2"),("version1")]
- @patch('trainingmgr.trainingmgr_main.get_field_of_given_version', return_value = db_result4)
+ @patch('trainingmgr.trainingmgr_main.get_steps_state_db', return_value = db_result4)
def test_get_steps_state_2(self,mock1):
trainingmgr_main.LOGGER.debug("******* test_get_steps_state get *******")
expected_data = "test_data1"
assert expected_data in str(response.data)
db_result5 = []
- @patch('trainingmgr.trainingmgr_main.get_field_of_given_version', return_value = db_result5)
+ @patch('trainingmgr.trainingmgr_main.get_steps_state_db', return_value = db_result5)
def test_negative_get_steps_state_2(self,mock1):
expected_data = "Exception"
response = self.client.get("/trainingjobs/{trainingjobname}/{version}/steps_state".format(trainingjobname="usecase1", version="1"),
assert "Exception" in data
assert "Metrics error" in data["Exception"]
-@pytest.mark.skip("")
class Test_unpload_pipeline:
def setup_method(self):
self.client = trainingmgr_main.APP.test_client(self)
assert "Exception" in data
assert "Database error" in data["Exception"]
-@pytest.mark.skip("")
class Test_training_main:
def setup_method(self):
self.client = trainingmgr_main.APP.test_client(self)
self.logger = trainingmgr_main.LOGGER
+ @pytest.fixture
+ def mock_trainingjob(self):
+ """Create a mock TrainingJob object."""
+ mock_steps_state = MagicMock()
+ mock_steps_state.states = json.dumps({'step1':'completed'})
+ return TrainingJob(
+ trainingjob_name="test_job",
+ deletion_in_progress=False,
+ steps_state=mock_steps_state,
+ )
+
+ @patch('trainingmgr.trainingmgr_main.trainingjob_schema.load')
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False)
@patch('trainingmgr.trainingmgr_main.add_update_trainingjob')
- def test_trainingjob_operations(self,mock1,mock2):
+ @patch('trainingmgr.trainingmgr_main.get_model_info')
+ def test_trainingjob_operations(self,mock1,mock2, mock3, mock_trainingjob_schema_load, mock_trainingjob):
trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******")
+ mock_trainingjob_schema_load.return_value = mock_trainingjob
trainingjob_req = {
- "trainingjob_name":"usecase1",
- "pipeline_name":"qoe Pipeline lat v2",
- "experiment_name":"Default",
- "featureGroup_name":"group",
- "query_filter":"",
- "arguments":{
- "epochs":"1",
- "trainingjob_name":"usecase1"
- },
- "enable_versioning":False,
- "description":"uc1",
- "pipeline_version":"3",
- "datalake_source":"InfluxSource",
- "_measurement":"liveCell",
- "bucket":"UEData",
- "is_mme":False,
- "model_name": ""
+ "modelName":"usecase1",
+ "training_config":{
+ "description":"description",
+ "dataPipeline":{
+ "feature_group_name":"group",
+ "query_filter":"",
+ "arguments":{
+ "epochs":"1",
+ "trainingjob_name":"usecase1"
+ }
+ },
+ "trainingPipeline":{
+ "pipeline_name":"qoe Pipeline lat v2",
+ "pipeline_version":"",
+ "enable_versioning":False
+ }
}
+ }
expected_data = b'{"result": "Information stored in database."}'
response = self.client.post("/trainingjobs/{}".format("usecase1"),
data=json.dumps(trainingjob_req),
trainingmgr_main.LOGGER.debug(response.data)
assert response.data == expected_data
assert response.status_code == status.HTTP_201_CREATED, "Return status code NOT equal"
-
- model_info_json={'model-name': 'qoe_93', 'rapp-id': 'rapp_1', 'meta-info': {'accuracy': '90', 'feature-list': ['*'], 'model-type': 'timeseries'}}
- db_result_fg=[('group','*','')]
- mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
- attrs_TRAININGMGR_CONFIG_OBJ = {'pipeline.return_value':''}
- mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
+
+ @patch('trainingmgr.trainingmgr_main.trainingjob_schema.load')
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False)
@patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = ("group1", 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db',True, ""))
- @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
- @patch('trainingmgr.trainingmgr_main.get_model_info', return_value=model_info_json)
- @patch('trainingmgr.trainingmgr_main.json.loads',return_value={'timeseries':''})
- @patch('trainingmgr.trainingmgr_main.get_feature_groups_db', return_value=db_result_fg)
@patch('trainingmgr.trainingmgr_main.add_update_trainingjob')
- def test_trainingjob_operations2(self,mock1,mock2, mock3, mock4, mock5, mock6, mock7):
+ def test_trainingjob_operations2(self,mock1,mock2, mock3, mock_trainingjob_schema_load, mock_trainingjob):
trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******")
+ mock_trainingjob_schema_load, mock_trainingjob
trainingjob_req = {
- "trainingjob_name":"usecase1",
- "pipeline_name":"qoe Pipeline lat v2",
- "experiment_name":"Default",
- "featureGroup_name":"group",
- "query_filter":"",
- "arguments":{
- "epochs":"1",
- "trainingjob_name":"usecase1"
- },
- "enable_versioning":False,
- "description":"uc1",
- "pipeline_version":"3",
- "datalake_source":"InfluxSource",
- "is_mme":True,
- "model_name": ""
+ "modelName":"usecase1",
+ "training_config":{
+ "description":"description",
+ "dataPipeline":{
+ "feature_group_name":"group",
+ "query_filter":"",
+ "arguments":{
+ "epochs":"1",
+ "trainingjob_name":"usecase1"
+ }
+ },
+ "trainingPipeline":{
+ "pipeline_name":"qoe Pipeline lat v2",
+ "pipeline_version":"",
+ "enable_versioning":False
+ }
}
+ }
expected_data = b'{"result": "Information stored in database."}'
response = self.client.post("/trainingjobs/{}".format("usecase1"),
data=json.dumps(trainingjob_req),
content_type="application/json")
- trainingmgr_main.LOGGER.debug(response.data)
assert response.data == expected_data
- assert response.status_code == status.HTTP_201_CREATED, "Return status code NOT equal"
+ assert response.status_code == status.HTTP_201_CREATED, "Return status code NOT equal"
- model_info_json={'model-name': 'qoe_93', 'rapp-id': 'rapp_1', 'meta-info': {'accuracy': '90', 'feature-list': ['*'], 'model-type': 'timeseries'}}
- db_result_fg=[('group','*','')]
- mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
- attrs_TRAININGMGR_CONFIG_OBJ = {'pipeline.return_value':''}
- mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
- @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False)
- @patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = ("group1", 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db',True, ""))
- @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
- @patch('trainingmgr.trainingmgr_main.get_model_info', return_value=model_info_json)
- @patch('trainingmgr.trainingmgr_main.json.loads',return_value='')
- @patch('trainingmgr.trainingmgr_main.get_feature_groups_db', return_value=db_result_fg)
- @patch('trainingmgr.trainingmgr_main.add_update_trainingjob')
- def test_negative_trainingjob_operations2(self,mock1,mock2, mock3, mock4, mock5, mock6, mock7):
- trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******")
- trainingjob_req = {
- "trainingjob_name":"usecase1",
- "pipeline_name":"qoe Pipeline lat v2",
- "experiment_name":"Default",
- "featureGroup_name":"group",
- "query_filter":"",
- "arguments":{
- "epochs":"1",
- "trainingjob_name":"usecase1"
- },
- "enable_versioning":False,
- "description":"uc1",
- "pipeline_version":"3",
- "datalake_source":"InfluxSource",
- "_measurement":"liveCell",
- "bucket":"UEData",
- "is_mme":True,
- "model_name": ""
- }
- expected_data = b'{"Exception": "Doesn\'t support the model type"}'
- response = self.client.post("/trainingjobs/{}".format("usecase1"),
- data=json.dumps(trainingjob_req),
- content_type="application/json")
- trainingmgr_main.LOGGER.debug(response.data)
- assert response.data == expected_data
- assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code NOT equal"
- model_info_json={'model-name': 'qoe_93', 'rapp-id': 'rapp_1', 'meta-info': {'accuracy': '90', 'feature-list': ['*'], 'model-type': 'timeseries'}}
- db_result_fg=[('group','abc','')]
- mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
- attrs_TRAININGMGR_CONFIG_OBJ = {'pipeline.return_value':''}
- mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
- @patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False)
- @patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = ("", 'unittest', '', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db',True, ""))
- @patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
- @patch('trainingmgr.trainingmgr_main.get_model_info', return_value=model_info_json)
- @patch('trainingmgr.trainingmgr_main.json.loads',return_value={"timeseries": "qoe_pipeline_h_release"})
- @patch('trainingmgr.trainingmgr_main.get_feature_groups_db', return_value=db_result_fg)
- @patch('trainingmgr.trainingmgr_main.add_update_trainingjob')
- def test_negative_trainingjob_operations3(self,mock1,mock2, mock3, mock4, mock5, mock6, mock7):
- trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******")
- trainingjob_req = {
- "trainingjob_name":"usecase1",
- "pipeline_name":"",
- "experiment_name":"Default",
- "featureGroup_name":"",
- "query_filter":"",
- "arguments":{
- "epochs":"1",
- "trainingjob_name":"usecase1"
- },
- "enable_versioning":False,
- "description":"uc1",
- "pipeline_version":"",
- "datalake_source":"InfluxSource",
- "_measurement":"liveCell",
- "bucket":"UEData",
- "is_mme":True,
- "model_name": "qoe_121"
- }
- expected_data = b'{"Exception":"The no feature group with mentioned feature list, create a feature group"}\n'
- response = self.client.post("/trainingjobs/{}".format("usecase1"),
- data=json.dumps(trainingjob_req),
- content_type="application/json")
- trainingmgr_main.LOGGER.debug(response.data)
- print(response.data)
- assert response.data == expected_data
- assert response.status_code == status.HTTP_406_NOT_ACCEPTABLE, "Return status code NOT equal"
-
- db_result = [('my_testing_new_7', 'testing', 'testing_influxdb', 'pipeline_kfp2.2.0_5', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "my_testing_new_7"}}', '', datetime.datetime(2024, 6, 21, 8, 57, 48, 408725), '432516c9-29d2-4f90-9074-407fe8f77e4f', '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FINISHED"}', datetime.datetime(2024, 6, 21, 9, 1, 54, 388278), 1, False, 'pipeline_kfp2.2.0_5', '{"datalake_source": {"InfluxSource": {}}}', 'http://10.0.0.10:32002/model/my_testing_new_7/1/Model.zip', '', False, False, '', '')]
-
-
training_data = ('','','','','','','','','',False,'')
+ @patch('trainingmgr.trainingmgr_main.get_one_word_status', return_value = States.FINISHED.name)
+ @patch('trainingmgr.trainingmgr_main.trainingjob_schema.load')
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
- @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result)
+ @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name')
@patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = training_data)
@patch('trainingmgr.trainingmgr_main.add_update_trainingjob')
- def test_trainingjob_operations_put(self,mock1,mock2,mock3,mock4):
+ def test_trainingjob_operations_put(self, mock1, mock2, mock_info_by_name, mock4, mock_trainingjob_schema_load, mock5, mock_trainingjob):
trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations_put *******")
+ mock_trainingjob_schema_load.return_value = mock_trainingjob
+ mock_info_by_name.return_value = mock_trainingjob
trainingjob_req = {
- "trainingjob_name": "my_testing_new_7",
- "is_mme": False,
- "model_name": False,
- "pipeline_name": "pipeline",
- "experiment_name": "Default",
- "featureGroup_name": "testing",
- "query_filter": "",
- "arguments": {
- "epochs": "1",
- "trainingjob_name": "my_testing"
- },
- "enable_versioning": False,
- "description": "testing",
- "pipeline_version": "pipeline",
- "datalake_source": "InfluxSource"
- }
-
+ "modelName":"qoe_121",
+ "training_config":{
+ "description":"uc1",
+ "dataPipeline":{
+ "feature_group_name":"group",
+ "query_filter":"",
+ "arguments":{
+ "epochs":"1",
+ "trainingjob_name":"my_testing_new_7"
+ }
+ },
+ "trainingPipeline":{
+ "pipeline_name":"qoe Pipeline lat v2",
+ "pipeline_version":"3",
+ "enable_versioning":False
+ }
+ }
+ }
expected_data = 'Information updated in database'
response = self.client.put("/trainingjobs/{}".format("my_testing_new_7"),
data=json.dumps(trainingjob_req),
content_type="application/json")
- trainingmgr_main.LOGGER.debug(response.data)
+ print(response.data)
assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
assert expected_data in str(response.data)
def test_negative_trainingjob_operations_post_conflit(self,mock1):
trainingmgr_main.LOGGER.debug("******* test_negative_trainingjob_operations_post_conflit *******")
trainingjob_req = {
- "trainingjob_name":"usecase1",
- "pipeline_name":"qoe Pipeline lat v2",
- "experiment_name":"Default",
- "featureGroup_name":"group",
- "query_filter":"",
- "arguments":{
- "epochs":"1",
- "trainingjob_name":"usecase1"
- },
- "enable_versioning":False,
- "description":"uc1",
- "pipeline_version":"3",
- "datalake_source":"InfluxSource",
- "_measurement":"liveCell",
- "bucket":"UEData"
+ "modelName":"usecase1",
+ "training_config":{
+ "description":"description",
+ "dataPipeline":{
+ "feature_group_name":"group",
+ "query_filter":"",
+ "arguments":{
+ "epochs":"1",
+ "trainingjob_name":"usecase1"
+ }
+ },
+ "trainingPipeline":{
+ "pipeline_name":"qoe Pipeline lat v2",
+ "pipeline_version":"",
+ "enable_versioning":False
+ }
}
+ }
expected_data = 'is already present in database'
response = self.client.post("/trainingjobs/{}".format("usecase1"),
data=json.dumps(trainingjob_req),
assert expected_data in str(response.data)
- db_result = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
- '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
- '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
- datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False)]
+ @pytest.fixture
+ def mock_test_training_training_job(self):
+ """Create a mock TrainingJob object."""
+ creation_time = datetime.datetime.now()
+ updation_time = datetime.datetime.now()
+ training_config = {
+ "description":"uc1",
+ "dataPipeline":{
+ "feature_group_name":"*",
+ "query_filter":"",
+ "arguments":{
+ "epochs":"1",
+ "trainingjob_name":"usecase1"
+ }
+ },
+ "trainingPipeline":{
+ "pipeline_name":"qoe Pipeline lat v2",
+ "pipeline_version":"3",
+ "enable_versioning":False
+ }
+ }
+ mock_steps_state = MagicMock()
+ mock_steps_state.states = json.dumps('{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}')
+ return TrainingJob(
+ trainingjob_name="usecase1",
+ training_config = json.dumps(training_config),
+ creation_time=creation_time,
+ run_id="51948a12-aee9-42e5-93a0-b8f4a15bca33",
+ steps_state = mock_steps_state,
+ updation_time=updation_time,
+ version=1,
+ model_url="http://test.model.url",
+ notification_url="http://test.notification.url",
+ deletion_in_progress=False,
+ )
+ @pytest.fixture
+ def mock_test_training_feature_group(self):
+ """Create a mock FeatureGroup object."""
+ return FeatureGroup(
+ featuregroup_name="testing_hash",
+ feature_list = "",
+ datalake_source="InfluxSource",
+ host="127.0.0.21",
+ port = "8080",
+ bucket="",
+ token="",
+ db_org="",
+ measurement="",
+ enable_dme=False,
+ measured_obj_class="",
+ dme_port="",
+ source_name=""
+ )
+
de_response = Response()
de_response = Response()
de_response.code = "expired"
de_response.status_code = status.HTTP_200_OK
de_response.headers={"content-type": "application/json"}
de_response._content = b'{"task_status": "Completed", "result": "Data Pipeline Execution Completed"}'
- db_result2=[('testing_hash', '', 'InfluxSource', '127.0.0.21', '8080', '', '', '', False, '', '', '')]
-
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
- @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result)
- @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value = db_result2)
+ @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name')
+ @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db')
@patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response)
@patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
- def test_training(self,mock1,mock2,mock3,mock4, mock5):
+ def test_training(self, mock1, mock2, mock_feature_group_by_name_db, mock_get_info_by_name, mock5, mock_test_training_feature_group, mock_test_training_training_job):
trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******")
+ mock_get_info_by_name.return_value = mock_test_training_training_job
+ mock_feature_group_by_name_db.return_value = mock_test_training_feature_group
expected_data = 'Data Pipeline Execution Completed"'
response = self.client.post("/trainingjobs/{}/training".format("usecase1"),
content_type="application/json")
- trainingmgr_main.LOGGER.debug(response.data)
assert response.status_code == status.HTTP_200_OK, "Return status code NOT equal"
assert expected_data in str(response.data)
- db_result1 = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
- '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
- '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
- datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False)]
-
de_response1 = Response()
de_response1.code = "expired"
de_response1.error_type = "expired"
de_response1._content = b'{"task_status": "Failed", "result": "Data Pipeline Execution Failed"}'
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = True)
- @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value = db_result1)
- @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value = db_result2)
+ @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name')
+ @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db')
@patch('trainingmgr.trainingmgr_main.data_extraction_start', return_value = de_response1)
@patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
- def test_training_negative_de_failed(self,mock1,mock2,mock3,mock4, mock5):
+ def test_training_negative_de_failed(self, mock1, mock2, mock_feature_group_by_name_db, mock_get_info_by_name, mock5, mock_test_training_feature_group, mock_test_training_training_job):
trainingmgr_main.LOGGER.debug("******* test_trainingjob_operations post *******")
+ mock_get_info_by_name.return_value = mock_test_training_training_job
+ mock_feature_group_by_name_db.return_value = mock_test_training_feature_group
expected_data = 'Data Pipeline Execution Failed'
response = self.client.post("/trainingjobs/{}/training".format("usecase1"),
content_type="application/json")
assert response.data == b'{"Exception":"The trainingjob_name is not correct"}\n'
-@pytest.mark.skip("")
class Test_get_versions_for_pipeline:
@patch('trainingmgr.common.trainingmgr_config.TMLogger', return_value = TMLogger("tests/common/conf_log.yaml"))
def setup_method(self,mock1,mock2):
print(response.data)
assert response.content_type != "application/text", "not equal content type"
-@pytest.mark.skip("")
+
class Test_get_pipelines_details:
def setup_method(self):
self.client = trainingmgr_main.APP.test_client(self)
def test_get_pipelines_details(self,mock1):
response = self.client.get("/pipelines")
assert response.content_type == "application/json", "not equal content type"
- assert response.status_code == 500, "Return status code NOT equal"
+ assert response.status_code == 200, "Return status code NOT equal"
@patch('trainingmgr.trainingmgr_main.requests.get', side_effect = requests.exceptions.ConnectionError('Mocked error'))
def test_negative_get_pipelines_details_1(self,mock1):