From: rajdeep11 Date: Wed, 30 Oct 2024 07:53:48 +0000 (+0530) Subject: changes for the retraining and delete trainingjob X-Git-Tag: 3.0.0~50 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=d50b2e7e3284ed3cba26d3039c74c5e25660d8c8;p=aiml-fw%2Fawmf%2Ftm.git changes for the retraining and delete trainingjob Change-Id: I0862a78fbf817b1d70c90e4f0987bcc3e4a9f3cd Signed-off-by: rajdeep11 --- diff --git a/trainingmgr/db/trainingjob_db.py b/trainingmgr/db/trainingjob_db.py index 9e13186..34b7915 100644 --- a/trainingmgr/db/trainingjob_db.py +++ b/trainingmgr/db/trainingjob_db.py @@ -252,7 +252,36 @@ def update_model_download_url(trainingjob_name, version, url): trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first() trainingjob_max_version.model_url = url + db.session.commit() except Exception as err: raise DBException(DB_QUERY_EXEC_ERROR + \ "update_model_download_url" + str(err)) - db.session.commit() + +def change_field_value_by_version(trainingjob_name, version, field, field_value): + """ + This function updates field's value to field_value of trainingjob. + """ + conn = None + try: + if field == "deletion_in_progress": + trainingjob = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first() + trainingjob.deletion_in_progress = field_value + trainingjob.updation_time = datetime.datetime.utcnow() + db.session.commit() + except Exception as err: + raise DBException("Failed to execute query in change_field_value_by_version," + str(err)) + +def delete_trainingjob_version(trainingjob_name, version): + """ + This function deletes the trainingjob entry by . + """ + + try: + trainingjob = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first() + if trainingjob: + db.session.delete(trainingjob) + db.session.commit() + + except Exception as err: + raise DBException(DB_QUERY_EXEC_ERROR + \ + "delete_trainingjob_version" + str(err)) diff --git a/trainingmgr/trainingmgr_main.py b/trainingmgr/trainingmgr_main.py index 54cf6d1..8c5a067 100644 --- a/trainingmgr/trainingmgr_main.py +++ b/trainingmgr/trainingmgr_main.py @@ -51,17 +51,17 @@ from trainingmgr.db.trainingmgr_ps_db import PSDB from trainingmgr.common.exceptions_utls import DBException from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainingjobs, \ change_in_progress_to_failed_by_latest_version, \ - get_info_by_version, get_all_versions_info_by_name, \ + get_all_versions_info_by_name, \ update_model_download_url, \ - get_field_of_given_version, get_info_of_latest_version, \ - delete_trainingjob_version, change_field_value_by_version + get_field_of_given_version from trainingmgr.models import db, TrainingJob, FeatureGroup from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema from trainingmgr.db.featuregroup_db import add_featuregroup, edit_featuregroup, get_feature_groups_db, \ get_feature_group_by_name_db, delete_feature_group_by_name from trainingmgr.db.trainingjob_db import add_update_trainingjob, get_trainingjob_info_by_name, \ get_all_jobs_latest_status_version, change_steps_state_of_latest_version, get_info_by_version, \ - get_steps_state_db, change_field_of_latest_version, get_latest_version_trainingjob_name + get_steps_state_db, change_field_of_latest_version, get_latest_version_trainingjob_name, get_info_of_latest_version, \ + change_field_value_by_version, delete_trainingjob_version APP = Flask(__name__) @@ -1084,51 +1084,28 @@ def retraining(): trainingjob_name = obj['trainingjob_name'] results = None try: - results = get_info_of_latest_version(trainingjob_name, PS_DB_OBJ) + trainingjob = get_info_of_latest_version(trainingjob_name) except Exception as err: not_possible_to_retrain.append(trainingjob_name) LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ")") continue - if results: - if results[0][17]: + if trainingjob: + if trainingjob.deletion_in_progress: not_possible_to_retrain.append(trainingjob_name) LOGGER.debug("Failed to retrain because deletion in progress" + \ "(trainingjob_name is " + trainingjob_name + ")") continue - if (get_one_word_status(json.loads(results[0][9])) + if (get_one_word_status(json.loads(trainingjob.steps_state)) not in [States.FINISHED.name, States.FAILED.name]): not_possible_to_retrain.append(trainingjob_name) LOGGER.debug("Not finished or not failed status" + \ "(trainingjob_name is " + trainingjob_name + ")") continue - enable_versioning = results[0][12] - pipeline_version = results[0][13] - description = results[0][1] - pipeline_name = results[0][3] - experiment_name = results[0][4] - featuregroup_name = results[0][2] - arguments = json.loads(results[0][5])['arguments'] - query_filter = results[0][6] - datalake_source = get_one_key(json.loads(results[0][14])["datalake_source"]) - is_mme=results[0][18] - model_name=results[0][19] - model_info=results[0][20] - - notification_url = "" - if "notification_url" in obj: - notification_url = obj['notification_url'] - - if "feature_filter" in obj: - query_filter = obj['feature_filter'] - try: - add_update_trainingjob(description, pipeline_name, experiment_name, featuregroup_name, - arguments, query_filter, False, enable_versioning, - pipeline_version, datalake_source, trainingjob_name, - PS_DB_OBJ,is_mme=is_mme, model_name=model_name, model_info=model_info) + add_update_trainingjob(trainingjob, False) except Exception as err: not_possible_to_retrain.append(trainingjob_name) LOGGER.debug(str(err) + "(training job is " + trainingjob_name + ")") @@ -1212,18 +1189,17 @@ def delete_list_of_trainingjob_version(): trainingjob_name = my_dict['trainingjob_name'] version = my_dict['version'] - results = None try: - results = get_info_by_version(trainingjob_name, version, PS_DB_OBJ) + trainingjob = get_info_by_version(trainingjob_name, version) except Exception as err: not_possible_to_delete.append(my_dict) LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ", version is " + str( version) + ")") continue - if results: + if trainingjob: - if results[0][17]: + if trainingjob.deletion_in_progress: not_possible_to_delete.append(my_dict) LOGGER.debug("Failed to process deletion request because deletion is " + \ "already in progress" + \ @@ -1231,7 +1207,7 @@ def delete_list_of_trainingjob_version(): version) + ")") continue - if (get_one_word_status(json.loads(results[0][9])) + if (get_one_word_status(json.loads(trainingjob.steps_state)) not in [States.FINISHED.name, States.FAILED.name]): not_possible_to_delete.append(my_dict) LOGGER.debug("Not finished or not failed status" + \ @@ -1240,7 +1216,7 @@ def delete_list_of_trainingjob_version(): continue try: - change_field_value_by_version(trainingjob_name, version, PS_DB_OBJ, + change_field_value_by_version(trainingjob_name, version, "deletion_in_progress", True) except Exception as err: not_possible_to_delete.append(my_dict) @@ -1263,7 +1239,7 @@ def delete_list_of_trainingjob_version(): continue try: - delete_trainingjob_version(trainingjob_name, version, PS_DB_OBJ) + delete_trainingjob_version(trainingjob_name, version) except Exception as err: not_possible_to_delete.append(my_dict) LOGGER.debug(str(err) + "(trainingjob_name is " + \