changes for the retraining and delete trainingjob 18/13718/4
authorrajdeep11 <rajdeep.sin@samsung.com>
Wed, 30 Oct 2024 07:53:48 +0000 (13:23 +0530)
committersubhash kumar singh <subh.singh@samsung.com>
Wed, 30 Oct 2024 10:18:18 +0000 (10:18 +0000)
Change-Id: I0862a78fbf817b1d70c90e4f0987bcc3e4a9f3cd
Signed-off-by: rajdeep11 <rajdeep.sin@samsung.com>
trainingmgr/db/trainingjob_db.py
trainingmgr/trainingmgr_main.py

index 9e13186..34b7915 100644 (file)
@@ -252,7 +252,36 @@ def update_model_download_url(trainingjob_name, version, url):
 
         trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first()
         trainingjob_max_version.model_url = url
+        db.session.commit()
     except Exception as err:
         raise DBException(DB_QUERY_EXEC_ERROR + \
             "update_model_download_url"  + str(err))
-    db.session.commit()
+
+def change_field_value_by_version(trainingjob_name, version, field, field_value):
+    """
+    This function updates field's value to field_value of <trainingjob_name, version> trainingjob.
+    """
+    conn = None
+    try:
+        if field == "deletion_in_progress":
+            trainingjob = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first()
+            trainingjob.deletion_in_progress = field_value
+            trainingjob.updation_time = datetime.datetime.utcnow()
+            db.session.commit()
+    except Exception as err:
+        raise DBException("Failed to execute query in change_field_value_by_version," + str(err))
+
+def delete_trainingjob_version(trainingjob_name, version):
+    """
+    This function deletes the trainingjob entry by <trainingjob_name, version> .
+    """
+
+    try:
+        trainingjob = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first()
+        if trainingjob:
+            db.session.delete(trainingjob)
+        db.session.commit()
+    except Exception as err:
+        raise DBException(DB_QUERY_EXEC_ERROR + \
+            "delete_trainingjob_version" + str(err))
index 54cf6d1..8c5a067 100644 (file)
@@ -51,17 +51,17 @@ from trainingmgr.db.trainingmgr_ps_db import PSDB
 from trainingmgr.common.exceptions_utls import DBException
 from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainingjobs, \
     change_in_progress_to_failed_by_latest_version, \
-    get_info_by_version, get_all_versions_info_by_name, \
+    get_all_versions_info_by_name, \
     update_model_download_url, \
-    get_field_of_given_version, get_info_of_latest_version, \
-    delete_trainingjob_version, change_field_value_by_version
+    get_field_of_given_version
 from trainingmgr.models import db, TrainingJob, FeatureGroup
 from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema
 from trainingmgr.db.featuregroup_db import add_featuregroup, edit_featuregroup, get_feature_groups_db, \
     get_feature_group_by_name_db, delete_feature_group_by_name
 from trainingmgr.db.trainingjob_db import add_update_trainingjob, get_trainingjob_info_by_name, \
     get_all_jobs_latest_status_version, change_steps_state_of_latest_version, get_info_by_version, \
-    get_steps_state_db, change_field_of_latest_version, get_latest_version_trainingjob_name
+    get_steps_state_db, change_field_of_latest_version, get_latest_version_trainingjob_name, get_info_of_latest_version, \
+    change_field_value_by_version, delete_trainingjob_version
 
 APP = Flask(__name__)
 
@@ -1084,51 +1084,28 @@ def retraining():
         trainingjob_name = obj['trainingjob_name']
         results = None
         try:
-            results = get_info_of_latest_version(trainingjob_name, PS_DB_OBJ)
+            trainingjob = get_info_of_latest_version(trainingjob_name)
         except Exception as err:
             not_possible_to_retrain.append(trainingjob_name)
             LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ")")
             continue
         
