Changes does: 54/13954/3
authorrajdeep11 <rajdeep.sin@samsung.com>
Sat, 21 Dec 2024 11:19:24 +0000 (16:49 +0530)
committerRajdeep Singh <rajdeep.sin@samsung.com>
Sat, 21 Dec 2024 11:31:50 +0000 (11:31 +0000)
1) Enabling the notification rapp
2) Handling the exceptions
3) changing the status to failed, and even fixing its code
        4) Handled not relavent error thrown when model is not registered
        5) add location in header of create trainingjob
Change-Id: I80845fdf907fd3bd59fde1ba454771eaa1290359
Signed-off-by: rajdeep11 <rajdeep.sin@samsung.com>
trainingmgr/common/trainingmgr_operations.py
trainingmgr/common/trainingmgr_util.py
trainingmgr/controller/trainingjob_controller.py
trainingmgr/db/trainingjob_db.py
trainingmgr/handler/async_handler.py
trainingmgr/service/training_job_service.py
trainingmgr/trainingmgr_main.py

index d79e2c1..b9d8311 100644 (file)
@@ -23,6 +23,7 @@ Training manager main operations
 
 import json
 import requests
+from trainingmgr.db.trainingjob_db import get_trainingjob
 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
 import validators
 from trainingmgr.common.exceptions_utls import TMException
@@ -193,13 +194,18 @@ def get_model_info(training_config_obj, model_name):
         logger.error(errMsg)
         raise TMException(errMsg)
 
-# def notification_rapp(trainingjob, training_config_obj):
-#     steps_state = get_steps_state_db(trainingjob.trainingjob_name)
-#     response = requests.post(trainingjob.notification_url,
-#                             data=json.dumps(steps_state),
-#                             headers={
-#                                 'content-type': MIMETYPE_JSON,
-#                                 'Accept-Charset': 'UTF-8'
-#                             })
-#     if response.status_code != 200:
-#         raise TMException("Notification failed: "+response.text)
\ No newline at end of file
+def notification_rapp(trainingjob_id):
+    try:
+        trainingjob = get_trainingjob(trainingjob_id)
+        steps_state = trainingjob.steps_state.states
+        if trainingjob.notification_url != "":
+            response = requests.post(trainingjob.notification_url,
+                                    data=json.dumps(steps_state),
+                                    headers={
+                                        'content-type': MIMETYPE_JSON,
+                                        'Accept-Charset': 'UTF-8'
+                                    })
+            if response.status_code != 200:
+                raise TMException("Notification failed: "+response.text)
+    except Exception as err:
+        LOGGER.error(f"failed to notify rapp due to {str(err)}")
\ No newline at end of file
index acdb4e7..f555df5 100644 (file)
@@ -43,51 +43,6 @@ PATTERN = re.compile(r"\w+")
 featuregroup_schema = FeatureGroupSchema()
 featuregroups_schema = FeatureGroupSchema(many = True)
 
