Resolving Linter Issues including: 82/13982/2
authorashishj1729 <jain.ashish@samsung.com>
Thu, 2 Jan 2025 12:40:08 +0000 (18:10 +0530)
committerashishj1729 <jain.ashish@samsung.com>
Fri, 3 Jan 2025 06:08:22 +0000 (11:38 +0530)
1. Removing Unused Imports
2. Removing Unused Variables
3. Minor Bugfix
4. Handling Deletion Cases during DataExtraction in the async-thread

Change-Id: Iccf5f5819b296497f6e01c943526b71a1242fb5e
Signed-off-by: ashishj1729 <jain.ashish@samsung.com>
14 files changed:
trainingmgr/common/trainingmgr_config.py
trainingmgr/common/trainingmgr_operations.py
trainingmgr/common/trainingmgr_util.py
trainingmgr/controller/featuregroup_controller.py
trainingmgr/controller/trainingjob_controller.py
trainingmgr/db/featuregroup_db.py
trainingmgr/db/model_db.py
trainingmgr/db/trainingjob_db.py
trainingmgr/handler/async_handler.py
trainingmgr/models/trainingjob.py
trainingmgr/pipeline/mme_mgr.py
trainingmgr/schemas/trainingjob_schema.py
trainingmgr/service/training_job_service.py
trainingmgr/trainingmgr_main.py

index 36304d6..6815b3e 100644 (file)
@@ -19,7 +19,6 @@
 """
 This module is for loading training manager configuration.
 """
-import os
 from os import getenv
 from trainingmgr.common.tmgr_logger import TMLogger
 
