creation_time = datetime.datetime.now()
updation_time = datetime.datetime.now()
training_config = {
- "is_mme" : True,
"description": "Test description",
"dataPipeline": {
"feature_group_name": "test_feature_group",
version=1,
model_url="http://test.model.url",
notification_url="http://test.notification.url",
- deletion_in_progress=False,
- model_name="test_model",
- model_info="test_model_info"
+ deletion_in_progress=False
)
@pytest.fixture
assert job_data['training_config']['description'] == "Test description"
assert job_data['training_config']['dataPipeline']['feature_group_name'] == "test_feature_group"
assert job_data['training_config']['trainingPipeline']['pipeline_name'] == "test_pipeline"
- assert job_data['training_config']['is_mme'] is True
- assert job_data['model_name'] == "test_model"
- assert job_data['model_info'] == "test_model_info"
assert job_data['accuracy'] == mock_metrics
@patch('trainingmgr.trainingmgr_main.check_trainingjob_name_and_version', return_value=False)
@patch('trainingmgr.common.trainingmgr_util.isinstance',return_value=True)
def test_check_trainingjob_data(self,mock1,mock2):
usecase_name = "usecase8"
- json_data = { "description":"unittest", "featureGroup_name": "group1" , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "is_mme":False, "model_name":""}
+ json_data = { "description":"unittest", "featureGroup_name": "group1" , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 }
- expected_data = ("group1", 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db',False, "")
+ expected_data = ("group1", 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db')
assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal"
def test_negative_check_trainingjob_data_1(self):
It returns paths possible to retrieve data
Based on TrainingConfig Schema:
{
- "is_mme" : false,
"description": "This is something3",
"dataPipeline": {
"feature_group_name": "base2",
}
'''
paths = {
- "is_mme": ["is_mme"],
"description": ["description"],
"feature_group_name": ["dataPipeline", "feature_group_name"],
"query_filter" : ["dataPipeline", "query_filter"],
"pipeline_name", "experiment_name",
"arguments", "enable_versioning",
"datalake_source", "description",
- "query_filter",
- "is_mme", "model_name"], json_data):
+ "query_filter"], json_data):
description = json_data["description"]
feature_list = json_data["featureGroup_name"]
enable_versioning = json_data["enable_versioning"]
pipeline_version = json_data["pipeline_version"]
datalake_source = json_data["datalake_source"]
- is_mme=json_data["is_mme"]
- model_name=json_data["model_name"]
else :
raise TMException("check_trainingjob_data- supplied data doesn't have" + \
"all the required fields ")
str(err)) from None
return (feature_list, description, pipeline_name, experiment_name,
arguments, query_filter, enable_versioning, pipeline_version,
- datalake_source, is_mme, model_name)
+ datalake_source)
def check_feature_group_data(json_data):
"""
training_config = Column(String(5000), nullable=False)
model_url = Column(String(1000), nullable=True)
notification_url = Column(String(1000), nullable=True)
- model_name = db.Column(db.String(128), nullable=True)
model_id = Column(Integer, nullable=False)
- model_info = Column(String(1000), nullable=True)
#defineing relationships
steps_state = relationship("TrainingJobStatus", back_populates="trainingjobs")
url for downloading model
notification_url: str
url of notification server
- is_mme: boolean
- whether the mme is enabled
model_name: str
model name
model_info: str
# "datalake_source": get_one_key(json.loads(trainingjob.datalake_source)['datalake_source']),
"model_url": trainingjob.model_url,
"notification_url": trainingjob.notification_url,
- # "is_mme": trainingjob.is_mme,
- "model_name": trainingjob.model_name,
- "model_info": trainingjob.model_info,
"accuracy": data
}
response_data = {"trainingjob": dict_data}
States.FINISHED.name)
notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ)
# upload to the mme
- is_mme = getField(trainingjob_info.training_config, "is_mme")
- if is_mme:
- model_name=trainingjob_info.model_name #model_name
- file=MM_SDK.get_model_zip(trainingjob_name, str(version))
- url ="http://"+str(TRAININGMGR_CONFIG_OBJ.model_management_service_ip)+":"+str(TRAININGMGR_CONFIG_OBJ.model_management_service_port)+"/uploadModel/{}".format(model_name)
- LOGGER.debug("url for upload is: ", url)
- resp2=requests.post(url=url, files={"file":('Model.zip', file, 'application/zip')})
- if resp2.status_code != status.HTTP_200_OK :
- errMsg= "Upload to mme failed"
- LOGGER.error(errMsg + trainingjob_name)
- raise TMException(errMsg + trainingjob_name)
-
- LOGGER.debug("Model uploaded to the MME")
else:
errMsg = "Trained model is not available "
LOGGER.error(errMsg + trainingjob_name)
Name of model
trainingConfig: dict
Training-Configurations, parameter as follows
- is_mme: boolean
- whether mme is enabled
description: str
description
dataPipeline: dict
processed_json_data = request.get_json()
processed_json_data['training_config'] = json.dumps(request.get_json()["training_config"])
trainingjob = trainingjob_schema.load(processed_json_data)
- model_info=""
- if getField(trainingjob.training_config, "is_mme"):
- pipeline_dict =json.loads(TRAININGMGR_CONFIG_OBJ.pipeline)
- model_info=get_model_info(TRAININGMGR_CONFIG_OBJ, trainingjob.model_name)
- s=model_info["meta-info"]["feature-list"]
- model_type=model_info["meta-info"]["model-type"]
- try:
- pipeline_name=pipeline_dict[str(model_type)]
- except Exception as err:
- err="Doesn't support the model type"
- raise TMException(err)
- pipeline_version=pipeline_name
- feature_list=','.join(s)
- result= get_feature_groups_db(PS_DB_OBJ)
- for res in result:
- if feature_list==res[1]:
- featuregroup_name=res[0]
- break
- if featuregroup_name =="":
- return {"Exception":"The no feature group with mentioned feature list, create a feature group"}, status.HTTP_406_NOT_ACCEPTABLE
add_update_trainingjob(trainingjob, True)
api_response = {"result": "Information stored in database."}
response_code = status.HTTP_201_CREATED