Check the feature group name length before adding to db
[aiml-fw/awmf/tm.git] / trainingmgr / trainingmgr_main.py
1 # ==================================================================================
2 #
3 #       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 # ==================================================================================
18
19 """"
20 This file contains all rest endpoints exposed by Training manager.
21 """
22 import json
23 import re
24 from logging import Logger
25 import os
26 import traceback
27 import threading
28 from threading import Lock
29 import time
30 from flask import Flask, request, send_file
31 from flask_api import status
32 import requests
33 from flask_cors import cross_origin
34 from werkzeug.utils import secure_filename
35 from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
36 from trainingmgr.common.trainingmgr_operations import data_extraction_start, training_start, data_extraction_status, create_dme_filtered_data_job, delete_dme_filtered_data_job
37 from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
38 from trainingmgr.common.trainingmgr_util import get_one_word_status, check_trainingjob_data, \
39     check_key_in_dictionary, get_one_key, \
40     response_for_training, get_metrics, \
41     handle_async_feature_engineering_status_exception_case, \
42     validate_trainingjob_name, get_all_pipeline_names_svc, check_featureGroup_data
43 from trainingmgr.common.exceptions_utls import APIException,TMException
44 from trainingmgr.constants.steps import Steps
45 from trainingmgr.constants.states import States
46 from trainingmgr.db.trainingmgr_ps_db import PSDB
47 from trainingmgr.db.common_db_fun import get_data_extraction_in_progress_trainingjobs, \
48     change_field_of_latest_version, \
49     change_in_progress_to_failed_by_latest_version, change_steps_state_of_latest_version, \
50     get_info_by_version, \
51     get_trainingjob_info_by_name, get_latest_version_trainingjob_name, get_all_versions_info_by_name, \
52     update_model_download_url, add_update_trainingjob, add_featuregroup, \
53     get_field_of_given_version,get_all_jobs_latest_status_version, get_info_of_latest_version, \
54     get_feature_groups_db, get_feature_group_by_name_db, delete_feature_group_by_name, delete_trainingjob_version, change_field_value_by_version
55
56 APP = Flask(__name__)
57 TRAININGMGR_CONFIG_OBJ = None
58 PS_DB_OBJ = None
59 LOGGER = None
60 MM_SDK = None
61 LOCK = None
62 DATAEXTRACTION_JOBS_CACHE = None
63
64 ERROR_TYPE_KF_ADAPTER_JSON = "Kf adapter doesn't sends json type response"
65 ERROR_TYPE_DB_STATUS = "Couldn't update the status as failed in db access"
66 MIMETYPE_JSON = "application/json"
67
68 @APP.errorhandler(APIException)
69 def error(err):
70     """
71     Return response with error message and error status code.
72     """
73     LOGGER.error(err.message)
74     return APP.response_class(response=json.dumps({"Exception": err.message}),
75                               status=err.code,
76                               mimetype=MIMETYPE_JSON)
77
78
79 @APP.route('/trainingjobs/<trainingjob_name>/<version>', methods=['GET'])
80 @cross_origin()
81 def get_trainingjob_by_name_version(trainingjob_name, version):
82     """
83     Rest endpoint to fetch training job details by name and version
84     <trainingjob_name, version>.
85
86     Args in function:
87         trainingjob_name: str
88             name of trainingjob.
89         version: int
90             version of trainingjob.
91
92     Returns:
93         json:
94             trainingjob: dict
95                      dictionary contains
96                          trainingjob_name: str
97                              name of trainingjob
98                          description: str
99                              description
100                          feature_list: str
101                              feature names
102                          pipeline_name: str
103                              name of pipeline
104                          experiment_name: str
105                              name of experiment
106                          arguments: dict
107                              key-value pairs related to hyper parameters and
108                              "trainingjob":<trainingjob_name> key-value pair
109                          query_filter: str
110                              string indication sql where clause for filtering out features
111                          creation_time: str
112                              time at which <trainingjob_name, version> trainingjob is created
113                          run_id: str
114                              run id from KF adapter for <trainingjob_name, version> trainingjob
115                          steps_state: dict
116                              <trainingjob_name, version> trainingjob's each steps and corresponding state
117                          accuracy: str
118                              metrics of model
119                          enable_versioning: bool
120                              flag for trainingjob versioning
121                          updation_time: str
122                              time at which <trainingjob_name, version> trainingjob is updated.
123                          version: int
124                              trainingjob's version
125                          pipeline_version: str
126                              pipeline version
127                         datalake_source: str
128                              string indicating datalake source
129                         model_url: str
130                              url for downloading model
131                         notification_url: str
132                              url of notification server
133                         _measurement: str
134                              _measurement of influx db datalake
135                         bucket: str
136                              bucket name of influx db datalake
137         status code:
138             HTTP status code 200
139
140     Exceptions:
141         all exception are provided with exception message and HTTP status code.
142
143     """
144     LOGGER.debug("Request to fetch trainingjob by name and version(trainingjob:" + \
145                  trainingjob_name + " ,version:" + version + ")")
146     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
147     response_data = {}
148     try:
149         results = get_info_by_version(trainingjob_name, version, PS_DB_OBJ)
150         data = get_metrics(trainingjob_name, version, MM_SDK)
151         if results:
152             trainingjob_info = results[0]
153             dict_data = {
154                 "trainingjob_name": trainingjob_info[0],
155                 "description": trainingjob_info[1],
156                 "feature_list": trainingjob_info[2],
157                 "pipeline_name": trainingjob_info[3],
158                 "experiment_name": trainingjob_info[4],
159                 "arguments": json.loads(trainingjob_info[5])['arguments'],
160                 "query_filter": trainingjob_info[6],
161                 "creation_time": str(trainingjob_info[7]),
162                 "run_id": trainingjob_info[8],
163                 "steps_state": json.loads(trainingjob_info[9]),
164                 "updation_time": str(trainingjob_info[10]),
165                 "version": trainingjob_info[11],
166                 "enable_versioning": bool(trainingjob_info[12]),
167                 "pipeline_version": trainingjob_info[13],
168                 "datalake_source": get_one_key(json.loads(trainingjob_info[14])['datalake_source']),
169                 "model_url": trainingjob_info[15],
170                 "notification_url": trainingjob_info[16],
171                 "_measurement": trainingjob_info[17],
172                 "bucket": trainingjob_info[18],
173                 "accuracy": data
174             }
175             response_data = {"trainingjob": dict_data}
176             response_code = status.HTTP_200_OK
177         else:
178             # no need to change status here because given trainingjob_name,version not found in postgres db.
179             response_code = status.HTTP_404_NOT_FOUND
180             raise TMException("Not found given trainingjob with version(trainingjob:" + \
181                             trainingjob_name + " version: " + version + ") in database")
182     except Exception as err:
183         LOGGER.error(str(err))
184         response_data = {"Exception": str(err)}
185         
186     return APP.response_class(response=json.dumps(response_data),
187                                         status=response_code,
188                                         mimetype=MIMETYPE_JSON)
189
190 @APP.route('/trainingjobs/<trainingjob_name>/<version>/steps_state', methods=['GET']) # Handled in GUI
191 @cross_origin()
192 def get_steps_state(trainingjob_name, version):
193     """
194     Function handling rest end points to get steps_state information for
195     given <trainingjob_name, version>.
196
197     Args in function:
198         trainingjob_name: str
199             name of trainingjob.
200         version: int
201             version of trainingjob.
202
203     Args in json:
204         not required json
205
206     Returns:
207         json:
208             DATA_EXTRACTION : str
209                 this step captures part
210                     starting: immediately after quick success response by data extraction module
211                     till: ending of data extraction.
212             DATA_EXTRACTION_AND_TRAINING : str
213                 this step captures part
214                     starting: immediately after DATA_EXTRACTION is FINISHED
215                     till: getting 'scheduled' run status from kf connector
216             TRAINING : str
217                 this step captures part
218                     starting: immediately after DATA_EXTRACTION_AND_TRAINING is FINISHED
219                     till: getting 'Succeeded' run status from kf connector
220             TRAINING_AND_TRAINED_MODEL : str
221                 this step captures part
222                     starting: immediately after TRAINING is FINISHED
223                     till: getting version for trainingjob_name trainingjob.
224             TRAINED_MODEL : str
225                 this step captures part
226                     starting: immediately after TRAINING_AND_TRAINED_MODEL is FINISHED
227                     till: model download url is updated in db.
228         status code:
229             HTTP status code 200
230
231     Exceptions:
232         all exception are provided with exception message and HTTP status code.
233     """
234     LOGGER.debug("Request to get steps_state for (trainingjob:" + \
235                  trainingjob_name + " and version: " + version + ")")
236     reponse_data = {}
237     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
238
239     try:
240         results = get_field_of_given_version(trainingjob_name, version, PS_DB_OBJ, "steps_state")
241         LOGGER.debug("get_field_of_given_version:" + str(results))
242         if results:
243             reponse_data = results[0][0]
244             response_code = status.HTTP_200_OK
245         else:
246              
247             response_code = status.HTTP_404_NOT_FOUND
248             raise TMException("Not found given trainingjob in database")
249     except Exception as err:
250         LOGGER.error(str(err))
251         reponse_data = {"Exception": str(err)}
252
253     return APP.response_class(response=reponse_data,
254                                       status=response_code,
255                                       mimetype=MIMETYPE_JSON)
256
257 @APP.route('/model/<trainingjob_name>/<version>/Model.zip', methods=['GET'])
258 def get_model(trainingjob_name, version):
259     """
260     Function handling rest endpoint to download model zip file of <trainingjob_name, version> trainingjob.
261
262     Args in function:
263         trainingjob_name: str
264             name of trainingjob.
265         version: int
266             version of trainingjob.
267
268     Args in json:
269         not required json
270
271     Returns:
272         zip file of model of <trainingjob_name, version> trainingjob.
273
274     Exceptions:
275         all exception are provided with exception message and HTTP status code.
276     """
277     try:
278         return send_file(MM_SDK.get_model_zip(trainingjob_name, version), mimetype='application/zip')
279     except Exception:
280         return {"Exception": "error while downloading model"}, status.HTTP_500_INTERNAL_SERVER_ERROR
281
282
283 @APP.route('/trainingjobs/<trainingjob_name>/training', methods=['POST']) # Handled in GUI
284 @cross_origin()
285 def training(trainingjob_name):
286     """
287     Rest end point to start training job.
288     It calls data extraction module for data extraction and other training steps
289
290     Args in function:
291         trainingjob_name: str
292             name of trainingjob.
293
294     Args in json:
295         not required json
296
297     Returns:
298         json:
299             trainingjob_name: str
300                 name of trainingjob
301             result: str
302                 route of data extraction module for getting data extraction status of
303                 given trainingjob_name .
304         status code:
305             HTTP status code 200
306
307     Exceptions:
308         all exception are provided with exception message and HTTP status code.
309     """
310
311     LOGGER.debug("Request for training trainingjob  %s ", trainingjob_name)
312     response_data = {}
313     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
314     try:
315         isDataAvaible = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ)
316         if not isDataAvaible:
317             response_code = status.HTTP_404_NOT_FOUND
318             raise TMException("Given trainingjob name is not present in database" + \
319                            "(trainingjob: " + trainingjob_name + ")") from None
320         else:
321
322             db_results = get_trainingjob_info_by_name(trainingjob_name, PS_DB_OBJ)
323             feature_list = db_results[0][2]
324             query_filter = db_results[0][6]
325             datalake_source = json.loads(db_results[0][14])['datalake_source']
326             _measurement = db_results[0][17]
327             bucket = db_results[0][18]
328
329             LOGGER.debug('Starting Data Extraction...')
330             de_response = data_extraction_start(TRAININGMGR_CONFIG_OBJ, trainingjob_name,
331                                          feature_list, query_filter, datalake_source,
332                                          _measurement, bucket)
333             if (de_response.status_code == status.HTTP_200_OK ):
334                 LOGGER.debug("Response from data extraction for " + \
335                         trainingjob_name + " : " + json.dumps(de_response.json()))
336                 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
337                                                     Steps.DATA_EXTRACTION.name,
338                                                     States.IN_PROGRESS.name)
339                 with LOCK:
340                     DATAEXTRACTION_JOBS_CACHE[trainingjob_name] = "Scheduled"
341                 response_data = de_response.json()
342                 response_code = status.HTTP_200_OK
343             elif( de_response.headers['content-type'] == MIMETYPE_JSON ) :
344                 errMsg = "Data extraction responded with error code."
345                 LOGGER.error(errMsg)
346                 json_data = de_response.json()
347                 LOGGER.debug(str(json_data))
348                 if check_key_in_dictionary(["result"], json_data):
349                     response_data = {"Failed":errMsg + json_data["result"]}
350                 else:
351                     raise TMException(errMsg)
352             else:
353                 raise TMException("Data extraction doesn't send json type response" + \
354                            "(trainingjob name is " + trainingjob_name + ")") from None
355     except Exception as err:
356         response_data =  {"Exception": str(err)}
357         LOGGER.debug("Error is training, job name:" + trainingjob_name + str(err))         
358     return APP.response_class(response=json.dumps(response_data),status=response_code,
359                             mimetype=MIMETYPE_JSON)
360
361 @APP.route('/trainingjob/dataExtractionNotification', methods=['POST'])
362 def data_extraction_notification():
363     """
364     This rest endpoint will be invoked when data extraction is finished.
365     It will further invoke kf-adapter for training, if the response from kf-adapter run_status is "scheduled",
366     that means request is accepted by kf-adapter for futher processing.
367
368     Args in function:
369         None
370
371     Args in json:
372         trainingjob_name: str
373             name of trainingjob.
374
375     Returns:
376         json:
377             result: str
378                 result message
379         status code:
380             HTTP status code 200
381
382     Exceptions:
383         all exception are provided with exception message and HTTP status code.
384     """
385     LOGGER.debug("Data extraction notification...")
386     err_response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
387     results = None
388     try:
389         if not check_key_in_dictionary(["trainingjob_name"], request.json) :
390             err_msg = "Trainingjob_name key not available in request"
391             Logger.error(err_msg)
392             err_response_code = status.HTTP_400_BAD_REQUEST
393             raise TMException(err_msg)
394  
395         trainingjob_name = request.json["trainingjob_name"]
396         results = get_trainingjob_info_by_name(trainingjob_name, PS_DB_OBJ)
397         arguments = json.loads(results[0][5])['arguments']
398         arguments["version"] = results[0][11]
399         LOGGER.debug(arguments)
400
401         dict_data = {
402             "pipeline_name": results[0][3], "experiment_name": results[0][4],
403             "arguments": arguments, "pipeline_version": results[0][13]
404         }
405
406         response = training_start(TRAININGMGR_CONFIG_OBJ, dict_data, trainingjob_name)
407         if ( response.headers['content-type'] != MIMETYPE_JSON 
408                 or response.status_code != status.HTTP_200_OK ):
409             err_msg = "Kf adapter invalid content-type or status_code for " + trainingjob_name
410             raise TMException(err_msg)
411
412         LOGGER.debug("response from kf_adapter for " + \
413                     trainingjob_name + " : " + json.dumps(response.json()))
414         json_data = response.json()
415         
416         if not check_key_in_dictionary(["run_status", "run_id"], json_data):
417             err_msg = "Kf adapter invalid response from , key not present ,run_status or  run_id for " + trainingjob_name
418             Logger.error(err_msg)
419             err_response_code = status.HTTP_400_BAD_REQUEST
420             raise TMException(err_msg)
421
422         if json_data["run_status"] == 'scheduled':
423             change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
424                                                 Steps.DATA_EXTRACTION_AND_TRAINING.name,
425                                                 States.FINISHED.name)
426             change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
427                                                 Steps.TRAINING.name,
428                                                 States.IN_PROGRESS.name)
429             change_field_of_latest_version(trainingjob_name, PS_DB_OBJ,
430                                         "run_id", json_data["run_id"])
431         else:
432             raise TMException("KF Adapter- run_status in not scheduled")
433     except requests.exceptions.ConnectionError as err:
434         err_msg = "Failed to connect KF adapter."
435         LOGGER.error(err_msg)
436         if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) :
437             LOGGER.error(ERROR_TYPE_DB_STATUS)
438         return response_for_training(err_response_code,
439                                         err_msg + str(err) + "(trainingjob name is " + trainingjob_name + ")",
440                                         LOGGER, False, trainingjob_name, PS_DB_OBJ, MM_SDK)
441     except Exception as err:
442         LOGGER.error("Failed to handle dataExtractionNotification. " + str(err))
443         if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) :
444             LOGGER.error(ERROR_TYPE_DB_STATUS)
445         return response_for_training(err_response_code,
446                                         str(err) + "(trainingjob name is " + trainingjob_name + ")",
447                                         LOGGER, False, trainingjob_name, PS_DB_OBJ, MM_SDK)
448
449     return APP.response_class(response=json.dumps({"result": "pipeline is scheduled"}),
450                                     status=status.HTTP_200_OK,
451                                     mimetype=MIMETYPE_JSON)
452
453
454 @APP.route('/trainingjob/pipelineNotification', methods=['POST'])
455 def pipeline_notification():
456     """
457     Function handling rest endpoint to get notification from kf_adapter and set model download
458     url in database(if it presents in model db).
459
460     Args in function: none
461
462     Required Args in json:
463         trainingjob_name: str
464             name of trainingjob.
465
466         run_status: str
467             status of run.
468
469     Returns:
470         json:
471             result: str
472                 result message
473         status:
474             HTTP status code 200
475
476     Exceptions:
477         all exception are provided with exception message and HTTP status code.
478     """
479
480     LOGGER.debug("Pipeline Notification response from kf_adapter: %s", json.dumps(request.json))
481     try:
482         check_key_in_dictionary(["trainingjob_name", "run_status"], request.json)
483         trainingjob_name = request.json["trainingjob_name"]
484         run_status = request.json["run_status"]
485
486         if run_status == 'Succeeded':
487             change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
488                                                     Steps.TRAINING.name,
489                                                     States.FINISHED.name)
490             change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
491                                                     Steps.TRAINING_AND_TRAINED_MODEL.name,
492                                                     States.IN_PROGRESS.name)
493                    
494             version = get_latest_version_trainingjob_name(trainingjob_name, PS_DB_OBJ)
495             change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
496                                                     Steps.TRAINING_AND_TRAINED_MODEL.name,
497                                                     States.FINISHED.name)
498             change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
499                                                     Steps.TRAINED_MODEL.name,
500                                                     States.IN_PROGRESS.name)
501                                                     
502             if MM_SDK.check_object(trainingjob_name, version, "Model.zip"):
503                 model_url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \
504                             str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \
505                             trainingjob_name + "/" + str(version) + "/Model.zip"
506                 
507                 update_model_download_url(trainingjob_name, version, model_url, PS_DB_OBJ)
508
509                 
510                 change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
511                                                         Steps.TRAINED_MODEL.name,
512                                                         States.FINISHED.name)
513             else:
514                 errMsg = "Trained model is not available  "
515                 LOGGER.error(errMsg + trainingjob_name)
516                 raise TMException(errMsg + trainingjob_name)
517         else:
518             LOGGER.error("Pipeline notification -Training failed " + trainingjob_name)    
519             raise TMException("Pipeline not successful for " + \
520                                         trainingjob_name + \
521                                         ",request json from kf adapter is: " + json.dumps(request.json))      
522     except Exception as err:
523         #Training failure response
524         LOGGER.error("Pipeline notification failed" + str(err))
525         if not change_in_progress_to_failed_by_latest_version(trainingjob_name, PS_DB_OBJ) :
526             LOGGER.error(ERROR_TYPE_DB_STATUS)
527         
528         return response_for_training(status.HTTP_500_INTERNAL_SERVER_ERROR,
529                             str(err) + " (trainingjob " + trainingjob_name + ")",
530                             LOGGER, False, trainingjob_name, PS_DB_OBJ, MM_SDK)
531     #Training success response
532     return response_for_training(status.HTTP_200_OK,
533                                             "Pipeline notification success.",
534                                             LOGGER, True, trainingjob_name, PS_DB_OBJ, MM_SDK)
535
536
537 @APP.route('/trainingjobs/latest', methods=['GET'])
538 @cross_origin()
539 def trainingjobs_operations():
540     """
541     Rest endpoint to fetch overall status, latest version of all existing training jobs
542
543     Args in function: none
544     Required Args in json:
545         no json required
546
547     Returns:
548         json:
549             trainingjobs : list
550                        list of dictionaries.
551                            dictionary contains
552                                trainingjob_name: str
553                                    name of trainingjob
554                                version: int
555                                    trainingjob version
556                                overall_status: str
557                                    overall status of end to end flow
558         status:
559             HTTP status code 200
560
561     Exceptions:
562         all exception are provided with exception message and HTTP status code.
563     """
564     LOGGER.debug("Request for getting all trainingjobs with latest version and status.")
565     api_response = {}
566     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
567     try:
568         results = get_all_jobs_latest_status_version(PS_DB_OBJ)
569         trainingjobs = []
570         for res in results:
571             dict_data = {
572                 "trainingjob_name": res[0],
573                 "version": res[1],
574                 "overall_status": get_one_word_status(json.loads(res[2]))
575             }
576             trainingjobs.append(dict_data)
577         api_response = {"trainingjobs": trainingjobs}
578         response_code = status.HTTP_200_OK
579     except Exception as err:
580         api_response =   {"Exception": str(err)}
581         LOGGER.error(str(err))
582     return APP.response_class(response=json.dumps(api_response),
583                         status=response_code,
584                         mimetype=MIMETYPE_JSON)
585
586 @APP.route("/pipelines/<pipe_name>/upload", methods=['POST'])
587 @cross_origin()
588 def upload_pipeline(pipe_name):
589     """
590     Function handling rest endpoint to upload pipeline.
591
592     Args in function:
593         pipe_name: str
594             name of pipeline
595
596     Args in json:
597         no json required
598
599     but file is required
600
601     Returns:
602         json:
603             result: str
604                 result message
605         status code:
606             HTTP status code 200
607
608     Exceptions:
609         all exception are provided with exception message and HTTP status code.
610     """
611     LOGGER.debug("Request to upload pipeline.")
612     result_string = None
613     result_code = None
614     uploaded_file_path = None
615     try:
616         LOGGER.debug(str(request))
617         LOGGER.debug(str(request.files))
618         if 'file' in request.files:
619             uploaded_file = request.files['file']
620         else:
621             result_string = "Didn't get file"
622             raise ValueError("file not found in request.files")
623         pattern = re.compile(r"[a-zA-Z0-9_]+")
624         if not re.fullmatch(pattern, pipe_name):
625             err_msg="the pipeline name is not valid"
626             raise TMException(err_msg)
627         LOGGER.debug("Uploading received for %s", uploaded_file.filename)
628         if uploaded_file.filename != '':
629             uploaded_file_path = "/tmp/" + secure_filename(uploaded_file.filename)
630             uploaded_file.save(uploaded_file_path)
631             LOGGER.debug("File uploaded :%s", uploaded_file_path)
632             kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
633             kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
634             if kf_adapter_ip!=None and kf_adapter_port!=None:
635                url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + \
636                   '/pipelineIds/' + pipe_name
637
638             description = ''
639             if 'description' in request.form:
640                 description = request.form['description']
641             if uploaded_file_path != None:     
642                 with open(uploaded_file_path, 'rb') as file:
643                     files = {'file': file.read()}
644
645             resp = requests.post(url, files=files, data={"description": description})
646             LOGGER.debug(resp.text)
647             if resp.status_code == status.HTTP_200_OK:
648                 LOGGER.debug("Pipeline uploaded :%s", pipe_name)
649                 result_string = "Pipeline uploaded " + pipe_name
650                 result_code = status.HTTP_200_OK
651             else:
652                 LOGGER.error(resp.json()["message"])
653                 result_string = resp.json()["message"]
654                 result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
655         else:
656             result_string = "File name not found"
657             raise ValueError("filename is not found in request.files")
658     except ValueError:
659         tbk = traceback.format_exc()
660         LOGGER.error(tbk)
661         result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
662         result_string = "Error while uploading pipeline"
663     except TMException:
664         tbk = traceback.format_exc()
665         LOGGER.error(tbk)
666         result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
667         result_string = "Pipeline name is not of valid format"
668     except Exception:
669         tbk = traceback.format_exc()
670         LOGGER.error(tbk)
671         result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
672         result_string = "Error while uploading pipeline cause"
673
674     if uploaded_file_path and os.path.isfile(uploaded_file_path):
675         LOGGER.debug("Deleting %s", uploaded_file_path)
676         if uploaded_file_path != None:
677             os.remove(uploaded_file_path)
678
679     LOGGER.debug("Responding to Client with %d %s", result_code, result_string)
680     return APP.response_class(response=json.dumps({'result': result_string}),
681                                   status=result_code,
682                                   mimetype=MIMETYPE_JSON)
683
684
685 @APP.route("/pipelines/<pipeline_name>/versions", methods=['GET'])
686 @cross_origin()
687 def get_versions_for_pipeline(pipeline_name):
688     """
689     Function handling rest endpoint to get versions of given pipeline name.
690
691     Args in function:
692         pipeline_name : str
693             name of pipeline.
694
695     Args in json:
696         no json required
697
698     Returns:
699         json:
700             versions_list : list
701                             list containing all versions(as str)
702         status code:
703             HTTP status code 200
704
705     Exceptions:
706         all exception are provided with exception message and HTTP status code.
707     """
708     valid_pipeline=""
709     api_response = {}            
710     LOGGER.debug("Request to get all version for given pipeline(" + pipeline_name + ").")
711     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
712     try:
713         pipeline_names=get_all_pipeline_names_svc(TRAININGMGR_CONFIG_OBJ)
714         print(pipeline_names, pipeline_name)
715         for pipeline in pipeline_names:
716             if pipeline == pipeline_name:
717                 valid_pipeline=pipeline
718                 break
719         if valid_pipeline == "":
720             raise TMException("Pipeline name not present")
721         kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
722         kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
723         if kf_adapter_ip!=None and kf_adapter_port!=None :
724           url = 'http://' + str(kf_adapter_ip) + ':' + str(
725             kf_adapter_port) + '/pipelines/' + valid_pipeline + \
726             '/versions'
727         LOGGER.debug("URL:" + url)
728         response = requests.get(url)
729         if response.headers['content-type'] != MIMETYPE_JSON:
730             raise TMException(ERROR_TYPE_KF_ADAPTER_JSON)
731         api_response = {"versions_list": response.json()['versions_list']}
732         response_code = status.HTTP_200_OK
733     except Exception as err:
734         api_response =  {"Exception": str(err)}
735         LOGGER.error(str(err))
736     return APP.response_class(response=json.dumps(api_response),
737             status=response_code,
738             mimetype=MIMETYPE_JSON)
739  
740 @APP.route('/pipelines', methods=['GET'])
741 @cross_origin()
742 def get_all_pipeline_names():
743     """
744     Function handling rest endpoint to get all pipeline names.
745
746     Args in function:
747         none
748
749     Args in json:
750         no json required
751
752     Returns:
753         json:
754             pipeline_names : list
755                              list containing all pipeline names(as str).
756         status code:
757             HTTP status code 200
758
759     Exceptions:
760         all exception are provided with exception message and HTTP status code.
761     """
762     LOGGER.debug("Request to get all getting all pipeline names.")
763     api_response = {}
764     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
765     try:
766         pipeline_names=get_all_pipeline_names_svc(TRAININGMGR_CONFIG_OBJ)
767         api_response = {"pipeline_names": pipeline_names}
768         response_code = status.HTTP_200_OK
769     except Exception as err:
770         LOGGER.error(str(err))
771         api_response =  {"Exception": str(err)}
772     return APP.response_class(response=json.dumps(api_response),status=response_code,mimetype=MIMETYPE_JSON)
773
774 @APP.route('/experiments', methods=['GET'])
775 @cross_origin()
776 def get_all_experiment_names():
777     """
778     Function handling rest endpoint to get all experiment names.
779
780     Args in function:
781         none
782
783     Args in json:
784         no json required
785
786     Returns:
787         json:
788             experiment_names : list
789                                list containing all experiment names(as str).
790         status code:
791             HTTP status code 200
792
793     Exceptions:
794         all exception are provided with exception message and HTTP status code.
795     """
796
797     LOGGER.debug("request for getting all experiment names is come.")
798     api_response = {}
799     reponse_code = status.HTTP_500_INTERNAL_SERVER_ERROR
800     try:
801         kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
802         kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
803         if kf_adapter_ip!=None and kf_adapter_port!=None: 
804             url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/experiments'
805         LOGGER.debug("url is :" + url)
806         response = requests.get(url)
807         if response.headers['content-type'] != MIMETYPE_JSON:
808             err_smg = ERROR_TYPE_KF_ADAPTER_JSON
809             raise TMException(err_smg)
810
811         experiment_names = []
812         for experiment in response.json().keys():
813             experiment_names.append(experiment)
814         api_response = {"experiment_names": experiment_names}
815         reponse_code = status.HTTP_200_OK
816     except Exception as err:
817         api_response =  {"Exception": str(err)}
818         LOGGER.error(str(err))
819     return APP.response_class(response=json.dumps(api_response),
820                                   status=reponse_code,
821                                   mimetype=MIMETYPE_JSON)
822
823
824 @APP.route('/trainingjobs/<trainingjob_name>', methods=['POST', 'PUT']) # Handled in GUI
825 @cross_origin()
826 def trainingjob_operations(trainingjob_name):
827     """
828     Rest endpoind to create or update trainingjob
829     Precondtion for update : trainingjob's overall_status should be failed
830     or finished and deletion processs should not be in progress
831
832     Args in function:
833         trainingjob_name: str
834             name of trainingjob.
835
836     Args in json:
837         if post/put request is called
838             json with below fields are given:
839                 description: str
840                     description
841                 feature_list: str
842                     feature names
843                 pipeline_name: str
844                     name of pipeline
845                 experiment_name: str
846                     name of experiment
847                 arguments: dict
848                     key-value pairs related to hyper parameters and
849                     "trainingjob":<trainingjob_name> key-value pair
850                 query_filter: str
851                     string indication sql where clause for filtering out features
852                 enable_versioning: bool
853                     flag for trainingjob versioning
854                 pipeline_version: str
855                     pipeline version
856                 datalake_source: str
857                     string indicating datalake source
858                 _measurement: str
859                     _measurement for influx db datalake
860                 bucket: str
861                     bucket name for influx db datalake
862
863     Returns:
864         1. For post request
865             json:
866                 result : str
867                     result message
868                 status code:
869                     HTTP status code 201
870         2. For put request
871             json:
872                 result : str
873                     result message
874                 status code:
875                     HTTP status code 200
876
877     Exceptions:
878         All exception are provided with exception message and HTTP status code.
879     """
880     api_response = {}
881     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
882     LOGGER.debug("Training job create/update request(trainingjob name  %s) ", trainingjob_name )
883     try:
884         json_data = request.json
885         if (request.method == 'POST'):          
886             LOGGER.debug("Create request json : " + json.dumps(json_data))
887             is_data_available = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ)
888             if  is_data_available:
889                 response_code = status.HTTP_409_CONFLICT
890                 raise TMException("trainingjob name(" + trainingjob_name + ") is already present in database")
891             else:
892                 (feature_list, description, pipeline_name, experiment_name,
893                 arguments, query_filter, enable_versioning, pipeline_version,
894                 datalake_source, _measurement, bucket) = \
895                 check_trainingjob_data(trainingjob_name, json_data)
896                 add_update_trainingjob(description, pipeline_name, experiment_name, feature_list,
897                                     arguments, query_filter, True, enable_versioning,
898                                     pipeline_version, datalake_source, trainingjob_name, 
899                                     PS_DB_OBJ, _measurement=_measurement,
900                                     bucket=bucket)
901                 api_response =  {"result": "Information stored in database."}                 
902                 response_code = status.HTTP_201_CREATED
903         elif(request.method == 'PUT'):
904             LOGGER.debug("Update request json : " + json.dumps(json_data))
905             is_data_available = validate_trainingjob_name(trainingjob_name, PS_DB_OBJ)
906             if not is_data_available:
907                 response_code = status.HTTP_404_NOT_FOUND
908                 raise TMException("Trainingjob name(" + trainingjob_name + ") is not  present in database")
909             else:
910                 results = None
911                 results = get_trainingjob_info_by_name(trainingjob_name, PS_DB_OBJ)
912                 if results:
913                     if results[0][19]:
914                         raise TMException("Failed to process request for trainingjob(" + trainingjob_name + ") " + \
915                                         " deletion in progress")
916                     if (get_one_word_status(json.loads(results[0][9]))
917                             not in [States.FAILED.name, States.FINISHED.name]):
918                         raise TMException("Trainingjob(" + trainingjob_name + ") is not in finished or failed status")
919
920                 (feature_list, description, pipeline_name, experiment_name,
921                 arguments, query_filter, enable_versioning, pipeline_version,
922                 datalake_source, _measurement, bucket) = check_trainingjob_data(trainingjob_name, json_data)
923
924                 add_update_trainingjob(description, pipeline_name, experiment_name, feature_list,
925                                         arguments, query_filter, False, enable_versioning,
926                                         pipeline_version, datalake_source, trainingjob_name, PS_DB_OBJ, _measurement=_measurement,
927                                         bucket=bucket)
928                 api_response = {"result": "Information updated in database."}
929                 response_code = status.HTTP_200_OK
930     except Exception as err:
931         LOGGER.error("Failed to create/update training job, " + str(err) )
932         api_response =  {"Exception": str(err)}
933
934     return APP.response_class(response= json.dumps(api_response),
935                     status= response_code,
936                     mimetype=MIMETYPE_JSON)
937
938 @APP.route('/trainingjobs/retraining', methods=['POST'])
939 @cross_origin()
940 def retraining():
941     """
942     Function handling rest endpoint to retrain trainingjobs in request json. trainingjob's
943     overall_status should be failed or finished and its deletion_in_progress should be False
944     otherwise retraining of that trainingjob is counted in failure.
945     Args in function: none
946     Required Args in json:
947         trainingjobs_list: list
948                        list containing dictionaries
949                            dictionary contains
950                                usecase_name: str
951                                    name of trainingjob
952                                notification_url(optional): str
953                                    url for notification
954                                feature_filter(optional): str
955                                    feature filter
956     Returns:
957         json:
958             success count: int
959                 successful retraining count
960             failure count: int
961                 failure retraining count
962         status: HTTP status code 200
963     Exceptions:
964         all exception are provided with exception message and HTTP status code.
965     """
966     LOGGER.debug('request comes for retraining, ' + json.dumps(request.json))
967     try:
968         check_key_in_dictionary(["trainingjobs_list"], request.json)
969     except Exception as err:
970         raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
971
972     trainingjobs_list = request.json['trainingjobs_list']
973     if not isinstance(trainingjobs_list, list):
974         raise APIException(status.HTTP_400_BAD_REQUEST, "not given as list")
975
976     for obj in trainingjobs_list:
977         try:
978             check_key_in_dictionary(["trainingjob_name"], obj)
979         except Exception as err:
980             raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
981
982     not_possible_to_retrain = []
983     possible_to_retrain = []
984
985     for obj in trainingjobs_list:
986         trainingjob_name = obj['trainingjob_name']
987         results = None
988         try:
989             results = get_info_of_latest_version(trainingjob_name, PS_DB_OBJ)
990         except Exception as err:
991             not_possible_to_retrain.append(trainingjob_name)
992             LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ")")
993             continue
994         
995         if results:
996
997             if results[0][19]:
998                 not_possible_to_retrain.append(trainingjob_name)
999                 LOGGER.debug("Failed to retrain because deletion in progress" + \
1000                              "(trainingjob_name is " + trainingjob_name + ")")
1001                 continue
1002
1003             if (get_one_word_status(json.loads(results[0][9]))
1004                     not in [States.FINISHED.name, States.FAILED.name]):
1005                 not_possible_to_retrain.append(trainingjob_name)
1006                 LOGGER.debug("Not finished or not failed status" + \
1007                              "(trainingjob_name is " + trainingjob_name + ")")
1008                 continue
1009
1010             enable_versioning = results[0][12]
1011             pipeline_version = results[0][13]
1012             description = results[0][1]
1013             pipeline_name = results[0][3]
1014             experiment_name = results[0][4]
1015             feature_list = results[0][2]
1016             arguments = json.loads(results[0][5])['arguments']
1017             query_filter = results[0][6]
1018             datalake_source = get_one_key(json.loads(results[0][14])["datalake_source"])
1019             _measurement = results[0][17]
1020             bucket = results[0][18]
1021
1022             notification_url = ""
1023             if "notification_url" in obj:
1024                 notification_url = obj['notification_url']
1025
1026             if "feature_filter" in obj:
1027                 query_filter = obj['feature_filter']
1028
1029             try:
1030                 add_update_trainingjob(description, pipeline_name, experiment_name,
1031                                       feature_list, arguments, query_filter, False,
1032                                       enable_versioning, pipeline_version,
1033                                       datalake_source, trainingjob_name, PS_DB_OBJ,
1034                                       notification_url, _measurement, bucket)
1035             except Exception as err:
1036                 not_possible_to_retrain.append(trainingjob_name)
1037                 LOGGER.debug(str(err) + "(training job is " + trainingjob_name + ")")
1038                 continue
1039
1040             url = 'http://' + str(TRAININGMGR_CONFIG_OBJ.my_ip) + \
1041                   ':' + str(TRAININGMGR_CONFIG_OBJ.my_port) + \
1042                   '/trainingjobs/' +trainingjob_name + '/training'
1043             response = requests.post(url)
1044
1045             if response.status_code == status.HTTP_200_OK:
1046                 possible_to_retrain.append(trainingjob_name)
1047             else:
1048                 LOGGER.debug("not 200 response" + "(trainingjob_name is " + trainingjob_name + ")")
1049                 not_possible_to_retrain.append(trainingjob_name)
1050
1051         else:
1052             LOGGER.debug("not present in postgres db" + "(trainingjob_name is " + trainingjob_name + ")")
1053             not_possible_to_retrain.append(trainingjob_name)
1054
1055         LOGGER.debug('success list: ' + str(possible_to_retrain))
1056         LOGGER.debug('failure list: ' + str(not_possible_to_retrain))
1057
1058     return APP.response_class(response=json.dumps( \
1059         {
1060             "success count": len(possible_to_retrain),
1061             "failure count": len(not_possible_to_retrain)
1062         }),
1063         status=status.HTTP_200_OK,
1064         mimetype='application/json')
1065
1066 @APP.route('/trainingjobs', methods=['DELETE'])
1067 @cross_origin()
1068 def delete_list_of_trainingjob_version():
1069     """
1070     Function handling rest endpoint to delete latest version of trainingjob_name trainingjobs which is
1071     given in request json. trainingjob's overall_status should be failed or finished and its
1072     deletion_in_progress should be False otherwise deletion of that trainingjobs is counted in failure.
1073     Args in function: none
1074     Required Args in json:
1075         list: list
1076               list containing dictionaries.
1077                   dictionary contains
1078                       trainingjob_name: str
1079                           trainingjob name
1080                       version: int
1081                           version of trainingjob
1082     Returns:
1083         json:
1084             success count: int
1085                 successful deletion count
1086             failure count: int
1087                 failure deletion count
1088         status:
1089             HTTP status code 200
1090     Exceptions:
1091         all exception are provided with exception message and HTTP status code.
1092     """
1093     LOGGER.debug('request comes for deleting:' + json.dumps(request.json))
1094     try:
1095         check_key_in_dictionary(["list"], request.json)
1096     except Exception as err:
1097         raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
1098
1099     list_of_trainingjob_version = request.json['list']
1100     if not isinstance(list_of_trainingjob_version, list):
1101         raise APIException(status.HTTP_400_BAD_REQUEST, "not given as list")
1102
1103     not_possible_to_delete = []
1104     possible_to_delete = []
1105
1106     for my_dict in list_of_trainingjob_version:
1107
1108         if not isinstance(my_dict, dict):
1109             not_possible_to_delete.append(my_dict)
1110             LOGGER.debug(str(my_dict) + "did not pass dictionary")
1111             continue
1112
1113         try:
1114             check_key_in_dictionary(["trainingjob_name", "version"], my_dict)
1115         except Exception as err:
1116             not_possible_to_delete.append(my_dict)
1117             LOGGER.debug(str(err))
1118             continue
1119
1120         trainingjob_name = my_dict['trainingjob_name']
1121         version = my_dict['version']
1122
1123         results = None
1124         try:
1125             results = get_info_by_version(trainingjob_name, version, PS_DB_OBJ)
1126         except Exception as err:
1127             not_possible_to_delete.append(my_dict)
1128             LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + ", version is " + str(
1129                 version) + ")")
1130             continue
1131
1132         if results:
1133
1134             if results[0][19]:
1135                 not_possible_to_delete.append(my_dict)
1136                 LOGGER.debug("Failed to process deletion request because deletion is " + \
1137                              "already in progress" + \
1138                              "(trainingjob_name is " + trainingjob_name + ", version is " + str(
1139                     version) + ")")
1140                 continue
1141
1142             if (get_one_word_status(json.loads(results[0][9]))
1143                     not in [States.FINISHED.name, States.FAILED.name]):
1144                 not_possible_to_delete.append(my_dict)
1145                 LOGGER.debug("Not finished or not failed status" + \
1146                              "(usecase_name is " + trainingjob_name + ", version is " + str(
1147                     version) + ")")
1148                 continue
1149
1150             try:
1151                 change_field_value_by_version(trainingjob_name, version, PS_DB_OBJ,
1152                                               "deletion_in_progress", True)
1153             except Exception as err:
1154                 not_possible_to_delete.append(my_dict)
1155                 LOGGER.debug(str(err) + "(usecase_name is " + trainingjob_name + \
1156                              ", version is " + str(version) + ")")
1157                 continue
1158
1159             try:
1160                 deleted = True
1161                 if MM_SDK.is_bucket_present(trainingjob_name):
1162                     deleted = MM_SDK.delete_model_metric(trainingjob_name, version)
1163             except Exception as err:
1164                 not_possible_to_delete.append(my_dict)
1165                 LOGGER.debug(str(err) + "(trainingjob_name is " + trainingjob_name + \
1166                              ", version is " + str(version) + ")")
1167                 continue
1168
1169             if not deleted:
1170                 not_possible_to_delete.append(my_dict)
1171                 continue
1172
1173             try:
1174                 delete_trainingjob_version(trainingjob_name, version, PS_DB_OBJ)
1175             except Exception as err:
1176                 not_possible_to_delete.append(my_dict)
1177                 LOGGER.debug(str(err) + "(trainingjob_name is " + \
1178                              trainingjob_name + ", version is " + str(version) + ")")
1179                 continue
1180
1181             possible_to_delete.append(my_dict)
1182
1183         else:
1184             not_possible_to_delete.append(my_dict)
1185             LOGGER.debug("not find in postgres db" + "(trainingjob_name is " + \
1186                          trainingjob_name + ", version is " + str(version) + ")")
1187
1188         LOGGER.debug('success list: ' + str(possible_to_delete))
1189         LOGGER.debug('failure list: ' + str(not_possible_to_delete))
1190
1191     return APP.response_class(response=json.dumps( \
1192         {
1193             "success count": len(possible_to_delete),
1194             "failure count": len(not_possible_to_delete)
1195         }),
1196         status=status.HTTP_200_OK,
1197         mimetype='application/json')
1198
1199 @APP.route('/trainingjobs/metadata/<trainingjob_name>')
1200 def get_metadata(trainingjob_name):
1201     """
1202     Function handling rest endpoint to get accuracy, version and model download url for all
1203     versions of given trainingjob_name which has overall_state FINISHED and
1204     deletion_in_progress is False
1205
1206     Args in function:
1207         trainingjob_name: str
1208             name of trainingjob.
1209
1210     Args in json:
1211         No json required
1212
1213     Returns:
1214         json:
1215             Successed metadata : list
1216                                  list containes dictionaries.
1217                                      dictionary containts
1218                                          accuracy: dict
1219                                              metrics of model
1220                                          version: int
1221                                              version of trainingjob
1222                                          url: str
1223                                              url for downloading model
1224         status:
1225             HTTP status code 200
1226
1227     Exceptions:
1228         all exception are provided with exception message and HTTP status code.
1229     """
1230
1231     LOGGER.debug("Request metadata for trainingjob(name of trainingjob is %s) ", trainingjob_name)
1232     api_response = {}
1233     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
1234     try:
1235         results = get_all_versions_info_by_name(trainingjob_name, PS_DB_OBJ)
1236         if results:
1237             info_list = []
1238             for trainingjob_info in results:
1239                 if (get_one_word_status(json.loads(trainingjob_info[9])) == States.FINISHED.name and
1240                         not trainingjob_info[19]):
1241            
1242                     LOGGER.debug("Downloading metric for " +trainingjob_name )
1243                     data = get_metrics(trainingjob_name, trainingjob_info[11], MM_SDK)
1244                     url = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + ":" + \
1245                         str(TRAININGMGR_CONFIG_OBJ.my_port) + "/model/" + \
1246                         trainingjob_name + "/" + str(trainingjob_info[11]) + "/Model.zip"
1247                     dict_data = {
1248                         "accuracy": data,
1249                         "version": trainingjob_info[11],
1250                         "url": url
1251                     }
1252                     info_list.append(dict_data)
1253             #info_list built        
1254             api_response = {"Successed metadata": info_list}
1255             response_code = status.HTTP_200_OK
1256         else :
1257             err_msg = "Not found given trainingjob name-" + trainingjob_name
1258             LOGGER.error(err_msg)
1259             response_code = status.HTTP_404_NOT_FOUND
1260             api_response = {"Exception":err_msg}
1261     except Exception as err:
1262         LOGGER.error(str(err))
1263         api_response = {"Exception":str(err)}
1264     return APP.response_class(response=json.dumps(api_response),
1265                                         status=response_code,
1266                                         mimetype=MIMETYPE_JSON)
1267
1268 @APP.route('/featureGroup', methods=['POST'])
1269 @cross_origin()
1270 def create_feature_group():
1271     """
1272     Rest endpoint to create feature group
1273
1274     Args in function:
1275                 NONE
1276
1277     Args in json:
1278             json with below fields are given:
1279                 featureGroupName: str
1280                     description
1281                 feature_list: str
1282                     feature names
1283                 enable_Dme: boolean
1284                     whether to enable dme
1285                 source_name: str
1286                     name of source
1287                 dbOrg: str
1288                     name of db org
1289                 bucket: str
1290                     bucket name
1291                 DmePort: str
1292                     DME port
1293                 DmeHost: str
1294                     DME Host
1295                 datalake_source: str
1296                     string indicating datalake source
1297                 token: str
1298                     token for the bucket
1299
1300     Returns:
1301         1. For post request
1302             json:
1303                 result : str
1304                     result message
1305                 status code:
1306                     HTTP status code 201
1307         2. For put request
1308             json:
1309                 result : str
1310                     result message
1311                 status code:
1312                     HTTP status code 200
1313
1314     Exceptions:
1315         All exception are provided with exception message and HTTP status code."""
1316     
1317     api_response = {}
1318     response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
1319     LOGGER.debug('feature Group Create request, ' + json.dumps(request.json))
1320
1321     try:
1322         json_data=request.json
1323         (featureGroup_name, features, datalake_source, enable_Dme, dme_host, dme_port, bucket, token, source_name,db_org)=check_featureGroup_data(json_data)
1324         # check the data conformance
1325         if len(featureGroup_name) < 3 or len(featureGroup_name) > 63:
1326             api_response = {"Exception": "Failed to create the feature group since feature group name must be between 3 and 63 characters long."}
1327             response_code = status.HTTP_400_BAD_REQUEST
1328         else:
1329             # the features are stored in string format in the db, and has to be passed as list of feature to the dme. Hence the conversion.
1330             features_list = features.split(",")
1331             add_featuregroup(featureGroup_name, features, datalake_source, enable_Dme, PS_DB_OBJ,dme_host, dme_port, bucket, token, source_name,db_org )
1332             if enable_Dme == True :
1333                 response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket,  token, features_list, featureGroup_name,  dme_host, dme_port)
1334                 if response.status_code != 201:
1335                     api_response={"Exception": "Cannot create dme job"}
1336                     delete_feature_group_by_name(PS_DB_OBJ, featureGroup_name)
1337                     response_code=status.HTTP_400_BAD_REQUEST
1338                 else:
1339                     api_response={"result": "Feature Group Created"}
1340                     response_code =status.HTTP_200_OK
1341             else:
1342                 api_response={"result": "Feature Group Created"}
1343                 response_code =status.HTTP_200_OK
1344     except Exception as err:
1345         delete_feature_group_by_name(PS_DB_OBJ, featureGroup_name)
1346         err_msg = "Failed to create the feature Group "
1347         api_response = {"Exception":err_msg}
1348         LOGGER.error(str(err))
1349     
1350     return APP.response_class(response=json.dumps(api_response),
1351                                         status=response_code,
1352                                         mimetype=MIMETYPE_JSON)
1353
1354 @APP.route('/featureGroup', methods=['GET'])
1355 @cross_origin()
1356 def get_feature_group():
1357     """
1358     Rest endpoint to fetch all the feature groups
1359
1360     Args in function: none
1361     Required Args in json:
1362         no json required 
1363     
1364     Returns:
1365         json:
1366             FeatureGroups: list
1367                 list of dictionaries.
1368                     dictionaries contains:
1369                         featuregroup_name: str
1370                             name of feature group
1371                         features: str
1372                             name of features
1373                         datalake: str
1374                             datalake
1375                         dme: boolean
1376                             whether to enable dme
1377                         
1378     """
1379     LOGGER.debug("Request for getting all feature groups")
1380     api_response={}
1381     response_code=status.HTTP_500_INTERNAL_SERVER_ERROR
1382     try:
1383         result= get_feature_groups_db(PS_DB_OBJ)
1384         feature_groups=[]
1385         for res in result:
1386             dict_data={
1387                 "featuregroup_name": res[0],
1388                 "features": res[1],
1389                 "datalake": res[2],
1390                 "dme": res[3]                
1391                 }
1392             feature_groups.append(dict_data)
1393         api_response={"featuregroups":feature_groups}
1394         response_code=status.HTTP_200_OK
1395
1396     except Exception as err:
1397         api_response =   {"Exception": str(err)}
1398         LOGGER.error(str(err))
1399     return APP.response_class(response=json.dumps(api_response),
1400                         status=response_code,
1401                         mimetype=MIMETYPE_JSON)
1402
1403 @APP.route('/featureGroup/<featuregroup_name>', methods=['GET'])
1404 @cross_origin()
1405 def get_feature_group_by_name(featuregroup_name):
1406     """
1407     Rest endpoint to fetch a feature group
1408
1409     Args in function:
1410         featuregroup_name: str
1411             name of featuregroup_name.
1412
1413     Returns:
1414         json:
1415             trainingjob: dict
1416                      dictionary contains
1417                          featuregroup_name: str
1418                              name of featuregroup
1419                          features: str
1420                              features
1421                          datalake: str
1422                              name of datalake
1423                          dme: str
1424                              whether dme enabled or not
1425                          dme_host: str
1426                              dme host
1427                          dme_port: str
1428                              dme_port
1429                          bucket: str
1430                              bucket name
1431                          token: str
1432                              token for the bucket
1433                          source_name: dict
1434                              source name
1435                          db_org: str
1436                              db org
1437         status code:
1438             HTTP status code 200
1439
1440     Exceptions:
1441         all exception are provided with exception message and HTTP status code.
1442
1443     """
1444     LOGGER.debug("Request for getting a feature group with name = "+ featuregroup_name)
1445     api_response={}
1446     response_code=status.HTTP_500_INTERNAL_SERVER_ERROR
1447     try:
1448         result= get_feature_group_by_name_db(PS_DB_OBJ, featuregroup_name)
1449         feature_group=[]
1450         if result:
1451             for res in result:
1452                 features=res[1].split(",")
1453                 dict_data={
1454                     "featuregroup_name": res[0],
1455                     "features": features,
1456                     "datalake": res[2],
1457                     "dme": res[3],
1458                     "dme_host": res[4],
1459                     "dme_port": res[5],
1460                     "bucket":res[6],
1461                     "token":res[7],
1462                     "source_name":res[8],
1463                     "db_org":res[9]
1464                 }
1465                 feature_group.append(dict_data)
1466             api_response={"featuregroup":feature_group}
1467             response_code=status.HTTP_200_OK
1468         else:
1469             response_code=status.HTTP_404_NOT_FOUND
1470             raise TMException("Failed to fetch feature group info from db")
1471     except Exception as err:
1472         api_response =   {"Exception": str(err)}
1473         LOGGER.error(str(err))
1474     return APP.response_class(response=json.dumps(api_response),
1475                         status=response_code,
1476                         mimetype=MIMETYPE_JSON) 
1477
1478 @APP.route('/featureGroup', methods=['DELETE'])
1479 @cross_origin()
1480 def delete_list_of_feature_group():
1481     """
1482     Function handling rest endpoint to delete featureGroup which is
1483     given in request json. 
1484
1485     Args in function: none
1486     Required Args in json:
1487         list: list
1488               list containing dictionaries.
1489                   dictionary contains
1490                       featuregroup_name: str
1491                           featuregroup name
1492
1493     Returns:
1494         json:
1495             success count: int
1496                 successful deletion count
1497             failure count: int
1498                 failure deletion count
1499         status:
1500             HTTP status code 200
1501     Exceptions:
1502         all exception are provided with exception message and HTTP status code.
1503     """
1504     LOGGER.debug('request comes for deleting:' + json.dumps(request.json))
1505     try:
1506         check_key_in_dictionary(["featuregroups_list"], request.json)
1507     except Exception as err:
1508         LOGGER.debug("exception in check_key_in_dictionary")
1509         raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
1510
1511     list_of_feature_groups = request.json['featuregroups_list']
1512     if not isinstance(list_of_feature_groups, list):
1513         LOGGER.debug("exception in not instance")
1514         raise APIException(status.HTTP_400_BAD_REQUEST, "not given as list")
1515
1516     not_possible_to_delete = []
1517     possible_to_delete = []
1518
1519     for my_dict in list_of_feature_groups:
1520         if not isinstance(my_dict, dict):
1521             not_possible_to_delete.append(my_dict)
1522             LOGGER.debug(str(my_dict) + "did not pass dictionary")
1523             continue
1524         
1525         try:
1526             check_key_in_dictionary(["featureGroup_name"], my_dict)
1527         except Exception as err:
1528             not_possible_to_delete.append(my_dict)
1529             LOGGER.debug(str(err))
1530             continue
1531
1532         featuregroup_name = my_dict['featureGroup_name']
1533         results = None
1534         try:
1535             results = get_feature_group_by_name_db(PS_DB_OBJ, featuregroup_name)
1536         except Exception as err:
1537             not_possible_to_delete.append(my_dict)
1538             LOGGER.debug(str(err) + "(featureGroup_name is " + featuregroup_name)
1539             continue
1540
1541         if results:
1542             dme=results[0][3]
1543             try:
1544                 delete_feature_group_by_name(PS_DB_OBJ, featuregroup_name)
1545                 if dme :
1546                     dme_host=results[0][4]
1547                     dme_port=results[0][5]
1548                     delete_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, featuregroup_name, dme_host, dme_port)
1549                 possible_to_delete.append(my_dict)
1550             except Exception as err:
1551                 not_possible_to_delete.append(my_dict)
1552                 LOGGER.debug(str(err) + "(featuregroup_name is "+ featuregroup_name + ")")
1553                 continue
1554         else:
1555              not_possible_to_delete.append(my_dict)
1556              LOGGER.debug("cannot find in postgres db" + "(featuregroup_name is " + \
1557                           featuregroup_name + ")")
1558
1559     LOGGER.debug('success list: ' + str(possible_to_delete))
1560     LOGGER.debug('failure list: ' + str(not_possible_to_delete))
1561
1562     return APP.response_class(response=json.dumps( \
1563         {
1564             "success count": len(possible_to_delete),
1565             "failure count": len(not_possible_to_delete)
1566         }),
1567         status=status.HTTP_200_OK,
1568         mimetype='application/json')
1569
1570 def async_feature_engineering_status():
1571     """
1572     This function takes trainingjobs from DATAEXTRACTION_JOBS_CACHE and checks data extraction status
1573     (using data extraction api) for those trainingjobs, if status is Completed then it calls
1574     /trainingjob/dataExtractionNotification route for those trainingjobs.
1575     """
1576     url_pipeline_run = "http://" + str(TRAININGMGR_CONFIG_OBJ.my_ip) + \
1577                        ":" + str(TRAININGMGR_CONFIG_OBJ.my_port) + \
1578                        "/trainingjob/dataExtractionNotification"
1579     while True:
1580         with LOCK:
1581             fjc = list(DATAEXTRACTION_JOBS_CACHE)
1582         for trainingjob_name in fjc:
1583             LOGGER.debug("Current DATAEXTRACTION_JOBS_CACHE :" + str(DATAEXTRACTION_JOBS_CACHE))
1584             try:
1585                 response = data_extraction_status(trainingjob_name, TRAININGMGR_CONFIG_OBJ)
1586                 if (response.headers['content-type'] != MIMETYPE_JSON or 
1587                         response.status_code != status.HTTP_200_OK ):
1588                     raise TMException("Data extraction responsed with error status code or invalid content type" + \
1589                                          "doesn't send json type response (trainingjob " + trainingjob_name + ")")
1590                 response = response.json()
1591                 LOGGER.debug("Data extraction status response for " + \
1592                             trainingjob_name + " " + json.dumps(response))
1593
1594                 if response["task_status"] == "Completed":
1595                     change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
1596                                                             Steps.DATA_EXTRACTION.name,
1597                                                             States.FINISHED.name)
1598                     change_steps_state_of_latest_version(trainingjob_name, PS_DB_OBJ,
1599                                                             Steps.DATA_EXTRACTION_AND_TRAINING.name,
1600                                                             States.IN_PROGRESS.name)
1601                     kf_response = requests.post(url_pipeline_run,
1602                                                 data=json.dumps({"trainingjob_name": trainingjob_name}),
1603                                                 headers={
1604                                                     'content-type': MIMETYPE_JSON,
1605                                                     'Accept-Charset': 'UTF-8'
1606                                                 })
1607                     if (kf_response.headers['content-type'] != MIMETYPE_JSON or 
1608                             kf_response.status_code != status.HTTP_200_OK ):
1609                         raise TMException("KF adapter responsed with error status code or invalid content type" + \
1610                                          "doesn't send json type response (trainingjob " + trainingjob_name + ")")
1611                     with LOCK:
1612                         DATAEXTRACTION_JOBS_CACHE.pop(trainingjob_name)        
1613                 elif response["task_status"] == "Error":
1614                     raise TMException("Data extraction has failed for " + trainingjob_name)
1615             except Exception as err:
1616                 LOGGER.error("Failure during procesing of DATAEXTRACTION_JOBS_CACHE," + str(err))
1617                 """ Job will be removed from DATAEXTRACTION_JOBS_CACHE in  handle_async
1618                     There might be some further error during handling of exception
1619                 """
1620                 handle_async_feature_engineering_status_exception_case(LOCK,
1621                                                     DATAEXTRACTION_JOBS_CACHE,
1622                                                     status.HTTP_500_INTERNAL_SERVER_ERROR,
1623                                                     str(err) + "(trainingjob name is " + trainingjob_name + ")",
1624                                                     LOGGER, False, trainingjob_name,
1625                                                     PS_DB_OBJ, MM_SDK)
1626
1627         #Wait and fetch latest list of trainingjobs
1628         time.sleep(10)
1629
1630 if __name__ == "__main__":
1631     TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
1632     try:
1633         if TRAININGMGR_CONFIG_OBJ.is_config_loaded_properly() is False:
1634             raise TMException("Not all configuration loaded.")
1635         LOGGER = TRAININGMGR_CONFIG_OBJ.logger
1636         PS_DB_OBJ = PSDB(TRAININGMGR_CONFIG_OBJ)
1637         LOCK = Lock()
1638         DATAEXTRACTION_JOBS_CACHE = get_data_extraction_in_progress_trainingjobs(PS_DB_OBJ)
1639         threading.Thread(target=async_feature_engineering_status, daemon=True).start()
1640         MM_SDK = ModelMetricsSdk()
1641         LOGGER.debug("Starting AIML-WF training manager .....")
1642         APP.run(debug=True, port=int(TRAININGMGR_CONFIG_OBJ.my_port), host='0.0.0.0')
1643     except TMException as err:
1644         print("Startup failure" + str(err))