"""
This module is for loading training manager configuration.
"""
-import os
from os import getenv
from trainingmgr.common.tmgr_logger import TMLogger
headers = {'Content-type': MIMETYPE_JSON}
-
url = create_url_host_port('http', host, port, 'data-consumer/v1/info-jobs/{}'.format(feature_group_name))
logger.debug(url)
logger.debug(json.dumps(job_json))
try:
trainingjob = get_trainingjob(trainingjob_id)
steps_state = trainingjob.steps_state.states
- if trainingjob.notification_url != "":
+ if trainingjob.notification_url != "" and trainingjob.notification_url is not None:
response = requests.post(trainingjob.notification_url,
data=json.dumps(steps_state),
headers={
import json
import re
from flask_api import status
-import requests
from marshmallow import ValidationError
-from trainingmgr.db.common_db_fun import change_in_progress_to_failed_by_latest_version, \
- get_field_by_latest_version, change_field_of_latest_version, \
- get_latest_version_trainingjob_name
-from trainingmgr.db.featuregroup_db import add_featuregroup, edit_featuregroup, get_feature_groups_db, \
+from trainingmgr.db.common_db_fun import change_in_progress_to_failed_by_latest_version
+from trainingmgr.db.featuregroup_db import edit_featuregroup, \
get_feature_group_by_name_db, delete_feature_group_by_name
from trainingmgr.constants.states import States
from trainingmgr.common.exceptions_utls import APIException,TMException,DBException
from trainingmgr.common.trainingmgr_operations import create_dme_filtered_data_job
-from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema
+from trainingmgr.schemas import FeatureGroupSchema
from trainingmgr.constants.steps import Steps
ERROR_TYPE_KF_ADAPTER_JSON = "Kf adapter doesn't sends json type response"
response_code =status.HTTP_200_OK
# TODO: Implement the process where DME edits from the dashboard are applied to the endpoint
if featuregroup.enable_dme == True :
- response= create_dme_filtered_data_job(tm_conf_obj, featuregroup.source_name, featuregroup.feature_list,
+ response= create_dme_filtered_data_job(tm_conf_obj, featuregroup.source_name, featuregroup.feature_list, featuregroup.featuregroup_name,
featuregroup.host, featuregroup.port,
featuregroup.measured_obj_class)
if response.status_code != 201:
except DBException as err:
return {"Exception": str(err)}, 400
except Exception as e:
- err_msg = "Failed to create the feature Group "
api_response = {"Exception":str(e)}
logger.error(str(e))
return data
+# TODO: Remove the fxn since it is not called anywhere
def handle_async_feature_engineering_status_exception_case(lock, dataextraction_job_cache, code,
message, logger, is_success,
trainingjob_name, mm_sdk):
if featuregroup.enable_dme == True :
response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, featuregroup.source_name, featuregroup.feature_list, featuregroup.featuregroup_name, featuregroup.host, featuregroup.dme_port, featuregroup.measured_obj_class)
if response.status_code != 201:
- delete_feature_group_by_name(featuregroup)
- return jsonify({"Exception": "Cannot create dme job"}), status.HTTP_400_BAD_REQUEST
+ delete_feature_group_by_name(featuregroup.featuregroup_name)
+ return jsonify({"Exception": "Cannot create dme job | DME Error : " + str(response.json()["detail"])}), status.HTTP_400_BAD_REQUEST
except ValidationError as err:
LOGGER.error(f"Failed to create the feature Group {str(err)}")
return {"Exception": str(err)}, 400
from trainingmgr.common.exceptions_utls import TMException
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
from trainingmgr.schemas.trainingjob_schema import TrainingJobSchema
-from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainingjob_by_modelId, get_trainining_jobs, \
+from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainining_jobs, \
get_steps_state
from trainingmgr.common.trainingmgr_util import check_key_in_dictionary
from trainingmgr.common.trainingConfig_parser import validateTrainingConfig
def delete_trainingjob(training_job_id):
LOGGER.debug(f'delete training job : {training_job_id}')
try:
- if delete_training_job(str(training_job_id)):
+ if delete_training_job(int(training_job_id)):
LOGGER.debug(f'training job with {training_job_id} is deleted successfully.')
return '', 204
else:
from trainingmgr.common.exceptions_utls import DBException
from sqlalchemy.exc import IntegrityError
from psycopg2.errors import UniqueViolation
-from psycopg2 import errors
from trainingmgr.models import db, FeatureGroup
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
try:
return FeatureGroup.query.with_entities(FeatureGroup.featuregroup_name).filter_by(feature_list=inputDataType).all()
except Exception as err:
- raise DBException("Unable to query in get_feature_groups_from_inputDataType_db with error : ", err)
+ raise DBException(DB_QUERY_EXEC_ERROR + "in get_feature_groups_from_inputDataType_db with error : " + str(err))
def delete_feature_group_by_name(featuregroup_name):
"""
# ==================================================================================
from trainingmgr.common.exceptions_utls import DBException
-from trainingmgr.models import db, ModelID
+from trainingmgr.models import ModelID
def get_model_by_modelId(modelname, modelversion):
try:
from trainingmgr.models import db, TrainingJob, TrainingJobStatus, ModelID
from trainingmgr.constants.steps import Steps
from trainingmgr.constants.states import States
-from sqlalchemy.sql import func
from sqlalchemy.exc import NoResultFound
-from trainingmgr.common.trainingConfig_parser import getField
raise DBException(f'{DB_QUERY_EXEC_ERROR} : {str(e)}' )
def get_trainingjob(id: int=None):
- if id is not None:
- return TrainingJob.query.filter(TrainingJob.id==id).one()
- else:
- tjs = TrainingJob.query.all()
- return tjs
+ try:
+ if id is not None:
+ return TrainingJob.query.filter(TrainingJob.id==id).one()
+ else:
+ tjs = TrainingJob.query.all()
+ return tjs
+ except NoResultFound:
+ # id is not present
+ return None
+ except Exception as e:
+ raise DBException(f'{DB_QUERY_EXEC_ERROR} : {str(e)}' )
+
def change_field_value_by_version(trainingjob_name, version, field, field_value):
"""
This function updates field's value to field_value of <trainingjob_name, version> trainingjob.
"""
- conn = None
try:
if field == "deletion_in_progress":
trainingjob = TrainingJob.query.filter(TrainingJob.trainingjob_name == trainingjob_name).filter(TrainingJob.version == version).first()
# trainingjob_name = trainingjob.trainingjob_name
with APP.app_context():
trainingjob = get_trainingjob(trainingjob_id)
+
+ if trainingjob is None:
+ # trainingjob_id not present in db, A possible case of deletion
+ LOGGER.debug(f"Training-Job Id {trainingjob_id} is Found to be deleted| Removing from DATAEXTRACTION_JOBS_CACHE")
+ with LOCK:
+ DATAEXTRACTION_JOBS_CACHE.pop(trainingjob_id)
+ continue
+
featuregroup_name = getField(trainingjob.training_config, "feature_group_name")
response = data_extraction_status(featuregroup_name, trainingjob_id, TRAININGMGR_CONFIG_OBJ)
if (response.headers.get('content-type') != "application/json" or
)
if (kf_response.headers.get('content-type') != "application/json" or
kf_response.status_code != 200):
- raise TMException(f"KF adapter returned an error for {featuregroup_name}.")
+ LOGGER.error(f"KF adapter returned an error for {featuregroup_name}. | Response : {kf_response.json()}")
+ raise TMException(f"KF adapter returned an error for {featuregroup_name}. ")
with LOCK:
DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id)
elif response_data["task_status"] == "Error":
raise TMException(f"Data extraction failed for {featuregroup_name}.")
+ except DBException as err:
+ # If there is any communication error with db, the thread must not fail
+ LOGGER.error("Recieved Db Failure in async-handler| Error : " + str(err))
+ continue
except Exception as err:
LOGGER.error(f"Error processing DATAEXTRACTION_JOBS_CACHE: {str(err)}")
- with APP.app_context():
- change_state_to_failed(trainingjob.id)
- notification_rapp(trainingjob.id)
- with LOCK:
- try:
- DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id)
- except KeyError as key_err:
- LOGGER.error("The training job key doesn't exist in DATAEXTRACTION_JOBS_CACHE: " + str(key_err))
+ #The following try-block will prevent thread-failure when 'change_state_to_failed' fails
+ try:
+ with APP.app_context():
+ change_state_to_failed(trainingjob.id)
+ notification_rapp(trainingjob.id)
+ with LOCK:
+ DATAEXTRACTION_JOBS_CACHE.pop(trainingjob.id)
+ except KeyError as key_err:
+ LOGGER.error("The training job key doesn't exist in DATAEXTRACTION_JOBS_CACHE: " + str(key_err))
+ except Exception as err:
+ LOGGER.error(f"Error processing DATAEXTRACTION_JOBS_CACHE-Exception: {str(err)}")
+
time.sleep(10) # Sleep before checking again
#
# ==============================================================================
from trainingmgr.models import db
-from datetime import datetime
from sqlalchemy.sql import func
from sqlalchemy import Integer, ForeignKey, String, DateTime, Column, Boolean
from sqlalchemy.orm import relationship
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
import requests
from trainingmgr.common.exceptions_utls import TMException
-from flask_api import status
import requests
-import json
LOGGER = TrainingMgrConfig().logger
from trainingmgr.models import TrainingJob
from trainingmgr.models.trainingjob import ModelID
import json
-from marshmallow import pre_load, post_dump, fields, validates, ValidationError
+from marshmallow import pre_load, post_dump
PATTERN = re.compile(r"\w+")
try:
# Signal Deletion in Progress
tj = get_trainingjob(training_job_id)
+ if tj is None:
+ return False
change_field_value(training_job_id, "deletion_in_progress", True)
steps_state = json.loads(tj.steps_state.states)
overall_status = get_one_word_status(steps_state)
elif (step_in_progress_state == Steps.TRAINING or (step_in_progress_state == Steps.DATA_EXTRACTION_AND_TRAINING and tj.run_id is not None)):
# Signal the Kf-Adapter to terminate the
response = terminate_training_service(tj.run_id)
- LOGGER.debug("Deletion-Response : " + response)
+ LOGGER.debug("Deletion-Response : " + response.text)
isDeleted = delete_trainingjob_by_id(id=training_job_id)
if isDeleted:
else :
return getField(training_config, "retraining_pipeline_name"), getField(training_config, "retraining_pipeline_version")
except Exception as err:
- raise TMException(f"cant fetch training or retraining pipeline name or version from trainingconfig {training_config}")
\ No newline at end of file
+ raise TMException(f"cant fetch training or retraining pipeline name or version from trainingconfig {training_config}| Error: " + str(err))
\ No newline at end of file
from flask import Flask, request, send_file, jsonify
from flask_api import status
from flask_migrate import Migrate
-from marshmallow import ValidationError
import requests
from flask_cors import CORS
from trainingmgr.db.trainingjob_db import change_state_to_failed
from trainingmgr.constants.states import States
from trainingmgr.db.trainingmgr_ps_db import PSDB
from trainingmgr.models import db
-from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema
-from trainingmgr.db.featuregroup_db import get_feature_groups_db, \
- get_feature_group_by_name_db, delete_feature_group_by_name
+from trainingmgr.schemas import TrainingJobSchema , FeatureGroupSchema
+from trainingmgr.db.featuregroup_db import get_feature_group_by_name_db, delete_feature_group_by_name
from trainingmgr.controller import featuregroup_controller, training_job_controller
from trainingmgr.controller.pipeline_controller import pipeline_controller
-from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField
+from trainingmgr.common.trainingConfig_parser import getField
from trainingmgr.handler.async_handler import start_async_handler
from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service
-from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, fetch_pipelinename_and_version, get_training_job, update_artifact_version
+from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, fetch_pipelinename_and_version, get_training_job
APP = Flask(__name__)
TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
return {"Exception": "error while downloading model"}, status.HTTP_500_INTERNAL_SERVER_ERROR
-
-# Training-Config Handled
@APP.route('/trainingjob/dataExtractionNotification', methods=['POST'])
def data_extraction_notification():
"""
all exception are provided with exception message and HTTP status code.
"""
LOGGER.debug("Data extraction notification...")
- err_response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
- results = None
try:
if not check_key_in_dictionary(["trainingjob_id"], request.json) :
err_msg = "featuregroup_name or trainingjob_id key not available in request"
if not check_key_in_dictionary(["run_status", "run_id"], json_data):
err_msg = "Kf adapter invalid response from , key not present ,run_status or run_id for " + str(trainingjob_id)
Logger.error(err_msg)
- err_response_code = status.HTTP_400_BAD_REQUEST
raise TMException(err_msg)
if json_data["run_status"] == 'scheduled':
return jsonify({"Message":"Training successful"}), 200
-
-# Moved to pipelineMgr (to be deleted in future)
-@APP.route('/experiments', methods=['GET'])
-def get_all_experiment_names():
- """
- Function handling rest endpoint to get all experiment names.
-
- Args in function:
- none
-
- Args in json:
- no json required
-
- Returns:
- json:
- experiment_names : list
- list containing all experiment names(as str).
- status code:
- HTTP status code 200
-
- Exceptions:
- all exception are provided with exception message and HTTP status code.
- """
-
- LOGGER.debug("request for getting all experiment names is come.")
- api_response = {}
- reponse_code = status.HTTP_500_INTERNAL_SERVER_ERROR
- try:
- kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
- kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
- if kf_adapter_ip!=None and kf_adapter_port!=None:
- url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/experiments'
- LOGGER.debug("url is :" + url)
- response = requests.get(url)
- if response.headers['content-type'] != MIMETYPE_JSON:
- err_smg = ERROR_TYPE_KF_ADAPTER_JSON
- raise TMException(err_smg)
-
- experiment_names = []
- for experiment in response.json().keys():
- experiment_names.append(experiment)
- api_response = {"experiment_names": experiment_names}
- reponse_code = status.HTTP_200_OK
- except Exception as err:
- api_response = {"Exception": str(err)}
- LOGGER.error(str(err))
- return APP.response_class(response=json.dumps(api_response),
- status=reponse_code,
- mimetype=MIMETYPE_JSON)
-
-
@APP.route('/featureGroup/<featuregroup_name>', methods=['GET', 'PUT'])
def feature_group_by_name(featuregroup_name):
"""