From 3ea2bc558caf8b31e951edeff25e20f02481bf00 Mon Sep 17 00:00:00 2001 From: ashishj1729 Date: Thu, 2 Jan 2025 18:10:08 +0530 Subject: [PATCH] Resolving Linter Issues including: 1. Removing Unused Imports 2. Removing Unused Variables 3. Minor Bugfix 4. Handling Deletion Cases during DataExtraction in the async-thread Change-Id: Iccf5f5819b296497f6e01c943526b71a1242fb5e Signed-off-by: ashishj1729 --- trainingmgr/common/trainingmgr_config.py | 1 - trainingmgr/common/trainingmgr_operations.py | 3 +- trainingmgr/common/trainingmgr_util.py | 13 ++--- trainingmgr/controller/featuregroup_controller.py | 4 +- trainingmgr/controller/trainingjob_controller.py | 4 +- trainingmgr/db/featuregroup_db.py | 3 +- trainingmgr/db/model_db.py | 2 +- trainingmgr/db/trainingjob_db.py | 20 ++++--- trainingmgr/handler/async_handler.py | 35 ++++++++---- trainingmgr/models/trainingjob.py | 1 - trainingmgr/pipeline/mme_mgr.py | 2 - trainingmgr/schemas/trainingjob_schema.py | 2 +- trainingmgr/service/training_job_service.py | 6 ++- trainingmgr/trainingmgr_main.py | 66 ++--------------------- 14 files changed, 59 insertions(+), 103 deletions(-) diff --git a/trainingmgr/common/trainingmgr_config.py b/trainingmgr/common/trainingmgr_config.py index 36304d6..6815b3e 100644 --- a/trainingmgr/common/trainingmgr_config.py +++ b/trainingmgr/common/trainingmgr_config.py @@ -19,7 +19,6 @@ """ This module is for loading training manager configuration. """ -import os from os import getenv from trainingmgr.common.tmgr_logger import TMLogger diff --git a/trainingmgr/common/trainingmgr_operations.py b/trainingmgr/common/trainingmgr_operations.py index 6aedbfe..4ab1123 100644 --- a/trainingmgr/common/trainingmgr_operations.py +++ b/trainingmgr/common/trainingmgr_operations.py @@ -165,7 +165,6 @@ def create_dme_filtered_data_job(training_config_obj, source_name, features, fea headers = {'Content-type': MIMETYPE_JSON} - url = create_url_host_port('http', host, port, 'data-consumer/v1/info-jobs/{}'.format(feature_group_name)) logger.debug(url) logger.debug(json.dumps(job_json)) @@ -201,7 +200,7 @@ def notification_rapp(trainingjob_id): try: trainingjob = get_trainingjob(trainingjob_id) steps_state = trainingjob.steps_state.states - if trainingjob.notification_url != "": + if trainingjob.notification_url != "" and trainingjob.notification_url is not None: response = requests.post(trainingjob.notification_url, data=json.dumps(steps_state), headers={ diff --git a/trainingmgr/common/trainingmgr_util.py b/trainingmgr/common/trainingmgr_util.py index d7a8c43..d67b3b2 100644 --- a/trainingmgr/common/trainingmgr_util.py +++ b/trainingmgr/common/trainingmgr_util.py @@ -23,17 +23,14 @@ from flask import jsonify import json import re from flask_api import status -import requests from marshmallow import ValidationError -from trainingmgr.db.common_db_fun import change_in_progress_to_failed_by_latest_version, \ - get_field_by_latest_version, change_field_of_latest_version, \ - get_latest_version_trainingjob_name -from trainingmgr.db.featuregroup_db import add_featuregroup, edit_featuregroup, get_feature_groups_db, \ +from trainingmgr.db.common_db_fun import change_in_progress_to_failed_by_latest_version +from trainingmgr.db.featuregroup_db import edit_featuregroup, \ get_feature_group_by_name_db, delete_feature_group_by_name from trainingmgr.constants.states import States from trainingmgr.common.exceptions_utls import APIException,TMException,DBException from trainingmgr.common.trainingmgr_operations import create_dme_filtered_data_job -from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema +from trainingmgr.schemas import FeatureGroupSchema from trainingmgr.constants.steps import Steps ERROR_TYPE_KF_ADAPTER_JSON = "Kf adapter doesn't sends json type response" @@ -224,7 +221,7 @@ def edit_feature_group_by_name(featuregroup_name: str, response_code =status.HTTP_200_OK # TODO: Implement the process where DME edits from the dashboard are applied to the endpoint if featuregroup.enable_dme == True : - response= create_dme_filtered_data_job(tm_conf_obj, featuregroup.source_name, featuregroup.feature_list, + response= create_dme_filtered_data_job(tm_conf_obj, featuregroup.source_name, featuregroup.feature_list, featuregroup.featuregroup_name, featuregroup.host, featuregroup.port, featuregroup.measured_obj_class) if response.status_code != 201: @@ -236,7 +233,6 @@ def edit_feature_group_by_name(featuregroup_name: str, except DBException as err: return {"Exception": str(err)}, 400 except Exception as e: - err_msg = "Failed to create the feature Group " api_response = {"Exception":str(e)} logger.error(str(e)) @@ -272,6 +268,7 @@ def get_metrics(trainingjob_name, version, mm_sdk): return data +# TODO: Remove the fxn since it is not called anywhere def handle_async_feature_engineering_status_exception_case(lock, dataextraction_job_cache, code, message, logger, is_success, trainingjob_name, mm_sdk): diff --git a/trainingmgr/controller/featuregroup_controller.py b/trainingmgr/controller/featuregroup_controller.py index 419b91b..42dfa1d 100644 --- a/trainingmgr/controller/featuregroup_controller.py +++ b/trainingmgr/controller/featuregroup_controller.py @@ -111,8 +111,8 @@ def create_feature_group(): if featuregroup.enable_dme == True : response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, featuregroup.source_name, featuregroup.feature_list, featuregroup.featuregroup_name, featuregroup.host, featuregroup.dme_port, featuregroup.measured_obj_class) if response.status_code != 201: - delete_feature_group_by_name(featuregroup) - return jsonify({"Exception": "Cannot create dme job"}), status.HTTP_400_BAD_REQUEST + delete_feature_group_by_name(featuregroup.featuregroup_name) + return jsonify({"Exception": "Cannot create dme job | DME Error : " + str(response.json()["detail"])}), status.HTTP_400_BAD_REQUEST except ValidationError as err: LOGGER.error(f"Failed to create the feature Group {str(err)}") return {"Exception": str(err)}, 400 diff --git a/trainingmgr/controller/trainingjob_controller.py b/trainingmgr/controller/trainingjob_controller.py index 40e0436..5c9dfd7 100644 --- a/trainingmgr/controller/trainingjob_controller.py +++ b/trainingmgr/controller/trainingjob_controller.py @@ -24,7 +24,7 @@ from marshmallow import ValidationError from trainingmgr.common.exceptions_utls import TMException from trainingmgr.common.trainingmgr_config import TrainingMgrConfig from trainingmgr.schemas.trainingjob_schema import TrainingJobSchema -from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainingjob_by_modelId, get_trainining_jobs, \ +from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainining_jobs, \ get_steps_state from trainingmgr.common.trainingmgr_util import check_key_in_dictionary from trainingmgr.common.trainingConfig_parser import validateTrainingConfig @@ -42,7 +42,7 @@ MIMETYPE_JSON = "application/json" def delete_trainingjob(training_job_id): LOGGER.debug(f'delete training job : {training_job_id}') try: - if delete_training_job(str(training_job_id)): + if delete_training_job(int(training_job_id)): LOGGER.debug(f'training job with {training_job_id} is deleted successfully.') return '', 204 else: diff --git a/trainingmgr/db/featuregroup_db.py b/trainingmgr/db/featuregroup_db.py index 6db5398..b1688c9 100644 --- a/trainingmgr/db/featuregroup_db.py +++ b/trainingmgr/db/featuregroup_db.py @@ -19,7 +19,6 @@ from trainingmgr.common.exceptions_utls import DBException from sqlalchemy.exc import IntegrityError from psycopg2.errors import UniqueViolation -from psycopg2 import errors from trainingmgr.models import db, FeatureGroup from trainingmgr.common.trainingmgr_config import TrainingMgrConfig @@ -82,7 +81,7 @@ def get_feature_groups_from_inputDataType_db(inputDataType): try: return FeatureGroup.query.with_entities(FeatureGroup.featuregroup_name).filter_by(feature_list=inputDataType).all() except Exception as err: - raise DBException("Unable to query in get_feature_groups_from_inputDataType_db with error : ", err) + raise DBException(DB_QUERY_EXEC_ERROR + "in get_feature_groups_from_inputDataType_db with error : " + str(err)) def delete_feature_group_by_name(featuregroup_name): """ diff --git a/trainingmgr/db/model_db.py b/trainingmgr/db/model_db.py index bc8a21f..57617aa 100644 --- a/trainingmgr/db/model_db.py +++ b/trainingmgr/db/model_db.py @@ -17,7 +17,7 @@ # ================================================================================== from trainingmgr.common.exceptions_utls import DBException -from trainingmgr.models import db, ModelID +from trainingmgr.models import ModelID def get_model_by_modelId(modelname, modelversion): try: diff --git a/trainingmgr/db/trainingjob_db.py b/trainingmgr/db/trainingjob_db.py index 24f0f17..0b24939 100644 --- a/trainingmgr/db/trainingjob_db.py +++ b/trainingmgr/db/trainingjob_db.py @@ -23,9 +23,7 @@ from trainingmgr.common.exceptions_utls import DBException from trainingmgr.models import db, TrainingJob, TrainingJobStatus, ModelID from trainingmgr.constants.steps import Steps from trainingmgr.constants.states import States -from sqlalchemy.sql import func from sqlalchemy.exc import NoResultFound -from trainingmgr.common.trainingConfig_parser import getField @@ -96,17 +94,23 @@ def delete_trainingjob_by_id(id: int): raise DBException(f'{DB_QUERY_EXEC_ERROR} : {str(e)}' ) def get_trainingjob(id: int=None): - if id is not None: - return TrainingJob.query.filter(TrainingJob.id==id).one() - else: - tjs = TrainingJob.query.all() - return tjs + try: + if id is not None: + return TrainingJob.query.filter(TrainingJob.id==id).one() + else: + tjs = TrainingJob.query.all() + return tjs + except NoResultFound: + # id is not present + return None + except Exception as e: + raise DBException(f'{DB_QUERY_EXEC_ERROR} : {str(e)}' ) + def change_field_value_by_version(trainingjob_name, version, field, field_value): """ This function updates field's value to field_value of trainingjob. """ - conn = None try: if field == "deletion_in_progress": trainingjob = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first() diff --git a/trainingmgr/handler/async_handler.py b/trainingmgr/handler/async_handler.py index c2c2674..509abef 100644 --- a/trainingmgr/handler/async_handler.py +++ b/trainingmgr/handler/async_handler.py @@ -49,6 +49,14 @@ def check_and_notify_feature_engineering_status(APP,db): # trainingjob_name = trainingjob.trainingjob_name with APP.app_context(): trainingjob = get_trainingjob(trainingjob_id) + + if trainingjob is None: + # trainingjob_id not present in db, A possible case of deletion + LOGGER.debug(f"Training-Job Id {trainingjob_id} is Found to be deleted| Removing from DATAEXTRACTION_JOBS_CACHE") + with LOCK: + DATAEXTRACTION_JOBS_CACHE.pop(trainingjob_id) + continue + featuregroup_name = getField(trainingjob.training_config, "feature_group_name") response = data_extraction_status(featuregroup_name, trainingjob_id, TRAININGMGR_CONFIG_OBJ) if (response.headers.get('content-type') != "application/json" or @@ -73,22 +81,31 @@ def check_and_notify_feature_engineering_status(APP,db): ) if (kf_response.headers.get('content-type') != "application/json" or kf_response.status_code != 200): - raise TMException(f"KF adapter returned an error for {featuregroup_name}.") + LOGGER.error(f"KF adapter returned an error for {featuregroup_name}. | Response : {kf_response.json()}") + raise TMException(f"KF adapter returned an error for {featuregroup_name}. ") with LOCK: DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id) elif response_data["task_status"] == "Error": raise TMException(f"Data extraction failed for {featuregroup_name}.") + except DBException as err: + # If there is any communication error with db, the thread must not fail + LOGGER.error("Recieved Db Failure in async-handler| Error : " + str(err)) + continue except Exception as err: LOGGER.error(f"Error processing DATAEXTRACTION_JOBS_CACHE: {str(err)}") - with APP.app_context(): - change_state_to_failed(trainingjob.id) - notification_rapp(trainingjob.id) - with LOCK: - try: - DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id) - except KeyError as key_err: - LOGGER.error("The training job key doesn't exist in DATAEXTRACTION_JOBS_CACHE: " + str(key_err)) + #The following try-block will prevent thread-failure when 'change_state_to_failed' fails + try: + with APP.app_context(): + change_state_to_failed(trainingjob.id) + notification_rapp(trainingjob.id) + with LOCK: + DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id) + except KeyError as key_err: + LOGGER.error("The training job key doesn't exist in DATAEXTRACTION_JOBS_CACHE: " + str(key_err)) + except Exception as err: + LOGGER.error(f"Error processing DATAEXTRACTION_JOBS_CACHE-Exception: {str(err)}") + time.sleep(10) # Sleep before checking again diff --git a/trainingmgr/models/trainingjob.py b/trainingmgr/models/trainingjob.py index 84600d4..b439475 100644 --- a/trainingmgr/models/trainingjob.py +++ b/trainingmgr/models/trainingjob.py @@ -16,7 +16,6 @@ # # ============================================================================== from trainingmgr.models import db -from datetime import datetime from sqlalchemy.sql import func from sqlalchemy import Integer, ForeignKey, String, DateTime, Column, Boolean from sqlalchemy.orm import relationship diff --git a/trainingmgr/pipeline/mme_mgr.py b/trainingmgr/pipeline/mme_mgr.py index 5dd117d..b82a6a9 100644 --- a/trainingmgr/pipeline/mme_mgr.py +++ b/trainingmgr/pipeline/mme_mgr.py @@ -19,9 +19,7 @@ from trainingmgr.common.trainingmgr_config import TrainingMgrConfig import requests from trainingmgr.common.exceptions_utls import TMException -from flask_api import status import requests -import json LOGGER = TrainingMgrConfig().logger diff --git a/trainingmgr/schemas/trainingjob_schema.py b/trainingmgr/schemas/trainingjob_schema.py index f24efb0..a4322c5 100644 --- a/trainingmgr/schemas/trainingjob_schema.py +++ b/trainingmgr/schemas/trainingjob_schema.py @@ -21,7 +21,7 @@ from trainingmgr.schemas import ma from trainingmgr.models import TrainingJob from trainingmgr.models.trainingjob import ModelID import json -from marshmallow import pre_load, post_dump, fields, validates, ValidationError +from marshmallow import pre_load, post_dump PATTERN = re.compile(r"\w+") diff --git a/trainingmgr/service/training_job_service.py b/trainingmgr/service/training_job_service.py index 91caf2f..bccfc28 100644 --- a/trainingmgr/service/training_job_service.py +++ b/trainingmgr/service/training_job_service.py @@ -100,6 +100,8 @@ def delete_training_job(training_job_id : int): try: # Signal Deletion in Progress tj = get_trainingjob(training_job_id) + if tj is None: + return False change_field_value(training_job_id, "deletion_in_progress", True) steps_state = json.loads(tj.steps_state.states) overall_status = get_one_word_status(steps_state) @@ -114,7 +116,7 @@ def delete_training_job(training_job_id : int): elif (step_in_progress_state == Steps.TRAINING or (step_in_progress_state == Steps.DATA_EXTRACTION_AND_TRAINING and tj.run_id is not None)): # Signal the Kf-Adapter to terminate the response = terminate_training_service(tj.run_id) - LOGGER.debug("Deletion-Response : " + response) + LOGGER.debug("Deletion-Response : " + response.text) isDeleted = delete_trainingjob_by_id(id=training_job_id) if isDeleted: @@ -281,4 +283,4 @@ def fetch_pipelinename_and_version(type, training_config): else : return getField(training_config, "retraining_pipeline_name"), getField(training_config, "retraining_pipeline_version") except Exception as err: - raise TMException(f"cant fetch training or retraining pipeline name or version from trainingconfig {training_config}") \ No newline at end of file + raise TMException(f"cant fetch training or retraining pipeline name or version from trainingconfig {training_config}| Error: " + str(err)) \ No newline at end of file diff --git a/trainingmgr/trainingmgr_main.py b/trainingmgr/trainingmgr_main.py index 80f7bfd..7f96e4b 100644 --- a/trainingmgr/trainingmgr_main.py +++ b/trainingmgr/trainingmgr_main.py @@ -24,7 +24,6 @@ from logging import Logger from flask import Flask, request, send_file, jsonify from flask_api import status from flask_migrate import Migrate -from marshmallow import ValidationError import requests from flask_cors import CORS from trainingmgr.db.trainingjob_db import change_state_to_failed @@ -38,15 +37,14 @@ from trainingmgr.constants.steps import Steps from trainingmgr.constants.states import States from trainingmgr.db.trainingmgr_ps_db import PSDB from trainingmgr.models import db -from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema -from trainingmgr.db.featuregroup_db import get_feature_groups_db, \ - get_feature_group_by_name_db, delete_feature_group_by_name +from trainingmgr.schemas import TrainingJobSchema , FeatureGroupSchema +from trainingmgr.db.featuregroup_db import get_feature_group_by_name_db, delete_feature_group_by_name from trainingmgr.controller import featuregroup_controller, training_job_controller from trainingmgr.controller.pipeline_controller import pipeline_controller -from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField +from trainingmgr.common.trainingConfig_parser import getField from trainingmgr.handler.async_handler import start_async_handler from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service -from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, fetch_pipelinename_and_version, get_training_job, update_artifact_version +from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, fetch_pipelinename_and_version, get_training_job APP = Flask(__name__) TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig() @@ -116,8 +114,6 @@ def get_model(modelname, modelversion, artifactversion): return {"Exception": "error while downloading model"}, status.HTTP_500_INTERNAL_SERVER_ERROR - -# Training-Config Handled @APP.route('/trainingjob/dataExtractionNotification', methods=['POST']) def data_extraction_notification(): """ @@ -143,8 +139,6 @@ def data_extraction_notification(): all exception are provided with exception message and HTTP status code. """ LOGGER.debug("Data extraction notification...") - err_response_code = status.HTTP_500_INTERNAL_SERVER_ERROR - results = None try: if not check_key_in_dictionary(["trainingjob_id"], request.json) : err_msg = "featuregroup_name or trainingjob_id key not available in request" @@ -211,7 +205,6 @@ def data_extraction_notification(): if not check_key_in_dictionary(["run_status", "run_id"], json_data): err_msg = "Kf adapter invalid response from , key not present ,run_status or run_id for " + str(trainingjob_id) Logger.error(err_msg) - err_response_code = status.HTTP_400_BAD_REQUEST raise TMException(err_msg) if json_data["run_status"] == 'scheduled': @@ -356,57 +349,6 @@ def pipeline_notification(): return jsonify({"Message":"Training successful"}), 200 - -# Moved to pipelineMgr (to be deleted in future) -@APP.route('/experiments', methods=['GET']) -def get_all_experiment_names(): - """ - Function handling rest endpoint to get all experiment names. - - Args in function: - none - - Args in json: - no json required - - Returns: - json: - experiment_names : list - list containing all experiment names(as str). - status code: - HTTP status code 200 - - Exceptions: - all exception are provided with exception message and HTTP status code. - """ - - LOGGER.debug("request for getting all experiment names is come.") - api_response = {} - reponse_code = status.HTTP_500_INTERNAL_SERVER_ERROR - try: - kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip - kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port - if kf_adapter_ip!=None and kf_adapter_port!=None: - url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/experiments' - LOGGER.debug("url is :" + url) - response = requests.get(url) - if response.headers['content-type'] != MIMETYPE_JSON: - err_smg = ERROR_TYPE_KF_ADAPTER_JSON - raise TMException(err_smg) - - experiment_names = [] - for experiment in response.json().keys(): - experiment_names.append(experiment) - api_response = {"experiment_names": experiment_names} - reponse_code = status.HTTP_200_OK - except Exception as err: - api_response = {"Exception": str(err)} - LOGGER.error(str(err)) - return APP.response_class(response=json.dumps(api_response), - status=reponse_code, - mimetype=MIMETYPE_JSON) - - @APP.route('/featureGroup/', methods=['GET', 'PUT']) def feature_group_by_name(featuregroup_name): """ -- 2.16.6