},
"trainingPipeline": {
"pipeline_name": "qoe_Pipeline",
- "pipeline_version": "2",
- "enable_versioning": true
+ "pipeline_version": "2"
}
'''
paths = {
"arguments" : ["dataPipeline", "arguments"],
"pipeline_name": ["trainingPipeline", "pipeline_name"],
"pipeline_version": ["trainingPipeline", "pipeline_version"],
- "enable_versioning": ["trainingPipeline", "enable_versioning"]
}
return paths
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
from trainingmgr.schemas.trainingjob_schema import TrainingJobSchema
from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainingjob_by_modelId, get_trainining_jobs, \
-get_steps_state, change_status_tj, get_data_extraction_in_progress_trainingjobs
+get_steps_state, change_status_tj, get_data_extraction_in_progress_trainingjobs, update_trainingPipeline
from trainingmgr.common.trainingmgr_util import check_key_in_dictionary
from trainingmgr.common.trainingmgr_operations import data_extraction_start
from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField
trainingjob = trainingjob_schema.load(request_json)
- model_id = trainingjob.modelId
-
- # the artifact version will be "0.0.0" for now, it will be updated once we have the model is trained.
- model_id.artifactversion="0.0.0"
-
trainingConfig = trainingjob.training_config
if(not validateTrainingConfig(trainingConfig)):
return jsonify({'Exception': 'The TrainingConfig is not correct'}), status.HTTP_400_BAD_REQUEST
- # check if trainingjob is already present with name
- trainingjob_db = get_trainingjob_by_modelId(model_id)
-
- if trainingjob_db != None:
- return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} is already present in database"}), status.HTTP_409_CONFLICT
-
+ model_id = trainingjob.modelId
+ registered_model_dict = get_modelinfo_by_modelId_service(model_id.modelname, model_id.modelversion)[0]
# Verify if the modelId is registered over mme or not
-
- registered_model_list = get_modelinfo_by_modelId_service(model_id.modelname, model_id.modelversion)
- if registered_model_list is None:
+ if registered_model_dict is None:
return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} is not registered at MME, Please first register at MME and then continue"}), status.HTTP_400_BAD_REQUEST
- registered_model_dict = registered_model_list[0]
- create_training_job(trainingjob, registered_model_dict)
- return jsonify({"Trainingjob": trainingjob_schema.dump(trainingjob)}), 201
+ if registered_model_dict["modelLocation"] != trainingjob.model_location:
+ return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} and trainingjob created does not have same modelLocation, Please first register at MME properly and then continue"}), status.HTTP_400_BAD_REQUEST
+
+ if registered_model_dict["modelId"]["artifactVersion"] == "0.0.0":
+ if registered_model_dict["modelLocation"] == "":
+ return create_training_job(trainingjob=trainingjob, registered_model_dict=registered_model_dict)
+ else:
+ trainingjob = update_trainingPipeline(trainingjob)
+ return create_training_job(trainingjob=trainingjob, registered_model_dict=registered_model_dict)
+ else:
+ trainingjob = update_trainingPipeline(trainingjob)
+ return create_training_job(trainingjob=trainingjob, registered_model_dict=registered_model_dict)
except ValidationError as error:
return jsonify(error.messages), status.HTTP_400_BAD_REQUEST
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2024 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+from trainingmgr.common.exceptions_utls import DBException
+from trainingmgr.models import db, ModelID
+
+def get_model_by_modelId(modelname, modelversion):
+ try:
+ model_info = ModelID.query.filter(ModelID.modelname == modelname, ModelID.modelversion == modelversion).one()
+ return model_info
+ except Exception as err:
+ if "No row was found when one was required" in str(err):
+ return None
+ raise DBException(f"Failed to fetch modelInfo due to {str(err)}")
\ No newline at end of file
from trainingmgr.common.trainingConfig_parser import getField
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
from trainingmgr.common.trainingmgr_operations import data_extraction_status
-from trainingmgr.service.training_job_service import get_data_extraction_in_progress_trainingjobs, get_training_job, change_status_tj
# from trainingmgr.common.trainingmgr_util import handle_async_feature_engineering_status_exception_case
-from trainingmgr.common.exceptions_utls import TMException
+from trainingmgr.common.exceptions_utls import DBException, TMException
from trainingmgr.constants import Steps, States
from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
-from trainingmgr.db.trainingjob_db import change_state_to_failed
+from trainingmgr.db.trainingjob_db import change_state_to_failed, get_trainingjob, change_steps_state
TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
Model_Metrics_Sdk = ModelMetricsSdk()
-
+def get_data_extraction_in_progress_trainingjobs():
+ result = {}
+ try:
+ trainingjobs = get_trainingjob()
+ for trainingjob in trainingjobs:
+ status = json.loads(trainingjob.steps_state.states)
+ if status[Steps.DATA_EXTRACTION.name] == States.IN_PROGRESS.name:
+ result[trainingjob.id] = "Scheduled"
+ except Exception as err:
+ raise DBException("get_data_extraction_in_progress_trainingjobs," + str(err))
+ return result
def check_and_notify_feature_engineering_status(APP,db):
"""Asynchronous function to check and notify feature engineering status."""
f"http://{TRAININGMGR_CONFIG_OBJ.my_ip}:"
f"{TRAININGMGR_CONFIG_OBJ.my_port}/trainingjob/dataExtractionNotification"
)
+
while True:
with LOCK:
training_job_ids = list(DATAEXTRACTION_JOBS_CACHE)
try:
# trainingjob_name = trainingjob.trainingjob_name
with APP.app_context():
- trainingjob = get_training_job(trainingjob_id)
+ trainingjob = get_trainingjob(trainingjob_id)
featuregroup_name = getField(trainingjob.training_config, "feature_group_name")
response = data_extraction_status(featuregroup_name, trainingjob_id, TRAININGMGR_CONFIG_OBJ)
if (response.headers.get('content-type') != "application/json" or
if response_data["task_status"] == "Completed":
with APP.app_context():
- change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION.name, States.FINISHED.name)
- change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION_AND_TRAINING.name, States.IN_PROGRESS.name)
+ change_steps_state(trainingjob.id, Steps.DATA_EXTRACTION.name, States.FINISHED.name)
+ change_steps_state(trainingjob.id, Steps.DATA_EXTRACTION_AND_TRAINING.name, States.IN_PROGRESS.name)
+
+ LOGGER.info("url_pipeline_run is : "+ str(url_pipeline_run))
+
kf_response = requests.post(
url_pipeline_run,
data=json.dumps({"trainingjob_id": trainingjob.id}),
#
# ==================================================================================
import json
+from threading import Lock
+from flask_api import status
+from flask import jsonify
+from trainingmgr.common.trainingmgr_operations import data_extraction_start
+from trainingmgr.db.model_db import get_model_by_modelId
from trainingmgr.db.trainingjob_db import delete_trainingjob_by_id, create_trainingjob, get_trainingjob, get_trainingjob_by_modelId_db, \
change_steps_state, change_field_value, change_field_value, change_steps_state_df, changeartifact
from trainingmgr.common.exceptions_utls import DBException, TMException
from trainingmgr.common.trainingConfig_parser import getField, setField
+from trainingmgr.handler.async_handler import DATAEXTRACTION_JOBS_CACHE
from trainingmgr.schemas import TrainingJobSchema
-from trainingmgr.common.trainingmgr_util import get_one_word_status, get_step_in_progress_state
+from trainingmgr.common.trainingmgr_util import check_key_in_dictionary, get_one_word_status, get_step_in_progress_state
from trainingmgr.constants.steps import Steps
from trainingmgr.constants.states import States
from trainingmgr.service.pipeline_service import terminate_training_service
-from trainingmgr.service.featuregroup_service import get_featuregroup_from_inputDataType
+from trainingmgr.service.featuregroup_service import get_featuregroup_by_name, get_featuregroup_from_inputDataType
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
-from trainingmgr.constants.steps import Steps
-from trainingmgr.constants.states import States
+from trainingmgr.constants import Steps, States
trainingJobSchema = TrainingJobSchema()
trainingJobsSchema = TrainingJobSchema(many=True)
-
+TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
+LOCK = Lock()
+MIMETYPE_JSON = "application/json"
LOGGER = TrainingMgrConfig().logger
def get_training_job(training_job_id: int):
feature_group_name = get_featuregroup_from_inputDataType(registered_model_dict['modelInformation']['inputDataType'])
trainingjob.training_config = json.dumps(setField(training_config, "feature_group_name", feature_group_name))
LOGGER.debug("Training Config after FeatureGroup deduction --> " + trainingjob.training_config)
-
+
+ modelId = trainingjob.modelId
+ modelinfo = get_model_by_modelId(modelId.modelname, modelId.modelversion)
+
+ if modelinfo != None:
+ trainingjob.model_id = modelinfo.id
+ trainingjob.modelId = modelinfo
+
create_trainingjob(trainingjob)
+
+ LOGGER.debug("trainingjob id is: "+str(trainingjob.id))
+
+ return training(trainingjob)
except DBException as err:
raise TMException(f"create_training_job failed with exception : {str(err)}")
try:
trainingjob = get_trainingjob_by_modelId_db(model_id)
return trainingjob
-
except Exception as err:
+ if "No row was found when one was required" in str(err):
+ return None
raise DBException(f"get_trainingjob_by_name failed with exception : {str(err)}")
def get_steps_state(trainingjob_id):
except DBException as err:
raise TMException(f"change status of tj dif failed with exception : {str(err)}")
-def get_data_extraction_in_progress_trainingjobs():
- result = {}
- try:
- trainingjobs = get_trainingjob()
- for trainingjob in trainingjobs:
- status = json.loads(trainingjob.steps_state.states)
- if status[Steps.DATA_EXTRACTION.name] == States.IN_PROGRESS.name:
- result[trainingjob.id] = "Scheduled"
- except Exception as err:
- raise DBException("get_data_extraction_in_progress_trainingjobs," + str(err))
- return result
def change_update_field_value(trainingjob_id, field, value):
try:
return f'{major}.{minor}.{patch}'
except Exception as err:
raise TMException(f"failed to update_artifact_version with exception : {str(err)}")
+
+def training(trainingjob):
+ """
+ Rest end point to start training job.
+ It calls data extraction module for data extraction and other training steps
+
+ Args in function:
+ training_job_id: str
+ id of trainingjob.
+
+ Args in json:
+ not required json
+
+ Returns:
+ json:
+ training_job_id: str
+ name of trainingjob
+ result: str
+ route of data extraction module for getting data extraction status of
+ given training_job_id .
+ status code:
+ HTTP status code 200
+
+ Exceptions:
+ all exception are provided with exception message and HTTP status code.
+ """
+
+ LOGGER.debug("Request for training trainingjob %s ", trainingjob.id)
+ try:
+ # trainingjob = get_training_job(trainingjob_id)
+ # print(trainingjob)
+ # trainingjob_name = trainingjob.trainingjob_name
+ training_job_id = trainingjob.id
+ featuregroup= get_featuregroup_by_name(getField(trainingjob.training_config, "feature_group_name"))
+ LOGGER.debug("featuregroup name is: "+featuregroup.featuregroup_name)
+ feature_list_string = featuregroup.feature_list
+ influxdb_info_dic={}
+ influxdb_info_dic["host"]=featuregroup.host
+ influxdb_info_dic["port"]=featuregroup.port
+ influxdb_info_dic["bucket"]=featuregroup.bucket
+ influxdb_info_dic["token"]=featuregroup.token
+ influxdb_info_dic["db_org"] = featuregroup.db_org
+ influxdb_info_dic["source_name"]= featuregroup.source_name
+ _measurement = featuregroup.measurement
+ query_filter = getField(trainingjob.training_config, "query_filter")
+ datalake_source = {featuregroup.datalake_source: {}} # Datalake source should be taken from FeatureGroup (not TrainingJob)
+ LOGGER.debug('Starting Data Extraction...')
+ de_response = data_extraction_start(TRAININGMGR_CONFIG_OBJ, training_job_id,
+ feature_list_string, query_filter, datalake_source,
+ _measurement, influxdb_info_dic, featuregroup.featuregroup_name)
+ if (de_response.status_code == status.HTTP_200_OK ):
+ LOGGER.debug("Response from data extraction for " + \
+ training_job_id + " : " + json.dumps(de_response.json()))
+ change_status_tj(trainingjob.id,
+ Steps.DATA_EXTRACTION.name,
+ States.IN_PROGRESS.name)
+ with LOCK:
+ DATAEXTRACTION_JOBS_CACHE[trainingjob.id] = "Scheduled"
+ elif( de_response.headers['content-type'] == MIMETYPE_JSON ) :
+ errMsg = "Data extraction responded with error code."
+ LOGGER.error(errMsg)
+ json_data = de_response.json()
+ LOGGER.debug(str(json_data))
+ if check_key_in_dictionary(["result"], json_data):
+ return jsonify({
+ "message": json.dumps({"Failed":errMsg + json_data["result"]})
+ }), 500
+ else:
+ return jsonify({
+ "message": errMsg
+ }), 500
+ else:
+ return jsonify({
+ "message": "failed data extraction"
+ }), 500
+ except TMException as err:
+ if "No row was found when one was required" in str(err):
+ return jsonify({
+ 'message': str(err)
+ }), 404
+ except Exception as e:
+ # print(traceback.format_exc())
+ # response_data = {"Exception": str(err)}
+ LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e))
+ return jsonify({
+ 'message': str(e)
+ }), 500
+ return jsonify({"Trainingjob": trainingJobSchema.dump(trainingjob)}), 201
+
+def update_trainingPipeline(trainingjob):
+ try:
+ training_config = trainingjob.training_config
+ training_config = json.dumps(setField(training_config, "pipeline_name", "qoe_retraining_pipeline"))
+ training_config = json.dumps(setField(training_config, "pipeline_version", "qoe_retraining_pipeline"))
+ trainingjob.training_config = training_config
+ return trainingjob
+ except Exception as err:
+ LOGGER.error(f"error in updating the trainingPipeline due to {str(err)}")
+ raise TMException("failed to update the trainingPipeline")
\ No newline at end of file
import threading
from threading import Lock
import time
-from flask import Flask, request, send_file
+from flask import Flask, request, send_file, jsonify
from flask_api import status
from flask_migrate import Migrate
from marshmallow import ValidationError
from trainingmgr.controller.pipeline_controller import pipeline_controller
from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField
from trainingmgr.handler.async_handler import start_async_handler
+from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service
from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, get_training_job, update_artifact_version
from trainingmgr.service.pipeline_service import start_training_service
trainingjob_id = request.json["trainingjob_id"]
trainingjob = get_training_job(trainingjob_id)
featuregroup_name = getField(trainingjob.training_config, "feature_group_name")
- arguments = getField(trainingjob.training_config, "arguments")
+ argument_dict = getField(trainingjob.training_config, "arguments")
- argument_dict = ast.literal_eval(arguments)
+ # argument_dict = ast.literal_eval(arguments)
argument_dict["trainingjob_id"] = trainingjob_id
argument_dict["featuregroup_name"] = featuregroup_name
argument_dict["modelName"] = trainingjob.modelId.modelname
argument_dict["modelVersion"] = trainingjob.modelId.modelversion
- argument_dict["artifactVersion"] = trainingjob.modelId.artifactversion
+ argument_dict["modellocation"] = trainingjob.model_location
# Arguments values must be of type string
for key, val in argument_dict.items():
# Experiment name is harded to be Default
training_details = {
"pipeline_name": getField(trainingjob.training_config, "pipeline_name"), "experiment_name": 'Default',
- "arguments": argument_dict, "pipeline_version": getField(trainingjob.training_config, "pipeline_name")
+ "arguments": argument_dict, "pipeline_version": getField(trainingjob.training_config, "pipeline_version")
}
+ LOGGER.debug("training detail for kf adapter is: "+ str(training_details))
response = training_start(TRAININGMGR_CONFIG_OBJ, training_details, trainingjob_id)
if ( response.headers['content-type'] != MIMETYPE_JSON
or response.status_code != status.HTTP_200_OK ):
change_status_tj(trainingjob.id,
Steps.DATA_EXTRACTION_AND_TRAINING.name,
States.FINISHED.name)
- LOGGER.debug("DATA_EXTRACTION_AND_TRAINING step set to FINISHED for training job " + trainingjob.id)
+ LOGGER.debug("DATA_EXTRACTION_AND_TRAINING step set to FINISHED for training job " + str(trainingjob.id))
change_status_tj(trainingjob.id,
Steps.TRAINING.name,
States.IN_PROGRESS.name)
- LOGGER.debug("TRAINING step set to IN_PROGRESS for training job " + trainingjob.id)
+ LOGGER.debug("TRAINING step set to IN_PROGRESS for training job " + str(trainingjob.id))
change_update_field_value(trainingjob.id,
"run_id",
json_data["run_id"])
pass
except Exception as err:
+ LOGGER.error("error is : "+ str(err))
+ return jsonify({"failed":"error"}), 500
# LOGGER.error("Failed to handle dataExtractionNotification. " + str(err))
# if not change_in_progress_to_failed_by_latest_version(trainingjob_name) :
# LOGGER.error(ERROR_TYPE_DB_STATUS)
# return response_for_training(err_response_code,
# str(err) + "(trainingjob name is " + trainingjob_name + ")",
# LOGGER, False, trainingjob_name, MM_SDK)
- pass
return APP.response_class(response=json.dumps({"result": "pipeline is scheduled"}),
status=status.HTTP_200_OK,
# notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ)
model_name= trainingjob.modelId.modelname
model_version= trainingjob.modelId.modelversion
- artifact_version= trainingjob.modelId.artifactversion
- artifact_version= update_artifact_version(trainingjob_id , artifact_version, "major")
- if MM_SDK.check_object(model_name, model_version, artifact_version, "Model.zip"):
+ modelinfo = get_modelinfo_by_modelId_service(model_name, model_version)[0]
+ artifactversion = modelinfo["modelId"]["artifactVersion"]
+ if MM_SDK.check_object(model_name, model_version, artifactversion, "Model.zip"):
model_url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \
str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \
- model_name + "/" + str(model_version) + "/" + str(artifact_version) + "/Model.zip"
+ model_name + "/" + str(model_version) + "/" + str(artifactversion) + "/Model.zip"
change_update_field_value(trainingjob_id, "model_url" , model_url)
# notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ)
else:
errMsg = "Trained model is not available "
- LOGGER.error(errMsg + trainingjob_id)
- raise TMException(errMsg + trainingjob_id)
+ LOGGER.error(errMsg + str(trainingjob_id))
+ raise TMException(errMsg + str(trainingjob_id))
else:
- LOGGER.error("Pipeline notification -Training failed " + trainingjob_id)
+ LOGGER.error("Pipeline notification -Training failed " + str(trainingjob_id))
raise TMException("Pipeline not successful for " + \
- trainingjob_id + \
+ str(trainingjob_id) + \
",request json from kf adapter is: " + json.dumps(request.json))
except Exception as err:
#Training failure response
-
# Moved to pipelineMgr (to be deleted in future)
@APP.route('/experiments', methods=['GET'])
def get_all_experiment_names():