1 # ==================================================================================
3 # Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # ==================================================================================
20 This file contains all rest endpoints exposed by Training manager.
24 from logging import Logger
28 from threading import Lock
30 from flask import Flask, request, send_file
31 from flask_api import status
33 from flask_cors import cross_origin
34 from werkzeug.utils import secure_filename
35 from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
36 from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status, create_dme_filtered_data_job, delete_dme_filtered_data_job
37 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
38 from trainingmgr.common.trainingmgr_util import get_one_word_status, check_trainingjob_data, \
39 check_key_in_dictionary, get_one_key, \
40 response_for_training, get_metrics, \
41 handle_async_feature_engineering_status_exception_case, \
42 validate_trainingjob_name, get_all_pipeline_names_svc, check_featureGroup_data
43 from trainingmgr.common.exceptions_utls import APIException,TMException
44 from trainingmgr.constants.steps import Steps
45 from trainingmgr.constants.states import States
46 from trainingmgr.db.trainingmgr_ps_db import PSDB
47 from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainingjobs, \
48 change_field_of_latest_version, \
49 change_in_progress_to_failed_by_latest_version, change_steps_state_of_latest_version, \
50 get_info_by_version, \
51 get_trainingjob_info_by_name, get_latest_version_trainingjob_name, get_all_versions_info_by_name, \
52 update_model_download_url, add_update_trainingjob, add_featuregroup, \
53 get_field_of_given_version,get_all_jobs_latest_status_version, get_info_of_latest_version, \
54 get_feature_groups_db, get_feature_group_by_name_db, delete_feature_group_by_name, delete_trainingjob_version, change_field_value_by_version
57 TRAININGMGR_CONFIG_OBJ = None
62 DATAEXTRACTION_JOBS_CACHE = None
64 ERROR_TYPE_KF_ADAPTER_JSON = "Kf adapter doesn't sends json type response"
65 ERROR_TYPE_DB_STATUS = "Couldn't update the status as failed in db access"
66 MIMETYPE_JSON = "application/json"
68 @APP.errorhandler(APIException)
71 Return response with error message and error status code.
73 LOGGER.error(err.message)
74 return APP.response_class(response=json.dumps({"Exception": err.message}),
76 mimetype=MIMETYPE_JSON)
79 @APP.route('/trainingjobs/<trainingjob_name>/<version>', methods=['GET'])
81 def get_trainingjob_by_name_version(trainingjob_name, version):
83 Rest endpoint to fetch training job details by name and version
84 <trainingjob_name, version>.
90 version of trainingjob.
107 key-value pairs related to hyper parameters and
108 "trainingjob":<trainingjob_name> key-value pair
110 string indication sql where clause for filtering out features
112 time at which <trainingjob_name, version> trainingjob is created
114 run id from KF adapter for <trainingjob_name, version> trainingjob
116 <trainingjob_name, version> trainingjob's each steps and corresponding state
119 enable_versioning: bool
120 flag for trainingjob versioning
122 time at which <trainingjob_name, version> trainingjob is updated.
124 trainingjob's version
125 pipeline_version: str
128 string indicating datalake source
130 url for downloading model
131 notification_url: str
132 url of notification server
134 _measurement of influx db datalake
136 bucket name of influx db datalake
141 all exception are provided with exception message and HTTP status code.
144 LOGGER.debug("Request to fetch trainingjob by name and version(trainingjob:" + \
145 trainingjob_name + " ,version:" + version + ")")
146 response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
149 results = get_info_by_version(trainingjob_name, version, PS_DB_OBJ)
150 data = get_metrics(trainingjob_name, version, MM_SDK)
152 trainingjob_info = results[0]
154 "trainingjob_name": trainingjob_info[0],
155 "description": trainingjob_info[1],
156 "feature_list": trainingjob_info[2],
157 "pipeline_name": trainingjob_info[3],
158 "experiment_name": trainingjob_info[4],
159 "arguments": json.loads(trainingjob_info[5])['arguments'],
160 "query_filter": trainingjob_info[6],
161 "creation_time": str(trainingjob_info[7]),
162 "run_id": trainingjob_info[8],
163 "steps_state": json.loads(trainingjob_info[9]),
164 "updation_time": str(trainingjob_info[10]),
165 "version": trainingjob_info[11],
166 "enable_versioning": bool(trainingjob_info[12]),
167 "pipeline_version": trainingjob_info[13],
168 "datalake_source": get_one_key(json.loads(trainingjob_info[14])['datalake_source']),
169 "model_url": trainingjob_info[15],
170 "notification_url": trainingjob_info[16],
171 "_measurement": trainingjob_info[17],
172 "bucket": trainingjob_info[18],
175 response_data = {"trainingjob": dict_data}
176 response_code = status.HTTP_200_OK
178 # no need to change status here because given trainingjob_name,version not found in postgres db.
179 response_code = status.HTTP_404_NOT_FOUND
180 raise TMException("Not found given trainingjob with version(trainingjob:" + \
181 trainingjob_name + " version: " + version + ") in database")
182 except Exception as err:
183 LOGGER.error(str(err))
184 response_data = {"Exception": str(err)}
186 return APP.response_class(response=json.dumps(response_data),
187 status=response_code,
188 mimetype=MIMETYPE_JSON)
190 @APP.route('/trainingjobs/<trainingjob_name>/<version>/steps_state', methods=['GET']) # Handled in GUI
192 def get_steps_state(trainingjob_name, version):
194 Function handling rest end points to get steps_state information for
195 given <trainingjob_name, version>.
198 trainingjob_name: str
201 version of trainingjob.
208 DATA_EXTRACTION : str
209 this step captures part
210 starting: immediately after quick success response by data extraction module
211 till: ending of data extraction.
212 DATA_EXTRACTION_AND_TRAINING : str
213 this step captures part
214 starting: immediately after DATA_EXTRACTION is FINISHED
215 till: getting 'scheduled' run status from kf connector
217 this step captures part
218 starting: immediately after DATA_EXTRACTION_AND_TRAINING is FINISHED
219 till: getting 'Succeeded' run status from kf connector
220 TRAINING_AND_TRAINED_MODEL : str
221 this step captures part
222 starting: immediately after TRAINING is FINISHED
223 till: getting version for trainingjob_name trainingjob.
225 this step captures part
226 starting: immediately after TRAINING_AND_TRAINED_MODEL is FINISHED
227 till: model download url is updated in db.
232 all exception are provided with exception message and HTTP status code.
234 LOGGER.debug("Request to get steps_state for (trainingjob:" + \
235 trainingjob_name + " and version: " + version + ")")
237 response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
240 results = get_field_of_given_version(trainingjob_name, version, PS_DB_OBJ, "steps_state")
241 LOGGER.debug("get_field_of_given_version:" + str(results))
243 reponse_data = results[0][0]
244 response_code = status.HTTP_200_OK
247 response_code = status.HTTP_404_NOT_FOUND
248 raise TMException("Not found given trainingjob in database")
249 except Exception as err:
250 LOGGER.error(str(err))
251 reponse_data = {"Exception": str(err)}
253 return APP.response_class(response=reponse_data,
254 status=response_code,
255 mimetype=MIMETYPE_JSON)
257 @APP.route('/model/<trainingjob_name>/<version>/Model.zip', methods=['GET'])
258 def get_model(trainingjob_name, version):
260 Function handling rest endpoint to download model zip file of <trainingjob_name, version> trainingjob.
263 trainingjob_name: str
266 version of trainingjob.
272 zip file of model of <trainingjob_name, version> trainingjob.
275 all exception are provided with exception message and HTTP status code.
278 return send_file(MM_SDK.get_model_zip(trainingjob_name, version), mimetype='application/zip')
280 return {"Exception": "error while downloading model"}, status.HTTP_500_INTERNAL_SERVER_ERROR
283 @APP.route('/trainingjobs/<trainingjob_name>/training', methods=['POST']) # Handled in GUI
285 def training(trainingjob_name):
287 Rest end point to start training job.
288 It calls data extraction module for data extraction and other training steps
291 trainingjob_name: str
299 trainingjob_name: str
302 route of data extraction module for getting data extraction status of
303 given trainingjob_name .
308 all exception are provided with exception message and HTTP status code.
311 LOGGER.debug("Request for training trainingjob %s ", trainingjob_name)
313 response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
315 isDataAvaible = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ)
316 if not isDataAvaible:
317 response_code = status.HTTP_404_NOT_FOUND
318 raise TMException("Given trainingjob name is not present in database" + \
319 "(trainingjob: " + trainingjob_name + ")") from None
322 db_results = get_trainingjob_info_by_name(trainingjob_name, PS_DB_OBJ)
323 feature_list = db_results[0][2]
324 query_filter = db_results[0][6]
325 datalake_source = json.loads(db_results[0][14])['datalake_source']
326 _measurement = db_results[0][17]
327 bucket = db_results[0][18]
329 LOGGER.debug('Starting Data Extraction...')
330 de_response = data_extraction_start(TRAININGMGR_CONFIG_OBJ, trainingjob_name,
331 feature_list, query_filter, datalake_source,
332 _measurement, bucket)
333 if (de_response.status_code == status.HTTP_200_OK ):
334 LOGGER.debug("Response from data extraction for " + \
335 trainingjob_name + " : " + json.dumps(de_response.json()))
336 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
337 Steps.DATA_EXTRACTION.name,
338 States.IN_PROGRESS.name)
340 DATAEXTRACTION_JOBS_CACHE[trainingjob_name] = "Scheduled"
341 response_data = de_response.json()
342 response_code = status.HTTP_200_OK
343 elif( de_response.headers['content-type'] == MIMETYPE_JSON ) :
344 errMsg = "Data extraction responded with error code."
346 json_data = de_response.json()
347 LOGGER.debug(str(json_data))
348 if check_key_in_dictionary(["result"], json_data):
349 response_data = {"Failed":errMsg + json_data["result"]}
351 raise TMException(errMsg)
353 raise TMException("Data extraction doesn't send json type response" + \
354 "(trainingjob name is " + trainingjob_name + ")") from None
355 except Exception as err:
356 response_data = {"Exception": str(err)}
357 LOGGER.debug("Error is training, job name:" + trainingjob_name + str(err))
358 return APP.response_class(response=json.dumps(response_data),status=response_code,
359 mimetype=MIMETYPE_JSON)
361 @APP.route('/trainingjob/dataExtractionNotification', methods=['POST'])
362 def data_extraction_notification():
364 This rest endpoint will be invoked when data extraction is finished.
365 It will further invoke kf-adapter for training, if the response from kf-adapter run_status is "scheduled",
366 that means request is accepted by kf-adapter for futher processing.
372 trainingjob_name: str
383 all exception are provided with exception message and HTTP status code.
385 LOGGER.debug("Data extraction notification...")
386 err_response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
389 if not check_key_in_dictionary(["trainingjob_name"], request.json) :
390 err_msg = "Trainingjob_name key not available in request"
391 Logger.error(err_msg)
392 err_response_code = status.HTTP_400_BAD_REQUEST
393 raise TMException(err_msg)
395 trainingjob_name = request.json["trainingjob_name"]
396 results = get_trainingjob_info_by_name(trainingjob_name, PS_DB_OBJ)
397 arguments = json.loads(results[0][5])['arguments']
398 arguments["version"] = results[0][11]
399 LOGGER.debug(arguments)
402 "pipeline_name": results[0][3], "experiment_name": results[0][4],
403 "arguments": arguments, "pipeline_version": results[0][13]
406 response = training_start(TRAININGMGR_CONFIG_OBJ, dict_data, trainingjob_name)
407 if ( response.headers['content-type'] != MIMETYPE_JSON
408 or response.status_code != status.HTTP_200_OK ):
409 err_msg = "Kf adapter invalid content-type or status_code for " + trainingjob_name
410 raise TMException(err_msg)
412 LOGGER.debug("response from kf_adapter for " + \
413 trainingjob_name + " : " + json.dumps(response.json()))
414 json_data = response.json()
416 if not check_key_in_dictionary(["run_status", "run_id"], json_data):
417 err_msg = "Kf adapter invalid response from , key not present ,run_status or run_id for " + trainingjob_name
418 Logger.error(err_msg)
419 err_response_code = status.HTTP_400_BAD_REQUEST
420 raise TMException(err_msg)
422 if json_data["run_status"] == 'scheduled':
423 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
424 Steps.DATA_EXTRACTION_AND_TRAINING.name,
425 States.FINISHED.name)
426 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
428 States.IN_PROGRESS.name)
429 change_field_of_latest_version(trainingjob_name, PS_DB_OBJ,
430 "run_id", json_data["run_id"])
432 raise TMException("KF Adapter- run_status in not scheduled")
433 except requests.exceptions.ConnectionError as err:
434 err_msg = "Failed to connect KF adapter."
435 LOGGER.error(err_msg)
436 if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) :
437 LOGGER.error(ERROR_TYPE_DB_STATUS)
438 return response_for_training(err_response_code,
439 err_msg + str(err) + "(trainingjob name is " + trainingjob_name + ")",
440 LOGGER, False, trainingjob_name, PS_DB_OBJ, MM_SDK)
441 except Exception as err:
442 LOGGER.error("Failed to handle dataExtractionNotification. " + str(err))
443 if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) :
444 LOGGER.error(ERROR_TYPE_DB_STATUS)
445 return response_for_training(err_response_code,
446 str(err) + "(trainingjob name is " + trainingjob_name + ")",
447 LOGGER, False, trainingjob_name, PS_DB_OBJ, MM_SDK)
449 return APP.response_class(response=json.dumps({"result": "pipeline is scheduled"}),
450 status=status.HTTP_200_OK,
451 mimetype=MIMETYPE_JSON)
454 @APP.route('/trainingjob/pipelineNotification', methods=['POST'])
455 def pipeline_notification():
457 Function handling rest endpoint to get notification from kf_adapter and set model download
458 url in database(if it presents in model db).
460 Args in function: none
462 Required Args in json:
463 trainingjob_name: str
477 all exception are provided with exception message and HTTP status code.
480 LOGGER.debug("Pipeline Notification response from kf_adapter: %s", json.dumps(request.json))
482 check_key_in_dictionary(["trainingjob_name", "run_status"], request.json)
483 trainingjob_name = request.json["trainingjob_name"]
484 run_status = request.json["run_status"]
486 if run_status == 'Succeeded':
487 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
489 States.FINISHED.name)
490 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
491 Steps.TRAINING_AND_TRAINED_MODEL.name,
492 States.IN_PROGRESS.name)
494 version = get_latest_version_trainingjob_name(trainingjob_name, PS_DB_OBJ)
495 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
496 Steps.TRAINING_AND_TRAINED_MODEL.name,
497 States.FINISHED.name)
498 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
499 Steps.TRAINED_MODEL.name,
500 States.IN_PROGRESS.name)
502 if MM_SDK.check_object(trainingjob_name, version, "Model.zip"):
503 model_url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \
504 str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \
505 trainingjob_name + "/" + str(version) + "/Model.zip"
507 update_model_download_url(trainingjob_name, version, model_url, PS_DB_OBJ)
510 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
511 Steps.TRAINED_MODEL.name,
512 States.FINISHED.name)
514 errMsg = "Trained model is not available "
515 LOGGER.error(errMsg + trainingjob_name)
516 raise TMException(errMsg + trainingjob_name)
518 LOGGER.error("Pipeline notification -Training failed " + trainingjob_name)
519 raise TMException("Pipeline not successful for " + \
521 ",request json from kf adapter is: " + json.dumps(request.json))
522 except Exception as err:
523 #Training failure response
524 LOGGER.error("Pipeline notification failed" + str(err))
525 if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) :
526 LOGGER.error(ERROR_TYPE_DB_STATUS)
528 return response_for_training(status.HTTP_500_INTERNAL_SERVER_ERROR,
529 str(err) + " (trainingjob " + trainingjob_name + ")",
530 LOGGER, False, trainingjob_name, PS_DB_OBJ, MM_SDK)
531 #Training success response
532 return response_for_training(status.HTTP_200_OK,
533 "Pipeline notification success.",
534 LOGGER, True, trainingjob_name, PS_DB_OBJ, MM_SDK)
537 @APP.route('/trainingjobs/latest', methods=['GET'])
539 def trainingjobs_operations():
541 Rest endpoint to fetch overall status, latest version of all existing training jobs
543 Args in function: none
544 Required Args in json:
550 list of dictionaries.
552 trainingjob_name: str
557 overall status of end to end flow
562 all exception are provided with exception message and HTTP status code.
564 LOGGER.debug("Request for getting all trainingjobs with latest version and status.")
566 response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
568 results = get_all_jobs_latest_status_version(PS_DB_OBJ)
572 "trainingjob_name": res[0],
574 "overall_status": get_one_word_status(json.loads(res[2]))
576 trainingjobs.append(dict_data)
577 api_response = {"trainingjobs": trainingjobs}
578 response_code = status.HTTP_200_OK
579 except Exception as err:
580 api_response = {"Exception": str(err)}
581 LOGGER.error(str(err))
582 return APP.response_class(response=json.dumps(api_response),
583 status=response_code,
584 mimetype=MIMETYPE_JSON)
586 @APP.route("/pipelines/<pipe_name>/upload", methods=['POST'])
588 def upload_pipeline(pipe_name):
590 Function handling rest endpoint to upload pipeline.
609 all exception are provided with exception message and HTTP status code.
611 LOGGER.debug("Request to upload pipeline.")
614 uploaded_file_path = None
616 LOGGER.debug(str(request))
617 LOGGER.debug(str(request.files))
618 if 'file' in request.files:
619 uploaded_file = request.files['file']
621 result_string = "Didn't get file"
622 raise ValueError("file not found in request.files")
623 pattern = re.compile(r"[a-zA-Z0-9_]+")
624 if not re.fullmatch(pattern, pipe_name):
625 err_msg="the pipeline name is not valid"
626 raise TMException(err_msg)
627 LOGGER.debug("Uploading received for %s", uploaded_file.filename)
628 if uploaded_file.filename != '':
629 uploaded_file_path = "/tmp/" + secure_filename(uploaded_file.filename)
630 uploaded_file.save(uploaded_file_path)
631 LOGGER.debug("File uploaded :%s", uploaded_file_path)
632 kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
633 kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
634 if kf_adapter_ip!=None and kf_adapter_port!=None:
635 url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + \
636 '/pipelineIds/' + pipe_name
639 if 'description' in request.form:
640 description = request.form['description']
641 if uploaded_file_path != None:
642 with open(uploaded_file_path, 'rb') as file:
643 files = {'file': file.read()}
645 resp = requests.post(url, files=files, data={"description": description})
646 LOGGER.debug(resp.text)
647 if resp.status_code == status.HTTP_200_OK:
648 LOGGER.debug("Pipeline uploaded :%s", pipe_name)
649 result_string = "Pipeline uploaded " + pipe_name
650 result_code = status.HTTP_200_OK
652 LOGGER.error(resp.json()["message"])
653 result_string = resp.json()["message"]
654 result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
656 result_string = "File name not found"
657 raise ValueError("filename is not found in request.files")
659 tbk = traceback.format_exc()
661 result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
662 result_string = "Error while uploading pipeline"
664 tbk = traceback.format_exc()
666 result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
667 result_string = "Pipeline name is not of valid format"
669 tbk = traceback.format_exc()
671 result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
672 result_string = "Error while uploading pipeline cause"
674 if uploaded_file_path and os.path.isfile(uploaded_file_path):
675 LOGGER.debug("Deleting %s", uploaded_file_path)
676 if uploaded_file_path != None:
677 os.remove(uploaded_file_path)
679 LOGGER.debug("Responding to Client with %d %s", result_code, result_string)
680 return APP.response_class(response=json.dumps({'result': result_string}),
682 mimetype=MIMETYPE_JSON)
685 @APP.route("/pipelines/<pipeline_name>/versions", methods=['GET'])
687 def get_versions_for_pipeline(pipeline_name):
689 Function handling rest endpoint to get versions of given pipeline name.
701 list containing all versions(as str)
706 all exception are provided with exception message and HTTP status code.
710 LOGGER.debug("Request to get all version for given pipeline(" + pipeline_name + ").")
711 response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
713 pipeline_names=get_all_pipeline_names_svc(TRAININGMGR_CONFIG_OBJ)
714 print(pipeline_names, pipeline_name)
715 for pipeline in pipeline_names:
716 if pipeline == pipeline_name:
717 valid_pipeline=pipeline
719 if valid_pipeline == "":
720 raise TMException("Pipeline name not present")
721 kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
722 kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
723 if kf_adapter_ip!=None and kf_adapter_port!=None :
724 url = 'http://' + str(kf_adapter_ip) + ':' + str(
725 kf_adapter_port) + '/pipelines/' + valid_pipeline + \
727 LOGGER.debug("URL:" + url)
728 response = requests.get(url)
729 if response.headers['content-type'] != MIMETYPE_JSON:
730 raise TMException(ERROR_TYPE_KF_ADAPTER_JSON)
731 api_response = {"versions_list": response.json()['versions_list']}
732 response_code = status.HTTP_200_OK
733 except Exception as err:
734 api_response = {"Exception": str(err)}
735 LOGGER.error(str(err))
736 return APP.response_class(response=json.dumps(api_response),
737 status=response_code,
738 mimetype=MIMETYPE_JSON)
740 @APP.route('/pipelines', methods=['GET'])
742 def get_all_pipeline_names():
744 Function handling rest endpoint to get all pipeline names.
754 pipeline_names : list
755 list containing all pipeline names(as str).
760 all exception are provided with exception message and HTTP status code.
762 LOGGER.debug("Request to get all getting all pipeline names.")
764 response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
766 pipeline_names=get_all_pipeline_names_svc(TRAININGMGR_CONFIG_OBJ)
767 api_response = {"pipeline_names": pipeline_names}
768 response_code = status.HTTP_200_OK
769 except Exception as err:
770 LOGGER.error(str(err))
771 api_response = {"Exception": str(err)}
772 return APP.response_class(response=json.dumps(api_response),status=response_code,mimetype=MIMETYPE_JSON)
774 @APP.route('/experiments', methods=['GET'])
776 def get_all_experiment_names():
778 Function handling rest endpoint to get all experiment names.
788 experiment_names : list
789 list containing all experiment names(as str).
794 all exception are provided with exception message and HTTP status code.
797 LOGGER.debug("request for getting all experiment names is come.")
799 reponse_code = status.HTTP_500_INTERNAL_SERVER_ERROR
801 kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
802 kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
803 if kf_adapter_ip!=None and kf_adapter_port!=None:
804 url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/experiments'
805 LOGGER.debug("url is :" + url)
806 response = requests.get(url)
807 if response.headers['content-type'] != MIMETYPE_JSON:
808 err_smg = ERROR_TYPE_KF_ADAPTER_JSON
809 raise TMException(err_smg)
811 experiment_names = []
812 for experiment in response.json().keys():
813 experiment_names.append(experiment)
814 api_response = {"experiment_names": experiment_names}
815 reponse_code = status.HTTP_200_OK
816 except Exception as err:
817 api_response = {"Exception": str(err)}
818 LOGGER.error(str(err))
819 return APP.response_class(response=json.dumps(api_response),
821 mimetype=MIMETYPE_JSON)
824 @APP.route('/trainingjobs/<trainingjob_name>', methods=['POST', 'PUT']) # Handled in GUI
826 def trainingjob_operations(trainingjob_name):
828 Rest endpoind to create or update trainingjob
829 Precondtion for update : trainingjob's overall_status should be failed
830 or finished and deletion processs should not be in progress
833 trainingjob_name: str
837 if post/put request is called
838 json with below fields are given:
848 key-value pairs related to hyper parameters and
849 "trainingjob":<trainingjob_name> key-value pair
851 string indication sql where clause for filtering out features
852 enable_versioning: bool
853 flag for trainingjob versioning
854 pipeline_version: str
857 string indicating datalake source
859 _measurement for influx db datalake
861 bucket name for influx db datalake
878 All exception are provided with exception message and HTTP status code.
881 response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
882 LOGGER.debug("Training job create/update request(trainingjob name %s) ", trainingjob_name )
884 json_data = request.json
885 if (request.method == 'POST'):
886 LOGGER.debug("Create request json : " + json.dumps(json_data))
887 is_data_available = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ)
888 if is_data_available:
889 response_code = status.HTTP_409_CONFLICT
890 raise TMException("trainingjob name(" + trainingjob_name + ") is already present in database")
892 (feature_list, description, pipeline_name, experiment_name,
893 arguments, query_filter, enable_versioning, pipeline_version,
894 datalake_source, _measurement, bucket) = \
895 check_trainingjob_data(trainingjob_name, json_data)
896 add_update_trainingjob(description, pipeline_name, experiment_name, feature_list,
897 arguments, query_filter, True, enable_versioning,
898 pipeline_version, datalake_source, trainingjob_name,
899 PS_DB_OBJ, _measurement=_measurement,
901 api_response = {"result": "Information stored in database."}
902 response_code = status.HTTP_201_CREATED
903 elif(request.method == 'PUT'):
904 LOGGER.debug("Update request json : " + json.dumps(json_data))
905 is_data_available = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ)
906 if not is_data_available:
907 response_code = status.HTTP_404_NOT_FOUND
908 raise TMException("Trainingjob name(" + trainingjob_name + ") is not present in database")
911 results = get_trainingjob_info_by_name(trainingjob_name, PS_DB_OBJ)
914 raise TMException("Failed to process request for trainingjob(" + trainingjob_name + ") " + \
915 " deletion in progress")
916 if (get_one_word_status(json.loads(results[0][9]))
917 not in [States.FAILED.name, States.FINISHED.name]):
918 raise TMException("Trainingjob(" + trainingjob_name + ") is not in finished or failed status")
920 (feature_list, description, pipeline_name, experiment_name,
921 arguments, query_filter, enable_versioning, pipeline_version,
922 datalake_source, _measurement, bucket) = check_trainingjob_data(trainingjob_name, json_data)
924 add_update_trainingjob(description, pipeline_name, experiment_name, feature_list,
925 arguments, query_filter, False, enable_versioning,
926 pipeline_version, datalake_source, trainingjob_name, PS_DB_OBJ, _measurement=_measurement,
928 api_response = {"result": "Information updated in database."}
929 response_code = status.HTTP_200_OK
930 except Exception as err:
931 LOGGER.error("Failed to create/update training job, " + str(err) )
932 api_response = {"Exception": str(err)}
934 return APP.response_class(response= json.dumps(api_response),
935 status= response_code,
936 mimetype=MIMETYPE_JSON)
938 @APP.route('/trainingjobs/retraining', methods=['POST'])
942 Function handling rest endpoint to retrain trainingjobs in request json. trainingjob's
943 overall_status should be failed or finished and its deletion_in_progress should be False
944 otherwise retraining of that trainingjob is counted in failure.
945 Args in function: none
946 Required Args in json:
947 trainingjobs_list: list
948 list containing dictionaries
952 notification_url(optional): str
954 feature_filter(optional): str
959 successful retraining count
961 failure retraining count
962 status: HTTP status code 200
964 all exception are provided with exception message and HTTP status code.
966 LOGGER.debug('request comes for retraining, ' + json.dumps(request.json))
968 check_key_in_dictionary(["trainingjobs_list"], request.json)
969 except Exception as err:
970 raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
972 trainingjobs_list = request.json['trainingjobs_list']
973 if not isinstance(trainingjobs_list, list):
974 raise APIException(status.HTTP_400_BAD_REQUEST, "not given as list")
976 for obj in trainingjobs_list:
978 check_key_in_dictionary(["trainingjob_name"], obj)
979 except Exception as err:
980 raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
982 not_possible_to_retrain = []
983 possible_to_retrain = []
985 for obj in trainingjobs_list:
986 trainingjob_name = obj['trainingjob_name']
989 results = get_info_of_latest_version(trainingjob_name, PS_DB_OBJ)
990 except Exception as err:
991 not_possible_to_retrain.append(trainingjob_name)
992 LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ")")
998 not_possible_to_retrain.append(trainingjob_name)
999 LOGGER.debug("Failed to retrain because deletion in progress" + \
1000 "(trainingjob_name is " + trainingjob_name + ")")
1003 if (get_one_word_status(json.loads(results[0][9]))
1004 not in [States.FINISHED.name, States.FAILED.name]):
1005 not_possible_to_retrain.append(trainingjob_name)
1006 LOGGER.debug("Not finished or not failed status" + \
1007 "(trainingjob_name is " + trainingjob_name + ")")
1010 enable_versioning = results[0][12]
1011 pipeline_version = results[0][13]
1012 description = results[0][1]
1013 pipeline_name = results[0][3]
1014 experiment_name = results[0][4]
1015 feature_list = results[0][2]
1016 arguments = json.loads(results[0][5])['arguments']
1017 query_filter = results[0][6]
1018 datalake_source = get_one_key(json.loads(results[0][14])["datalake_source"])
1019 _measurement = results[0][17]
1020 bucket = results[0][18]
1022 notification_url = ""
1023 if "notification_url" in obj:
1024 notification_url = obj['notification_url']
1026 if "feature_filter" in obj:
1027 query_filter = obj['feature_filter']
1030 add_update_trainingjob(description, pipeline_name, experiment_name,
1031 feature_list, arguments, query_filter, False,
1032 enable_versioning, pipeline_version,
1033 datalake_source, trainingjob_name, PS_DB_OBJ,
1034 notification_url, _measurement, bucket)
1035 except Exception as err:
1036 not_possible_to_retrain.append(trainingjob_name)
1037 LOGGER.debug(str(err) + "(training job is " + trainingjob_name + ")")
1040 url = 'http://' + str(TRAININGMGR_CONFIG_OBJ.my_ip) + \
1041 ':' + str(TRAININGMGR_CONFIG_OBJ.my_port) + \
1042 '/trainingjobs/' +trainingjob_name + '/training'
1043 response = requests.post(url)
1045 if response.status_code == status.HTTP_200_OK:
1046 possible_to_retrain.append(trainingjob_name)
1048 LOGGER.debug("not 200 response" + "(trainingjob_name is " + trainingjob_name + ")")
1049 not_possible_to_retrain.append(trainingjob_name)
1052 LOGGER.debug("not present in postgres db" + "(trainingjob_name is " + trainingjob_name + ")")
1053 not_possible_to_retrain.append(trainingjob_name)
1055 LOGGER.debug('success list: ' + str(possible_to_retrain))
1056 LOGGER.debug('failure list: ' + str(not_possible_to_retrain))
1058 return APP.response_class(response=json.dumps( \
1060 "success count": len(possible_to_retrain),
1061 "failure count": len(not_possible_to_retrain)
1063 status=status.HTTP_200_OK,
1064 mimetype='application/json')
1066 @APP.route('/trainingjobs', methods=['DELETE'])
1068 def delete_list_of_trainingjob_version():
1070 Function handling rest endpoint to delete latest version of trainingjob_name trainingjobs which is
1071 given in request json. trainingjob's overall_status should be failed or finished and its
1072 deletion_in_progress should be False otherwise deletion of that trainingjobs is counted in failure.
1073 Args in function: none
1074 Required Args in json:
1076 list containing dictionaries.
1078 trainingjob_name: str
1081 version of trainingjob
1085 successful deletion count
1087 failure deletion count
1089 HTTP status code 200
1091 all exception are provided with exception message and HTTP status code.
1093 LOGGER.debug('request comes for deleting:' + json.dumps(request.json))
1095 check_key_in_dictionary(["list"], request.json)
1096 except Exception as err:
1097 raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
1099 list_of_trainingjob_version = request.json['list']
1100 if not isinstance(list_of_trainingjob_version, list):
1101 raise APIException(status.HTTP_400_BAD_REQUEST, "not given as list")
1103 not_possible_to_delete = []
1104 possible_to_delete = []
1106 for my_dict in list_of_trainingjob_version:
1108 if not isinstance(my_dict, dict):
1109 not_possible_to_delete.append(my_dict)
1110 LOGGER.debug(str(my_dict) + "did not pass dictionary")
1114 check_key_in_dictionary(["trainingjob_name", "version"], my_dict)
1115 except Exception as err:
1116 not_possible_to_delete.append(my_dict)
1117 LOGGER.debug(str(err))
1120 trainingjob_name = my_dict['trainingjob_name']
1121 version = my_dict['version']
1125 results = get_info_by_version(trainingjob_name, version, PS_DB_OBJ)
1126 except Exception as err:
1127 not_possible_to_delete.append(my_dict)
1128 LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ", version is " + str(
1135 not_possible_to_delete.append(my_dict)
1136 LOGGER.debug("Failed to process deletion request because deletion is " + \
1137 "already in progress" + \
1138 "(trainingjob_name is " + trainingjob_name + ", version is " + str(
1142 if (get_one_word_status(json.loads(results[0][9]))
1143 not in [States.FINISHED.name, States.FAILED.name]):
1144 not_possible_to_delete.append(my_dict)
1145 LOGGER.debug("Not finished or not failed status" + \
1146 "(usecase_name is " + trainingjob_name + ", version is " + str(
1151 change_field_value_by_version(trainingjob_name, version, PS_DB_OBJ,
1152 "deletion_in_progress", True)
1153 except Exception as err:
1154 not_possible_to_delete.append(my_dict)
1155 LOGGER.debug(str(err) + "(usecase_name is " + trainingjob_name + \
1156 ", version is " + str(version) + ")")
1161 if MM_SDK.is_bucket_present(trainingjob_name):
1162 deleted = MM_SDK.delete_model_metric(trainingjob_name, version)
1163 except Exception as err:
1164 not_possible_to_delete.append(my_dict)
1165 LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + \
1166 ", version is " + str(version) + ")")
1170 not_possible_to_delete.append(my_dict)
1174 delete_trainingjob_version(trainingjob_name, version, PS_DB_OBJ)
1175 except Exception as err:
1176 not_possible_to_delete.append(my_dict)
1177 LOGGER.debug(str(err) + "(trainingjob_name is " + \
1178 trainingjob_name + ", version is " + str(version) + ")")
1181 possible_to_delete.append(my_dict)
1184 not_possible_to_delete.append(my_dict)
1185 LOGGER.debug("not find in postgres db" + "(trainingjob_name is " + \
1186 trainingjob_name + ", version is " + str(version) + ")")
1188 LOGGER.debug('success list: ' + str(possible_to_delete))
1189 LOGGER.debug('failure list: ' + str(not_possible_to_delete))
1191 return APP.response_class(response=json.dumps( \
1193 "success count": len(possible_to_delete),
1194 "failure count": len(not_possible_to_delete)
1196 status=status.HTTP_200_OK,
1197 mimetype='application/json')
1199 @APP.route('/trainingjobs/metadata/<trainingjob_name>')
1200 def get_metadata(trainingjob_name):
1202 Function handling rest endpoint to get accuracy, version and model download url for all
1203 versions of given trainingjob_name which has overall_state FINISHED and
1204 deletion_in_progress is False
1207 trainingjob_name: str
1208 name of trainingjob.
1215 Successed metadata : list
1216 list containes dictionaries.
1217 dictionary containts
1221 version of trainingjob
1223 url for downloading model
1225 HTTP status code 200
1228 all exception are provided with exception message and HTTP status code.
1231 LOGGER.debug("Request metadata for trainingjob(name of trainingjob is %s) ", trainingjob_name)
1233 response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
1235 results = get_all_versions_info_by_name(trainingjob_name, PS_DB_OBJ)
1238 for trainingjob_info in results:
1239 if (get_one_word_status(json.loads(trainingjob_info[9])) == States.FINISHED.name and
1240 not trainingjob_info[19]):
1242 LOGGER.debug("Downloading metric for " +trainingjob_name )
1243 data = get_metrics(trainingjob_name, trainingjob_info[11], MM_SDK)
1244 url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \
1245 str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \
1246 trainingjob_name + "/" + str(trainingjob_info[11]) + "/Model.zip"
1249 "version": trainingjob_info[11],
1252 info_list.append(dict_data)
1254 api_response = {"Successed metadata": info_list}
1255 response_code = status.HTTP_200_OK
1257 err_msg = "Not found given trainingjob name-" + trainingjob_name
1258 LOGGER.error(err_msg)
1259 response_code = status.HTTP_404_NOT_FOUND
1260 api_response = {"Exception":err_msg}
1261 except Exception as err:
1262 LOGGER.error(str(err))
1263 api_response = {"Exception":str(err)}
1264 return APP.response_class(response=json.dumps(api_response),
1265 status=response_code,
1266 mimetype=MIMETYPE_JSON)
1268 @APP.route('/featureGroup', methods=['POST'])
1270 def create_feature_group():
1272 Rest endpoint to create feature group
1278 json with below fields are given:
1279 featureGroupName: str
1284 whether to enable dme
1295 datalake_source: str
1296 string indicating datalake source
1298 token for the bucket
1306 HTTP status code 201
1312 HTTP status code 200
1315 All exception are provided with exception message and HTTP status code."""
1318 response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
1319 LOGGER.debug('feature Group Create request, ' + json.dumps(request.json))
1322 json_data=request.json
1323 (featureGroup_name, features, datalake_source, enable_Dme, dme_host, dme_port, bucket, token, source_name,db_org)=check_featureGroup_data(json_data)
1324 # check the data conformance
1325 if len(featureGroup_name) < 3 or len(featureGroup_name) > 63:
1326 api_response = {"Exception": "Failed to create the feature group since feature group name must be between 3 and 63 characters long."}
1327 response_code = status.HTTP_400_BAD_REQUEST
1329 # the features are stored in string format in the db, and has to be passed as list of feature to the dme. Hence the conversion.
1330 features_list = features.split(",")
1331 add_featuregroup(featureGroup_name, features, datalake_source, enable_Dme, PS_DB_OBJ,dme_host, dme_port, bucket, token, source_name,db_org )
1332 if enable_Dme == True :
1333 response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket, token, features_list, featureGroup_name, dme_host, dme_port)
1334 if response.status_code != 201:
1335 api_response={"Exception": "Cannot create dme job"}
1336 delete_feature_group_by_name(PS_DB_OBJ, featureGroup_name)
1337 response_code=status.HTTP_400_BAD_REQUEST
1339 api_response={"result": "Feature Group Created"}
1340 response_code =status.HTTP_200_OK
1342 api_response={"result": "Feature Group Created"}
1343 response_code =status.HTTP_200_OK
1344 except Exception as err:
1345 delete_feature_group_by_name(PS_DB_OBJ, featureGroup_name)
1346 err_msg = "Failed to create the feature Group "
1347 api_response = {"Exception":err_msg}
1348 LOGGER.error(str(err))
1350 return APP.response_class(response=json.dumps(api_response),
1351 status=response_code,
1352 mimetype=MIMETYPE_JSON)
1354 @APP.route('/featureGroup', methods=['GET'])
1356 def get_feature_group():
1358 Rest endpoint to fetch all the feature groups
1360 Args in function: none
1361 Required Args in json:
1367 list of dictionaries.
1368 dictionaries contains:
1369 featuregroup_name: str
1370 name of feature group
1376 whether to enable dme
1379 LOGGER.debug("Request for getting all feature groups")
1381 response_code=status.HTTP_500_INTERNAL_SERVER_ERROR
1383 result= get_feature_groups_db(PS_DB_OBJ)
1387 "featuregroup_name": res[0],
1392 feature_groups.append(dict_data)
1393 api_response={"featuregroups":feature_groups}
1394 response_code=status.HTTP_200_OK
1396 except Exception as err:
1397 api_response = {"Exception": str(err)}
1398 LOGGER.error(str(err))
1399 return APP.response_class(response=json.dumps(api_response),
1400 status=response_code,
1401 mimetype=MIMETYPE_JSON)
1403 @APP.route('/featureGroup/<featuregroup_name>', methods=['GET'])
1405 def get_feature_group_by_name(featuregroup_name):
1407 Rest endpoint to fetch a feature group
1410 featuregroup_name: str
1411 name of featuregroup_name.
1417 featuregroup_name: str
1418 name of featuregroup
1424 whether dme enabled or not
1432 token for the bucket
1438 HTTP status code 200
1441 all exception are provided with exception message and HTTP status code.
1444 LOGGER.debug("Request for getting a feature group with name = "+ featuregroup_name)
1446 response_code=status.HTTP_500_INTERNAL_SERVER_ERROR
1448 result= get_feature_group_by_name_db(PS_DB_OBJ, featuregroup_name)
1452 features=res[1].split(",")
1454 "featuregroup_name": res[0],
1455 "features": features,
1462 "source_name":res[8],
1465 feature_group.append(dict_data)
1466 api_response={"featuregroup":feature_group}
1467 response_code=status.HTTP_200_OK
1469 response_code=status.HTTP_404_NOT_FOUND
1470 raise TMException("Failed to fetch feature group info from db")
1471 except Exception as err:
1472 api_response = {"Exception": str(err)}
1473 LOGGER.error(str(err))
1474 return APP.response_class(response=json.dumps(api_response),
1475 status=response_code,
1476 mimetype=MIMETYPE_JSON)
1478 @APP.route('/featureGroup', methods=['DELETE'])
1480 def delete_list_of_feature_group():
1482 Function handling rest endpoint to delete featureGroup which is
1483 given in request json.
1485 Args in function: none
1486 Required Args in json:
1488 list containing dictionaries.
1490 featuregroup_name: str
1496 successful deletion count
1498 failure deletion count
1500 HTTP status code 200
1502 all exception are provided with exception message and HTTP status code.
1504 LOGGER.debug('request comes for deleting:' + json.dumps(request.json))
1506 check_key_in_dictionary(["featuregroups_list"], request.json)
1507 except Exception as err:
1508 LOGGER.debug("exception in check_key_in_dictionary")
1509 raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
1511 list_of_feature_groups = request.json['featuregroups_list']
1512 if not isinstance(list_of_feature_groups, list):
1513 LOGGER.debug("exception in not instance")
1514 raise APIException(status.HTTP_400_BAD_REQUEST, "not given as list")
1516 not_possible_to_delete = []
1517 possible_to_delete = []
1519 for my_dict in list_of_feature_groups:
1520 if not isinstance(my_dict, dict):
1521 not_possible_to_delete.append(my_dict)
1522 LOGGER.debug(str(my_dict) + "did not pass dictionary")
1526 check_key_in_dictionary(["featureGroup_name"], my_dict)
1527 except Exception as err:
1528 not_possible_to_delete.append(my_dict)
1529 LOGGER.debug(str(err))
1532 featuregroup_name = my_dict['featureGroup_name']
1535 results = get_feature_group_by_name_db(PS_DB_OBJ, featuregroup_name)
1536 except Exception as err:
1537 not_possible_to_delete.append(my_dict)
1538 LOGGER.debug(str(err) + "(featureGroup_name is " + featuregroup_name)
1544 delete_feature_group_by_name(PS_DB_OBJ, featuregroup_name)
1546 dme_host=results[0][4]
1547 dme_port=results[0][5]
1548 delete_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, featuregroup_name, dme_host, dme_port)
1549 possible_to_delete.append(my_dict)
1550 except Exception as err:
1551 not_possible_to_delete.append(my_dict)
1552 LOGGER.debug(str(err) + "(featuregroup_name is "+ featuregroup_name + ")")
1555 not_possible_to_delete.append(my_dict)
1556 LOGGER.debug("cannot find in postgres db" + "(featuregroup_name is " + \
1557 featuregroup_name + ")")
1559 LOGGER.debug('success list: ' + str(possible_to_delete))
1560 LOGGER.debug('failure list: ' + str(not_possible_to_delete))
1562 return APP.response_class(response=json.dumps( \
1564 "success count": len(possible_to_delete),
1565 "failure count": len(not_possible_to_delete)
1567 status=status.HTTP_200_OK,
1568 mimetype='application/json')
1570 def async_feature_engineering_status():
1572 This function takes trainingjobs from DATAEXTRACTION_JOBS_CACHE and checks data extraction status
1573 (using data extraction api) for those trainingjobs, if status is Completed then it calls
1574 /trainingjob/dataExtractionNotification route for those trainingjobs.
1576 url_pipeline_run = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + \
1577 ":" + str(TRAININGMGR_CONFIG_OBJ.my_port) + \
1578 "/trainingjob/dataExtractionNotification"
1581 fjc = list(DATAEXTRACTION_JOBS_CACHE)
1582 for trainingjob_name in fjc:
1583 LOGGER.debug("Current DATAEXTRACTION_JOBS_CACHE :" + str(DATAEXTRACTION_JOBS_CACHE))
1585 response = data_extraction_status(trainingjob_name, TRAININGMGR_CONFIG_OBJ)
1586 if (response.headers['content-type'] != MIMETYPE_JSON or
1587 response.status_code != status.HTTP_200_OK ):
1588 raise TMException("Data extraction responsed with error status code or invalid content type" + \
1589 "doesn't send json type response (trainingjob " + trainingjob_name + ")")
1590 response = response.json()
1591 LOGGER.debug("Data extraction status response for " + \
1592 trainingjob_name + " " + json.dumps(response))
1594 if response["task_status"] == "Completed":
1595 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
1596 Steps.DATA_EXTRACTION.name,
1597 States.FINISHED.name)
1598 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
1599 Steps.DATA_EXTRACTION_AND_TRAINING.name,
1600 States.IN_PROGRESS.name)
1601 kf_response = requests.post(url_pipeline_run,
1602 data=json.dumps({"trainingjob_name": trainingjob_name}),
1604 'content-type': MIMETYPE_JSON,
1605 'Accept-Charset': 'UTF-8'
1607 if (kf_response.headers['content-type'] != MIMETYPE_JSON or
1608 kf_response.status_code != status.HTTP_200_OK ):
1609 raise TMException("KF adapter responsed with error status code or invalid content type" + \
1610 "doesn't send json type response (trainingjob " + trainingjob_name + ")")
1612 DATAEXTRACTION_JOBS_CACHE.pop(trainingjob_name)
1613 elif response["task_status"] == "Error":
1614 raise TMException("Data extraction has failed for " + trainingjob_name)
1615 except Exception as err:
1616 LOGGER.error("Failure during procesing of DATAEXTRACTION_JOBS_CACHE," + str(err))
1617 """ Job will be removed from DATAEXTRACTION_JOBS_CACHE in handle_async
1618 There might be some further error during handling of exception
1620 handle_async_feature_engineering_status_exception_case(LOCK,
1621 DATAEXTRACTION_JOBS_CACHE,
1622 status.HTTP_500_INTERNAL_SERVER_ERROR,
1623 str(err) + "(trainingjob name is " + trainingjob_name + ")",
1624 LOGGER, False, trainingjob_name,
1627 #Wait and fetch latest list of trainingjobs
1630 if __name__ == "__main__":
1631 TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
1633 if TRAININGMGR_CONFIG_OBJ.is_config_loaded_properly() is False:
1634 raise TMException("Not all configuration loaded.")
1635 LOGGER = TRAININGMGR_CONFIG_OBJ.logger
1636 PS_DB_OBJ = PSDB(TRAININGMGR_CONFIG_OBJ)
1638 DATAEXTRACTION_JOBS_CACHE = get_data_extraction_in_progress_trainingjobs(PS_DB_OBJ)
1639 threading.Thread(target=async_feature_engineering_status, daemon=True).start()
1640 MM_SDK = ModelMetricsSdk()
1641 LOGGER.debug("Starting AIML-WF training manager .....")
1642 APP.run(debug=True, port=int(TRAININGMGR_CONFIG_OBJ.my_port), host='0.0.0.0')
1643 except TMException as err:
1644 print("Startup failure" + str(err))