from trainingmgr.common.exceptions_utls import TMException
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
from trainingmgr.schemas.trainingjob_schema import TrainingJobSchema
-from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainining_jobs, \
-get_steps_state, change_status_tj, update_trainingPipeline
+from trainingmgr.service.training_job_service import delete_training_job, create_training_job, get_training_job, get_trainingjob_by_modelId, get_trainining_jobs, \
+get_steps_state, update_trainingPipeline
from trainingmgr.common.trainingmgr_util import check_key_in_dictionary
-from trainingmgr.common.trainingmgr_operations import data_extraction_start
-from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField
-from trainingmgr.service.featuregroup_service import get_featuregroup_by_name
-from trainingmgr.constants.steps import Steps
-from trainingmgr.constants.states import States
-from trainingmgr.handler.async_handler import DATAEXTRACTION_JOBS_CACHE
+from trainingmgr.common.trainingConfig_parser import validateTrainingConfig
from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service
training_job_controller = Blueprint('training_job_controller', __name__)
LOGGER = TrainingMgrConfig().logger
def create_trainingjob():
try:
-
+ LOGGER.debug(f"request for training with json {request.get_json()}")
request_json = request.get_json()
if check_key_in_dictionary(["training_config"], request_json):
return jsonify({'Exception': 'The TrainingConfig is not correct'}), status.HTTP_400_BAD_REQUEST
model_id = trainingjob.modelId
- registered_model_dict = get_modelinfo_by_modelId_service(model_id.modelname, model_id.modelversion)[0]
+ registered_model_list = get_modelinfo_by_modelId_service(model_id.modelname, model_id.modelversion)
# Verify if the modelId is registered over mme or not
- if registered_model_dict is None:
+ if registered_model_list is None:
return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} is not registered at MME, Please first register at MME and then continue"}), status.HTTP_400_BAD_REQUEST
+ registered_model_dict = registered_model_list[0]
if registered_model_dict["modelLocation"] != trainingjob.model_location:
return jsonify({"Exception":f"modelId {model_id.modelname} and {model_id.modelversion} and trainingjob created does not have same modelLocation, Please first register at MME properly and then continue"}), status.HTTP_400_BAD_REQUEST
return jsonify({
'message': str(err)
}), 500
-
-@training_job_controller.route('/training-jobs/<int:training_job_id>/training', methods=['POST'])
-def training(training_job_id):
- """
- Rest end point to start training job.
- It calls data extraction module for data extraction and other training steps
-
- Args in function:
- training_job_id: str
- id of trainingjob.
-
- Args in json:
- not required json
-
- Returns:
- json:
- training_job_id: str
- name of trainingjob
- result: str
- route of data extraction module for getting data extraction status of
- given training_job_id .
- status code:
- HTTP status code 200
-
- Exceptions:
- all exception are provided with exception message and HTTP status code.
- """
-
- LOGGER.debug("Request for training trainingjob %s ", training_job_id)
- try:
- trainingjob = get_training_job(training_job_id)
- featuregroup= get_featuregroup_by_name(getField(trainingjob.training_config, "feature_group_name"))
- feature_list_string = featuregroup.feature_list
- influxdb_info_dic={}
- influxdb_info_dic["host"]=featuregroup.host
- influxdb_info_dic["port"]=featuregroup.port
- influxdb_info_dic["bucket"]=featuregroup.bucket
- influxdb_info_dic["token"]=featuregroup.token
- influxdb_info_dic["db_org"] = featuregroup.db_org
- influxdb_info_dic["source_name"]= featuregroup.source_name
- _measurement = featuregroup.measurement
- query_filter = getField(trainingjob.training_config, "query_filter")
- datalake_source = {featuregroup.datalake_source: {}} # Datalake source should be taken from FeatureGroup (not TrainingJob)
- LOGGER.debug('Starting Data Extraction...')
- de_response = data_extraction_start(TRAININGMGR_CONFIG_OBJ, training_job_id,
- feature_list_string, query_filter, datalake_source,
- _measurement, influxdb_info_dic, featuregroup.featuregroup_name)
- if (de_response.status_code == status.HTTP_200_OK ):
- LOGGER.debug("Response from data extraction for " + \
- str(trainingjob.id) + " : " + json.dumps(de_response.json()))
- change_status_tj(trainingjob.id,
- Steps.DATA_EXTRACTION.name,
- States.IN_PROGRESS.name)
- with LOCK:
- DATAEXTRACTION_JOBS_CACHE[trainingjob.id] = "Scheduled"
- elif( de_response.headers['content-type'] == MIMETYPE_JSON ) :
- errMsg = "Data extraction responded with error code."
- LOGGER.error(errMsg)
- json_data = de_response.json()
- LOGGER.debug(str(json_data))
- if check_key_in_dictionary(["result"], json_data):
- return jsonify({
- "message": json.dumps({"Failed":errMsg + json_data["result"]})
- }), 500
- else:
- return jsonify({
- "message": errMsg
- }), 500
- else:
- return jsonify({
- "message": "failed data extraction"
- }), 500
- except TMException as err:
- if "No row was found when one was required" in str(err):
- return jsonify({
- 'message': str(err)
- }), 404
- except Exception as e:
- # print(traceback.format_exc())
- # response_data = {"Exception": str(err)}
- LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e))
- return jsonify({
- 'message': str(e)
- }), 500
- return jsonify({"result": "training started"}), 200
\ No newline at end of file
from threading import Lock
from flask_api import status
from flask import jsonify
-from trainingmgr.common.trainingmgr_operations import data_extraction_start
+from trainingmgr.common.trainingmgr_operations import data_extraction_start, notification_rapp
from trainingmgr.db.model_db import get_model_by_modelId
-from trainingmgr.db.trainingjob_db import delete_trainingjob_by_id, create_trainingjob, get_trainingjob, get_trainingjob_by_modelId_db, \
+from trainingmgr.db.trainingjob_db import change_state_to_failed, delete_trainingjob_by_id, create_trainingjob, get_trainingjob, get_trainingjob_by_modelId_db, \
change_steps_state, change_field_value, change_field_value, change_steps_state_df, changeartifact
from trainingmgr.common.exceptions_utls import DBException, TMException
from trainingmgr.common.trainingConfig_parser import getField, setField
from trainingmgr.handler.async_handler import DATAEXTRACTION_JOBS_CACHE
from trainingmgr.schemas import TrainingJobSchema
from trainingmgr.common.trainingmgr_util import check_key_in_dictionary, get_one_word_status, get_step_in_progress_state
-from trainingmgr.constants.steps import Steps
-from trainingmgr.constants.states import States
+from trainingmgr.constants import Steps, States
from trainingmgr.service.pipeline_service import terminate_training_service
from trainingmgr.service.featuregroup_service import get_featuregroup_by_name, get_featuregroup_from_inputDataType
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
all exception are provided with exception message and HTTP status code.
"""
- LOGGER.debug("Request for training trainingjob %s ", trainingjob.id)
+ LOGGER.debug("Request for training trainingjob id %s ", trainingjob.id)
try:
# trainingjob = get_training_job(trainingjob_id)
# print(trainingjob)
_measurement, influxdb_info_dic, featuregroup.featuregroup_name)
if (de_response.status_code == status.HTTP_200_OK ):
LOGGER.debug("Response from data extraction for " + \
- training_job_id + " : " + json.dumps(de_response.json()))
+ str(training_job_id) + " : " + json.dumps(de_response.json()))
change_status_tj(trainingjob.id,
Steps.DATA_EXTRACTION.name,
States.IN_PROGRESS.name)
+ notification_rapp(trainingjob.id)
with LOCK:
DATAEXTRACTION_JOBS_CACHE[trainingjob.id] = "Scheduled"
elif( de_response.headers['content-type'] == MIMETYPE_JSON ) :
LOGGER.error(errMsg)
json_data = de_response.json()
LOGGER.debug(str(json_data))
+ change_state_to_failed(training_job_id)
if check_key_in_dictionary(["result"], json_data):
return jsonify({
"message": json.dumps({"Failed":errMsg + json_data["result"]})
"message": "failed data extraction"
}), 500
except TMException as err:
+ change_state_to_failed(training_job_id)
if "No row was found when one was required" in str(err):
return jsonify({
'message': str(err)
except Exception as e:
# print(traceback.format_exc())
# response_data = {"Exception": str(err)}
- LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e))
+ LOGGER.debug("Error is training, job id: " + str(training_job_id)+" " + str(e))
+ change_state_to_failed(training_job_id)
return jsonify({
'message': str(e)
- }), 500
- return jsonify({"Trainingjob": trainingJobSchema.dump(trainingjob)}), 201
+ }), 500
+
+ response = jsonify(trainingJobSchema.dump(trainingjob))
+ response.headers['Location'] = "training-jobs/" + str(training_job_id)
+ return response, 201
def update_trainingPipeline(trainingjob):
try:
""""
This file contains all rest endpoints exposed by Training manager.
"""
-import ast
import json
-import re
from logging import Logger
-import os
-import traceback
-import threading
-from threading import Lock
-import time
from flask import Flask, request, send_file, jsonify
from flask_api import status
from flask_migrate import Migrate
from marshmallow import ValidationError
import requests
from flask_cors import CORS
-from werkzeug.utils import secure_filename
+from trainingmgr.db.trainingjob_db import change_state_to_failed
from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
-from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status, create_dme_filtered_data_job, delete_dme_filtered_data_job, \
- get_model_info
+from trainingmgr.common.trainingmgr_operations import notification_rapp, training_start, delete_dme_filtered_data_job
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
-from trainingmgr.common.trainingmgr_util import get_one_word_status, check_trainingjob_data, \
- check_key_in_dictionary, get_one_key, \
- response_for_training, get_metrics, \
- handle_async_feature_engineering_status_exception_case, check_feature_group_data, check_trainingjob_name_and_version, check_trainingjob_name_or_featuregroup_name, \
+from trainingmgr.common.trainingmgr_util import check_key_in_dictionary, \
get_feature_group_by_name, edit_feature_group_by_name
from trainingmgr.common.exceptions_utls import APIException,TMException
from trainingmgr.constants.steps import Steps
from trainingmgr.constants.states import States
from trainingmgr.db.trainingmgr_ps_db import PSDB
-from trainingmgr.common.exceptions_utls import DBException
-from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainingjobs
-from trainingmgr.models import db, TrainingJob, FeatureGroup
+from trainingmgr.models import db
from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema
-from trainingmgr.db.featuregroup_db import add_featuregroup, edit_featuregroup, get_feature_groups_db, \
+from trainingmgr.db.featuregroup_db import get_feature_groups_db, \
get_feature_group_by_name_db, delete_feature_group_by_name
from trainingmgr.controller import featuregroup_controller, training_job_controller
from trainingmgr.controller.pipeline_controller import pipeline_controller
from trainingmgr.handler.async_handler import start_async_handler
from trainingmgr.service.mme_service import get_modelinfo_by_modelId_service
from trainingmgr.service.training_job_service import change_status_tj, change_update_field_value, get_training_job, update_artifact_version
-from trainingmgr.service.pipeline_service import start_training_service
APP = Flask(__name__)
TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
else:
raise TMException("KF Adapter- run_status in not scheduled")
except requests.exceptions.ConnectionError as err:
- # err_msg = "Failed to connect KF adapter."
- # LOGGER.error(err_msg)
- # if not change_in_progress_to_failed_by_latest_version(trainingjob_name) :
- # LOGGER.error(ERROR_TYPE_DB_STATUS)
- # return response_for_training(err_response_code,
- # err_msg + str(err) + "(trainingjob name is " + trainingjob_name + ")",
- # LOGGER, False, trainingjob_name, MM_SDK)
- pass
+ LOGGER.error(f"DataExtraction Notification failed due to {str(err)}")
+ try:
+ notification_rapp(trainingjob.id)
+ change_state_to_failed(trainingjob.id)
+ except Exception as e:
+ if "the change_steps_state to failed" in str(e):
+ LOGGER.error(f"failed to update the status of trainingjob to FAILED due to {str(err)}")
+ return jsonify({"Error":"Failed to connect to KF adapter"}) , 504
except Exception as err:
LOGGER.error("error is : "+ str(err))
+ try:
+ notification_rapp(trainingjob.id)
+ change_state_to_failed(trainingjob.id)
+ except Exception as e:
+ if "the change_steps_state to failed" in str(e):
+ LOGGER.error(f"failed to update the status of trainingjob to FAILED due to {str(err)}")
return jsonify({"failed":"error"}), 500
- # LOGGER.error("Failed to handle dataExtractionNotification. " + str(err))
- # if not change_in_progress_to_failed_by_latest_version(trainingjob_name) :
- # LOGGER.error(ERROR_TYPE_DB_STATUS)
- # return response_for_training(err_response_code,
- # str(err) + "(trainingjob name is " + trainingjob_name + ")",
- # LOGGER, False, trainingjob_name, MM_SDK)
return APP.response_class(response=json.dumps({"result": "pipeline is scheduled"}),
status=status.HTTP_200_OK,
Steps.TRAINING_AND_TRAINED_MODEL.name,
States.IN_PROGRESS.name)
- # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ)
-
- # version = get_latest_version_trainingjob_name(trainingjob_name)
+ notification_rapp(trainingjob.id)
change_status_tj(trainingjob_id,
Steps.TRAINING_AND_TRAINED_MODEL.name,
Steps.TRAINED_MODEL.name,
States.IN_PROGRESS.name)
- # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ)
+ notification_rapp(trainingjob.id)
+
model_name= trainingjob.modelId.modelname
model_version= trainingjob.modelId.modelversion
change_status_tj(trainingjob_id,
Steps.TRAINED_MODEL.name,
States.FINISHED.name)
- # notification_rapp(trainingjob_info, TRAININGMGR_CONFIG_OBJ)
+ notification_rapp(trainingjob.id)
else:
errMsg = "Trained model is not available "
LOGGER.error(errMsg + str(trainingjob_id))
+ change_status_tj(trainingjob_id,
+ Steps.TRAINED_MODEL.name,
+ States.FAILED.name)
+ notification_rapp(trainingjob.id)
raise TMException(errMsg + str(trainingjob_id))
else:
LOGGER.error("Pipeline notification -Training failed " + str(trainingjob_id))
+ change_status_tj(trainingjob_id,
+ Steps.TRAINING.name,
+ States.FAILED.name)
+ notification_rapp(trainingjob.id)
raise TMException("Pipeline not successful for " + \
str(trainingjob_id) + \
",request json from kf adapter is: " + json.dumps(request.json))
except Exception as err:
#Training failure response
LOGGER.error("Pipeline notification failed" + str(err))
- # if not change_in_progress_to_failed_by_latest_version(trainingjob_id) :
- # LOGGER.error(ERROR_TYPE_DB_STATUS)
-
- # return response_for_training(status.HTTP_500_INTERNAL_SERVER_ERROR,
- # str(err) + " (trainingjob " + trainingjob_id + ")",
- # LOGGER, False, trainingjob_id, MM_SDK)
- return "", 500
- #Training success response
- # return response_for_training(status.HTTP_200_OK,
- # "Pipeline notification success.",
- # LOGGER, True, trainingjob_id, MM_SDK)
- return "", 200
+ try:
+ notification_rapp(trainingjob.id)
+ change_state_to_failed(trainingjob.id)
+ except Exception as e:
+ if "the change_steps_state to failed" in str(e):
+ LOGGER.error(f"failed to update the status of trainingjob to FAILED due to {str(err)}")
+ return jsonify({"Error":"Training Failed"}), 500
+
+ return jsonify({"Message":"Training successful"}), 200
mimetype=MIMETYPE_JSON)
-
-# @APP.route('/trainingjobs/retraining', methods=['POST'])
-# def retraining():
-# """
-# Function handling rest endpoint to retrain trainingjobs in request json. trainingjob's
-# overall_status should be failed or finished and its deletion_in_progress should be False
-# otherwise retraining of that trainingjob is counted in failure.
-# Args in function: none
-# Required Args in json:
-# trainingjobs_list: list
-# list containing dictionaries
-# dictionary contains
-# usecase_name: str
-# name of trainingjob
-# notification_url(optional): str
-# url for notification
-# feature_filter(optional): str
-# feature filter
-# Returns:
-# json:
-# success count: int
-# successful retraining count
-# failure count: int
-# failure retraining count
-# status: HTTP status code 200
-# Exceptions:
-# all exception are provided with exception message and HTTP status code.
-# """
-# LOGGER.debug('request comes for retraining, ' + json.dumps(request.json))
-# try:
-# check_key_in_dictionary(["trainingjobs_list"], request.json)
-# except Exception as err:
-# raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
-
-# trainingjobs_list = request.json['trainingjobs_list']
-# if not isinstance(trainingjobs_list, list):
-# raise APIException(status.HTTP_400_BAD_REQUEST, NOT_LIST)
-
-# for obj in trainingjobs_list:
-# try:
-# check_key_in_dictionary(["trainingjob_name"], obj)
-# except Exception as err:
-# raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
-
-# not_possible_to_retrain = []
-# possible_to_retrain = []
-
-# for obj in trainingjobs_list:
-# trainingjob_name = obj['trainingjob_name']
-# results = None
-# try:
-# trainingjob = get_info_of_latest_version(trainingjob_name)
-# except Exception as err:
-# not_possible_to_retrain.append(trainingjob_name)
-# LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ")")
-# continue
-
-# if trainingjob:
-# if trainingjob.deletion_in_progress:
-# not_possible_to_retrain.append(trainingjob_name)
-# LOGGER.debug("Failed to retrain because deletion in progress" + \
-# "(trainingjob_name is " + trainingjob_name + ")")
-# continue
-
-# if (get_one_word_status(json.loads(trainingjob.steps_state))
-# not in [States.FINISHED.name, States.FAILED.name]):
-# not_possible_to_retrain.append(trainingjob_name)
-# LOGGER.debug("Not finished or not failed status" + \
-# "(trainingjob_name is " + trainingjob_name + ")")
-# continue
-
-# try:
-# add_update_trainingjob(trainingjob, False)
-# except Exception as err:
-# not_possible_to_retrain.append(trainingjob_name)
-# LOGGER.debug(str(err) + "(training job is " + trainingjob_name + ")")
-# continue
-
-# url = 'http://' + str(TRAININGMGR_CONFIG_OBJ.my_ip) + \
-# ':' + str(TRAININGMGR_CONFIG_OBJ.my_port) + \
-# '/trainingjobs/' +trainingjob_name + '/training'
-# response = requests.post(url)
-
-# if response.status_code == status.HTTP_200_OK:
-# possible_to_retrain.append(trainingjob_name)
-# else:
-# LOGGER.debug("not 200 response" + "(trainingjob_name is " + trainingjob_name + ")")
-# not_possible_to_retrain.append(trainingjob_name)
-
-# else:
-# LOGGER.debug("not present in postgres db" + "(trainingjob_name is " + trainingjob_name + ")")
-# not_possible_to_retrain.append(trainingjob_name)
-
-# LOGGER.debug('success list: ' + str(possible_to_retrain))
-# LOGGER.debug('failure list: ' + str(not_possible_to_retrain))
-
-# return APP.response_class(response=json.dumps( \
-# {
-# "success count": len(possible_to_retrain),
-# "failure count": len(not_possible_to_retrain)
-# }),
-# status=status.HTTP_200_OK,
-# mimetype='application/json')
-
-
-
-
-# @APP.route('/trainingjobs/metadata/<trainingjob_name>')
-# def get_metadata(trainingjob_name):
-# """
-# Function handling rest endpoint to get accuracy, version and model download url for all
-# versions of given trainingjob_name which has overall_state FINISHED and
-# deletion_in_progress is False
-
-# Args in function:
-# trainingjob_name: str
-# name of trainingjob.
-
-# Args in json:
-# No json required
-
-# Returns:
-# json:
-# Successed metadata : list
-# list containes dictionaries.
-# dictionary containts
-# accuracy: dict
-# metrics of model
-# version: int
-# version of trainingjob
-# url: str
-# url for downloading model
-# status:
-# HTTP status code 200
-
-# Exceptions:
-# all exception are provided with exception message and HTTP status code.
-# """
-# response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
-# api_response = {}
-# if not check_trainingjob_name_or_featuregroup_name(trainingjob_name):
-# return {"Exception":"The trainingjob_name is not correct"}, status.HTTP_400_BAD_REQUEST
-
-# LOGGER.debug("Request metadata for trainingjob(name of trainingjob is %s) ", trainingjob_name)
-# try:
-# results = get_all_versions_info_by_name(trainingjob_name)
-# if results:
-# info_list = []
-# for trainingjob_info in results:
-# if (get_one_word_status(json.loads(trainingjob_info.steps_state)) == States.FINISHED.name and
-# not trainingjob_info.deletion_in_progress):
-# LOGGER.debug("Downloading metric for " +trainingjob_name )
-# data = get_metrics(trainingjob_name, trainingjob_info[11], MM_SDK)
-# url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \
-# str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \
-# trainingjob_name + "/" + str(trainingjob_info[11]) + "/Model.zip"
-# dict_data = {
-# "accuracy": data,
-# "version": trainingjob_info.version,
-# "url": url
-# }
-# info_list.append(dict_data)
-# #info_list built
-# api_response = {"Successed metadata": info_list}
-# response_code = status.HTTP_200_OK
-# else :
-# err_msg = "Not found given trainingjob name-" + trainingjob_name
-# LOGGER.error(err_msg)
-# response_code = status.HTTP_404_NOT_FOUND
-# api_response = {"Exception":err_msg}
-# except Exception as err:
-# LOGGER.error(str(err))
-# api_response = {"Exception":str(err)}
-# return APP.response_class(response=json.dumps(api_response),
-# status=response_code,
-# mimetype=MIMETYPE_JSON)
-
@APP.route('/featureGroup/<featuregroup_name>', methods=['GET', 'PUT'])
def feature_group_by_name(featuregroup_name):
"""
mimetype='application/json')
-# def async_feature_engineering_status():
-# """
-# This function takes trainingjobs from DATAEXTRACTION_JOBS_CACHE and checks data extraction status
-# (using data extraction api) for those trainingjobs, if status is Completed then it calls
-# /trainingjob/dataExtractionNotification route for those trainingjobs.
-# """
-# url_pipeline_run = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + \
-# ":" + str(TRAININGMGR_CONFIG_OBJ.my_port) + \
-# "/trainingjob/dataExtractionNotification"
-# while True:
-# with LOCK:
-# fjc = list(DATAEXTRACTION_JOBS_CACHE)
-# for trainingjob_name in fjc:
-# LOGGER.debug("Current DATAEXTRACTION_JOBS_CACHE :" + str(DATAEXTRACTION_JOBS_CACHE))
-# try:
-# response = data_extraction_status(trainingjob_name, TRAININGMGR_CONFIG_OBJ)
-# if (response.headers['content-type'] != MIMETYPE_JSON or
-# response.status_code != status.HTTP_200_OK ):
-# raise TMException("Data extraction responsed with error status code or invalid content type" + \
-# "doesn't send json type response (trainingjob " + trainingjob_name + ")")
-# response = response.json()
-# LOGGER.debug("Data extraction status response for " + \
-# trainingjob_name + " " + json.dumps(response))
-
-# if response["task_status"] == "Completed":
-# with APP.app_context():
-# change_steps_state_of_latest_version(trainingjob_name,
-# Steps.DATA_EXTRACTION.name,
-# States.FINISHED.name)
-# change_steps_state_of_latest_version(trainingjob_name,
-# Steps.DATA_EXTRACTION_AND_TRAINING.name,
-# States.IN_PROGRESS.name)
-# kf_response = requests.post(url_pipeline_run,
-# data=json.dumps({"trainingjob_name": trainingjob_name}),
-# headers={
-# 'content-type': MIMETYPE_JSON,
-# 'Accept-Charset': 'UTF-8'
-# })
-# if (kf_response.headers['content-type'] != MIMETYPE_JSON or
-# kf_response.status_code != status.HTTP_200_OK ):
-# raise TMException("KF adapter responsed with error status code or invalid content type" + \
-# "doesn't send json type response (trainingjob " + trainingjob_name + ")")
-# with LOCK:
-# DATAEXTRACTION_JOBS_CACHE.pop(trainingjob_name)
-# elif response["task_status"] == "Error":
-# raise TMException("Data extraction has failed for " + trainingjob_name)
-# except Exception as err:
-# LOGGER.error("Failure during procesing of DATAEXTRACTION_JOBS_CACHE," + str(err))
-# """ Job will be removed from DATAEXTRACTION_JOBS_CACHE in handle_async
-# There might be some further error during handling of exception
-# """
-# handle_async_feature_engineering_status_exception_case(LOCK,
-# DATAEXTRACTION_JOBS_CACHE,
-# status.HTTP_500_INTERNAL_SERVER_ERROR,
-# str(err) + "(trainingjob name is " + trainingjob_name + ")",
-# LOGGER, False, trainingjob_name, MM_SDK)
-
-# #Wait and fetch latest list of trainingjobs
-# time.sleep(10)
-
if __name__ == "__main__":
try:
if TRAININGMGR_CONFIG_OBJ.is_config_loaded_properly() is False: