From: rajdeep11 Date: Tue, 17 Dec 2024 06:15:21 +0000 (+0530) Subject: changes for retraining X-Git-Tag: 3.0.0~10 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=b7a8788dc66dd962278113dcce65df39f9b63b9c;p=aiml-fw%2Fawmf%2Ftm.git changes for retraining Change-Id: I3c35fdb7ede087e9d24cef6ce6dcf2929019d11c Signed-off-by: rajdeep11 --- diff --git a/trainingmgr/common/trainingConfig_parser.py b/trainingmgr/common/trainingConfig_parser.py index 844e836..4b38e96 100644 --- a/trainingmgr/common/trainingConfig_parser.py +++ b/trainingmgr/common/trainingConfig_parser.py @@ -48,8 +48,7 @@ def __getLeafPaths(): }, "trainingPipeline": { "pipeline_name": "qoe_Pipeline", - "pipeline_version": "2", - "enable_versioning": true + "pipeline_version": "2" } ''' paths = { @@ -59,7 +58,6 @@ def __getLeafPaths(): "arguments" : ["dataPipeline", "arguments"], "pipeline_name": ["trainingPipeline", "pipeline_name"], "pipeline_version": ["trainingPipeline", "pipeline_version"], - "enable_versioning": ["trainingPipeline", "enable_versioning"] } return paths diff --git a/trainingmgr/controller/trainingjob_controller.py b/trainingmgr/controller/trainingjob_controller.py index aab12db..264997d 100644 --- a/trainingmgr/controller/trainingjob_controller.py +++ b/trainingmgr/controller/trainingjob_controller.py @@ -25,7 +25,7 @@ from trainingmgr.common.exceptions_utls import TMException from trainingmgr.common.trainingmgr_config import TrainingMgrConfig from trainingmgr.schemas.trainingjob_schema import TrainingJobSchema from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainingjob_by_modelId, get_trainining_jobs, \ -get_steps_state, change_status_tj, get_data_extraction_in_progress_trainingjobs +get_steps_state, change_status_tj, get_data_extraction_in_progress_trainingjobs, update_trainingPipeline from trainingmgr.common.trainingmgr_util import check_key_in_dictionary from trainingmgr.common.trainingmgr_operations import data_extraction_start from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField @@ -76,30 +76,28 @@ def create_trainingjob(): trainingjob = trainingjob_schema.load(request_json) - model_id = trainingjob.modelId - - # the artifact version will be "0.0.0" for now, it will be updated once we have the model is trained. - model_id.artifactversion="0.0.0" - trainingConfig = trainingjob.training_config if(not validateTrainingConfig(trainingConfig)): return jsonify({'Exception': 'The TrainingConfig is not correct'}), status.HTTP_400_BAD_REQUEST - # check if trainingjob is already present with name - trainingjob_db = get_trainingjob_by_modelId(model_id) - - if trainingjob_db != None: - return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} is already present in database"}), status.HTTP_409_CONFLICT - + model_id = trainingjob.modelId + registered_model_dict = get_modelinfo_by_modelId_service(model_id.modelname, model_id.modelversion)[0] # Verify if the modelId is registered over mme or not - - registered_model_list = get_modelinfo_by_modelId_service(model_id.modelname, model_id.modelversion) - if registered_model_list is None: + if registered_model_dict is None: return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} is not registered at MME, Please first register at MME and then continue"}), status.HTTP_400_BAD_REQUEST - registered_model_dict = registered_model_list[0] - create_training_job(trainingjob, registered_model_dict) - return jsonify({"Trainingjob": trainingjob_schema.dump(trainingjob)}), 201 + if registered_model_dict["modelLocation"] != trainingjob.model_location: + return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} and trainingjob created does not have same modelLocation, Please first register at MME properly and then continue"}), status.HTTP_400_BAD_REQUEST + + if registered_model_dict["modelId"]["artifactVersion"] == "0.0.0": + if registered_model_dict["modelLocation"] == "": + return create_training_job(trainingjob=trainingjob, registered_model_dict=registered_model_dict) + else: + trainingjob = update_trainingPipeline(trainingjob) + return create_training_job(trainingjob=trainingjob, registered_model_dict=registered_model_dict) + else: + trainingjob = update_trainingPipeline(trainingjob) + return create_training_job(trainingjob=trainingjob, registered_model_dict=registered_model_dict) except ValidationError as error: return jsonify(error.messages), status.HTTP_400_BAD_REQUEST diff --git a/trainingmgr/db/model_db.py b/trainingmgr/db/model_db.py new file mode 100644 index 0000000..bc8a21f --- /dev/null +++ b/trainingmgr/db/model_db.py @@ -0,0 +1,29 @@ +# ================================================================================== +# +# Copyright (c) 2024 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +from trainingmgr.common.exceptions_utls import DBException +from trainingmgr.models import db, ModelID + +def get_model_by_modelId(modelname, modelversion): + try: + model_info = ModelID.query.filter(ModelID.modelname == modelname, ModelID.modelversion == modelversion).one() + return model_info + except Exception as err: + if "No row was found when one was required" in str(err): + return None + raise DBException(f"Failed to fetch modelInfo due to {str(err)}") \ No newline at end of file diff --git a/trainingmgr/handler/async_handler.py b/trainingmgr/handler/async_handler.py index 63e3015..91cf105 100644 --- a/trainingmgr/handler/async_handler.py +++ b/trainingmgr/handler/async_handler.py @@ -6,12 +6,11 @@ import requests from trainingmgr.common.trainingConfig_parser import getField from trainingmgr.common.trainingmgr_config import TrainingMgrConfig from trainingmgr.common.trainingmgr_operations import data_extraction_status -from trainingmgr.service.training_job_service import get_data_extraction_in_progress_trainingjobs, get_training_job, change_status_tj # from trainingmgr.common.trainingmgr_util import handle_async_feature_engineering_status_exception_case -from trainingmgr.common.exceptions_utls import TMException +from trainingmgr.common.exceptions_utls import DBException, TMException from trainingmgr.constants import Steps, States from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk -from trainingmgr.db.trainingjob_db import change_state_to_failed +from trainingmgr.db.trainingjob_db import change_state_to_failed, get_trainingjob, change_steps_state @@ -23,7 +22,17 @@ LOGGER = TrainingMgrConfig().logger TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig() Model_Metrics_Sdk = ModelMetricsSdk() - +def get_data_extraction_in_progress_trainingjobs(): + result = {} + try: + trainingjobs = get_trainingjob() + for trainingjob in trainingjobs: + status = json.loads(trainingjob.steps_state.states) + if status[Steps.DATA_EXTRACTION.name] == States.IN_PROGRESS.name: + result[trainingjob.id] = "Scheduled" + except Exception as err: + raise DBException("get_data_extraction_in_progress_trainingjobs," + str(err)) + return result def check_and_notify_feature_engineering_status(APP,db): """Asynchronous function to check and notify feature engineering status.""" @@ -32,6 +41,7 @@ def check_and_notify_feature_engineering_status(APP,db): f"http://{TRAININGMGR_CONFIG_OBJ.my_ip}:" f"{TRAININGMGR_CONFIG_OBJ.my_port}/trainingjob/dataExtractionNotification" ) + while True: with LOCK: training_job_ids = list(DATAEXTRACTION_JOBS_CACHE) @@ -40,7 +50,7 @@ def check_and_notify_feature_engineering_status(APP,db): try: # trainingjob_name = trainingjob.trainingjob_name with APP.app_context(): - trainingjob = get_training_job(trainingjob_id) + trainingjob = get_trainingjob(trainingjob_id) featuregroup_name = getField(trainingjob.training_config, "feature_group_name") response = data_extraction_status(featuregroup_name, trainingjob_id, TRAININGMGR_CONFIG_OBJ) if (response.headers.get('content-type') != "application/json" or @@ -53,8 +63,11 @@ def check_and_notify_feature_engineering_status(APP,db): if response_data["task_status"] == "Completed": with APP.app_context(): - change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION.name, States.FINISHED.name) - change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION_AND_TRAINING.name, States.IN_PROGRESS.name) + change_steps_state(trainingjob.id, Steps.DATA_EXTRACTION.name, States.FINISHED.name) + change_steps_state(trainingjob.id, Steps.DATA_EXTRACTION_AND_TRAINING.name, States.IN_PROGRESS.name) + + LOGGER.info("url_pipeline_run is : "+ str(url_pipeline_run)) + kf_response = requests.post( url_pipeline_run, data=json.dumps({"trainingjob_id": trainingjob.id}), diff --git a/trainingmgr/service/training_job_service.py b/trainingmgr/service/training_job_service.py index 5e76934..cc9b0ba 100644 --- a/trainingmgr/service/training_job_service.py +++ b/trainingmgr/service/training_job_service.py @@ -16,23 +16,30 @@ # # ================================================================================== import json +from threading import Lock +from flask_api import status +from flask import jsonify +from trainingmgr.common.trainingmgr_operations import data_extraction_start +from trainingmgr.db.model_db import get_model_by_modelId from trainingmgr.db.trainingjob_db import delete_trainingjob_by_id, create_trainingjob, get_trainingjob, get_trainingjob_by_modelId_db, \ change_steps_state, change_field_value, change_field_value, change_steps_state_df, changeartifact from trainingmgr.common.exceptions_utls import DBException, TMException from trainingmgr.common.trainingConfig_parser import getField, setField +from trainingmgr.handler.async_handler import DATAEXTRACTION_JOBS_CACHE from trainingmgr.schemas import TrainingJobSchema -from trainingmgr.common.trainingmgr_util import get_one_word_status, get_step_in_progress_state +from trainingmgr.common.trainingmgr_util import check_key_in_dictionary, get_one_word_status, get_step_in_progress_state from trainingmgr.constants.steps import Steps from trainingmgr.constants.states import States from trainingmgr.service.pipeline_service import terminate_training_service -from trainingmgr.service.featuregroup_service import get_featuregroup_from_inputDataType +from trainingmgr.service.featuregroup_service import get_featuregroup_by_name, get_featuregroup_from_inputDataType from trainingmgr.common.trainingmgr_config import TrainingMgrConfig -from trainingmgr.constants.steps import Steps -from trainingmgr.constants.states import States +from trainingmgr.constants import Steps, States trainingJobSchema = TrainingJobSchema() trainingJobsSchema = TrainingJobSchema(many=True) - +TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig() +LOCK = Lock() +MIMETYPE_JSON = "application/json" LOGGER = TrainingMgrConfig().logger def get_training_job(training_job_id: int): @@ -59,8 +66,19 @@ def create_training_job(trainingjob, registered_model_dict): feature_group_name = get_featuregroup_from_inputDataType(registered_model_dict['modelInformation']['inputDataType']) trainingjob.training_config = json.dumps(setField(training_config, "feature_group_name", feature_group_name)) LOGGER.debug("Training Config after FeatureGroup deduction --> " + trainingjob.training_config) - + + modelId = trainingjob.modelId + modelinfo = get_model_by_modelId(modelId.modelname, modelId.modelversion) + + if modelinfo != None: + trainingjob.model_id = modelinfo.id + trainingjob.modelId = modelinfo + create_trainingjob(trainingjob) + + LOGGER.debug("trainingjob id is: "+str(trainingjob.id)) + + return training(trainingjob) except DBException as err: raise TMException(f"create_training_job failed with exception : {str(err)}") @@ -111,8 +129,9 @@ def get_trainingjob_by_modelId(model_id): try: trainingjob = get_trainingjob_by_modelId_db(model_id) return trainingjob - except Exception as err: + if "No row was found when one was required" in str(err): + return None raise DBException(f"get_trainingjob_by_name failed with exception : {str(err)}") def get_steps_state(trainingjob_id): @@ -134,17 +153,6 @@ def change_status_tj_dif(trainingjob_id, step:str, state:str): except DBException as err: raise TMException(f"change status of tj dif failed with exception : {str(err)}") -def get_data_extraction_in_progress_trainingjobs(): - result = {} - try: - trainingjobs = get_trainingjob() - for trainingjob in trainingjobs: - status = json.loads(trainingjob.steps_state.states) - if status[Steps.DATA_EXTRACTION.name] == States.IN_PROGRESS.name: - result[trainingjob.id] = "Scheduled" - except Exception as err: - raise DBException("get_data_extraction_in_progress_trainingjobs," + str(err)) - return result def change_update_field_value(trainingjob_id, field, value): try: @@ -170,3 +178,102 @@ def update_artifact_version(trainingjob_id, artifact_version : str, level : str) return f'{major}.{minor}.{patch}' except Exception as err: raise TMException(f"failed to update_artifact_version with exception : {str(err)}") + +def training(trainingjob): + """ + Rest end point to start training job. + It calls data extraction module for data extraction and other training steps + + Args in function: + training_job_id: str + id of trainingjob. + + Args in json: + not required json + + Returns: + json: + training_job_id: str + name of trainingjob + result: str + route of data extraction module for getting data extraction status of + given training_job_id . + status code: + HTTP status code 200 + + Exceptions: + all exception are provided with exception message and HTTP status code. + """ + + LOGGER.debug("Request for training trainingjob %s ", trainingjob.id) + try: + # trainingjob = get_training_job(trainingjob_id) + # print(trainingjob) + # trainingjob_name = trainingjob.trainingjob_name + training_job_id = trainingjob.id + featuregroup= get_featuregroup_by_name(getField(trainingjob.training_config, "feature_group_name")) + LOGGER.debug("featuregroup name is: "+featuregroup.featuregroup_name) + feature_list_string = featuregroup.feature_list + influxdb_info_dic={} + influxdb_info_dic["host"]=featuregroup.host + influxdb_info_dic["port"]=featuregroup.port + influxdb_info_dic["bucket"]=featuregroup.bucket + influxdb_info_dic["token"]=featuregroup.token + influxdb_info_dic["db_org"] = featuregroup.db_org + influxdb_info_dic["source_name"]= featuregroup.source_name + _measurement = featuregroup.measurement + query_filter = getField(trainingjob.training_config, "query_filter") + datalake_source = {featuregroup.datalake_source: {}} # Datalake source should be taken from FeatureGroup (not TrainingJob) + LOGGER.debug('Starting Data Extraction...') + de_response = data_extraction_start(TRAININGMGR_CONFIG_OBJ, training_job_id, + feature_list_string, query_filter, datalake_source, + _measurement, influxdb_info_dic, featuregroup.featuregroup_name) + if (de_response.status_code == status.HTTP_200_OK ): + LOGGER.debug("Response from data extraction for " + \ + training_job_id + " : " + json.dumps(de_response.json())) + change_status_tj(trainingjob.id, + Steps.DATA_EXTRACTION.name, + States.IN_PROGRESS.name) + with LOCK: + DATAEXTRACTION_JOBS_CACHE[trainingjob.id] = "Scheduled" + elif( de_response.headers['content-type'] == MIMETYPE_JSON ) : + errMsg = "Data extraction responded with error code." + LOGGER.error(errMsg) + json_data = de_response.json() + LOGGER.debug(str(json_data)) + if check_key_in_dictionary(["result"], json_data): + return jsonify({ + "message": json.dumps({"Failed":errMsg + json_data["result"]}) + }), 500 + else: + return jsonify({ + "message": errMsg + }), 500 + else: + return jsonify({ + "message": "failed data extraction" + }), 500 + except TMException as err: + if "No row was found when one was required" in str(err): + return jsonify({ + 'message': str(err) + }), 404 + except Exception as e: + # print(traceback.format_exc()) + # response_data = {"Exception": str(err)} + LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e)) + return jsonify({ + 'message': str(e) + }), 500 + return jsonify({"Trainingjob": trainingJobSchema.dump(trainingjob)}), 201 + +def update_trainingPipeline(trainingjob): + try: + training_config = trainingjob.training_config + training_config = json.dumps(setField(training_config, "pipeline_name", "qoe_retraining_pipeline")) + training_config = json.dumps(setField(training_config, "pipeline_version", "qoe_retraining_pipeline")) + trainingjob.training_config = training_config + return trainingjob + except Exception as err: + LOGGER.error(f"error in updating the trainingPipeline due to {str(err)}") + raise TMException("failed to update the trainingPipeline") \ No newline at end of file diff --git a/trainingmgr/trainingmgr_main.py b/trainingmgr/trainingmgr_main.py index 3483a34..e6ba523 100644 --- a/trainingmgr/trainingmgr_main.py +++ b/trainingmgr/trainingmgr_main.py @@ -28,7 +28,7 @@ import traceback import threading from threading import Lock import time -from flask import Flask, request, send_file +from flask import Flask, request, send_file, jsonify from flask_api import status from flask_migrate import Migrate from marshmallow import ValidationError @@ -58,6 +58,7 @@ from trainingmgr.controller import featuregroup_controller, training_job_control from trainingmgr.controller.pipeline_controller import pipeline_controller from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField from trainingmgr.handler.async_handler import start_async_handler +from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, get_training_job, update_artifact_version from trainingmgr.service.pipeline_service import start_training_service @@ -167,15 +168,15 @@ def data_extraction_notification(): trainingjob_id = request.json["trainingjob_id"] trainingjob = get_training_job(trainingjob_id) featuregroup_name = getField(trainingjob.training_config, "feature_group_name") - arguments = getField(trainingjob.training_config, "arguments") + argument_dict = getField(trainingjob.training_config, "arguments") - argument_dict = ast.literal_eval(arguments) + # argument_dict = ast.literal_eval(arguments) argument_dict["trainingjob_id"] = trainingjob_id argument_dict["featuregroup_name"] = featuregroup_name argument_dict["modelName"] = trainingjob.modelId.modelname argument_dict["modelVersion"] = trainingjob.modelId.modelversion - argument_dict["artifactVersion"] = trainingjob.modelId.artifactversion + argument_dict["modellocation"] = trainingjob.model_location # Arguments values must be of type string for key, val in argument_dict.items(): @@ -185,8 +186,9 @@ def data_extraction_notification(): # Experiment name is harded to be Default training_details = { "pipeline_name": getField(trainingjob.training_config, "pipeline_name"), "experiment_name": 'Default', - "arguments": argument_dict, "pipeline_version": getField(trainingjob.training_config, "pipeline_name") + "arguments": argument_dict, "pipeline_version": getField(trainingjob.training_config, "pipeline_version") } + LOGGER.debug("training detail for kf adapter is: "+ str(training_details)) response = training_start(TRAININGMGR_CONFIG_OBJ, training_details, trainingjob_id) if ( response.headers['content-type'] != MIMETYPE_JSON or response.status_code != status.HTTP_200_OK ): @@ -207,11 +209,11 @@ def data_extraction_notification(): change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION_AND_TRAINING.name, States.FINISHED.name) - LOGGER.debug("DATA_EXTRACTION_AND_TRAINING step set to FINISHED for training job " + trainingjob.id) + LOGGER.debug("DATA_EXTRACTION_AND_TRAINING step set to FINISHED for training job " + str(trainingjob.id)) change_status_tj(trainingjob.id, Steps.TRAINING.name, States.IN_PROGRESS.name) - LOGGER.debug("TRAINING step set to IN_PROGRESS for training job " + trainingjob.id) + LOGGER.debug("TRAINING step set to IN_PROGRESS for training job " + str(trainingjob.id)) change_update_field_value(trainingjob.id, "run_id", json_data["run_id"]) @@ -229,13 +231,14 @@ def data_extraction_notification(): pass except Exception as err: + LOGGER.error("error is : "+ str(err)) + return jsonify({"failed":"error"}), 500 # LOGGER.error("Failed to handle dataExtractionNotification. " + str(err)) # if not change_in_progress_to_failed_by_latest_version(trainingjob_name) : # LOGGER.error(ERROR_TYPE_DB_STATUS) # return response_for_training(err_response_code, # str(err) + "(trainingjob name is " + trainingjob_name + ")", # LOGGER, False, trainingjob_name, MM_SDK) - pass return APP.response_class(response=json.dumps({"result": "pipeline is scheduled"}), status=status.HTTP_200_OK, @@ -301,13 +304,13 @@ def pipeline_notification(): # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ) model_name= trainingjob.modelId.modelname model_version= trainingjob.modelId.modelversion - artifact_version= trainingjob.modelId.artifactversion - artifact_version= update_artifact_version(trainingjob_id , artifact_version, "major") - if MM_SDK.check_object(model_name, model_version, artifact_version, "Model.zip"): + modelinfo = get_modelinfo_by_modelId_service(model_name, model_version)[0] + artifactversion = modelinfo["modelId"]["artifactVersion"] + if MM_SDK.check_object(model_name, model_version, artifactversion, "Model.zip"): model_url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \ str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \ - model_name + "/" + str(model_version) + "/" + str(artifact_version) + "/Model.zip" + model_name + "/" + str(model_version) + "/" + str(artifactversion) + "/Model.zip" change_update_field_value(trainingjob_id, "model_url" , model_url) @@ -317,12 +320,12 @@ def pipeline_notification(): # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ) else: errMsg = "Trained model is not available " - LOGGER.error(errMsg + trainingjob_id) - raise TMException(errMsg + trainingjob_id) + LOGGER.error(errMsg + str(trainingjob_id)) + raise TMException(errMsg + str(trainingjob_id)) else: - LOGGER.error("Pipeline notification -Training failed " + trainingjob_id) + LOGGER.error("Pipeline notification -Training failed " + str(trainingjob_id)) raise TMException("Pipeline not successful for " + \ - trainingjob_id + \ + str(trainingjob_id) + \ ",request json from kf adapter is: " + json.dumps(request.json)) except Exception as err: #Training failure response @@ -342,7 +345,6 @@ def pipeline_notification(): - # Moved to pipelineMgr (to be deleted in future) @APP.route('/experiments', methods=['GET']) def get_all_experiment_names():