message1="Pipeline notification success."
code1=status.HTTP_200_OK
response_tuple1=({"result": message1}, code1)
+ db_result = [('usecase1', 'uc1', '*', 'qoe Pipeline lat v2', 'Default', '{"arguments": {"epochs": "1", "trainingjob_name": "usecase1"}}',
+ '', datetime.datetime(2022, 10, 12, 10, 0, 59, 923588), '51948a12-aee9-42e5-93a0-b8f4a15bca33',
+ '{"DATA_EXTRACTION": "FINISHED", "DATA_EXTRACTION_AND_TRAINING": "FINISHED", "TRAINING": "FINISHED", "TRAINING_AND_TRAINED_MODEL": "FINISHED", "TRAINED_MODEL": "FAILED"}',
+ datetime.datetime(2022, 10, 12, 10, 2, 31, 888830), 1, False, '3', '{"datalake_source": {"InfluxSource": {}}}', 'No data available.', '', 'liveCell', 'UEData', False, False, "","")]
@patch('trainingmgr.trainingmgr_main.MM_SDK', return_value = mocked_mm_sdk)
@patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
@patch('trainingmgr.trainingmgr_main.change_steps_state_of_latest_version')
@patch('trainingmgr.trainingmgr_main.update_model_download_url')
+ @patch('trainingmgr.trainingmgr_main.get_trainingjob_info_by_name', return_value=db_result)
@patch('trainingmgr.trainingmgr_main.get_latest_version_trainingjob_name', return_value = "usecase1")
@patch('trainingmgr.trainingmgr_main.response_for_training', return_value = response_tuple1)
- def test_pipeline_notification(self,mock1, mock2, mock3, mock4, mock5, mock6):
+ def test_pipeline_notification(self,mock1, mock2, mock3, mock4, mock5, mock6, mock7):
trainingmgr_main.LOGGER.debug("******* test_pipeline_notification post *******")
trainingjob_req = {
"trainingjob_name":"usecase1",
change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
Steps.TRAINED_MODEL.name,
States.FINISHED.name)
+ # upload to the mme
+ trainingjob_info=get_trainingjob_info_by_name(trainingjob_name, PS_DB_OBJ)
+
+ is_mme= trainingjob_info[0][20]
+ if is_mme:
+ model_name=trainingjob_info[0][21] #model_name
+ file=MM_SDK.get_model_zip(trainingjob_name, str(version))
+ url ="http://"+str(TRAININGMGR_CONFIG_OBJ.model_management_service_ip)+":"+str(TRAININGMGR_CONFIG_OBJ.model_management_service_port)+"/uploadModel/{}".format(model_name)
+ LOGGER.debug("url for upload is: ", url)
+ resp2=requests.post(url=url, files={"file":('Model.zip', file, 'application/zip')})
+ if resp2.status_code != status.HTTP_200_OK :
+ errMsg= "Upload to mme failed"
+ LOGGER.error(errMsg + trainingjob_name)
+ raise TMException(errMsg + trainingjob_name)
+
+ LOGGER.debug("Model uploaded to the MME")
else:
errMsg = "Trained model is not available "
LOGGER.error(errMsg + trainingjob_name)