From: rajdeep11 Date: Sat, 21 Dec 2024 11:19:24 +0000 (+0530) Subject: Changes does: X-Git-Tag: 3.0.0~8 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=fca6dd7827ea0a179a907e9f56282d21bfd75264;p=aiml-fw%2Fawmf%2Ftm.git Changes does: 1) Enabling the notification rapp 2) Handling the exceptions 3) changing the status to failed, and even fixing its code 4) Handled not relavent error thrown when model is not registered 5) add location in header of create trainingjob Change-Id: I80845fdf907fd3bd59fde1ba454771eaa1290359 Signed-off-by: rajdeep11 --- diff --git a/trainingmgr/common/trainingmgr_operations.py b/trainingmgr/common/trainingmgr_operations.py index d79e2c1..b9d8311 100644 --- a/trainingmgr/common/trainingmgr_operations.py +++ b/trainingmgr/common/trainingmgr_operations.py @@ -23,6 +23,7 @@ Training manager main operations import json import requests +from trainingmgr.db.trainingjob_db import get_trainingjob from trainingmgr.common.trainingmgr_config import TrainingMgrConfig import validators from trainingmgr.common.exceptions_utls import TMException @@ -193,13 +194,18 @@ def get_model_info(training_config_obj, model_name): logger.error(errMsg) raise TMException(errMsg) -# def notification_rapp(trainingjob, training_config_obj): -# steps_state = get_steps_state_db(trainingjob.trainingjob_name) -# response = requests.post(trainingjob.notification_url, -# data=json.dumps(steps_state), -# headers={ -# 'content-type': MIMETYPE_JSON, -# 'Accept-Charset': 'UTF-8' -# }) -# if response.status_code != 200: -# raise TMException("Notification failed: "+response.text) \ No newline at end of file +def notification_rapp(trainingjob_id): + try: + trainingjob = get_trainingjob(trainingjob_id) + steps_state = trainingjob.steps_state.states + if trainingjob.notification_url != "": + response = requests.post(trainingjob.notification_url, + data=json.dumps(steps_state), + headers={ + 'content-type': MIMETYPE_JSON, + 'Accept-Charset': 'UTF-8' + }) + if response.status_code != 200: + raise TMException("Notification failed: "+response.text) + except Exception as err: + LOGGER.error(f"failed to notify rapp due to {str(err)}") \ No newline at end of file diff --git a/trainingmgr/common/trainingmgr_util.py b/trainingmgr/common/trainingmgr_util.py index acdb4e7..f555df5 100644 --- a/trainingmgr/common/trainingmgr_util.py +++ b/trainingmgr/common/trainingmgr_util.py @@ -43,51 +43,6 @@ PATTERN = re.compile(r"\w+") featuregroup_schema = FeatureGroupSchema() featuregroups_schema = FeatureGroupSchema(many = True) -def response_for_training(code, message, logger, is_success, trainingjob_name, mm_sdk): - """ - Post training job completion,this function provides notifications to the subscribers, - who subscribed for the result of training job and provided a notification url during - training job creation. - returns tuple containing result dictionary and status code. - """ - logger.debug("Training job result: " + str(code) + " " + message + " " + str(is_success)) - - try : - #TODO DB query optimization, all data to fetch in one call - notif_url = get_field_by_latest_version(trainingjob_name, "notification_url") - if notif_url : - - model_url = get_field_by_latest_version(trainingjob_name, "model_url") - version = get_latest_version_trainingjob_name(trainingjob_name) - metrics = get_metrics(trainingjob_name, version, mm_sdk) - - req_json = None - if is_success: - req_json = { - "result": "success", "model_url": model_url, - "trainingjob_name": trainingjob_name, "metrics": metrics - } - else: - req_json = {"result": "failed", "trainingjob_name": trainingjob_name} - - response = requests.post(notif_url, - data=json.dumps(req_json), - headers={ - 'content-type': MIMETYPE_JSON, - 'Accept-Charset': 'UTF-8' - }) - if ( response.headers['content-type'] != MIMETYPE_JSON - or response.status_code != status.HTTP_200_OK ): - err_msg = "Failed to notify the subscribed url " + trainingjob_name - raise TMException(err_msg) - except Exception as err: - change_in_progress_to_failed_by_latest_version(trainingjob_name) - raise APIException(status.HTTP_500_INTERNAL_SERVER_ERROR, - str(err) + "(trainingjob name is " + trainingjob_name + ")") from None - if is_success: - return {"result": message}, code - return {"Exception": message}, code - def check_key_in_dictionary(fields, dictionary): ''' @@ -336,27 +291,7 @@ def handle_async_feature_engineering_status_exception_case(lock, dataextraction_ dataextraction_job_cache.pop(trainingjob_name) except KeyError as key_err: logger.error("The training job key doesn't exist in DATAEXTRACTION_JOBS_CACHE: " + str(key_err)) - -# def validate_trainingjob_name(trainingjob_name): -# """ -# This function returns True if given trainingjob_name exists in db otherwise -# it returns False. -# """ -# results = None -# isavailable = False -# if (not re.fullmatch(PATTERN, trainingjob_name) or -# len(trainingjob_name) < 3 or len(trainingjob_name) > 63): -# raise TMException("The name of training job is invalid.") - -# try: -# results = get_all_versions_info_by_name(trainingjob_name) -# except Exception as err: -# errmsg = str(err) -# raise DBException("Could not get info from db for " + trainingjob_name + "," + errmsg) -# if results: -# isavailable = True -# return isavailable - + def check_trainingjob_name_and_version(trainingjob_name, version): if (re.fullmatch(PATTERN, trainingjob_name) and version.isnumeric()): diff --git a/trainingmgr/controller/trainingjob_controller.py b/trainingmgr/controller/trainingjob_controller.py index e43fcde..99cece0 100644 --- a/trainingmgr/controller/trainingjob_controller.py +++ b/trainingmgr/controller/trainingjob_controller.py @@ -24,15 +24,10 @@ from marshmallow import ValidationError from trainingmgr.common.exceptions_utls import TMException from trainingmgr.common.trainingmgr_config import TrainingMgrConfig from trainingmgr.schemas.trainingjob_schema import TrainingJobSchema -from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainining_jobs, \ -get_steps_state, change_status_tj, update_trainingPipeline +from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainingjob_by_modelId, get_trainining_jobs, \ +get_steps_state, update_trainingPipeline from trainingmgr.common.trainingmgr_util import check_key_in_dictionary -from trainingmgr.common.trainingmgr_operations import data_extraction_start -from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField -from trainingmgr.service.featuregroup_service import get_featuregroup_by_name -from trainingmgr.constants.steps import Steps -from trainingmgr.constants.states import States -from trainingmgr.handler.async_handler import DATAEXTRACTION_JOBS_CACHE +from trainingmgr.common.trainingConfig_parser import validateTrainingConfig from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service training_job_controller = Blueprint('training_job_controller', __name__) LOGGER = TrainingMgrConfig().logger @@ -66,7 +61,7 @@ def delete_trainingjob(training_job_id): def create_trainingjob(): try: - + LOGGER.debug(f"request for training with json {request.get_json()}") request_json = request.get_json() if check_key_in_dictionary(["training_config"], request_json): @@ -81,11 +76,12 @@ def create_trainingjob(): return jsonify({'Exception': 'The TrainingConfig is not correct'}), status.HTTP_400_BAD_REQUEST model_id = trainingjob.modelId - registered_model_dict = get_modelinfo_by_modelId_service(model_id.modelname, model_id.modelversion)[0] + registered_model_list = get_modelinfo_by_modelId_service(model_id.modelname, model_id.modelversion) # Verify if the modelId is registered over mme or not - if registered_model_dict is None: + if registered_model_list is None: return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} is not registered at MME, Please first register at MME and then continue"}), status.HTTP_400_BAD_REQUEST + registered_model_dict = registered_model_list[0] if registered_model_dict["modelLocation"] != trainingjob.model_location: return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} and trainingjob created does not have same modelLocation, Please first register at MME properly and then continue"}), status.HTTP_400_BAD_REQUEST @@ -145,88 +141,3 @@ def get_trainingjob_status(training_job_id): return jsonify({ 'message': str(err) }), 500 - -@training_job_controller.route('/training-jobs//training', methods=['POST']) -def training(training_job_id): - """ - Rest end point to start training job. - It calls data extraction module for data extraction and other training steps - - Args in function: - training_job_id: str - id of trainingjob. - - Args in json: - not required json - - Returns: - json: - training_job_id: str - name of trainingjob - result: str - route of data extraction module for getting data extraction status of - given training_job_id . - status code: - HTTP status code 200 - - Exceptions: - all exception are provided with exception message and HTTP status code. - """ - - LOGGER.debug("Request for training trainingjob %s ", training_job_id) - try: - trainingjob = get_training_job(training_job_id) - featuregroup= get_featuregroup_by_name(getField(trainingjob.training_config, "feature_group_name")) - feature_list_string = featuregroup.feature_list - influxdb_info_dic={} - influxdb_info_dic["host"]=featuregroup.host - influxdb_info_dic["port"]=featuregroup.port - influxdb_info_dic["bucket"]=featuregroup.bucket - influxdb_info_dic["token"]=featuregroup.token - influxdb_info_dic["db_org"] = featuregroup.db_org - influxdb_info_dic["source_name"]= featuregroup.source_name - _measurement = featuregroup.measurement - query_filter = getField(trainingjob.training_config, "query_filter") - datalake_source = {featuregroup.datalake_source: {}} # Datalake source should be taken from FeatureGroup (not TrainingJob) - LOGGER.debug('Starting Data Extraction...') - de_response = data_extraction_start(TRAININGMGR_CONFIG_OBJ, training_job_id, - feature_list_string, query_filter, datalake_source, - _measurement, influxdb_info_dic, featuregroup.featuregroup_name) - if (de_response.status_code == status.HTTP_200_OK ): - LOGGER.debug("Response from data extraction for " + \ - str(trainingjob.id) + " : " + json.dumps(de_response.json())) - change_status_tj(trainingjob.id, - Steps.DATA_EXTRACTION.name, - States.IN_PROGRESS.name) - with LOCK: - DATAEXTRACTION_JOBS_CACHE[trainingjob.id] = "Scheduled" - elif( de_response.headers['content-type'] == MIMETYPE_JSON ) : - errMsg = "Data extraction responded with error code." - LOGGER.error(errMsg) - json_data = de_response.json() - LOGGER.debug(str(json_data)) - if check_key_in_dictionary(["result"], json_data): - return jsonify({ - "message": json.dumps({"Failed":errMsg + json_data["result"]}) - }), 500 - else: - return jsonify({ - "message": errMsg - }), 500 - else: - return jsonify({ - "message": "failed data extraction" - }), 500 - except TMException as err: - if "No row was found when one was required" in str(err): - return jsonify({ - 'message': str(err) - }), 404 - except Exception as e: - # print(traceback.format_exc()) - # response_data = {"Exception": str(err)} - LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e)) - return jsonify({ - 'message': str(e) - }), 500 - return jsonify({"result": "training started"}), 200 \ No newline at end of file diff --git a/trainingmgr/db/trainingjob_db.py b/trainingmgr/db/trainingjob_db.py index 7bd774a..7271de0 100644 --- a/trainingmgr/db/trainingjob_db.py +++ b/trainingmgr/db/trainingjob_db.py @@ -105,7 +105,6 @@ def get_trainingjob(id: int=None): else: tjs = TrainingJob.query.all() return tjs - return tj def change_field_value_by_version(trainingjob_name, version, field, field_value): """ @@ -151,11 +150,14 @@ def change_steps_state(trainingjob_id, step: Steps, state:States): raise DBException(f'{DB_QUERY_EXEC_ERROR} the change_steps_state : {str(e)}') -def change_state_to_failed(trainingjob): +def change_state_to_failed(trainingjob_id): try: + trainingjob = TrainingJob.query.filter(TrainingJob.id==trainingjob_id).one() steps_state = json.loads(trainingjob.steps_state.states) - steps_state = {step: States.FAILED.name for step in steps_state if steps_state[step] == States.IN_PROGRESS.name} + for step in steps_state: + if steps_state[step] == States.IN_PROGRESS.name: + steps_state[step] = States.FAILED.name trainingjob.steps_state.states=json.dumps(steps_state) db.session.add(trainingjob) db.session.commit() diff --git a/trainingmgr/handler/async_handler.py b/trainingmgr/handler/async_handler.py index 91cf105..c2c2674 100644 --- a/trainingmgr/handler/async_handler.py +++ b/trainingmgr/handler/async_handler.py @@ -5,7 +5,7 @@ import time import requests from trainingmgr.common.trainingConfig_parser import getField from trainingmgr.common.trainingmgr_config import TrainingMgrConfig -from trainingmgr.common.trainingmgr_operations import data_extraction_status +from trainingmgr.common.trainingmgr_operations import data_extraction_status, notification_rapp # from trainingmgr.common.trainingmgr_util import handle_async_feature_engineering_status_exception_case from trainingmgr.common.exceptions_utls import DBException, TMException from trainingmgr.constants import Steps, States @@ -13,8 +13,6 @@ from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk from trainingmgr.db.trainingjob_db import change_state_to_failed, get_trainingjob, change_steps_state - - # Global variables LOCK = Lock() DATAEXTRACTION_JOBS_CACHE = {} @@ -84,8 +82,8 @@ def check_and_notify_feature_engineering_status(APP,db): except Exception as err: LOGGER.error(f"Error processing DATAEXTRACTION_JOBS_CACHE: {str(err)}") with APP.app_context(): - change_state_to_failed(trainingjob) - # notification_rapp(trainingjob.id) + change_state_to_failed(trainingjob.id) + notification_rapp(trainingjob.id) with LOCK: try: DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id) diff --git a/trainingmgr/service/training_job_service.py b/trainingmgr/service/training_job_service.py index cc9b0ba..24187d2 100644 --- a/trainingmgr/service/training_job_service.py +++ b/trainingmgr/service/training_job_service.py @@ -19,17 +19,16 @@ import json from threading import Lock from flask_api import status from flask import jsonify -from trainingmgr.common.trainingmgr_operations import data_extraction_start +from trainingmgr.common.trainingmgr_operations import data_extraction_start, notification_rapp from trainingmgr.db.model_db import get_model_by_modelId -from trainingmgr.db.trainingjob_db import delete_trainingjob_by_id, create_trainingjob, get_trainingjob, get_trainingjob_by_modelId_db, \ +from trainingmgr.db.trainingjob_db import change_state_to_failed, delete_trainingjob_by_id, create_trainingjob, get_trainingjob, get_trainingjob_by_modelId_db, \ change_steps_state, change_field_value, change_field_value, change_steps_state_df, changeartifact from trainingmgr.common.exceptions_utls import DBException, TMException from trainingmgr.common.trainingConfig_parser import getField, setField from trainingmgr.handler.async_handler import DATAEXTRACTION_JOBS_CACHE from trainingmgr.schemas import TrainingJobSchema from trainingmgr.common.trainingmgr_util import check_key_in_dictionary, get_one_word_status, get_step_in_progress_state -from trainingmgr.constants.steps import Steps -from trainingmgr.constants.states import States +from trainingmgr.constants import Steps, States from trainingmgr.service.pipeline_service import terminate_training_service from trainingmgr.service.featuregroup_service import get_featuregroup_by_name, get_featuregroup_from_inputDataType from trainingmgr.common.trainingmgr_config import TrainingMgrConfig @@ -205,7 +204,7 @@ def training(trainingjob): all exception are provided with exception message and HTTP status code. """ - LOGGER.debug("Request for training trainingjob %s ", trainingjob.id) + LOGGER.debug("Request for training trainingjob id %s ", trainingjob.id) try: # trainingjob = get_training_job(trainingjob_id) # print(trainingjob) @@ -230,10 +229,11 @@ def training(trainingjob): _measurement, influxdb_info_dic, featuregroup.featuregroup_name) if (de_response.status_code == status.HTTP_200_OK ): LOGGER.debug("Response from data extraction for " + \ - training_job_id + " : " + json.dumps(de_response.json())) + str(training_job_id) + " : " + json.dumps(de_response.json())) change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION.name, States.IN_PROGRESS.name) + notification_rapp(trainingjob.id) with LOCK: DATAEXTRACTION_JOBS_CACHE[trainingjob.id] = "Scheduled" elif( de_response.headers['content-type'] == MIMETYPE_JSON ) : @@ -241,6 +241,7 @@ def training(trainingjob): LOGGER.error(errMsg) json_data = de_response.json() LOGGER.debug(str(json_data)) + change_state_to_failed(training_job_id) if check_key_in_dictionary(["result"], json_data): return jsonify({ "message": json.dumps({"Failed":errMsg + json_data["result"]}) @@ -254,6 +255,7 @@ def training(trainingjob): "message": "failed data extraction" }), 500 except TMException as err: + change_state_to_failed(training_job_id) if "No row was found when one was required" in str(err): return jsonify({ 'message': str(err) @@ -261,11 +263,15 @@ def training(trainingjob): except Exception as e: # print(traceback.format_exc()) # response_data = {"Exception": str(err)} - LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e)) + LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e)) + change_state_to_failed(training_job_id) return jsonify({ 'message': str(e) - }), 500 - return jsonify({"Trainingjob": trainingJobSchema.dump(trainingjob)}), 201 + }), 500 + + response = jsonify(trainingJobSchema.dump(trainingjob)) + response.headers['Location'] = "training-jobs/" + str(training_job_id) + return response, 201 def update_trainingPipeline(trainingjob): try: diff --git a/trainingmgr/trainingmgr_main.py b/trainingmgr/trainingmgr_main.py index e6ba523..627d107 100644 --- a/trainingmgr/trainingmgr_main.py +++ b/trainingmgr/trainingmgr_main.py @@ -19,40 +19,27 @@ """" This file contains all rest endpoints exposed by Training manager. """ -import ast import json -import re from logging import Logger -import os -import traceback -import threading -from threading import Lock -import time from flask import Flask, request, send_file, jsonify from flask_api import status from flask_migrate import Migrate from marshmallow import ValidationError import requests from flask_cors import CORS -from werkzeug.utils import secure_filename +from trainingmgr.db.trainingjob_db import change_state_to_failed from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk -from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status, create_dme_filtered_data_job, delete_dme_filtered_data_job, \ - get_model_info +from trainingmgr.common.trainingmgr_operations import notification_rapp, training_start, delete_dme_filtered_data_job from trainingmgr.common.trainingmgr_config import TrainingMgrConfig -from trainingmgr.common.trainingmgr_util import get_one_word_status, check_trainingjob_data, \ - check_key_in_dictionary, get_one_key, \ - response_for_training, get_metrics, \ - handle_async_feature_engineering_status_exception_case, check_feature_group_data, check_trainingjob_name_and_version, check_trainingjob_name_or_featuregroup_name, \ +from trainingmgr.common.trainingmgr_util import check_key_in_dictionary, \ get_feature_group_by_name, edit_feature_group_by_name from trainingmgr.common.exceptions_utls import APIException,TMException from trainingmgr.constants.steps import Steps from trainingmgr.constants.states import States from trainingmgr.db.trainingmgr_ps_db import PSDB -from trainingmgr.common.exceptions_utls import DBException -from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainingjobs -from trainingmgr.models import db, TrainingJob, FeatureGroup +from trainingmgr.models import db from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema -from trainingmgr.db.featuregroup_db import add_featuregroup, edit_featuregroup, get_feature_groups_db, \ +from trainingmgr.db.featuregroup_db import get_feature_groups_db, \ get_feature_group_by_name_db, delete_feature_group_by_name from trainingmgr.controller import featuregroup_controller, training_job_controller from trainingmgr.controller.pipeline_controller import pipeline_controller @@ -60,7 +47,6 @@ from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, get from trainingmgr.handler.async_handler import start_async_handler from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, get_training_job, update_artifact_version -from trainingmgr.service.pipeline_service import start_training_service APP = Flask(__name__) TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig() @@ -221,24 +207,24 @@ def data_extraction_notification(): else: raise TMException("KF Adapter- run_status in not scheduled") except requests.exceptions.ConnectionError as err: - # err_msg = "Failed to connect KF adapter." - # LOGGER.error(err_msg) - # if not change_in_progress_to_failed_by_latest_version(trainingjob_name) : - # LOGGER.error(ERROR_TYPE_DB_STATUS) - # return response_for_training(err_response_code, - # err_msg + str(err) + "(trainingjob name is " + trainingjob_name + ")", - # LOGGER, False, trainingjob_name, MM_SDK) - pass + LOGGER.error(f"DataExtraction Notification failed due to {str(err)}") + try: + notification_rapp(trainingjob.id) + change_state_to_failed(trainingjob.id) + except Exception as e: + if "the change_steps_state to failed" in str(e): + LOGGER.error(f"failed to update the status of trainingjob to FAILED due to {str(err)}") + return jsonify({"Error":"Failed to connect to KF adapter"}) , 504 except Exception as err: LOGGER.error("error is : "+ str(err)) + try: + notification_rapp(trainingjob.id) + change_state_to_failed(trainingjob.id) + except Exception as e: + if "the change_steps_state to failed" in str(e): + LOGGER.error(f"failed to update the status of trainingjob to FAILED due to {str(err)}") return jsonify({"failed":"error"}), 500 - # LOGGER.error("Failed to handle dataExtractionNotification. " + str(err)) - # if not change_in_progress_to_failed_by_latest_version(trainingjob_name) : - # LOGGER.error(ERROR_TYPE_DB_STATUS) - # return response_for_training(err_response_code, - # str(err) + "(trainingjob name is " + trainingjob_name + ")", - # LOGGER, False, trainingjob_name, MM_SDK) return APP.response_class(response=json.dumps({"result": "pipeline is scheduled"}), status=status.HTTP_200_OK, @@ -290,9 +276,7 @@ def pipeline_notification(): Steps.TRAINING_AND_TRAINED_MODEL.name, States.IN_PROGRESS.name) - # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ) - - # version = get_latest_version_trainingjob_name(trainingjob_name) + notification_rapp(trainingjob.id) change_status_tj(trainingjob_id, Steps.TRAINING_AND_TRAINED_MODEL.name, @@ -301,7 +285,8 @@ def pipeline_notification(): Steps.TRAINED_MODEL.name, States.IN_PROGRESS.name) - # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ) + notification_rapp(trainingjob.id) + model_name= trainingjob.modelId.modelname model_version= trainingjob.modelId.modelversion @@ -317,31 +302,36 @@ def pipeline_notification(): change_status_tj(trainingjob_id, Steps.TRAINED_MODEL.name, States.FINISHED.name) - # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ) + notification_rapp(trainingjob.id) else: errMsg = "Trained model is not available " LOGGER.error(errMsg + str(trainingjob_id)) + change_status_tj(trainingjob_id, + Steps.TRAINED_MODEL.name, + States.FAILED.name) + notification_rapp(trainingjob.id) raise TMException(errMsg + str(trainingjob_id)) else: LOGGER.error("Pipeline notification -Training failed " + str(trainingjob_id)) + change_status_tj(trainingjob_id, + Steps.TRAINING.name, + States.FAILED.name) + notification_rapp(trainingjob.id) raise TMException("Pipeline not successful for " + \ str(trainingjob_id) + \ ",request json from kf adapter is: " + json.dumps(request.json)) except Exception as err: #Training failure response LOGGER.error("Pipeline notification failed" + str(err)) - # if not change_in_progress_to_failed_by_latest_version(trainingjob_id) : - # LOGGER.error(ERROR_TYPE_DB_STATUS) - - # return response_for_training(status.HTTP_500_INTERNAL_SERVER_ERROR, - # str(err) + " (trainingjob " + trainingjob_id + ")", - # LOGGER, False, trainingjob_id, MM_SDK) - return "", 500 - #Training success response - # return response_for_training(status.HTTP_200_OK, - # "Pipeline notification success.", - # LOGGER, True, trainingjob_id, MM_SDK) - return "", 200 + try: + notification_rapp(trainingjob.id) + change_state_to_failed(trainingjob.id) + except Exception as e: + if "the change_steps_state to failed" in str(e): + LOGGER.error(f"failed to update the status of trainingjob to FAILED due to {str(err)}") + return jsonify({"Error":"Training Failed"}), 500 + + return jsonify({"Message":"Training successful"}), 200 @@ -395,183 +385,6 @@ def get_all_experiment_names(): mimetype=MIMETYPE_JSON) - -# @APP.route('/trainingjobs/retraining', methods=['POST']) -# def retraining(): -# """ -# Function handling rest endpoint to retrain trainingjobs in request json. trainingjob's -# overall_status should be failed or finished and its deletion_in_progress should be False -# otherwise retraining of that trainingjob is counted in failure. -# Args in function: none -# Required Args in json: -# trainingjobs_list: list -# list containing dictionaries -# dictionary contains -# usecase_name: str -# name of trainingjob -# notification_url(optional): str -# url for notification -# feature_filter(optional): str -# feature filter -# Returns: -# json: -# success count: int -# successful retraining count -# failure count: int -# failure retraining count -# status: HTTP status code 200 -# Exceptions: -# all exception are provided with exception message and HTTP status code. -# """ -# LOGGER.debug('request comes for retraining, ' + json.dumps(request.json)) -# try: -# check_key_in_dictionary(["trainingjobs_list"], request.json) -# except Exception as err: -# raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None - -# trainingjobs_list = request.json['trainingjobs_list'] -# if not isinstance(trainingjobs_list, list): -# raise APIException(status.HTTP_400_BAD_REQUEST, NOT_LIST) - -# for obj in trainingjobs_list: -# try: -# check_key_in_dictionary(["trainingjob_name"], obj) -# except Exception as err: -# raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None - -# not_possible_to_retrain = [] -# possible_to_retrain = [] - -# for obj in trainingjobs_list: -# trainingjob_name = obj['trainingjob_name'] -# results = None -# try: -# trainingjob = get_info_of_latest_version(trainingjob_name) -# except Exception as err: -# not_possible_to_retrain.append(trainingjob_name) -# LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ")") -# continue - -# if trainingjob: -# if trainingjob.deletion_in_progress: -# not_possible_to_retrain.append(trainingjob_name) -# LOGGER.debug("Failed to retrain because deletion in progress" + \ -# "(trainingjob_name is " + trainingjob_name + ")") -# continue - -# if (get_one_word_status(json.loads(trainingjob.steps_state)) -# not in [States.FINISHED.name, States.FAILED.name]): -# not_possible_to_retrain.append(trainingjob_name) -# LOGGER.debug("Not finished or not failed status" + \ -# "(trainingjob_name is " + trainingjob_name + ")") -# continue - -# try: -# add_update_trainingjob(trainingjob, False) -# except Exception as err: -# not_possible_to_retrain.append(trainingjob_name) -# LOGGER.debug(str(err) + "(training job is " + trainingjob_name + ")") -# continue - -# url = 'http://' + str(TRAININGMGR_CONFIG_OBJ.my_ip) + \ -# ':' + str(TRAININGMGR_CONFIG_OBJ.my_port) + \ -# '/trainingjobs/' +trainingjob_name + '/training' -# response = requests.post(url) - -# if response.status_code == status.HTTP_200_OK: -# possible_to_retrain.append(trainingjob_name) -# else: -# LOGGER.debug("not 200 response" + "(trainingjob_name is " + trainingjob_name + ")") -# not_possible_to_retrain.append(trainingjob_name) - -# else: -# LOGGER.debug("not present in postgres db" + "(trainingjob_name is " + trainingjob_name + ")") -# not_possible_to_retrain.append(trainingjob_name) - -# LOGGER.debug('success list: ' + str(possible_to_retrain)) -# LOGGER.debug('failure list: ' + str(not_possible_to_retrain)) - -# return APP.response_class(response=json.dumps( \ -# { -# "success count": len(possible_to_retrain), -# "failure count": len(not_possible_to_retrain) -# }), -# status=status.HTTP_200_OK, -# mimetype='application/json') - - - - -# @APP.route('/trainingjobs/metadata/') -# def get_metadata(trainingjob_name): -# """ -# Function handling rest endpoint to get accuracy, version and model download url for all -# versions of given trainingjob_name which has overall_state FINISHED and -# deletion_in_progress is False - -# Args in function: -# trainingjob_name: str -# name of trainingjob. - -# Args in json: -# No json required - -# Returns: -# json: -# Successed metadata : list -# list containes dictionaries. -# dictionary containts -# accuracy: dict -# metrics of model -# version: int -# version of trainingjob -# url: str -# url for downloading model -# status: -# HTTP status code 200 - -# Exceptions: -# all exception are provided with exception message and HTTP status code. -# """ -# response_code = status.HTTP_500_INTERNAL_SERVER_ERROR -# api_response = {} -# if not check_trainingjob_name_or_featuregroup_name(trainingjob_name): -# return {"Exception":"The trainingjob_name is not correct"}, status.HTTP_400_BAD_REQUEST - -# LOGGER.debug("Request metadata for trainingjob(name of trainingjob is %s) ", trainingjob_name) -# try: -# results = get_all_versions_info_by_name(trainingjob_name) -# if results: -# info_list = [] -# for trainingjob_info in results: -# if (get_one_word_status(json.loads(trainingjob_info.steps_state)) == States.FINISHED.name and -# not trainingjob_info.deletion_in_progress): -# LOGGER.debug("Downloading metric for " +trainingjob_name ) -# data = get_metrics(trainingjob_name, trainingjob_info[11], MM_SDK) -# url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \ -# str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \ -# trainingjob_name + "/" + str(trainingjob_info[11]) + "/Model.zip" -# dict_data = { -# "accuracy": data, -# "version": trainingjob_info.version, -# "url": url -# } -# info_list.append(dict_data) -# #info_list built -# api_response = {"Successed metadata": info_list} -# response_code = status.HTTP_200_OK -# else : -# err_msg = "Not found given trainingjob name-" + trainingjob_name -# LOGGER.error(err_msg) -# response_code = status.HTTP_404_NOT_FOUND -# api_response = {"Exception":err_msg} -# except Exception as err: -# LOGGER.error(str(err)) -# api_response = {"Exception":str(err)} -# return APP.response_class(response=json.dumps(api_response), -# status=response_code, -# mimetype=MIMETYPE_JSON) - @APP.route('/featureGroup/', methods=['GET', 'PUT']) def feature_group_by_name(featuregroup_name): """ @@ -782,66 +595,6 @@ def delete_list_of_feature_group(): mimetype='application/json') -# def async_feature_engineering_status(): -# """ -# This function takes trainingjobs from DATAEXTRACTION_JOBS_CACHE and checks data extraction status -# (using data extraction api) for those trainingjobs, if status is Completed then it calls -# /trainingjob/dataExtractionNotification route for those trainingjobs. -# """ -# url_pipeline_run = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + \ -# ":" + str(TRAININGMGR_CONFIG_OBJ.my_port) + \ -# "/trainingjob/dataExtractionNotification" -# while True: -# with LOCK: -# fjc = list(DATAEXTRACTION_JOBS_CACHE) -# for trainingjob_name in fjc: -# LOGGER.debug("Current DATAEXTRACTION_JOBS_CACHE :" + str(DATAEXTRACTION_JOBS_CACHE)) -# try: -# response = data_extraction_status(trainingjob_name, TRAININGMGR_CONFIG_OBJ) -# if (response.headers['content-type'] != MIMETYPE_JSON or -# response.status_code != status.HTTP_200_OK ): -# raise TMException("Data extraction responsed with error status code or invalid content type" + \ -# "doesn't send json type response (trainingjob " + trainingjob_name + ")") -# response = response.json() -# LOGGER.debug("Data extraction status response for " + \ -# trainingjob_name + " " + json.dumps(response)) - -# if response["task_status"] == "Completed": -# with APP.app_context(): -# change_steps_state_of_latest_version(trainingjob_name, -# Steps.DATA_EXTRACTION.name, -# States.FINISHED.name) -# change_steps_state_of_latest_version(trainingjob_name, -# Steps.DATA_EXTRACTION_AND_TRAINING.name, -# States.IN_PROGRESS.name) -# kf_response = requests.post(url_pipeline_run, -# data=json.dumps({"trainingjob_name": trainingjob_name}), -# headers={ -# 'content-type': MIMETYPE_JSON, -# 'Accept-Charset': 'UTF-8' -# }) -# if (kf_response.headers['content-type'] != MIMETYPE_JSON or -# kf_response.status_code != status.HTTP_200_OK ): -# raise TMException("KF adapter responsed with error status code or invalid content type" + \ -# "doesn't send json type response (trainingjob " + trainingjob_name + ")") -# with LOCK: -# DATAEXTRACTION_JOBS_CACHE.pop(trainingjob_name) -# elif response["task_status"] == "Error": -# raise TMException("Data extraction has failed for " + trainingjob_name) -# except Exception as err: -# LOGGER.error("Failure during procesing of DATAEXTRACTION_JOBS_CACHE," + str(err)) -# """ Job will be removed from DATAEXTRACTION_JOBS_CACHE in handle_async -# There might be some further error during handling of exception -# """ -# handle_async_feature_engineering_status_exception_case(LOCK, -# DATAEXTRACTION_JOBS_CACHE, -# status.HTTP_500_INTERNAL_SERVER_ERROR, -# str(err) + "(trainingjob name is " + trainingjob_name + ")", -# LOGGER, False, trainingjob_name, MM_SDK) - -# #Wait and fetch latest list of trainingjobs -# time.sleep(10) - if __name__ == "__main__": try: if TRAININGMGR_CONFIG_OBJ.is_config_loaded_properly() is False: