adding the trainingjob db layer 09/13709/9
authorrajdeep11 <rajdeep.sin@samsung.com>
Tue, 29 Oct 2024 05:56:48 +0000 (11:26 +0530)
committersubhash kumar singh <subh.singh@samsung.com>
Wed, 30 Oct 2024 10:02:56 +0000 (10:02 +0000)
Change-Id: Ica0d81792848cfd3299509536dd507e2c35749d3
Signed-off-by: rajdeep11 <rajdeep.sin@samsung.com>
trainingmgr/db/trainingjob_db.py [new file with mode: 0644]

diff --git a/trainingmgr/db/trainingjob_db.py b/trainingmgr/db/trainingjob_db.py
new file mode 100644 (file)
index 0000000..3e525d1
--- /dev/null
@@ -0,0 +1,214 @@
+# ==================================================================================
+#
+#       Copyright (c) 2024 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+import datetime
+import re
+import json
+from trainingmgr.common.exceptions_utls import DBException
+from trainingmgr.models import db, TrainingJob, FeatureGroup
+from trainingmgr.constants.steps import Steps
+from trainingmgr.constants.states import States
+from sqlalchemy.sql import func
+
+
+
+DB_QUERY_EXEC_ERROR = "Failed to execute query in "
+PATTERN = re.compile(r"\w+")
+
+
+def get_all_versions_info_by_name(trainingjob_name):
+    """
+    This function returns information of given trainingjob_name for all version.
+    """   
+    return TrainingJob.query.filter_by(trainingjob_name=trainingjob_name).all()
+
+def add_update_trainingjob(trainingjob, adding):
+    """
+    This function add the new row or update existing row with given information
+    """
+
+    try:
+        # arguments_string = json.dumps({"arguments": trainingjob.arguments})
+        datalake_source_dic = {}
+        datalake_source_dic[trainingjob.datalake_source] = {}
+        trainingjob.datalake_source = json.dumps({"datalake_source": datalake_source_dic})
+        trainingjob.creation_time = datetime.datetime.utcnow()
+        trainingjob.updation_time = trainingjob.creation_time
+        run_id = "No data available"
+        steps_state = {
+            Steps.DATA_EXTRACTION.name: States.NOT_STARTED.name,
+            Steps.DATA_EXTRACTION_AND_TRAINING.name: States.NOT_STARTED.name,
+            Steps.TRAINING.name: States.NOT_STARTED.name,
+            Steps.TRAINING_AND_TRAINED_MODEL.name: States.NOT_STARTED.name,
+            Steps.TRAINED_MODEL.name: States.NOT_STARTED.name
+        }
+        trainingjob.steps_state=json.dumps(steps_state)
+        trainingjob.model_url = "No data available."
+        trainingjob.deletion_in_progress = False
+        trainingjob.version = 1
+        if not adding:
+
+            trainingjob_max_version = db.session.query(TrainingJob).filter(TrainingJob.trainingjob_name == trainingjob.trainingjob_name).order_by(TrainingJob.version.desc()).first()
+            
+            if trainingjob_max_version.enable_versioning:
+                trainingjob.version = trainingjob_max_version.version + 1
+                db.session.add(trainingjob)
+            else:
+
+                for key, value in trainingjob.items():
+                    if(key == 'id'):
+                        continue
+                    setattr(trainingjob_max_version, key, value)
+
+        else:
+            db.session.add(trainingjob)
+        db.session.commit()
+
+    except Exception as err:
+        raise DBException(DB_QUERY_EXEC_ERROR + \
+            "add_update_trainingjob"  + str(err))
+
+def get_trainingjob_info_by_name(trainingjob_name):
+    """
+    This function returns information of training job by name and 
+    by default latest version
+    """
+
+    try:
+        trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).order_by(TrainingJob.version.desc()).first()
+    except Exception as err:
+        raise DBException(DB_QUERY_EXEC_ERROR + \
+            "get_trainingjob_info_by_name"  + str(err))
+    return trainingjob_max_version
+
+def get_info_by_version(trainingjob_name, version):
+    """
+    This function returns information for given <trainingjob_name, version> trainingjob.
+    """
+
+    try:
+        trainingjob = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first()
+    except Exception as err:
+        raise DBException(DB_QUERY_EXEC_ERROR + \
+            "get_info_by_version"  + str(err))
+    return trainingjob
+
+def get_steps_state_db(trainingjob_name, version):
+    """
+    This function returns steps_state value of <trainingjob_name, version> trainingjob as tuple of list.
+    """
+
+    try:
+        steps_state = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first().steps_state
+    except Exception as err:
+        raise DBException("Failed to execute query in get_field_of_given_version" + str(err))
+
+    return steps_state
+
+def get_info_of_latest_version(trainingjob_name):
+    """
+    This function returns information of <trainingjob_name, trainingjob_name trainingjob's latest version>
+    usecase.
+    """
+
+    try:
+        trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).order_by(TrainingJob.version.desc()).first()
+    except Exception as err:
+        raise DBException("Failed to execute query in get_info_of_latest_version " + str(err))
+
+    return trainingjob_max_version
+
+def get_all_jobs_latest_status_version():
+    """
+    This function returns all trainingjobs latest version.
+    """
+
+    try:
+        subquery = (
+            db.session.query(
+                TrainingJob.trainingjob_name,
+                func.max(TrainingJob.version).label('max_version')
+                ).group_by(TrainingJob.trainingjob_name)
+                .subquery()
+        )
+        results = (
+            db.session.query(TrainingJob)
+            .join(subquery, (TrainingJob.trainingjob_name == subquery.c.trainingjob_name) & 
+                            (TrainingJob.version == subquery.c.max_version))
+            .all()
+        )
+
+    except Exception as err:
+
+        raise DBException(DB_QUERY_EXEC_ERROR + \
+            "get_all_jobs_latest_status_version"  + str(err))
+
+    return results
+
+def change_steps_state_of_latest_version(trainingjob_name, key, value):
+    """
+    This function changes steps_state of trainingjob latest version
+    """
+    try:
+        trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).order_by(TrainingJob.version.desc()).first()
+        steps_state = json.loads(trainingjob_max_version.steps_state)
+        steps_state[key] = value
+        trainingjob_max_version.steps_state = json.dumps(steps_state) 
+        db.session.commit()
+    except Exception as err:
+        raise DBException(DB_QUERY_EXEC_ERROR + \
+            "change_steps_state_of_latest_version"  + str(err))
+
+def change_in_progress_to_failed_by_latest_version(trainingjob_name):
+    """
+    This function changes steps_state's key's value to FAILED which is currently
+    IN_PROGRESS of <trainingjob_name, trainingjob_name trainingjob's latest version> trainingjob.
+    """
+    status_changed = False
+    try:
+        trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).order_by(TrainingJob.version.desc()).first()
+        steps_state = json.loads(trainingjob_max_version.steps_state)
+        for step in steps_state:
+            if steps_state[step] == States.IN_PROGRESS.name:
+                steps_state[step] = States.FAILED.name
+        trainingjob_max_version.steps_state = json.dumps(steps_state)
+        status_changed = True
+        db.session.commit()
+    except Exception as err:
+        raise DBException(DB_QUERY_EXEC_ERROR + \
+             "change_in_progress_to_failed_by_latest_version" + str(err))
+    return status_changed
+
+def get_field_by_latest_version(trainingjob_name, field):
+    """
+    This function returns field's value of <trainingjob_name, trainingjob_name trainingjob's latest version>
+    trainingjob as tuple of list.
+    """
+
+    try:
+        trainingjob_max_version = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).order_by(TrainingJob.version.desc()).first()
+        result = None
+        if field == "notification_url":
+            result = trainingjob_max_version.notification_url
+        elif field == "model_url":
+            result = trainingjob_max_version.model_url
+                    
+    except Exception as err:
+        raise DBException("Failed to execute query in get_field_by_latest_version,"  + str(err))
+
+    return result
\ No newline at end of file