From be342182c7a95cc195f488af1b9fd4e334ec5f40 Mon Sep 17 00:00:00 2001 From: josephthaliath Date: Fri, 20 Jan 2023 13:29:15 +0530 Subject: [PATCH] minor fixes for issues reported in sonar Issue-Id: AIMLFW-22 Signed-off-by: josephthaliath Change-Id: I4f8b4542c506f5b4de407b1e892669c7e88b6381 --- trainingmgr/common/tmgr_logger.py | 6 +-- trainingmgr/common/trainingmgr_util.py | 22 ++++---- trainingmgr/db/common_db_fun.py | 72 +++++++++++++------------- trainingmgr/db/trainingmgr_ps_db.py | 8 +-- trainingmgr/trainingmgr_main.py | 92 ++++++++++++++++++---------------- 5 files changed, 105 insertions(+), 95 deletions(-) diff --git a/trainingmgr/common/tmgr_logger.py b/trainingmgr/common/tmgr_logger.py index 82f6c27..d06a226 100644 --- a/trainingmgr/common/tmgr_logger.py +++ b/trainingmgr/common/tmgr_logger.py @@ -43,7 +43,7 @@ class TMLogger(object):# pylint: disable=too-few-public-methods with open(conf_file, 'r') as file: log_config = yaml.safe_load(file.read()) logging.config.dictConfig(log_config) - self.LogLevel = log_config["root"]["level"] + self.loglevel = log_config["root"]["level"] self.logger = logging.getLogger(__name__) except FileNotFoundError as err: print("error opening yaml config file") @@ -61,5 +61,5 @@ class TMLogger(object):# pylint: disable=too-few-public-methods @property def get_logLevel(self): - return self.LogLevel - \ No newline at end of file + return self.loglevel + diff --git a/trainingmgr/common/trainingmgr_util.py b/trainingmgr/common/trainingmgr_util.py index f758f56..38474b0 100644 --- a/trainingmgr/common/trainingmgr_util.py +++ b/trainingmgr/common/trainingmgr_util.py @@ -83,13 +83,13 @@ def check_key_in_dictionary(fields, dictionary): This function raises exception if any string from fields list does not present in a dictionary as a key ''' - isKeyAvailable = True + iskeyavailable = True for field_name in fields: if field_name not in dictionary: - isKeyAvailable = False + iskeyavailable = False break #Log (field_name + " not provide") - return isKeyAvailable + return iskeyavailable def get_one_word_status(steps_state): """ @@ -177,12 +177,12 @@ def get_metrics(trainingjob_name, version, mm_sdk): if present: data = json.dumps(mm_sdk.get_metrics(trainingjob_name, version)) if data is None: - raise Exception("Problem while downloading metrics") + raise TMException("Problem while downloading metrics") else: data = "No data available" except Exception as err: - errMsg = str(err) - raise TMException ( "Problem while downloading metric" + errMsg) + errmsg = str(err) + raise TMException ( "Problem while downloading metric" + errmsg) return data @@ -212,12 +212,12 @@ def validate_trainingjob_name(trainingjob_name, ps_db_obj): it returns False. """ results = None - isAvailable = False + isavailable = False try: results = get_all_versions_info_by_name(trainingjob_name, ps_db_obj) except Exception as err: - errMsg = str(err) - raise DBException("Could not get info from db for " + trainingjob_name + "," + errMsg) + errmsg = str(err) + raise DBException("Could not get info from db for " + trainingjob_name + "," + errmsg) if results: - isAvailable = True - return isAvailable \ No newline at end of file + isavailable = True + return isavailable diff --git a/trainingmgr/db/common_db_fun.py b/trainingmgr/db/common_db_fun.py index bf3926f..1389d81 100644 --- a/trainingmgr/db/common_db_fun.py +++ b/trainingmgr/db/common_db_fun.py @@ -26,6 +26,7 @@ from trainingmgr.constants.states import States from trainingmgr.common.exceptions_utls import DBException tm_table_name = "trainingjob_info" # Table used by 'Training Manager' for training jobs +DB_QUERY_EXEC_ERROR = "Failed to execute query in " def get_data_extraction_in_progress_trainingjobs(ps_db_obj): """ @@ -48,7 +49,7 @@ def get_data_extraction_in_progress_trainingjobs(ps_db_obj): except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "get_data_extraction_in_progress_trainingjobs," + str(err)) finally: if conn is not None: @@ -175,7 +176,7 @@ def change_in_progress_to_failed_by_latest_version(trainingjob_name, ps_db_obj): This function changes steps_state's key's value to FAILED which is currently IN_PROGRESS of trainingjob. """ - status_Changed = False + status_changed = False conn = None try: conn = ps_db_obj.get_new_conn() @@ -191,19 +192,19 @@ def change_in_progress_to_failed_by_latest_version(trainingjob_name, ps_db_obj): if steps_state[step] == States.IN_PROGRESS.name: steps_state[step] = States.FAILED.name cursor.execute("update {} set steps_state = %s where trainingjob_name = %s and ".format(tm_table_name) + \ - "version = %s", (json.dumps(steps_state), trainingjob_name, version)) - status_Changed = True + "version= %s", (json.dumps(steps_state), trainingjob_name, version)) + status_changed = True conn.commit() cursor.close() except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "change_in_progress_to_failed_by_latest_version" + str(err)) finally: if conn is not None: conn.close() - return status_Changed + return status_changed def change_steps_state_of_latest_version(trainingjob_name, ps_db_obj, key, value): @@ -223,13 +224,13 @@ def change_steps_state_of_latest_version(trainingjob_name, ps_db_obj, key, value steps_state = json.loads(cursor.fetchall()[0][0]) steps_state[key] = value cursor.execute("update {} set steps_state = %s where trainingjob_name = %s and ".format(tm_table_name) + \ - "version = %s", (json.dumps(steps_state), trainingjob_name, version)) + "version= %s", (json.dumps(steps_state), trainingjob_name, version)) conn.commit() cursor.close() except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "change_steps_state_of_latest_version" + str(err)) finally: if conn is not None: @@ -249,13 +250,13 @@ def change_steps_state_by_version(trainingjob_name, version, ps_db_obj, key, val steps_state = json.loads(cursor.fetchall()[0][0]) steps_state[key] = value cursor.execute("update {} set steps_state = %s where trainingjob_name = %s ".format(tm_table_name) + \ - "and version = %s", (json.dumps(steps_state), trainingjob_name, version)) + "and version= %s", (json.dumps(steps_state), trainingjob_name, version)) conn.commit() cursor.close() except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "change_steps_state_by_version" + str(err)) finally: if conn is not None: @@ -277,7 +278,7 @@ def delete_trainingjob_version(trainingjob_name, version, ps_db_obj): except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "delete_trainingjob_version" + str(err)) finally: if conn is not None: @@ -301,7 +302,7 @@ def get_info_by_version(trainingjob_name, version, ps_db_obj): except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "get_info_by_version" + str(err)) finally: if conn is not None: @@ -331,7 +332,7 @@ def get_trainingjob_info_by_name(trainingjob_name, ps_db_obj): except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "get_trainingjob_info_by_name" + str(err)) finally: if conn is not None: @@ -357,7 +358,7 @@ def get_latest_version_trainingjob_name(trainingjob_name, ps_db_obj): except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "get_latest_version_trainingjob_name" + str(err)) finally: if conn is not None: @@ -382,7 +383,7 @@ def get_all_versions_info_by_name(trainingjob_name, ps_db_obj): except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "get_all_versions_info_by_name" + str(err)) finally: if conn is not None: @@ -408,7 +409,7 @@ def get_all_distinct_trainingjobs(ps_db_obj): except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "get_all_distinct_trainingjobs" + str(err)) finally: if conn is not None: @@ -436,7 +437,7 @@ def get_all_version_num_by_trainingjob_name(trainingjob_name, ps_db_obj): except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "get_all_version_num_by_trainingjob_name" + str(err)) finally: if conn is not None: @@ -462,7 +463,7 @@ def update_model_download_url(trainingjob_name, version, url, ps_db_obj): except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "update_model_download_url" + str(err)) finally: if conn is not None: @@ -473,13 +474,13 @@ def add_update_trainingjob(description, pipeline_name, experiment_name, feature_ query_filter, adding, enable_versioning, pipeline_version, datalake_source, trainingjob_name, ps_db_obj, notification_url="", _measurement="", bucket=""): - """ - This function add the new row or update existing row with given information - """ + """ + This function add the new row or update existing row with given information + """ - conn = None - try: + conn = None + try: arguments_string = json.dumps({"arguments": arguments}) datalake_source_dic = {} datalake_source_dic[datalake_source] = {} @@ -502,10 +503,10 @@ def add_update_trainingjob(description, pipeline_name, experiment_name, feature_ cursor = conn.cursor() if not adding: cursor.execute('''select nt.mv from (select max(version) mv,trainingjob_name from ''' + \ - '''{} group by trainingjob_name) nt where nt.trainingjob_name = %s'''.format(tm_table_name), - (trainingjob_name)) + '''{} group by trainingjob_name) nt where nt.trainingjob_name = %s'''.format(tm_table_name), + (trainingjob_name,)) version = int(cursor.fetchall()[0][0]) - + if enable_versioning: version = version + 1 cursor.execute('''INSERT INTO {} VALUES '''.format(tm_table_name) + \ @@ -548,15 +549,14 @@ def add_update_trainingjob(description, pipeline_name, experiment_name, feature_ deletion_in_progress)) conn.commit() cursor.close() - except Exception as err: - if conn is not None: - conn.rollback() - raise DBException("Failed to execute query in " + \ - "add_update_trainingjob" + str(err)) - finally: - if conn is not None: - conn.close() - + except Exception as err: + if conn is not None: + conn.rollback() + raise DBException(DB_QUERY_EXEC_ERROR + \ + "add_update_trainingjob" + str(err)) + finally: + if conn is not None: + conn.close() def get_all_jobs_latest_status_version(ps_db_obj): """ @@ -581,7 +581,7 @@ def get_all_jobs_latest_status_version(ps_db_obj): except Exception as err: if conn is not None: conn.rollback() - raise DBException("Failed to execute query in " + \ + raise DBException(DB_QUERY_EXEC_ERROR + \ "get_all_jobs_latest_status_version" + str(err)) finally: if conn is not None: diff --git a/trainingmgr/db/trainingmgr_ps_db.py b/trainingmgr/db/trainingmgr_ps_db.py index 37fe99f..ed6ae58 100644 --- a/trainingmgr/db/trainingmgr_ps_db.py +++ b/trainingmgr/db/trainingmgr_ps_db.py @@ -23,6 +23,8 @@ and sending connection of postgres db. import pg8000.dbapi from trainingmgr.common.exceptions_utls import DBException +PG_DB_ACCESS_ERROR = "Problem of connection with postgres db" + class PSDB(): """ Database interface for training manager @@ -43,7 +45,7 @@ class PSDB(): port=int(config_hdl.ps_port)) except pg8000.dbapi.Error: self.__config_hdl.logger.error("Problem of connection with postgres db") - raise DBException("Problem of connection with postgres db") from None + raise DBException(PG_DB_ACCESS_ERROR) from None conn1.autocommit = True cur1 = conn1.cursor() present = False @@ -59,7 +61,7 @@ class PSDB(): except pg8000.dbapi.Error: conn1.rollback() self.__config_hdl.logger.error("Can't create database.") - raise DBException("Can't create database.") from None + raise DBException(PG_DB_ACCESS_ERROR) from None finally: if conn1 is not None: conn1.close() @@ -74,7 +76,7 @@ class PSDB(): database="training_manager_database") except pg8000.dbapi.Error: self.__config_hdl.logger.error("Problem of connection with postgres db") - raise DBException("Problem of connection with postgres db") from None + raise DBException(PG_DB_ACCESS_ERROR) from None cur2 = conn2.cursor() try: cur2.execute("create table if not exists trainingjob_info(" + \ diff --git a/trainingmgr/trainingmgr_main.py b/trainingmgr/trainingmgr_main.py index 8c45003..96a1925 100644 --- a/trainingmgr/trainingmgr_main.py +++ b/trainingmgr/trainingmgr_main.py @@ -59,6 +59,10 @@ MM_SDK = None LOCK = None DATAEXTRACTION_JOBS_CACHE = None +ERROR_TYPE_KF_ADAPTER_JSON = "Kf adapter doesn't sends json type response" +ERROR_TYPE_DB_STATUS = "Couldn't update the status as failed in db access" +MIMETYPE_JSON = "application/json" + @APP.errorhandler(APIException) def error(err): """ @@ -67,7 +71,7 @@ def error(err): LOGGER.error(err.message) return APP.response_class(response=json.dumps({"Exception": err.message}), status=err.code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route('/trainingjobs//', methods=['GET']) @@ -179,7 +183,7 @@ def get_trainingjob_by_name_version(trainingjob_name, version): return APP.response_class(response=json.dumps(response_data), status=response_code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route('/trainingjobs///steps_state', methods=['GET']) # Handled in GUI @cross_origin() @@ -246,7 +250,7 @@ def get_steps_state(trainingjob_name, version): return APP.response_class(response=reponse_data, status=response_code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route('/model///Model.zip', methods=['GET']) def get_model(trainingjob_name, version): @@ -334,7 +338,7 @@ def training(trainingjob_name): DATAEXTRACTION_JOBS_CACHE[trainingjob_name] = "Scheduled" response_data = de_response.json() response_code = status.HTTP_200_OK - elif( de_response.headers['content-type'] == "application/json" ) : + elif( de_response.headers['content-type'] == MIMETYPE_JSON ) : errMsg = "Data extraction responded with error code." LOGGER.error(errMsg) json_data = de_response.json() @@ -350,7 +354,7 @@ def training(trainingjob_name): response_data = {"Exception": str(err)} LOGGER.debug("Error is training, job name:" + trainingjob_name + str(err)) return APP.response_class(response=json.dumps(response_data),status=response_code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route('/trainingjob/dataExtractionNotification', methods=['POST']) def data_extraction_notification(): @@ -398,7 +402,7 @@ def data_extraction_notification(): } response = training_start(TRAININGMGR_CONFIG_OBJ, dict_data, trainingjob_name) - if ( response.headers['content-type'] != "application/json" + if ( response.headers['content-type'] != MIMETYPE_JSON or response.status_code != status.HTTP_200_OK ): err_msg = "Kf adapter invalid content-type or status_code for " + trainingjob_name raise TMException(err_msg) @@ -428,21 +432,21 @@ def data_extraction_notification(): err_msg = "Failed to connect KF adapter." LOGGER.error(err_msg) if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) : - LOGGER.error("Couldn't update the status as failed in db") + LOGGER.error(ERROR_TYPE_DB_STATUS) return response_for_training(err_response_code, err_msg + str(err) + "(trainingjob name is " + trainingjob_name + ")", LOGGER, False, trainingjob_name, PS_DB_OBJ, MM_SDK) except Exception as err: LOGGER.error("Failed to handle dataExtractionNotification. " + str(err)) if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) : - LOGGER.error("Couldn't update the status as failed in db") + LOGGER.error(ERROR_TYPE_DB_STATUS) return response_for_training(err_response_code, str(err) + "(trainingjob name is " + trainingjob_name + ")", LOGGER, False, trainingjob_name, PS_DB_OBJ, MM_SDK) return APP.response_class(response=json.dumps({"result": "pipeline is scheduled"}), status=status.HTTP_200_OK, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route('/trainingjob/pipelineNotification', methods=['POST']) @@ -492,7 +496,7 @@ def pipeline_notification(): change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ, Steps.TRAINED_MODEL.name, States.IN_PROGRESS.name) - + if MM_SDK.check_object(trainingjob_name, version, "Model.zip"): model_url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \ str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \ @@ -517,7 +521,7 @@ def pipeline_notification(): #Training failure response LOGGER.error("Pipeline notification failed" + str(err)) if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) : - LOGGER.error("Couldn't update the status as failed in db") + LOGGER.error(ERROR_TYPE_DB_STATUS) return response_for_training(status.HTTP_500_INTERNAL_SERVER_ERROR, str(err) + " (trainingjob " + trainingjob_name + ")", @@ -578,7 +582,7 @@ def trainingjobs_operations(): LOGGER.error(str(err)) return APP.response_class(response=json.dumps(api_response), status=response_code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route("/pipelines//upload", methods=['POST']) @cross_origin() @@ -623,17 +627,18 @@ def upload_pipeline(pipe_name): uploaded_file_path = "/tmp/" + uploaded_file.filename uploaded_file.save(uploaded_file_path) LOGGER.debug("File uploaded :%s", uploaded_file_path) - kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port - url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + \ + if kf_adapter_ip!=None and kf_adapter_port!=None: + url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + \ '/pipelineIds/' + pipe_name description = '' if 'description' in request.form: description = request.form['description'] - with open(uploaded_file_path, 'rb') as file: - files = {'file': file.read()} + if uploaded_file_path != None: + with open(uploaded_file_path, 'rb') as file: + files = {'file': file.read()} resp = requests.post(url, files=files, data={"description": description}) LOGGER.debug(resp.text) @@ -661,12 +666,13 @@ def upload_pipeline(pipe_name): if uploaded_file_path and os.path.isfile(uploaded_file_path): LOGGER.debug("Deleting %s", uploaded_file_path) - os.remove(uploaded_file_path) + if uploaded_file_path != None: + os.remove(uploaded_file_path) LOGGER.debug("Responding to Client with %d %s", result_code, result_string) return APP.response_class(response=json.dumps({'result': result_string}), status=result_code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route("/pipelines//versions", methods=['GET']) @@ -698,14 +704,14 @@ def get_versions_for_pipeline(pipeline_name): try: kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port - - url = 'http://' + str(kf_adapter_ip) + ':' + str( + if kf_adapter_ip!=None and kf_adapter_port!=None : + url = 'http://' + str(kf_adapter_ip) + ':' + str( kf_adapter_port) + '/pipelines/' + pipeline_name + \ '/versions' LOGGER.debug("URL:" + url) response = requests.get(url) - if response.headers['content-type'] != "application/json": - raise TMException("Kf adapter doesn't sends json type response") + if response.headers['content-type'] != MIMETYPE_JSON: + raise TMException(ERROR_TYPE_KF_ADAPTER_JSON) api_response = {"versions_list": response.json()['versions_list']} response_code = status.HTTP_200_OK except Exception as err: @@ -713,7 +719,7 @@ def get_versions_for_pipeline(pipeline_name): LOGGER.error(str(err)) return APP.response_class(response=json.dumps(api_response), status=response_code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route('/pipelines', methods=['GET']) @@ -745,11 +751,12 @@ def get_all_pipeline_names(): try: kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port - url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/pipelines' + if kf_adapter_ip!=None and kf_adapter_port!=None: + url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/pipelines' LOGGER.debug(url) response = requests.get(url) - if response.headers['content-type'] != "application/json": - err_smg = "Kf adapter doesn't sends json type response" + if response.headers['content-type'] != MIMETYPE_JSON: + err_smg = ERROR_TYPE_KF_ADAPTER_JSON LOGGER.error(err_smg) raise TMException(err_smg) pipeline_names = [] @@ -763,7 +770,7 @@ def get_all_pipeline_names(): api_response = {"Exception": str(err)} return APP.response_class(response=json.dumps(api_response), status=response_code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route('/experiments', methods=['GET']) @@ -795,11 +802,12 @@ def get_all_experiment_names(): try: kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port - url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/experiments' + if kf_adapter_ip!=None and kf_adapter_port!=None: + url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/experiments' LOGGER.debug("url is :" + url) response = requests.get(url) - if response.headers['content-type'] != "application/json": - err_smg = "Kf adapter doesn't sends json type response" + if response.headers['content-type'] != MIMETYPE_JSON: + err_smg = ERROR_TYPE_KF_ADAPTER_JSON raise TMException(err_smg) experiment_names = [] @@ -812,7 +820,7 @@ def get_all_experiment_names(): LOGGER.error(str(err)) return APP.response_class(response=json.dumps(api_response), status=reponse_code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route('/trainingjobs/', methods=['POST', 'PUT']) # Handled in GUI @@ -878,8 +886,8 @@ def trainingjob_operations(trainingjob_name): json_data = request.json if (request.method == 'POST'): LOGGER.debug("Create request json : " + json.dumps(json_data)) - isDataAvailable = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ) - if isDataAvailable: + is_data_available = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ) + if is_data_available: response_code = status.HTTP_409_CONFLICT raise TMException("trainingjob name(" + trainingjob_name + ") is already present in database") else: @@ -896,8 +904,8 @@ def trainingjob_operations(trainingjob_name): response_code = status.HTTP_201_CREATED elif(request.method == 'PUT'): LOGGER.debug("Update request json : " + json.dumps(json_data)) - isDataAvailable = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ) - if not isDataAvailable: + is_data_available = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ) + if not is_data_available: response_code = status.HTTP_404_NOT_FOUND raise TMException("Trainingjob name(" + trainingjob_name + ") is not present in database") else: @@ -927,7 +935,7 @@ def trainingjob_operations(trainingjob_name): return APP.response_class(response= json.dumps(api_response), status= response_code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) @APP.route('/trainingjobs/metadata/') def get_metadata(trainingjob_name): @@ -996,7 +1004,7 @@ def get_metadata(trainingjob_name): api_response = {"Exception":str(err)} return APP.response_class(response=json.dumps(api_response), status=response_code, - mimetype='application/json') + mimetype=MIMETYPE_JSON) def async_feature_engineering_status(): """ @@ -1014,7 +1022,7 @@ def async_feature_engineering_status(): LOGGER.debug("Current DATAEXTRACTION_JOBS_CACHE :" + str(DATAEXTRACTION_JOBS_CACHE)) try: response = data_extraction_status(trainingjob_name, TRAININGMGR_CONFIG_OBJ) - if (response.headers['content-type'] != "application/json" or + if (response.headers['content-type'] != MIMETYPE_JSON or response.status_code != status.HTTP_200_OK ): raise TMException("Data extraction responsed with error status code or invalid content type" + \ "doesn't send json type response (trainingjob " + trainingjob_name + ")") @@ -1032,10 +1040,10 @@ def async_feature_engineering_status(): kf_response = requests.post(url_pipeline_run, data=json.dumps({"trainingjob_name": trainingjob_name}), headers={ - 'content-type': 'application/json', + 'content-type': MIMETYPE_JSON, 'Accept-Charset': 'UTF-8' }) - if (kf_response.headers['content-type'] != "application/json" or + if (kf_response.headers['content-type'] != MIMETYPE_JSON or kf_response.status_code != status.HTTP_200_OK ): raise TMException("KF adapter responsed with error status code or invalid content type" + \ "doesn't send json type response (trainingjob " + trainingjob_name + ")") @@ -1062,7 +1070,7 @@ if __name__ == "__main__": TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig() try: if TRAININGMGR_CONFIG_OBJ.is_config_loaded_properly() is False: - raise Exception("Not all configuration loaded.") + raise TMException("Not all configuration loaded.") LOGGER = TRAININGMGR_CONFIG_OBJ.logger PS_DB_OBJ = PSDB(TRAININGMGR_CONFIG_OBJ) LOCK = Lock() @@ -1071,5 +1079,5 @@ if __name__ == "__main__": MM_SDK = ModelMetricsSdk() LOGGER.debug("Starting AIML-WF training manager .....") APP.run(debug=True, port=int(TRAININGMGR_CONFIG_OBJ.my_port), host='0.0.0.0') - except Exception as err: + except TMException as err: print("Startup failure" + str(err)) -- 2.16.6