-        if results:
-            if results[0][17]:
+        if trainingjob:
+            if trainingjob.deletion_in_progress:
                 not_possible_to_retrain.append(trainingjob_name)
                 LOGGER.debug("Failed to retrain because deletion in progress" + \
                              "(trainingjob_name is " + trainingjob_name + ")")
                 continue
 
-            if (get_one_word_status(json.loads(results[0][9]))
+            if (get_one_word_status(json.loads(trainingjob.steps_state))
                     not in [States.FINISHED.name, States.FAILED.name]):
                 not_possible_to_retrain.append(trainingjob_name)
                 LOGGER.debug("Not finished or not failed status" + \
                              "(trainingjob_name is " + trainingjob_name + ")")
                 continue
 
-            enable_versioning = results[0][12]
-            pipeline_version = results[0][13]
-            description = results[0][1]
-            pipeline_name = results[0][3]
-            experiment_name = results[0][4]
-            featuregroup_name = results[0][2]
-            arguments = json.loads(results[0][5])['arguments']
-            query_filter = results[0][6]
-            datalake_source = get_one_key(json.loads(results[0][14])["datalake_source"])
-            is_mme=results[0][18]
-            model_name=results[0][19]
-            model_info=results[0][20]
-
-            notification_url = ""
-            if "notification_url" in obj:
-                notification_url = obj['notification_url']
-
-            if "feature_filter" in obj:
-                query_filter = obj['feature_filter']
-
             try:
-                add_update_trainingjob(description, pipeline_name, experiment_name, featuregroup_name,
-                                    arguments, query_filter, False, enable_versioning,
-                                    pipeline_version, datalake_source, trainingjob_name, 
-                                    PS_DB_OBJ,is_mme=is_mme, model_name=model_name, model_info=model_info)
+                add_update_trainingjob(trainingjob, False)
             except Exception as err:
                 not_possible_to_retrain.append(trainingjob_name)
                 LOGGER.debug(str(err) + "(training job is " + trainingjob_name + ")")
@@ -1212,18 +1189,17 @@ def delete_list_of_trainingjob_version():
         trainingjob_name = my_dict['trainingjob_name']
         version = my_dict['version']
 
-        results = None
         try:
-            results = get_info_by_version(trainingjob_name, version, PS_DB_OBJ)
+            trainingjob = get_info_by_version(trainingjob_name, version)
         except Exception as err:
             not_possible_to_delete.append(my_dict)
             LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ", version is " + str(
                 version) + ")")
             continue
 
-        if results:
+        if trainingjob:
 
-            if results[0][17]:
+            if trainingjob.deletion_in_progress:
                 not_possible_to_delete.append(my_dict)
                 LOGGER.debug("Failed to process deletion request because deletion is " + \
                              "already in progress" + \
@@ -1231,7 +1207,7 @@ def delete_list_of_trainingjob_version():
                     version) + ")")
                 continue
 
-            if (get_one_word_status(json.loads(results[0][9]))
+            if (get_one_word_status(json.loads(trainingjob.steps_state))
                     not in [States.FINISHED.name, States.FAILED.name]):
                 not_possible_to_delete.append(my_dict)
                 LOGGER.debug("Not finished or not failed status" + \
@@ -1240,7 +1216,7 @@ def delete_list_of_trainingjob_version():
                 continue
 
             try:
-                change_field_value_by_version(trainingjob_name, version, PS_DB_OBJ,
+                change_field_value_by_version(trainingjob_name, version,
                                               "deletion_in_progress", True)
             except Exception as err:
                 not_possible_to_delete.append(my_dict)
@@ -1263,7 +1239,7 @@ def delete_list_of_trainingjob_version():
                 continue
 
             try:
-                delete_trainingjob_version(trainingjob_name, version, PS_DB_OBJ)
+                delete_trainingjob_version(trainingjob_name, version)
             except Exception as err:
                 not_possible_to_delete.append(my_dict)
                 LOGGER.debug(str(err) + "(trainingjob_name is " + \