index 6aedbfe..4ab1123 100644 (file)
@@ -165,7 +165,6 @@ def create_dme_filtered_data_job(training_config_obj, source_name, features, fea
 
 
     headers = {'Content-type': MIMETYPE_JSON}
-
     url = create_url_host_port('http', host, port, 'data-consumer/v1/info-jobs/{}'.format(feature_group_name))
     logger.debug(url)
     logger.debug(json.dumps(job_json))
@@ -201,7 +200,7 @@ def notification_rapp(trainingjob_id):
     try:
         trainingjob = get_trainingjob(trainingjob_id)
         steps_state = trainingjob.steps_state.states
-        if trainingjob.notification_url != "":
+        if trainingjob.notification_url != "" and trainingjob.notification_url is not None:
             response = requests.post(trainingjob.notification_url,
                                     data=json.dumps(steps_state),
                                     headers={
index d7a8c43..d67b3b2 100644 (file)
@@ -23,17 +23,14 @@ from flask import jsonify
 import json
 import re
 from flask_api import status
-import requests
 from marshmallow import ValidationError
-from trainingmgr.db.common_db_fun import change_in_progress_to_failed_by_latest_version, \
-    get_field_by_latest_version, change_field_of_latest_version, \
-    get_latest_version_trainingjob_name
-from trainingmgr.db.featuregroup_db import add_featuregroup, edit_featuregroup, get_feature_groups_db, \
+from trainingmgr.db.common_db_fun import change_in_progress_to_failed_by_latest_version
+from trainingmgr.db.featuregroup_db import edit_featuregroup, \
 get_feature_group_by_name_db, delete_feature_group_by_name
 from trainingmgr.constants.states import States
 from trainingmgr.common.exceptions_utls import APIException,TMException,DBException
 from trainingmgr.common.trainingmgr_operations import create_dme_filtered_data_job
-from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema
+from trainingmgr.schemas import FeatureGroupSchema
 from trainingmgr.constants.steps import Steps
 
 ERROR_TYPE_KF_ADAPTER_JSON = "Kf adapter doesn't sends json type response"
@@ -224,7 +221,7 @@ def edit_feature_group_by_name(featuregroup_name: str,
         response_code =status.HTTP_200_OK
         # TODO: Implement the process where DME edits from the dashboard are applied to the endpoint
         if featuregroup.enable_dme == True :
-            response= create_dme_filtered_data_job(tm_conf_obj, featuregroup.source_name, featuregroup.feature_list, 
+            response= create_dme_filtered_data_job(tm_conf_obj, featuregroup.source_name, featuregroup.feature_list, featuregroup.featuregroup_name,
                                                    featuregroup.host, featuregroup.port, 
                                                    featuregroup.measured_obj_class)
             if response.status_code != 201:
@@ -236,7 +233,6 @@ def edit_feature_group_by_name(featuregroup_name: str,
     except DBException as err:
         return {"Exception": str(err)}, 400
     except Exception as e:
-        err_msg = "Failed to create the feature Group "
         api_response = {"Exception":str(e)}
         logger.error(str(e))
     
@@ -272,6 +268,7 @@ def get_metrics(trainingjob_name, version, mm_sdk):
     return data
 
 
+# TODO: Remove the fxn since it is not called anywhere
 def handle_async_feature_engineering_status_exception_case(lock, dataextraction_job_cache, code,
                                                            message, logger, is_success,
                                                            trainingjob_name, mm_sdk):
index 419b91b..42dfa1d 100644 (file)
@@ -111,8 +111,8 @@ def create_feature_group():
             if featuregroup.enable_dme == True :
                 response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, featuregroup.source_name, featuregroup.feature_list, featuregroup.featuregroup_name, featuregroup.host, featuregroup.dme_port, featuregroup.measured_obj_class)
                 if response.status_code != 201:
-                    delete_feature_group_by_name(featuregroup)
-                    return jsonify({"Exception": "Cannot create dme job"}), status.HTTP_400_BAD_REQUEST
+                    delete_feature_group_by_name(featuregroup.featuregroup_name)
+                    return jsonify({"Exception": "Cannot create dme job | DME Error : " + str(response.json()["detail"])}), status.HTTP_400_BAD_REQUEST
     except ValidationError as err:
         LOGGER.error(f"Failed to create the feature Group {str(err)}")
         return {"Exception": str(err)}, 400
index 40e0436..5c9dfd7 100644 (file)
@@ -24,7 +24,7 @@ from marshmallow import ValidationError
 from trainingmgr.common.exceptions_utls import TMException
 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
 from trainingmgr.schemas.trainingjob_schema import TrainingJobSchema
-from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainingjob_by_modelId, get_trainining_jobs, \
+from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainining_jobs, \
 get_steps_state
 from trainingmgr.common.trainingmgr_util import check_key_in_dictionary
 from trainingmgr.common.trainingConfig_parser import validateTrainingConfig
@@ -42,7 +42,7 @@ MIMETYPE_JSON = "application/json"
 def delete_trainingjob(training_job_id):
     LOGGER.debug(f'delete training job : {training_job_id}')
     try:
-        if delete_training_job(str(training_job_id)):
+        if delete_training_job(int(training_job_id)):
             LOGGER.debug(f'training job with {training_job_id} is deleted successfully.')
             return '', 204
         else:
index 6db5398..b1688c9 100644 (file)
@@ -19,7 +19,6 @@
 from trainingmgr.common.exceptions_utls import DBException
 from sqlalchemy.exc import IntegrityError
 from psycopg2.errors import UniqueViolation
-from psycopg2 import errors
 from trainingmgr.models import db, FeatureGroup
 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
 
@@ -82,7 +81,7 @@ def get_feature_groups_from_inputDataType_db(inputDataType):
     try:
         return FeatureGroup.query.with_entities(FeatureGroup.featuregroup_name).filter_by(feature_list=inputDataType).all()
     except Exception as err:
-        raise DBException("Unable to query in get_feature_groups_from_inputDataType_db with error : ", err)
+        raise DBException(DB_QUERY_EXEC_ERROR + "in get_feature_groups_from_inputDataType_db with error : " + str(err))
 
 def delete_feature_group_by_name(featuregroup_name):
     """
index bc8a21f..57617aa 100644 (file)
@@ -17,7 +17,7 @@
 # ==================================================================================
 
 from trainingmgr.common.exceptions_utls import DBException
-from trainingmgr.models import db, ModelID
+from trainingmgr.models import ModelID
 
 def get_model_by_modelId(modelname, modelversion):
     try:
index 24f0f17..0b24939 100644 (file)
@@ -23,9 +23,7 @@ from trainingmgr.common.exceptions_utls import DBException
 from trainingmgr.models import db, TrainingJob, TrainingJobStatus, ModelID
 from trainingmgr.constants.steps import Steps
 from trainingmgr.constants.states import States
-from sqlalchemy.sql import func
 from sqlalchemy.exc import NoResultFound
-from trainingmgr.common.trainingConfig_parser import getField
 
 
 
@@ -96,17 +94,23 @@ def delete_trainingjob_by_id(id: int):
         raise DBException(f'{DB_QUERY_EXEC_ERROR} : {str(e)}' )
 
 def get_trainingjob(id: int=None):
-    if id is not None:
-        return TrainingJob.query.filter(TrainingJob.id==id).one()
-    else:
-        tjs = TrainingJob.query.all()
-        return tjs
+    try:
+        if id is not None:
+            return TrainingJob.query.filter(TrainingJob.id==id).one()
+        else:
+            tjs = TrainingJob.query.all()
+            return tjs
+    except NoResultFound:
+        # id is not present
+        return None
+    except Exception as e:
+        raise DBException(f'{DB_QUERY_EXEC_ERROR} : {str(e)}' )
+        
 
 def change_field_value_by_version(trainingjob_name, version, field, field_value):
     """
     This function updates field's value to field_value of <trainingjob_name, version> trainingjob.
     """
-    conn = None
     try:
         if field == "deletion_in_progress":
             trainingjob = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first()
index c2c2674..509abef 100644 (file)
@@ -49,6 +49,14 @@ def check_and_notify_feature_engineering_status(APP,db):
                 # trainingjob_name = trainingjob.trainingjob_name
                 with APP.app_context():
                     trainingjob = get_trainingjob(trainingjob_id)
+                
+                if trainingjob is None:
+                    # trainingjob_id not present in db, A possible case of deletion
+                    LOGGER.debug(f"Training-Job Id {trainingjob_id} is Found to be deleted| Removing from DATAEXTRACTION_JOBS_CACHE")
+                    with LOCK:
+                        DATAEXTRACTION_JOBS_CACHE.pop(trainingjob_id)
+                    continue
+                    
                 featuregroup_name = getField(trainingjob.training_config, "feature_group_name")
                 response = data_extraction_status(featuregroup_name, trainingjob_id, TRAININGMGR_CONFIG_OBJ)
                 if (response.headers.get('content-type') != "application/json" or
@@ -73,22 +81,31 @@ def check_and_notify_feature_engineering_status(APP,db):
                     )
                     if (kf_response.headers.get('content-type') != "application/json" or
                             kf_response.status_code != 200):
-                        raise TMException(f"KF adapter returned an error for {featuregroup_name}.")
+                        LOGGER.error(f"KF adapter returned an error for {featuregroup_name}. | Response : {kf_response.json()}")
+                        raise TMException(f"KF adapter returned an error for {featuregroup_name}. ")
 
                     with LOCK:
                         DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id)
                 elif response_data["task_status"] == "Error":
                     raise TMException(f"Data extraction failed for {featuregroup_name}.")
+            except DBException as err:
+                # If there is any communication error with db, the thread must not fail
+                LOGGER.error("Recieved Db Failure in async-handler| Error : " + str(err))
+                continue
             except Exception as err:
                 LOGGER.error(f"Error processing DATAEXTRACTION_JOBS_CACHE: {str(err)}")
-                with APP.app_context():
-                    change_state_to_failed(trainingjob.id)
-                    notification_rapp(trainingjob.id)
-                    with LOCK:
-                        try:
-                            DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id)
-                        except KeyError as key_err:
-                            LOGGER.error("The training job key doesn't exist in DATAEXTRACTION_JOBS_CACHE: " + str(key_err))
+                #The following try-block will prevent thread-failure when  'change_state_to_failed' fails
+                try:
+                    with APP.app_context():
+                        change_state_to_failed(trainingjob.id)
+                        notification_rapp(trainingjob.id)
+                        with LOCK:
+                                DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id)
+                except KeyError as key_err:
+                    LOGGER.error("The training job key doesn't exist in DATAEXTRACTION_JOBS_CACHE: " + str(key_err))
+                except Exception as err:
+                    LOGGER.error(f"Error processing DATAEXTRACTION_JOBS_CACHE-Exception: {str(err)}")
+                    
 
         time.sleep(10)  # Sleep before checking again
 
index 84600d4..b439475 100644 (file)
@@ -16,7 +16,6 @@
 #
 # ==============================================================================
 from trainingmgr.models import db
-from datetime import datetime
 from sqlalchemy.sql import func
 from sqlalchemy import Integer, ForeignKey, String, DateTime, Column, Boolean
 from sqlalchemy.orm import relationship
index 5dd117d..b82a6a9 100644 (file)
@@ -19,9 +19,7 @@
 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
 import requests
 from trainingmgr.common.exceptions_utls import TMException
-from flask_api import status
 import requests
-import json
 
 LOGGER = TrainingMgrConfig().logger
 
index f24efb0..a4322c5 100644 (file)
@@ -21,7 +21,7 @@ from trainingmgr.schemas import ma
 from trainingmgr.models import TrainingJob
 from trainingmgr.models.trainingjob import ModelID
 import json
-from marshmallow import pre_load, post_dump, fields, validates, ValidationError
+from marshmallow import pre_load, post_dump
 
 PATTERN = re.compile(r"\w+")
 
index 91caf2f..bccfc28 100644 (file)
@@ -100,6 +100,8 @@ def delete_training_job(training_job_id : int):
     try:
         # Signal Deletion in Progress
         tj = get_trainingjob(training_job_id)
+        if tj is None:
+            return False
         change_field_value(training_job_id, "deletion_in_progress", True)
         steps_state =  json.loads(tj.steps_state.states)
         overall_status = get_one_word_status(steps_state)
@@ -114,7 +116,7 @@ def delete_training_job(training_job_id : int):
             elif (step_in_progress_state == Steps.TRAINING or (step_in_progress_state == Steps.DATA_EXTRACTION_AND_TRAINING and tj.run_id is not None)):
                 # Signal the Kf-Adapter to terminate the 
                 response = terminate_training_service(tj.run_id)
-                LOGGER.debug("Deletion-Response : " + response)
+                LOGGER.debug("Deletion-Response : " + response.text)
 
         isDeleted = delete_trainingjob_by_id(id=training_job_id)
         if isDeleted:
@@ -281,4 +283,4 @@ def fetch_pipelinename_and_version(type, training_config):
         else :
             return getField(training_config, "retraining_pipeline_name"), getField(training_config, "retraining_pipeline_version")
     except Exception as err:
-        raise TMException(f"cant fetch training or retraining pipeline name or version from trainingconfig {training_config}")
\ No newline at end of file
+        raise TMException(f"cant fetch training or retraining pipeline name or version from trainingconfig {training_config}| Error: " + str(err))
\ No newline at end of file
index 80f7bfd..7f96e4b 100644 (file)
@@ -24,7 +24,6 @@ from logging import Logger
 from flask import Flask, request, send_file, jsonify
 from flask_api import status
 from flask_migrate import Migrate
-from marshmallow import ValidationError
 import requests
 from flask_cors import CORS
 from trainingmgr.db.trainingjob_db import change_state_to_failed
@@ -38,15 +37,14 @@ from trainingmgr.constants.steps import Steps
 from trainingmgr.constants.states import States
 from trainingmgr.db.trainingmgr_ps_db import PSDB
 from trainingmgr.models import db
-from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema
-from trainingmgr.db.featuregroup_db import  get_feature_groups_db, \
-    get_feature_group_by_name_db, delete_feature_group_by_name
+from trainingmgr.schemas import TrainingJobSchema , FeatureGroupSchema
+from trainingmgr.db.featuregroup_db import get_feature_group_by_name_db, delete_feature_group_by_name
 from trainingmgr.controller import featuregroup_controller, training_job_controller
 from trainingmgr.controller.pipeline_controller import pipeline_controller
-from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField
+from trainingmgr.common.trainingConfig_parser import getField
 from trainingmgr.handler.async_handler import start_async_handler
 from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service
-from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, fetch_pipelinename_and_version, get_training_job, update_artifact_version
+from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, fetch_pipelinename_and_version, get_training_job
 
 APP = Flask(__name__)
 TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
@@ -116,8 +114,6 @@ def get_model(modelname, modelversion, artifactversion):
         return {"Exception": "error while downloading model"}, status.HTTP_500_INTERNAL_SERVER_ERROR
 
 
-
-# Training-Config Handled
 @APP.route('/trainingjob/dataExtractionNotification', methods=['POST'])
 def data_extraction_notification():
     """
@@ -143,8 +139,6 @@ def data_extraction_notification():
         all exception are provided with exception message and HTTP status code.
     """
     LOGGER.debug("Data extraction notification...")
-    err_response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
-    results = None
     try:
         if not check_key_in_dictionary(["trainingjob_id"], request.json) :
             err_msg = "featuregroup_name or trainingjob_id key not available in request"
@@ -211,7 +205,6 @@ def data_extraction_notification():
         if not check_key_in_dictionary(["run_status", "run_id"], json_data):
             err_msg = "Kf adapter invalid response from , key not present ,run_status or  run_id for " + str(trainingjob_id)
             Logger.error(err_msg)
-            err_response_code = status.HTTP_400_BAD_REQUEST
             raise TMException(err_msg)
 
         if json_data["run_status"] == 'scheduled':
@@ -356,57 +349,6 @@ def pipeline_notification():
     return jsonify({"Message":"Training successful"}), 200
 
 
-
-# Moved to pipelineMgr (to be deleted in future)
-@APP.route('/experiments', methods=['GET'])
-def get_all_experiment_names():
-    """
-    Function handling rest endpoint to get all experiment names.
-
-    Args in function:
-        none
-
-    Args in json:
-        no json required
-
-    Returns:
-        json:
-            experiment_names : list
-                               list containing all experiment names(as str).
-        status code:
-            HTTP status code 200
-
-    Exceptions:
-        all exception are provided with exception message and HTTP status code.
-    """
-
-    LOGGER.debug("request for getting all experiment names is come.")
-    api_response = {}
-    reponse_code = status.HTTP_500_INTERNAL_SERVER_ERROR
-    try:
-        kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
-        kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
-        if kf_adapter_ip!=None and kf_adapter_port!=None: 
-            url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/experiments'
-        LOGGER.debug("url is :" + url)
-        response = requests.get(url)
-        if response.headers['content-type'] != MIMETYPE_JSON:
-            err_smg = ERROR_TYPE_KF_ADAPTER_JSON
-            raise TMException(err_smg)
-
-        experiment_names = []
-        for experiment in response.json().keys():
-            experiment_names.append(experiment)
-        api_response = {"experiment_names": experiment_names}
-        reponse_code = status.HTTP_200_OK
-    except Exception as err:
-        api_response =  {"Exception": str(err)}
-        LOGGER.error(str(err))
-    return APP.response_class(response=json.dumps(api_response),
-                                  status=reponse_code,
-                                  mimetype=MIMETYPE_JSON)
-
-
 @APP.route('/featureGroup/<featuregroup_name>', methods=['GET', 'PUT'])
 def feature_group_by_name(featuregroup_name):
     """