From: rajdeep11 Date: Tue, 29 Oct 2024 05:56:48 +0000 (+0530) Subject: adding the trainingjob db layer X-Git-Tag: 3.0.0~56 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=0a681ec0af091a8d78abc0a4c21db91a3cdf87de;p=aiml-fw%2Fawmf%2Ftm.git adding the trainingjob db layer Change-Id: Ica0d81792848cfd3299509536dd507e2c35749d3 Signed-off-by: rajdeep11 --- diff --git a/trainingmgr/db/trainingjob_db.py b/trainingmgr/db/trainingjob_db.py new file mode 100644 index 0000000..3e525d1 --- /dev/null +++ b/trainingmgr/db/trainingjob_db.py @@ -0,0 +1,214 @@ +# ================================================================================== +# +# Copyright (c) 2024 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import datetime +import re +import json +from trainingmgr.common.exceptions_utls import DBException +from trainingmgr.models import db, TrainingJob, FeatureGroup +from trainingmgr.constants.steps import Steps +from trainingmgr.constants.states import States +from sqlalchemy.sql import func + + + +DB_QUERY_EXEC_ERROR = "Failed to execute query in " +PATTERN = re.compile(r"\w+") + + +def get_all_versions_info_by_name(trainingjob_name): + """ + This function returns information of given trainingjob_name for all version. + """ + return TrainingJob.query.filter_by(trainingjob_name=trainingjob_name).all() + +def add_update_trainingjob(trainingjob, adding): + """ + This function add the new row or update existing row with given information + """ + + try: + # arguments_string = json.dumps({"arguments": trainingjob.arguments}) + datalake_source_dic = {} + datalake_source_dic[trainingjob.datalake_source] = {} + trainingjob.datalake_source = json.dumps({"datalake_source": datalake_source_dic}) + trainingjob.creation_time = datetime.datetime.utcnow() + trainingjob.updation_time = trainingjob.creation_time + run_id = "No data available" + steps_state = { + Steps.DATA_EXTRACTION.name: States.NOT_STARTED.name, + Steps.DATA_EXTRACTION_AND_TRAINING.name: States.NOT_STARTED.name, + Steps.TRAINING.name: States.NOT_STARTED.name, + Steps.TRAINING_AND_TRAINED_MODEL.name: States.NOT_STARTED.name, + Steps.TRAINED_MODEL.name: States.NOT_STARTED.name + } + trainingjob.steps_state=json.dumps(steps_state) + trainingjob.model_url = "No data available." + trainingjob.deletion_in_progress = False + trainingjob.version = 1 + if not adding: + + trainingjob_max_version = db.session.query(TrainingJob).filter(TrainingJob.trainingjob_name == trainingjob.trainingjob_name).order_by(TrainingJob.version.desc()).first() + + if trainingjob_max_version.enable_versioning: + trainingjob.version = trainingjob_max_version.version + 1 + db.session.add(trainingjob) + else: + + for key, value in trainingjob.items(): + if(key == 'id'): + continue + setattr(trainingjob_max_version, key, value) + + else: + db.session.add(trainingjob) + db.session.commit() + + except Exception as err: + raise DBException(DB_QUERY_EXEC_ERROR + \ + "add_update_trainingjob" + str(err)) + +def get_trainingjob_info_by_name(trainingjob_name): + """ + This function returns information of training job by name and + by default latest version + """ + + try: + trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).order_by(TrainingJob.version.desc()).first() + except Exception as err: + raise DBException(DB_QUERY_EXEC_ERROR + \ + "get_trainingjob_info_by_name" + str(err)) + return trainingjob_max_version + +def get_info_by_version(trainingjob_name, version): + """ + This function returns information for given trainingjob. + """ + + try: + trainingjob = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first() + except Exception as err: + raise DBException(DB_QUERY_EXEC_ERROR + \ + "get_info_by_version" + str(err)) + return trainingjob + +def get_steps_state_db(trainingjob_name, version): + """ + This function returns steps_state value of trainingjob as tuple of list. + """ + + try: + steps_state = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first().steps_state + except Exception as err: + raise DBException("Failed to execute query in get_field_of_given_version" + str(err)) + + return steps_state + +def get_info_of_latest_version(trainingjob_name): + """ + This function returns information of + usecase. + """ + + try: + trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).order_by(TrainingJob.version.desc()).first() + except Exception as err: + raise DBException("Failed to execute query in get_info_of_latest_version " + str(err)) + + return trainingjob_max_version + +def get_all_jobs_latest_status_version(): + """ + This function returns all trainingjobs latest version. + """ + + try: + subquery = ( + db.session.query( + TrainingJob.trainingjob_name, + func.max(TrainingJob.version).label('max_version') + ).group_by(TrainingJob.trainingjob_name) + .subquery() + ) + results = ( + db.session.query(TrainingJob) + .join(subquery, (TrainingJob.trainingjob_name == subquery.c.trainingjob_name) & + (TrainingJob.version == subquery.c.max_version)) + .all() + ) + + except Exception as err: + + raise DBException(DB_QUERY_EXEC_ERROR + \ + "get_all_jobs_latest_status_version" + str(err)) + + return results + +def change_steps_state_of_latest_version(trainingjob_name, key, value): + """ + This function changes steps_state of trainingjob latest version + """ + try: + trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).order_by(TrainingJob.version.desc()).first() + steps_state = json.loads(trainingjob_max_version.steps_state) + steps_state[key] = value + trainingjob_max_version.steps_state = json.dumps(steps_state) + db.session.commit() + except Exception as err: + raise DBException(DB_QUERY_EXEC_ERROR + \ + "change_steps_state_of_latest_version" + str(err)) + +def change_in_progress_to_failed_by_latest_version(trainingjob_name): + """ + This function changes steps_state's key's value to FAILED which is currently + IN_PROGRESS of trainingjob. + """ + status_changed = False + try: + trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).order_by(TrainingJob.version.desc()).first() + steps_state = json.loads(trainingjob_max_version.steps_state) + for step in steps_state: + if steps_state[step] == States.IN_PROGRESS.name: + steps_state[step] = States.FAILED.name + trainingjob_max_version.steps_state = json.dumps(steps_state) + status_changed = True + db.session.commit() + except Exception as err: + raise DBException(DB_QUERY_EXEC_ERROR + \ + "change_in_progress_to_failed_by_latest_version" + str(err)) + return status_changed + +def get_field_by_latest_version(trainingjob_name, field): + """ + This function returns field's value of + trainingjob as tuple of list. + """ + + try: + trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).order_by(TrainingJob.version.desc()).first() + result = None + if field == "notification_url": + result = trainingjob_max_version.notification_url + elif field == "model_url": + result = trainingjob_max_version.model_url + + except Exception as err: + raise DBException("Failed to execute query in get_field_by_latest_version," + str(err)) + + return result \ No newline at end of file