from trainingmgr.common.exceptions_utls import DBException
tm_table_name = "trainingjob_info" # Table used by 'Training Manager' for training jobs
+DB_QUERY_EXEC_ERROR = "Failed to execute query in "
def get_data_extraction_in_progress_trainingjobs(ps_db_obj):
"""
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"get_data_extraction_in_progress_trainingjobs," + str(err))
finally:
if conn is not None:
This function changes steps_state's key's value to FAILED which is currently
IN_PROGRESS of <trainingjob_name, trainingjob_name trainingjob's latest version> trainingjob.
"""
- status_Changed = False
+ status_changed = False
conn = None
try:
conn = ps_db_obj.get_new_conn()
if steps_state[step] == States.IN_PROGRESS.name:
steps_state[step] = States.FAILED.name
cursor.execute("update {} set steps_state = %s where trainingjob_name = %s and ".format(tm_table_name) + \
- "version = %s", (json.dumps(steps_state), trainingjob_name, version))
- status_Changed = True
+ "version= %s", (json.dumps(steps_state), trainingjob_name, version))
+ status_changed = True
conn.commit()
cursor.close()
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"change_in_progress_to_failed_by_latest_version" + str(err))
finally:
if conn is not None:
conn.close()
- return status_Changed
+ return status_changed
def change_steps_state_of_latest_version(trainingjob_name, ps_db_obj, key, value):
steps_state = json.loads(cursor.fetchall()[0][0])
steps_state[key] = value
cursor.execute("update {} set steps_state = %s where trainingjob_name = %s and ".format(tm_table_name) + \
- "version = %s", (json.dumps(steps_state), trainingjob_name, version))
+ "version= %s", (json.dumps(steps_state), trainingjob_name, version))
conn.commit()
cursor.close()
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"change_steps_state_of_latest_version" + str(err))
finally:
if conn is not None:
steps_state = json.loads(cursor.fetchall()[0][0])
steps_state[key] = value
cursor.execute("update {} set steps_state = %s where trainingjob_name = %s ".format(tm_table_name) + \
- "and version = %s", (json.dumps(steps_state), trainingjob_name, version))
+ "and version= %s", (json.dumps(steps_state), trainingjob_name, version))
conn.commit()
cursor.close()
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"change_steps_state_by_version" + str(err))
finally:
if conn is not None:
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"delete_trainingjob_version" + str(err))
finally:
if conn is not None:
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"get_info_by_version" + str(err))
finally:
if conn is not None:
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"get_trainingjob_info_by_name" + str(err))
finally:
if conn is not None:
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"get_latest_version_trainingjob_name" + str(err))
finally:
if conn is not None:
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"get_all_versions_info_by_name" + str(err))
finally:
if conn is not None:
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"get_all_distinct_trainingjobs" + str(err))
finally:
if conn is not None:
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"get_all_version_num_by_trainingjob_name" + str(err))
finally:
if conn is not None:
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"update_model_download_url" + str(err))
finally:
if conn is not None:
query_filter, adding, enable_versioning,
pipeline_version, datalake_source, trainingjob_name, ps_db_obj, notification_url="",
_measurement="", bucket=""):
- """
- This function add the new row or update existing row with given information
- """
+ """
+ This function add the new row or update existing row with given information
+ """
- conn = None
- try:
+ conn = None
+ try:
arguments_string = json.dumps({"arguments": arguments})
datalake_source_dic = {}
datalake_source_dic[datalake_source] = {}
cursor = conn.cursor()
if not adding:
cursor.execute('''select nt.mv from (select max(version) mv,trainingjob_name from ''' + \
- '''{} group by trainingjob_name) nt where nt.trainingjob_name = %s'''.format(tm_table_name),
- (trainingjob_name))
+ '''{} group by trainingjob_name) nt where nt.trainingjob_name = %s'''.format(tm_table_name),
+ (trainingjob_name,))
version = int(cursor.fetchall()[0][0])
-
+
if enable_versioning:
version = version + 1
cursor.execute('''INSERT INTO {} VALUES '''.format(tm_table_name) + \
deletion_in_progress))
conn.commit()
cursor.close()
- except Exception as err:
- if conn is not None:
- conn.rollback()
- raise DBException("Failed to execute query in " + \
- "add_update_trainingjob" + str(err))
- finally:
- if conn is not None:
- conn.close()
-
+ except Exception as err:
+ if conn is not None:
+ conn.rollback()
+ raise DBException(DB_QUERY_EXEC_ERROR + \
+ "add_update_trainingjob" + str(err))
+ finally:
+ if conn is not None:
+ conn.close()
def get_all_jobs_latest_status_version(ps_db_obj):
"""
except Exception as err:
if conn is not None:
conn.rollback()
- raise DBException("Failed to execute query in " + \
+ raise DBException(DB_QUERY_EXEC_ERROR + \
"get_all_jobs_latest_status_version" + str(err))
finally:
if conn is not None:
LOCK = None
DATAEXTRACTION_JOBS_CACHE = None
+ERROR_TYPE_KF_ADAPTER_JSON = "Kf adapter doesn't sends json type response"
+ERROR_TYPE_DB_STATUS = "Couldn't update the status as failed in db access"
+MIMETYPE_JSON = "application/json"
+
@APP.errorhandler(APIException)
def error(err):
"""
LOGGER.error(err.message)
return APP.response_class(response=json.dumps({"Exception": err.message}),
status=err.code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route('/trainingjobs/<trainingjob_name>/<version>', methods=['GET'])
return APP.response_class(response=json.dumps(response_data),
status=response_code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route('/trainingjobs/<trainingjob_name>/<version>/steps_state', methods=['GET']) # Handled in GUI
@cross_origin()
return APP.response_class(response=reponse_data,
status=response_code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route('/model/<trainingjob_name>/<version>/Model.zip', methods=['GET'])
def get_model(trainingjob_name, version):
DATAEXTRACTION_JOBS_CACHE[trainingjob_name] = "Scheduled"
response_data = de_response.json()
response_code = status.HTTP_200_OK
- elif( de_response.headers['content-type'] == "application/json" ) :
+ elif( de_response.headers['content-type'] == MIMETYPE_JSON ) :
errMsg = "Data extraction responded with error code."
LOGGER.error(errMsg)
json_data = de_response.json()
response_data = {"Exception": str(err)}
LOGGER.debug("Error is training, job name:" + trainingjob_name + str(err))
return APP.response_class(response=json.dumps(response_data),status=response_code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route('/trainingjob/dataExtractionNotification', methods=['POST'])
def data_extraction_notification():
}
response = training_start(TRAININGMGR_CONFIG_OBJ, dict_data, trainingjob_name)
- if ( response.headers['content-type'] != "application/json"
+ if ( response.headers['content-type'] != MIMETYPE_JSON
or response.status_code != status.HTTP_200_OK ):
err_msg = "Kf adapter invalid content-type or status_code for " + trainingjob_name
raise TMException(err_msg)
err_msg = "Failed to connect KF adapter."
LOGGER.error(err_msg)
if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) :
- LOGGER.error("Couldn't update the status as failed in db")
+ LOGGER.error(ERROR_TYPE_DB_STATUS)
return response_for_training(err_response_code,
err_msg + str(err) + "(trainingjob name is " + trainingjob_name + ")",
LOGGER, False, trainingjob_name, PS_DB_OBJ, MM_SDK)
except Exception as err:
LOGGER.error("Failed to handle dataExtractionNotification. " + str(err))
if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) :
- LOGGER.error("Couldn't update the status as failed in db")
+ LOGGER.error(ERROR_TYPE_DB_STATUS)
return response_for_training(err_response_code,
str(err) + "(trainingjob name is " + trainingjob_name + ")",
LOGGER, False, trainingjob_name, PS_DB_OBJ, MM_SDK)
return APP.response_class(response=json.dumps({"result": "pipeline is scheduled"}),
status=status.HTTP_200_OK,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route('/trainingjob/pipelineNotification', methods=['POST'])
change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
Steps.TRAINED_MODEL.name,
States.IN_PROGRESS.name)
-
+
if MM_SDK.check_object(trainingjob_name, version, "Model.zip"):
model_url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \
str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \
#Training failure response
LOGGER.error("Pipeline notification failed" + str(err))
if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) :
- LOGGER.error("Couldn't update the status as failed in db")
+ LOGGER.error(ERROR_TYPE_DB_STATUS)
return response_for_training(status.HTTP_500_INTERNAL_SERVER_ERROR,
str(err) + " (trainingjob " + trainingjob_name + ")",
LOGGER.error(str(err))
return APP.response_class(response=json.dumps(api_response),
status=response_code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route("/pipelines/<pipe_name>/upload", methods=['POST'])
@cross_origin()
uploaded_file_path = "/tmp/" + uploaded_file.filename
uploaded_file.save(uploaded_file_path)
LOGGER.debug("File uploaded :%s", uploaded_file_path)
-
kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
- url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + \
+ if kf_adapter_ip!=None and kf_adapter_port!=None:
+ url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + \
'/pipelineIds/' + pipe_name
description = ''
if 'description' in request.form:
description = request.form['description']
- with open(uploaded_file_path, 'rb') as file:
- files = {'file': file.read()}
+ if uploaded_file_path != None:
+ with open(uploaded_file_path, 'rb') as file:
+ files = {'file': file.read()}
resp = requests.post(url, files=files, data={"description": description})
LOGGER.debug(resp.text)
if uploaded_file_path and os.path.isfile(uploaded_file_path):
LOGGER.debug("Deleting %s", uploaded_file_path)
- os.remove(uploaded_file_path)
+ if uploaded_file_path != None:
+ os.remove(uploaded_file_path)
LOGGER.debug("Responding to Client with %d %s", result_code, result_string)
return APP.response_class(response=json.dumps({'result': result_string}),
status=result_code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route("/pipelines/<pipeline_name>/versions", methods=['GET'])
try:
kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
-
- url = 'http://' + str(kf_adapter_ip) + ':' + str(
+ if kf_adapter_ip!=None and kf_adapter_port!=None :
+ url = 'http://' + str(kf_adapter_ip) + ':' + str(
kf_adapter_port) + '/pipelines/' + pipeline_name + \
'/versions'
LOGGER.debug("URL:" + url)
response = requests.get(url)
- if response.headers['content-type'] != "application/json":
- raise TMException("Kf adapter doesn't sends json type response")
+ if response.headers['content-type'] != MIMETYPE_JSON:
+ raise TMException(ERROR_TYPE_KF_ADAPTER_JSON)
api_response = {"versions_list": response.json()['versions_list']}
response_code = status.HTTP_200_OK
except Exception as err:
LOGGER.error(str(err))
return APP.response_class(response=json.dumps(api_response),
status=response_code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route('/pipelines', methods=['GET'])
try:
kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
- url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/pipelines'
+ if kf_adapter_ip!=None and kf_adapter_port!=None:
+ url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/pipelines'
LOGGER.debug(url)
response = requests.get(url)
- if response.headers['content-type'] != "application/json":
- err_smg = "Kf adapter doesn't sends json type response"
+ if response.headers['content-type'] != MIMETYPE_JSON:
+ err_smg = ERROR_TYPE_KF_ADAPTER_JSON
LOGGER.error(err_smg)
raise TMException(err_smg)
pipeline_names = []
api_response = {"Exception": str(err)}
return APP.response_class(response=json.dumps(api_response),
status=response_code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route('/experiments', methods=['GET'])
try:
kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
- url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/experiments'
+ if kf_adapter_ip!=None and kf_adapter_port!=None:
+ url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/experiments'
LOGGER.debug("url is :" + url)
response = requests.get(url)
- if response.headers['content-type'] != "application/json":
- err_smg = "Kf adapter doesn't sends json type response"
+ if response.headers['content-type'] != MIMETYPE_JSON:
+ err_smg = ERROR_TYPE_KF_ADAPTER_JSON
raise TMException(err_smg)
experiment_names = []
LOGGER.error(str(err))
return APP.response_class(response=json.dumps(api_response),
status=reponse_code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route('/trainingjobs/<trainingjob_name>', methods=['POST', 'PUT']) # Handled in GUI
json_data = request.json
if (request.method == 'POST'):
LOGGER.debug("Create request json : " + json.dumps(json_data))
- isDataAvailable = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ)
- if isDataAvailable:
+ is_data_available = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ)
+ if is_data_available:
response_code = status.HTTP_409_CONFLICT
raise TMException("trainingjob name(" + trainingjob_name + ") is already present in database")
else:
response_code = status.HTTP_201_CREATED
elif(request.method == 'PUT'):
LOGGER.debug("Update request json : " + json.dumps(json_data))
- isDataAvailable = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ)
- if not isDataAvailable:
+ is_data_available = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ)
+ if not is_data_available:
response_code = status.HTTP_404_NOT_FOUND
raise TMException("Trainingjob name(" + trainingjob_name + ") is not present in database")
else:
return APP.response_class(response= json.dumps(api_response),
status= response_code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
@APP.route('/trainingjobs/metadata/<trainingjob_name>')
def get_metadata(trainingjob_name):
api_response = {"Exception":str(err)}
return APP.response_class(response=json.dumps(api_response),
status=response_code,
- mimetype='application/json')
+ mimetype=MIMETYPE_JSON)
def async_feature_engineering_status():
"""
LOGGER.debug("Current DATAEXTRACTION_JOBS_CACHE :" + str(DATAEXTRACTION_JOBS_CACHE))
try:
response = data_extraction_status(trainingjob_name, TRAININGMGR_CONFIG_OBJ)
- if (response.headers['content-type'] != "application/json" or
+ if (response.headers['content-type'] != MIMETYPE_JSON or
response.status_code != status.HTTP_200_OK ):
raise TMException("Data extraction responsed with error status code or invalid content type" + \
"doesn't send json type response (trainingjob " + trainingjob_name + ")")
kf_response = requests.post(url_pipeline_run,
data=json.dumps({"trainingjob_name": trainingjob_name}),
headers={
- 'content-type': 'application/json',
+ 'content-type': MIMETYPE_JSON,
'Accept-Charset': 'UTF-8'
})
- if (kf_response.headers['content-type'] != "application/json" or
+ if (kf_response.headers['content-type'] != MIMETYPE_JSON or
kf_response.status_code != status.HTTP_200_OK ):
raise TMException("KF adapter responsed with error status code or invalid content type" + \
"doesn't send json type response (trainingjob " + trainingjob_name + ")")
TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
try:
if TRAININGMGR_CONFIG_OBJ.is_config_loaded_properly() is False:
- raise Exception("Not all configuration loaded.")
+ raise TMException("Not all configuration loaded.")
LOGGER = TRAININGMGR_CONFIG_OBJ.logger
PS_DB_OBJ = PSDB(TRAININGMGR_CONFIG_OBJ)
LOCK = Lock()
MM_SDK = ModelMetricsSdk()
LOGGER.debug("Starting AIML-WF training manager .....")
APP.run(debug=True, port=int(TRAININGMGR_CONFIG_OBJ.my_port), host='0.0.0.0')
- except Exception as err:
+ except TMException as err:
print("Startup failure" + str(err))