-def response_for_training(code, message, logger, is_success, trainingjob_name, mm_sdk):
-    """
-    Post training job completion,this function provides notifications to the subscribers, 
-    who subscribed for the result of training job and provided a notification url during 
-    training job creation.
-    returns tuple containing result dictionary and status code.
-    """
-    logger.debug("Training job result: " + str(code) + " " + message + " " + str(is_success))
-    
-    try :
-        #TODO DB query optimization, all data to fetch in one call
-        notif_url = get_field_by_latest_version(trainingjob_name, "notification_url")
-        if notif_url :
-
-            model_url = get_field_by_latest_version(trainingjob_name, "model_url")
-            version = get_latest_version_trainingjob_name(trainingjob_name)
-            metrics = get_metrics(trainingjob_name, version, mm_sdk)
-
-            req_json = None
-            if is_success:
-                req_json = {
-                    "result": "success", "model_url": model_url,
-                    "trainingjob_name": trainingjob_name, "metrics": metrics
-                }
-            else:
-                req_json = {"result": "failed", "trainingjob_name": trainingjob_name}
-        
-            response = requests.post(notif_url,
-                    data=json.dumps(req_json),
-                    headers={
-                        'content-type': MIMETYPE_JSON,
-                        'Accept-Charset': 'UTF-8'
-                    })
-            if ( response.headers['content-type'] != MIMETYPE_JSON
-                    or response.status_code != status.HTTP_200_OK ):
-                err_msg = "Failed to notify the subscribed url " + trainingjob_name
-                raise TMException(err_msg)
-    except Exception as err:
-        change_in_progress_to_failed_by_latest_version(trainingjob_name)
-        raise APIException(status.HTTP_500_INTERNAL_SERVER_ERROR,
-                            str(err) + "(trainingjob name is " + trainingjob_name + ")") from None
-    if is_success:
-        return {"result": message}, code
-    return {"Exception": message}, code
-
 
 def check_key_in_dictionary(fields, dictionary):
     '''
@@ -336,27 +291,7 @@ def handle_async_feature_engineering_status_exception_case(lock, dataextraction_
                 dataextraction_job_cache.pop(trainingjob_name)
             except KeyError as key_err:
                 logger.error("The training job key doesn't exist in DATAEXTRACTION_JOBS_CACHE: " + str(key_err))
-
-# def validate_trainingjob_name(trainingjob_name):
-#     """
-#     This function returns True if given trainingjob_name exists in db otherwise
-#     it returns False.
-#     """
-#     results = None
-#     isavailable = False
-#     if (not re.fullmatch(PATTERN, trainingjob_name) or
-#         len(trainingjob_name) < 3 or len(trainingjob_name) > 63):
-#         raise TMException("The name of training job is invalid.")
-
-#     try:
-#         results = get_all_versions_info_by_name(trainingjob_name)
-#     except Exception as err:
-#         errmsg = str(err)
-#         raise DBException("Could not get info from db for " + trainingjob_name + "," + errmsg)
-#     if results:
-#         isavailable = True
-#     return isavailable     
-
+    
 
 def check_trainingjob_name_and_version(trainingjob_name, version):
     if (re.fullmatch(PATTERN, trainingjob_name) and version.isnumeric()):
index e43fcde..99cece0 100644 (file)
@@ -24,15 +24,10 @@ from marshmallow import ValidationError
 from trainingmgr.common.exceptions_utls import TMException
 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
 from trainingmgr.schemas.trainingjob_schema import TrainingJobSchema
-from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainining_jobs, \
-get_steps_state, change_status_tj, update_trainingPipeline
+from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainingjob_by_modelId, get_trainining_jobs, \
+get_steps_state, update_trainingPipeline
 from trainingmgr.common.trainingmgr_util import check_key_in_dictionary
-from trainingmgr.common.trainingmgr_operations import data_extraction_start
-from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField
-from trainingmgr.service.featuregroup_service import get_featuregroup_by_name
-from trainingmgr.constants.steps import Steps
-from trainingmgr.constants.states import States
-from trainingmgr.handler.async_handler import DATAEXTRACTION_JOBS_CACHE
+from trainingmgr.common.trainingConfig_parser import validateTrainingConfig
 from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service
 training_job_controller = Blueprint('training_job_controller', __name__)
 LOGGER = TrainingMgrConfig().logger
@@ -66,7 +61,7 @@ def delete_trainingjob(training_job_id):
 def create_trainingjob():
 
     try:
-
+        LOGGER.debug(f"request for training with json {request.get_json()}")
         request_json = request.get_json()
 
         if check_key_in_dictionary(["training_config"], request_json):
@@ -81,11 +76,12 @@ def create_trainingjob():
             return jsonify({'Exception': 'The TrainingConfig is not correct'}), status.HTTP_400_BAD_REQUEST
         
         model_id = trainingjob.modelId
-        registered_model_dict = get_modelinfo_by_modelId_service(model_id.modelname, model_id.modelversion)[0]
+        registered_model_list = get_modelinfo_by_modelId_service(model_id.modelname, model_id.modelversion)
         # Verify if the modelId is registered over mme or not
-        if registered_model_dict is None:
+        if registered_model_list is None:
             return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} is not registered at MME, Please first register at MME and then continue"}), status.HTTP_400_BAD_REQUEST
 
+        registered_model_dict = registered_model_list[0]
         if registered_model_dict["modelLocation"] != trainingjob.model_location:
             return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} and trainingjob created does not have same modelLocation, Please first register at MME properly and then continue"}), status.HTTP_400_BAD_REQUEST
         
@@ -145,88 +141,3 @@ def get_trainingjob_status(training_job_id):
         return jsonify({
             'message': str(err)
         }), 500
-
-@training_job_controller.route('/training-jobs/<int:training_job_id>/training', methods=['POST'])
-def training(training_job_id):
-    """
-    Rest end point to start training job.
-    It calls data extraction module for data extraction and other training steps
-
-    Args in function:
-        training_job_id: str
-            id of trainingjob.
-
-    Args in json:
-        not required json
-
-    Returns:
-        json:
-            training_job_id: str
-                name of trainingjob
-            result: str
-                route of data extraction module for getting data extraction status of
-                given training_job_id .
-        status code:
-            HTTP status code 200
-
-    Exceptions:
-        all exception are provided with exception message and HTTP status code.
-    """
-
-    LOGGER.debug("Request for training trainingjob  %s ", training_job_id)
-    try:
-        trainingjob = get_training_job(training_job_id)
-        featuregroup= get_featuregroup_by_name(getField(trainingjob.training_config, "feature_group_name"))
-        feature_list_string = featuregroup.feature_list
-        influxdb_info_dic={}
-        influxdb_info_dic["host"]=featuregroup.host
-        influxdb_info_dic["port"]=featuregroup.port
-        influxdb_info_dic["bucket"]=featuregroup.bucket
-        influxdb_info_dic["token"]=featuregroup.token
-        influxdb_info_dic["db_org"] = featuregroup.db_org
-        influxdb_info_dic["source_name"]= featuregroup.source_name
-        _measurement = featuregroup.measurement
-        query_filter = getField(trainingjob.training_config, "query_filter")
-        datalake_source = {featuregroup.datalake_source: {}} # Datalake source should be taken from FeatureGroup (not TrainingJob)
-        LOGGER.debug('Starting Data Extraction...')
-        de_response = data_extraction_start(TRAININGMGR_CONFIG_OBJ, training_job_id,
-                                        feature_list_string, query_filter, datalake_source,
-                                        _measurement, influxdb_info_dic, featuregroup.featuregroup_name)
-        if (de_response.status_code == status.HTTP_200_OK ):
-            LOGGER.debug("Response from data extraction for " + \
-                    str(trainingjob.id) + " : " + json.dumps(de_response.json()))
-            change_status_tj(trainingjob.id,
-                                Steps.DATA_EXTRACTION.name,
-                                States.IN_PROGRESS.name)
-            with LOCK:
-                DATAEXTRACTION_JOBS_CACHE[trainingjob.id] = "Scheduled"
-        elif( de_response.headers['content-type'] == MIMETYPE_JSON ) :
-            errMsg = "Data extraction responded with error code."
-            LOGGER.error(errMsg)
-            json_data = de_response.json()
-            LOGGER.debug(str(json_data))
-            if check_key_in_dictionary(["result"], json_data):
-                return jsonify({
-                    "message": json.dumps({"Failed":errMsg + json_data["result"]})
-                }), 500
-            else:
-                return jsonify({
-                    "message": errMsg
-                }), 500
-        else:
-                return jsonify({
-                    "message": "failed data extraction"
-                }), 500
-    except TMException as err:
-        if "No row was found when one was required" in str(err):
-            return jsonify({
-                    'message': str(err)
-                }), 404 
-    except Exception as e:
-        # print(traceback.format_exc())
-        # response_data =  {"Exception": str(err)}
-        LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e))   
-        return jsonify({
-            'message': str(e)
-        }), 500      
-    return jsonify({"result": "training started"}), 200
\ No newline at end of file
index 7bd774a..7271de0 100644 (file)
@@ -105,7 +105,6 @@ def get_trainingjob(id: int=None):
     else:
         tjs = TrainingJob.query.all()
         return tjs
-    return tj
 
 def change_field_value_by_version(trainingjob_name, version, field, field_value):
     """
@@ -151,11 +150,14 @@ def change_steps_state(trainingjob_id, step: Steps, state:States):
         raise DBException(f'{DB_QUERY_EXEC_ERROR} the change_steps_state : {str(e)}')
 
 
-def change_state_to_failed(trainingjob):
+def change_state_to_failed(trainingjob_id):
 
     try:
+        trainingjob = TrainingJob.query.filter(TrainingJob.id==trainingjob_id).one()
         steps_state = json.loads(trainingjob.steps_state.states)
-        steps_state = {step: States.FAILED.name for step in steps_state if steps_state[step] == States.IN_PROGRESS.name}
+        for step in steps_state:
+            if steps_state[step] == States.IN_PROGRESS.name:
+                steps_state[step] = States.FAILED.name
         trainingjob.steps_state.states=json.dumps(steps_state)
         db.session.add(trainingjob)
         db.session.commit()
index 91cf105..c2c2674 100644 (file)
@@ -5,7 +5,7 @@ import time
 import requests
 from trainingmgr.common.trainingConfig_parser import getField
 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
-from trainingmgr.common.trainingmgr_operations import data_extraction_status
+from trainingmgr.common.trainingmgr_operations import data_extraction_status, notification_rapp
 # from trainingmgr.common.trainingmgr_util import handle_async_feature_engineering_status_exception_case
 from trainingmgr.common.exceptions_utls import DBException, TMException
 from trainingmgr.constants import Steps, States
@@ -13,8 +13,6 @@ from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
 from trainingmgr.db.trainingjob_db import change_state_to_failed, get_trainingjob, change_steps_state
 
 
-
-
 # Global variables
 LOCK = Lock()
 DATAEXTRACTION_JOBS_CACHE = {}
@@ -84,8 +82,8 @@ def check_and_notify_feature_engineering_status(APP,db):
             except Exception as err:
                 LOGGER.error(f"Error processing DATAEXTRACTION_JOBS_CACHE: {str(err)}")
                 with APP.app_context():
-                    change_state_to_failed(trainingjob)
-                    notification_rapp(trainingjob.id)
+                    change_state_to_failed(trainingjob.id)
+                    notification_rapp(trainingjob.id)
                     with LOCK:
                         try:
                             DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id)
index cc9b0ba..24187d2 100644 (file)
@@ -19,17 +19,16 @@ import json
 from threading import Lock
 from flask_api import status
 from flask import jsonify
-from trainingmgr.common.trainingmgr_operations import data_extraction_start
+from trainingmgr.common.trainingmgr_operations import data_extraction_start, notification_rapp
 from trainingmgr.db.model_db import get_model_by_modelId
-from trainingmgr.db.trainingjob_db import delete_trainingjob_by_id, create_trainingjob, get_trainingjob, get_trainingjob_by_modelId_db, \
+from trainingmgr.db.trainingjob_db import change_state_to_failed, delete_trainingjob_by_id, create_trainingjob, get_trainingjob, get_trainingjob_by_modelId_db, \
 change_steps_state, change_field_value, change_field_value, change_steps_state_df, changeartifact
 from trainingmgr.common.exceptions_utls import DBException, TMException
 from trainingmgr.common.trainingConfig_parser import getField, setField
 from trainingmgr.handler.async_handler import DATAEXTRACTION_JOBS_CACHE
 from trainingmgr.schemas import TrainingJobSchema
 from trainingmgr.common.trainingmgr_util import check_key_in_dictionary, get_one_word_status, get_step_in_progress_state
-from trainingmgr.constants.steps import Steps
-from trainingmgr.constants.states import States
+from trainingmgr.constants import Steps, States
 from trainingmgr.service.pipeline_service import terminate_training_service
 from trainingmgr.service.featuregroup_service import  get_featuregroup_by_name, get_featuregroup_from_inputDataType
 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
@@ -205,7 +204,7 @@ def training(trainingjob):
         all exception are provided with exception message and HTTP status code.
     """
 
-    LOGGER.debug("Request for training trainingjob  %s ", trainingjob.id)
+    LOGGER.debug("Request for training trainingjob id %s ", trainingjob.id)
     try:
         # trainingjob = get_training_job(trainingjob_id)
         # print(trainingjob)
@@ -230,10 +229,11 @@ def training(trainingjob):
                                         _measurement, influxdb_info_dic, featuregroup.featuregroup_name)
         if (de_response.status_code == status.HTTP_200_OK ):
             LOGGER.debug("Response from data extraction for " + \
-                    training_job_id + " : " + json.dumps(de_response.json()))
+                    str(training_job_id) + " : " + json.dumps(de_response.json()))
             change_status_tj(trainingjob.id,
                                 Steps.DATA_EXTRACTION.name,
                                 States.IN_PROGRESS.name)
+            notification_rapp(trainingjob.id)
             with LOCK:
                 DATAEXTRACTION_JOBS_CACHE[trainingjob.id] = "Scheduled"
         elif( de_response.headers['content-type'] == MIMETYPE_JSON ) :
@@ -241,6 +241,7 @@ def training(trainingjob):
             LOGGER.error(errMsg)
             json_data = de_response.json()
             LOGGER.debug(str(json_data))
+            change_state_to_failed(training_job_id)
             if check_key_in_dictionary(["result"], json_data):
                 return jsonify({
                     "message": json.dumps({"Failed":errMsg + json_data["result"]})
@@ -254,6 +255,7 @@ def training(trainingjob):
                     "message": "failed data extraction"
                 }), 500
     except TMException as err:
+        change_state_to_failed(training_job_id)
         if "No row was found when one was required" in str(err):
             return jsonify({
                     'message': str(err)
@@ -261,11 +263,15 @@ def training(trainingjob):
     except Exception as e:
         # print(traceback.format_exc())
         # response_data =  {"Exception": str(err)}
-        LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e))   
+        LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e)) 
+        change_state_to_failed(training_job_id)
         return jsonify({
             'message': str(e)
-        }), 500      
-    return jsonify({"Trainingjob": trainingJobSchema.dump(trainingjob)}), 201
+        }), 500  
+        
+    response =  jsonify(trainingJobSchema.dump(trainingjob))
+    response.headers['Location'] = "training-jobs/" + str(training_job_id)
+    return response, 201
 
 def update_trainingPipeline(trainingjob):
     try:
index e6ba523..627d107 100644 (file)
 """"
 This file contains all rest endpoints exposed by Training manager.
 """
-import ast
 import json
-import re
 from logging import Logger
-import os
-import traceback
-import threading
-from threading import Lock
-import time
 from flask import Flask, request, send_file, jsonify
 from flask_api import status
 from flask_migrate import Migrate
 from marshmallow import ValidationError
 import requests
 from flask_cors import CORS
-from werkzeug.utils import secure_filename
+from trainingmgr.db.trainingjob_db import change_state_to_failed
 from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
-from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status, create_dme_filtered_data_job, delete_dme_filtered_data_job, \
-    get_model_info
+from trainingmgr.common.trainingmgr_operations import  notification_rapp, training_start, delete_dme_filtered_data_job
 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
-from trainingmgr.common.trainingmgr_util import get_one_word_status, check_trainingjob_data, \
-    check_key_in_dictionary, get_one_key, \
-    response_for_training, get_metrics, \
-    handle_async_feature_engineering_status_exception_case, check_feature_group_data, check_trainingjob_name_and_version, check_trainingjob_name_or_featuregroup_name, \
+from trainingmgr.common.trainingmgr_util import check_key_in_dictionary, \
     get_feature_group_by_name, edit_feature_group_by_name
 from trainingmgr.common.exceptions_utls import APIException,TMException
 from trainingmgr.constants.steps import Steps
 from trainingmgr.constants.states import States
 from trainingmgr.db.trainingmgr_ps_db import PSDB
-from trainingmgr.common.exceptions_utls import DBException
-from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainingjobs
-from trainingmgr.models import db, TrainingJob, FeatureGroup
+from trainingmgr.models import db
 from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema
-from trainingmgr.db.featuregroup_db import add_featuregroup, edit_featuregroup, get_feature_groups_db, \
+from trainingmgr.db.featuregroup_db import  get_feature_groups_db, \
     get_feature_group_by_name_db, delete_feature_group_by_name
 from trainingmgr.controller import featuregroup_controller, training_job_controller
 from trainingmgr.controller.pipeline_controller import pipeline_controller
@@ -60,7 +47,6 @@ from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, get
 from trainingmgr.handler.async_handler import start_async_handler
 from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service
 from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, get_training_job, update_artifact_version
-from trainingmgr.service.pipeline_service import start_training_service
 
 APP = Flask(__name__)
 TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
@@ -221,24 +207,24 @@ def data_extraction_notification():
         else:
             raise TMException("KF Adapter- run_status in not scheduled")
     except requests.exceptions.ConnectionError as err:
-        # err_msg = "Failed to connect KF adapter."
-        # LOGGER.error(err_msg)
-        # if not change_in_progress_to_failed_by_latest_version(trainingjob_name) :
-        #     LOGGER.error(ERROR_TYPE_DB_STATUS)
-        # return response_for_training(err_response_code,
-        #                                 err_msg + str(err) + "(trainingjob name is " + trainingjob_name + ")",
-        #                                 LOGGER, False, trainingjob_name, MM_SDK)
-        pass
+        LOGGER.error(f"DataExtraction Notification failed due to {str(err)}")
+        try:
+            notification_rapp(trainingjob.id)
+            change_state_to_failed(trainingjob.id)
+        except Exception as e:
+            if "the change_steps_state to failed" in str(e):
+                LOGGER.error(f"failed to update the status of trainingjob to FAILED due to {str(err)}")
+        return jsonify({"Error":"Failed to connect to KF adapter"}) , 504
 
     except Exception as err:
         LOGGER.error("error is : "+ str(err))
+        try:
+            notification_rapp(trainingjob.id)
+            change_state_to_failed(trainingjob.id)
+        except Exception as e:
+            if "the change_steps_state to failed" in str(e):
+                LOGGER.error(f"failed to update the status of trainingjob to FAILED due to {str(err)}")
         return jsonify({"failed":"error"}), 500
-        # LOGGER.error("Failed to handle dataExtractionNotification. " + str(err))
-        # if not change_in_progress_to_failed_by_latest_version(trainingjob_name) :
-        #     LOGGER.error(ERROR_TYPE_DB_STATUS)
-        # return response_for_training(err_response_code,
-        #                                 str(err) + "(trainingjob name is " + trainingjob_name + ")",
-        #                                 LOGGER, False, trainingjob_name, MM_SDK)
 
     return APP.response_class(response=json.dumps({"result": "pipeline is scheduled"}),
                                     status=status.HTTP_200_OK,
@@ -290,9 +276,7 @@ def pipeline_notification():
                             Steps.TRAINING_AND_TRAINED_MODEL.name,
                             States.IN_PROGRESS.name)
             
-            # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ)
-
-            # version = get_latest_version_trainingjob_name(trainingjob_name)
+            notification_rapp(trainingjob.id)
 
             change_status_tj(trainingjob_id,
                             Steps.TRAINING_AND_TRAINED_MODEL.name,
@@ -301,7 +285,8 @@ def pipeline_notification():
                             Steps.TRAINED_MODEL.name,
                             States.IN_PROGRESS.name)
             
-            # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ)
+            notification_rapp(trainingjob.id)
+
             model_name= trainingjob.modelId.modelname
             model_version= trainingjob.modelId.modelversion
 
@@ -317,31 +302,36 @@ def pipeline_notification():
                 change_status_tj(trainingjob_id,
                                 Steps.TRAINED_MODEL.name,
                                 States.FINISHED.name)
-                # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ)
+                notification_rapp(trainingjob.id)
             else:
                 errMsg = "Trained model is not available  "
                 LOGGER.error(errMsg + str(trainingjob_id))
+                change_status_tj(trainingjob_id,
+                                Steps.TRAINED_MODEL.name,
+                                States.FAILED.name)
+                notification_rapp(trainingjob.id)
                 raise TMException(errMsg + str(trainingjob_id))
         else:
             LOGGER.error("Pipeline notification -Training failed " + str(trainingjob_id)) 
+            change_status_tj(trainingjob_id,
+                            Steps.TRAINING.name,
+                            States.FAILED.name)
+            notification_rapp(trainingjob.id)
             raise TMException("Pipeline not successful for " + \
                                         str(trainingjob_id) + \
                                         ",request json from kf adapter is: " + json.dumps(request.json))      
     except Exception as err:
         #Training failure response
         LOGGER.error("Pipeline notification failed" + str(err))
-        # if not change_in_progress_to_failed_by_latest_version(trainingjob_id) :
-        #     LOGGER.error(ERROR_TYPE_DB_STATUS)
-        
-        # return response_for_training(status.HTTP_500_INTERNAL_SERVER_ERROR,
-        #                     str(err) + " (trainingjob " + trainingjob_id + ")",
-        #                     LOGGER, False, trainingjob_id, MM_SDK)
-        return "", 500
-    #Training success response
-    # return response_for_training(status.HTTP_200_OK,
-    #                                         "Pipeline notification success.",
-    #                                         LOGGER, True, trainingjob_id, MM_SDK)
-    return "", 200
+        try:
+            notification_rapp(trainingjob.id)
+            change_state_to_failed(trainingjob.id)
+        except Exception as e:
+            if "the change_steps_state to failed" in str(e):
+                LOGGER.error(f"failed to update the status of trainingjob to FAILED due to {str(err)}")
+        return jsonify({"Error":"Training Failed"}), 500
+
+    return jsonify({"Message":"Training successful"}), 200
 
 
 
@@ -395,183 +385,6 @@ def get_all_experiment_names():
                                   mimetype=MIMETYPE_JSON)
 
 
-
-# @APP.route('/trainingjobs/retraining', methods=['POST'])
-# def retraining():
-#     """
-#     Function handling rest endpoint to retrain trainingjobs in request json. trainingjob's
-#     overall_status should be failed or finished and its deletion_in_progress should be False
-#     otherwise retraining of that trainingjob is counted in failure.
-#     Args in function: none
-#     Required Args in json:
-#         trainingjobs_list: list
-#                        list containing dictionaries
-#                            dictionary contains
-#                                usecase_name: str
-#                                    name of trainingjob
-#                                notification_url(optional): str
-#                                    url for notification
-#                                feature_filter(optional): str
-#                                    feature filter
-#     Returns:
-#         json:
-#             success count: int
-#                 successful retraining count
-#             failure count: int
-#                 failure retraining count
-#         status: HTTP status code 200
-#     Exceptions:
-#         all exception are provided with exception message and HTTP status code.
-#     """
-#     LOGGER.debug('request comes for retraining, ' + json.dumps(request.json))
-#     try:
-#         check_key_in_dictionary(["trainingjobs_list"], request.json)
-#     except Exception as err:
-#         raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
-
-#     trainingjobs_list = request.json['trainingjobs_list']
-#     if not isinstance(trainingjobs_list, list):
-#         raise APIException(status.HTTP_400_BAD_REQUEST, NOT_LIST)
-
-#     for obj in trainingjobs_list:
-#         try:
-#             check_key_in_dictionary(["trainingjob_name"], obj)
-#         except Exception as err:
-#             raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
-
-#     not_possible_to_retrain = []
-#     possible_to_retrain = []
-
-#     for obj in trainingjobs_list:
-#         trainingjob_name = obj['trainingjob_name']
-#         results = None
-#         try:
-#             trainingjob = get_info_of_latest_version(trainingjob_name)
-#         except Exception as err:
-#             not_possible_to_retrain.append(trainingjob_name)
-#             LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ")")
-#             continue
-        
-#         if trainingjob:
-#             if trainingjob.deletion_in_progress:
-#                 not_possible_to_retrain.append(trainingjob_name)
-#                 LOGGER.debug("Failed to retrain because deletion in progress" + \
-#                              "(trainingjob_name is " + trainingjob_name + ")")
-#                 continue
-
-#             if (get_one_word_status(json.loads(trainingjob.steps_state))
-#                     not in [States.FINISHED.name, States.FAILED.name]):
-#                 not_possible_to_retrain.append(trainingjob_name)
-#                 LOGGER.debug("Not finished or not failed status" + \
-#                              "(trainingjob_name is " + trainingjob_name + ")")
-#                 continue
-
-#             try:
-#                 add_update_trainingjob(trainingjob, False)
-#             except Exception as err:
-#                 not_possible_to_retrain.append(trainingjob_name)
-#                 LOGGER.debug(str(err) + "(training job is " + trainingjob_name + ")")
-#                 continue
-
-#             url = 'http://' + str(TRAININGMGR_CONFIG_OBJ.my_ip) + \
-#                   ':' + str(TRAININGMGR_CONFIG_OBJ.my_port) + \
-#                   '/trainingjobs/' +trainingjob_name + '/training'
-#             response = requests.post(url)
-
-#             if response.status_code == status.HTTP_200_OK:
-#                 possible_to_retrain.append(trainingjob_name)
-#             else:
-#                 LOGGER.debug("not 200 response" + "(trainingjob_name is " + trainingjob_name + ")")
-#                 not_possible_to_retrain.append(trainingjob_name)
-
-#         else:
-#             LOGGER.debug("not present in postgres db" + "(trainingjob_name is " + trainingjob_name + ")")
-#             not_possible_to_retrain.append(trainingjob_name)
-
-#         LOGGER.debug('success list: ' + str(possible_to_retrain))
-#         LOGGER.debug('failure list: ' + str(not_possible_to_retrain))
-
-#     return APP.response_class(response=json.dumps( \
-#         {
-#             "success count": len(possible_to_retrain),
-#             "failure count": len(not_possible_to_retrain)
-#         }),
-#         status=status.HTTP_200_OK,
-#         mimetype='application/json')
-
-
-
-
-# @APP.route('/trainingjobs/metadata/<trainingjob_name>')
-# def get_metadata(trainingjob_name):
-#     """
-#     Function handling rest endpoint to get accuracy, version and model download url for all
-#     versions of given trainingjob_name which has overall_state FINISHED and
-#     deletion_in_progress is False
-
-#     Args in function:
-#         trainingjob_name: str
-#             name of trainingjob.
-
-#     Args in json:
-#         No json required
-
-#     Returns:
-#         json:
-#             Successed metadata : list
-#                                  list containes dictionaries.
-#                                      dictionary containts
-#                                          accuracy: dict
-#                                              metrics of model
-#                                          version: int
-#                                              version of trainingjob
-#                                          url: str
-#                                              url for downloading model
-#         status:
-#             HTTP status code 200
-
-#     Exceptions:
-#         all exception are provided with exception message and HTTP status code.
-#     """
-#     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
-#     api_response = {}
-#     if not check_trainingjob_name_or_featuregroup_name(trainingjob_name):
-#         return {"Exception":"The trainingjob_name is not correct"}, status.HTTP_400_BAD_REQUEST
-
-#     LOGGER.debug("Request metadata for trainingjob(name of trainingjob is %s) ", trainingjob_name)
-#     try:
-#         results = get_all_versions_info_by_name(trainingjob_name)
-#         if results:
-#             info_list = []
-#             for trainingjob_info in results:
-#                 if (get_one_word_status(json.loads(trainingjob_info.steps_state)) == States.FINISHED.name and
-#                         not trainingjob_info.deletion_in_progress):                   
-#                     LOGGER.debug("Downloading metric for " +trainingjob_name )
-#                     data = get_metrics(trainingjob_name, trainingjob_info[11], MM_SDK)
-#                     url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \
-#                         str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \
-#                         trainingjob_name + "/" + str(trainingjob_info[11]) + "/Model.zip"
-#                     dict_data = {
-#                         "accuracy": data,
-#                         "version": trainingjob_info.version,
-#                         "url": url
-#                     }
-#                     info_list.append(dict_data)
-#             #info_list built        
-#             api_response = {"Successed metadata": info_list}
-#             response_code = status.HTTP_200_OK
-#         else :
-#             err_msg = "Not found given trainingjob name-" + trainingjob_name
-#             LOGGER.error(err_msg)
-#             response_code = status.HTTP_404_NOT_FOUND
-#             api_response = {"Exception":err_msg}
-#     except Exception as err:
-#         LOGGER.error(str(err))
-#         api_response = {"Exception":str(err)}
-#     return APP.response_class(response=json.dumps(api_response),
-#                                         status=response_code,
-#                                         mimetype=MIMETYPE_JSON)
-
 @APP.route('/featureGroup/<featuregroup_name>', methods=['GET', 'PUT'])
 def feature_group_by_name(featuregroup_name):
     """
@@ -782,66 +595,6 @@ def delete_list_of_feature_group():
         mimetype='application/json')
 
 
-# def async_feature_engineering_status():
-#     """
-#     This function takes trainingjobs from DATAEXTRACTION_JOBS_CACHE and checks data extraction status
-#     (using data extraction api) for those trainingjobs, if status is Completed then it calls
-#     /trainingjob/dataExtractionNotification route for those trainingjobs.
-#     """
-#     url_pipeline_run = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + \
-#                        ":" + str(TRAININGMGR_CONFIG_OBJ.my_port) + \
-#                        "/trainingjob/dataExtractionNotification"
-#     while True:
-#         with LOCK:
-#             fjc = list(DATAEXTRACTION_JOBS_CACHE)
-#         for trainingjob_name in fjc:
-#             LOGGER.debug("Current DATAEXTRACTION_JOBS_CACHE :" + str(DATAEXTRACTION_JOBS_CACHE))
-#             try:
-#                 response = data_extraction_status(trainingjob_name, TRAININGMGR_CONFIG_OBJ)
-#                 if (response.headers['content-type'] != MIMETYPE_JSON or 
-#                         response.status_code != status.HTTP_200_OK ):
-#                     raise TMException("Data extraction responsed with error status code or invalid content type" + \
-#                                          "doesn't send json type response (trainingjob " + trainingjob_name + ")")
-#                 response = response.json()
-#                 LOGGER.debug("Data extraction status response for " + \
-#                             trainingjob_name + " " + json.dumps(response))
-
-#                 if response["task_status"] == "Completed":
-#                     with APP.app_context():
-#                         change_steps_state_of_latest_version(trainingjob_name,
-#                                                                 Steps.DATA_EXTRACTION.name,
-#                                                                 States.FINISHED.name)
-#                         change_steps_state_of_latest_version(trainingjob_name,
-#                                                                 Steps.DATA_EXTRACTION_AND_TRAINING.name,
-#                                                                 States.IN_PROGRESS.name)
-#                     kf_response = requests.post(url_pipeline_run,
-#                                                 data=json.dumps({"trainingjob_name": trainingjob_name}),
-#                                                 headers={
-#                                                     'content-type': MIMETYPE_JSON,
-#                                                     'Accept-Charset': 'UTF-8'
-#                                                 })
-#                     if (kf_response.headers['content-type'] != MIMETYPE_JSON or 
-#                             kf_response.status_code != status.HTTP_200_OK ):
-#                         raise TMException("KF adapter responsed with error status code or invalid content type" + \
-#                                          "doesn't send json type response (trainingjob " + trainingjob_name + ")")
-#                     with LOCK:
-#                         DATAEXTRACTION_JOBS_CACHE.pop(trainingjob_name)        
-#                 elif response["task_status"] == "Error":
-#                     raise TMException("Data extraction has failed for " + trainingjob_name)
-#             except Exception as err:
-#                 LOGGER.error("Failure during procesing of DATAEXTRACTION_JOBS_CACHE," + str(err))
-#                 """ Job will be removed from DATAEXTRACTION_JOBS_CACHE in  handle_async
-#                     There might be some further error during handling of exception
-#                 """
-#                 handle_async_feature_engineering_status_exception_case(LOCK,
-#                                                     DATAEXTRACTION_JOBS_CACHE,
-#                                                     status.HTTP_500_INTERNAL_SERVER_ERROR,
-#                                                     str(err) + "(trainingjob name is " + trainingjob_name + ")",
-#                                                     LOGGER, False, trainingjob_name, MM_SDK)
-
-#         #Wait and fetch latest list of trainingjobs
-#         time.sleep(10)
-
 if __name__ == "__main__":
     try:
         if TRAININGMGR_CONFIG_OBJ.is_config_loaded_properly() is False: