From 65481e223ed843f710527aa8f6e24b9e82d320bd Mon Sep 17 00:00:00 2001 From: Youhwan Seol Date: Mon, 24 Oct 2022 13:37:17 +0900 Subject: [PATCH] add main.py Issue-Id: AIMLFW-2 Signed-off-by: Youhwan Seol Change-Id: I97a7bc49aa022b6a436c3bc180b96a059f23cbc5 --- kfadapter/main.py | 533 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 533 insertions(+) create mode 100644 kfadapter/main.py diff --git a/kfadapter/main.py b/kfadapter/main.py new file mode 100644 index 0000000..45be5b3 --- /dev/null +++ b/kfadapter/main.py @@ -0,0 +1,533 @@ +#!/usr/bin/python3 +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +"""kfadapter_main.py. + +This is the main files and it exposes various rest endpoints for +HTTP GET - for getting experiments,runs,pipelines,experiment details, + pipeline description,run description(TBD) +HTTP POST - for execution of pipeline + + +""" + +import os +import traceback +import json +from threading import Thread +from flask import Flask, request, jsonify +from flask_api import status +import kfp_server_api +from kfadapter_kfconnect import KfConnect +import kfadapter_conf +from kfadapter_util import BadRequest, wait_status_thread, keys_match, check_map + +#Handles to Config and Kubeflow +KFCONNECT_CONFIG_OBJ = None +KFCONNECT_KF_OBJ = None +LOGGER = None + + +APP = Flask(__name__) + +@APP.errorhandler(BadRequest) +def handle_bad_request(error): + """ + Function handling BadRequest exception globally, + serialize into JSON, and respond with 400. + """ + print(error.payload) + payload = dict(error.payload or ()) + payload['status'] = error.status + payload['message'] = error.message + + return jsonify(payload), error.status + +@APP.route("/experiments/") +def get_experiment(expname): + """Function handling HTTP GET rest endpoint to get experiment details from kubeflow + + Args: + expname (str): Experiment name + + Returns: + json dict: denoting expid for expname + status: HTTP status 200 or 400 + + Exceptions: + error payload describing status, message and HTTP status code + + """ + exp_dict = {} + try: + exp = KFCONNECT_KF_OBJ.get_kf_experiment_details(expname,\ + KFCONNECT_CONFIG_OBJ.kf_dict['kfdefaultns']) + + if exp is None: + raise ValueError("Experiment name is not correct " +expname) + + LOGGER.debug("Experiment name is present") + + LOGGER.debug(exp) + exp_dict['name'] = exp.name + exp_dict['id'] = exp.id + except ValueError as err: + LOGGER.error(err) + raise BadRequest('Experiment name does not exist', status.HTTP_400_BAD_REQUEST,\ + {'payload': {'exp.name': expname}}) from None + except:# pylint: disable=bare-except + tbk = traceback.format_exc() + LOGGER.error(tbk) + raise BadRequest('Unsupported error from Kubeflow',\ + status.HTTP_500_INTERNAL_SERVER_ERROR, {'ext': 1}) from None + + return jsonify(exp_dict), status.HTTP_200_OK + +@APP.route("/pipelineIds/", methods=['GET', 'POST']) +def get_pipeline_id(pipe_name): + """Function handling HTTP GET rest endpoint to get pipeline id from kubeflow + + Args: + pipe_name (str): Pipeline name + + Returns: + json dict: denoting pipline_id for pipeline_name + status: HTTP status 200 or 400 + + Exceptions: + error payload describing status, message and HTTP status code + + """ + pipe_dict = {} + try: + if request.method == 'GET': + pipe_id = KFCONNECT_KF_OBJ.get_kf_pipeline_id(pipe_name) + if pipe_id is None: + raise ValueError("No pipeline is found with name "+ pipe_name) + + LOGGER.debug(pipe_id) + pipe_dict['name'] = pipe_name + pipe_dict['id'] = pipe_id + else: + uploaded_file = request.files['file'] + LOGGER.debug("Uploading received for %s", uploaded_file.filename) + if uploaded_file.filename != '': + uploaded_file_path = "/tmp/" + uploaded_file.filename + uploaded_file.save(uploaded_file_path) + LOGGER.debug("File uploaded :%s", uploaded_file_path) + description = request.form['description'] + pipe_info = KFCONNECT_KF_OBJ.upload_pipeline_with_versions( + pipe_name, + uploaded_file_path, + description) + LOGGER.debug("Pipeline uploaded :%s", pipe_name) + pipe_dict['name'] = pipe_name + pipe_dict['id'] = pipe_info.id + os.remove(uploaded_file_path) + else: + raise Exception("Error saving file from POST") + except ValueError as err: + LOGGER.error(err) + raise BadRequest('PipeLine Name does not exist', status.HTTP_400_BAD_REQUEST,\ + {'payload': {'pipe_name': pipe_name, 'error': str(err)}}) from None + except kfp_server_api.exceptions.ApiException as err: + LOGGER.error("Exception from KubeFlow") + LOGGER.error(err) + os.remove(uploaded_file_path) + raise BadRequest(check_map(json.loads(err.body), "error_details"),\ + status.HTTP_500_INTERNAL_SERVER_ERROR) from None + except: # pylint: disable=bare-except + tbk = traceback.format_exc() + LOGGER.error(tbk) + raise BadRequest('Unsupported error from Kubeflow',\ + status.HTTP_500_INTERNAL_SERVER_ERROR, {'ext': 1}) from None + + return jsonify(pipe_dict), status.HTTP_200_OK + +@APP.route("/pipelines//versions", methods=['GET']) +def get_versions_for_pipeline(pipeline_name): + """ + Function handling HTTP GET rest endpoint to get pipeline versions based on + pipeline name. + + Args: + pipeline_name (str): Pipeline name + + Returns: + json dict: + it contains versons list for given pipeline name + + status: HTTP status 200 + + Exceptions: + error payload describing status, message and HTTP status code + """ + result_dict = {} + try: + versions_list = KFCONNECT_KF_OBJ.get_pl_versions_by_pl_name( + pipeline_name) + except kfp_server_api.exceptions.ApiException as err: + LOGGER.error("Exception from KubeFlow") + LOGGER.error(err) + raise BadRequest(check_map(json.loads(err.body), "error_details"),\ + status.HTTP_500_INTERNAL_SERVER_ERROR) from None + except: # pylint: disable=bare-except + tbk = traceback.format_exc() + LOGGER.error(tbk) + raise BadRequest('Unsupported error from Kubeflow',\ + status.HTTP_500_INTERNAL_SERVER_ERROR, {'ext': 1}) from None + result_dict['versions_list'] = versions_list + return jsonify(result_dict), status.HTTP_200_OK + + +@APP.route("/pipelines/", methods=['GET', 'DELETE']) +def pipelinei(pipe_id): + """Function handling HTTP GET/DELETE rest endpoint to get/delete pipeline based on + pipeline id from kubeflow + + Args: + pipe_id (str): Pipeline id + + Returns: + json dict: + denoting pipline description for pipeline_id in HTTP GET METHOD + denoting pipeline and status for pipeline_id in HTTP DELETE METHOD + + status: HTTP status 200 or 400 + + Exceptions: + error payload describing status, message and HTTP status code + + """ + pipe_dict = {} + pipe_arg = {} + try: + if request.method == 'DELETE': + pipeline_info = KFCONNECT_KF_OBJ.delete_kf_pipeline(pipe_id) + pipeline_info = {} + if bool(pipeline_info) is False: + pipe_dict['id'] = pipe_id + pipe_dict['status'] = "Deleted" + else: + pipeline_info = KFCONNECT_KF_OBJ.get_kf_pipeline_desc(pipe_id) + LOGGER.debug(pipeline_info) + for parameter in pipeline_info.parameters: + pipe_arg[parameter.name] = parameter.value + pipe_dict['arguments'] = pipe_arg + pipe_dict['description'] = pipeline_info.description + pipe_dict['id'] = pipeline_info.id + pipe_dict['name'] = pipeline_info.name + except kfp_server_api.exceptions.ApiException as err: + LOGGER.error("Exception from KubeFlow") + LOGGER.error(err) + raise BadRequest('Unsupported error from Kubeflow',\ + status.HTTP_500_INTERNAL_SERVER_ERROR, {'payload': {'pipe_id': pipe_id}}) from None + + return jsonify(pipe_dict), status.HTTP_200_OK + + +@APP.route("/liveness") +def kf_liveness(): + """Function handling liveness probe + + Args:none + + Returns: + status: HTTP status 200 + + """ + return "Okay", status.HTTP_200_OK + +@APP.route("/experiments") +def list_experiments(): + """Function handling rest endpoint to get all experiments + from kubeflow + + Args:none + + Returns: + json dict: + denoting experiment id for each experiment name + + status: HTTP status 200 or 400 + + Exceptions: + error payload describing status, message and HTTP status code + + """ + + exp_dict = {} + try: + exp = KFCONNECT_KF_OBJ.get_kf_list_experiments(KFCONNECT_CONFIG_OBJ.kf_dict['kfdefaultns']) + for experiment in exp.experiments: + exp_dict[experiment.name] = experiment.id + except:# pylint: disable=bare-except + tbk = traceback.format_exc() + LOGGER.error(tbk) + raise BadRequest('Unsupported error from Kubeflow',\ + status.HTTP_500_INTERNAL_SERVER_ERROR, {'ext': 1}) from None + + return jsonify(exp_dict), status.HTTP_200_OK + +@APP.route("/pipelines") +def list_pipelines(): + """Function handling rest endpoint to get all pipelines + from kubeflow + + Args:none + + Returns: + json dict: + denoting pipeline description for each pipeline + + status: HTTP status 200 or 400 + + Exceptions: + error payload describing status, message and HTTP status code + + """ + pipe_dict = {} + try: + pipeline_list = KFCONNECT_KF_OBJ.get_kf_list_pipelines() + + for pipeline in pipeline_list.pipelines: + pipe_super_dict = {} + pipe_param_dict = {} + pipe_super_dict['id'] = pipeline.id + pipe_super_dict['description'] = pipeline.description + if pipeline.parameters is not None: + for parameter in pipeline.parameters: + pipe_param_dict[parameter.name] = parameter.value + + pipe_super_dict['parameters'] = pipe_param_dict + pipe_dict[pipeline.name] = pipe_super_dict + except:# pylint: disable=bare-except + tbk = traceback.format_exc() + LOGGER.error(tbk) + raise BadRequest('Unsupported error from Kubeflow',\ + status.HTTP_500_INTERNAL_SERVER_ERROR, {'ext': 1}) from None + + return jsonify(pipe_dict), status.HTTP_200_OK + +@APP.route('/trainingjobs//execution', methods=['POST']) +def run_pipeline(trainingjob_name): + """Function handling HTTP POST rest endpoint to execute pipeline based on trainingjob name + + Args: + trainingjob_name (str): Unique trainingjob_name + json_request_args(dict): + arguments(dict) - Arguments required for pipeline to run + pipeline_name(str) - name of Pipeline registered in KubeFlow + experiment_name(str) - Experiment under which the pipeline + run will happen + + + Returns: + json dict: denoting run for pipeline + status: HTTP status 200 or 400 + + Exceptions: + error payload describing status, message and HTTP status code + + """ + errcode = None + err_string = None + LOGGER.debug("run_pipeline for %s", trainingjob_name) + run_dict = {} + try: + errcode = status.HTTP_400_BAD_REQUEST + err_string = "Internal Error" + req = request.json + LOGGER.debug(req) + if("arguments" in req.keys() and "pipeline_name" in req.keys() and \ + "experiment_name" in req.keys()): + arguments = req["arguments"] + pipe_name = req["pipeline_name"] + exp_name = req["experiment_name"] + pipeline_version_name = req["pipeline_version"] + + errcode = status.HTTP_500_INTERNAL_SERVER_ERROR + err_string = "Unsupported error from Kubeflow" + exp = KFCONNECT_KF_OBJ.get_kf_experiment_details(exp_name,\ + KFCONNECT_CONFIG_OBJ.kf_dict['kfdefaultns']) + if exp is None: + raise ValueError("Experiment name is not correct " +exp_name) + + LOGGER.debug(exp) + pipe_id = KFCONNECT_KF_OBJ.get_kf_pipeline_id(pipe_name) + if pipe_id is None: + raise ValueError("Pipeline name is not correct " +pipe_name) + + LOGGER.debug("Pipeline ID = " + pipe_id) + + pipe_arg = {} + LOGGER.debug("Getting pipeline desc") + + pipeline_info = KFCONNECT_KF_OBJ.get_kf_pipeline_desc(pipe_id) + LOGGER.debug(pipeline_info) + for parameter in pipeline_info.default_version.parameters: + pipe_arg[parameter.name] = parameter.value + LOGGER.debug("Arguments provided " + str(arguments.keys())) + LOGGER.debug("Arguments in pipeline " + str(pipe_arg.keys())) + args_match = keys_match(arguments, pipe_arg) + if args_match is False: + LOGGER.error("arguments: "+str(arguments)) + LOGGER.error("pipe_arg: "+str(pipe_arg)) + raise ValueError("Arguments does not match with pipeline arguments") + + version_id = KFCONNECT_KF_OBJ.get_kf_pipeline_version_id(pipe_id, pipeline_version_name) + LOGGER.debug("Running pipeline") + run = KFCONNECT_KF_OBJ.run_kf_pipeline(exp.id, pipe_id, arguments, version_id) + LOGGER.debug("Run ID = %s", run.id) + run_dict['trainingjob_name'] = trainingjob_name + run_dict['run_id'] = run.id + run_dict['run_name'] = run.name + run_dict['experiment_name'] = run.resource_references[0].name + run_dict['experiment_id'] = run.resource_references[0].key.id + + if len(run.resource_references) > 1: + run_dict['pipeline_name'] = run.resource_references[1].name + run_dict['pipeline_id'] = run.resource_references[1].key.id + + if run.status is None: + run_dict['run_status'] = "scheduled" + with kfadapter_conf.LOCK: + kfadapter_conf.TRAINING_DICT[run.id] = trainingjob_name + else: + errcode = status.HTTP_400_BAD_REQUEST + err_string = 'Less arguments' + raise BadRequest('Less arguments', errcode, {'ext':1}) + except ValueError as err: + LOGGER.error(err) + payload = {'payload': request.json} + raise BadRequest(err_string, errcode, payload) from None + + except:# pylint: disable=bare-except + tbk = traceback.format_exc() + if err_string == 'Internal Error': + LOGGER.error(tbk) + payload = {'ext': 1} + else: + payload = {'payload': request.json} + + raise BadRequest(err_string, errcode, payload) from None + + return jsonify(run_dict), status.HTTP_200_OK + +@APP.route("/runs") +def list_runs(): + """Function handling rest endpoint to get all runs + from kubeflow + + Args:none + + Returns: + json dict: + denoting run description for each pipeline run + + status: HTTP status 200 or 400 + + Exceptions: + error payload describing status, message and HTTP status code + + """ + run_dict = {} + + try: + runs = KFCONNECT_KF_OBJ.get_kf_list_runs(KFCONNECT_CONFIG_OBJ.kf_dict['kfdefaultns']) + + for run in runs.runs: + run_super_dict = {} + run_super_dict['run_id'] = run.id + run_super_dict['run_description'] = run.description + run_super_dict['run_status'] = run.status + run_super_dict['experiment_name'] = run.resource_references[0].name + run_super_dict['experiment_id'] = run.resource_references[0].key.id + + if len(run.resource_references) > 1: + run_super_dict['pipeline_name'] = run.resource_references[1].name + run_super_dict['pipeline_id'] = run.resource_references[1].key.id + + run_dict[run.name] = run_super_dict + except:# pylint: disable=bare-except + tbk = traceback.format_exc() + LOGGER.error(tbk) + raise BadRequest('Unsupported error from Kubeflow', status.HTTP_400_BAD_REQUEST, + {'ext': 1}) from None + + + return jsonify(run_dict), status.HTTP_200_OK + +@APP.route("/runs/", methods=['GET', 'DELETE']) +def kf_run(run_id): + """Function handling HTTP GET/DELETE rest endpoint to get/delete run based on + run id from kubeflow + + Args: + run_id (str): Run id + + Returns: + json dict: + denoting run description for run_id in HTTP GET METHOD + denoting run and status for pipeline_id in HTTP DELETE METHOD + + status: HTTP status 200 or 400 + + Exceptions: + error payload describing status, message and HTTP status code + + """ + run_dict = {} + try: + if request.method == 'DELETE': + LOGGER.error("Method not supported yet") + raise BadRequest("Method not supported yet", status.HTTP_501_NOT_IMPLEMENTED,\ + {'ext': 1}) + + run_info = KFCONNECT_KF_OBJ.get_kf_run(run_id) + run_dict['run_id'] = run_info.run.id + run_dict['run_name'] = run_info.run.name + run_dict['run_status'] = run_info.run.status + LOGGER.debug(run_dict) + except Exception as err: + LOGGER.error("Exception from KubeFlow in run") + LOGGER.error(err) + raise BadRequest('Unsupported error from Kubeflow', status.HTTP_400_BAD_REQUEST,\ + {'payload': {'run_id': run_id}}) from None + + return jsonify(run_dict), status.HTTP_200_OK + +if __name__ == "__main__": + KFCONNECT_CONFIG_OBJ = kfadapter_conf.KfConfiguration.get_instance() + if KFCONNECT_CONFIG_OBJ.is_config_loaded_properly() is False: + print("Config not loaded properly") + else: + KF_HOST_URI = "http://"+KFCONNECT_CONFIG_OBJ.kf_dict['kfhostname']+\ + ":"+KFCONNECT_CONFIG_OBJ.kf_dict['kfport']+"/pipeline" + LOGGER = KFCONNECT_CONFIG_OBJ.logger + LOGGER.debug(KF_HOST_URI) + KFCONNECT_KF_OBJ = KfConnect() + try: + KFCONNECT_KF_OBJ.get_kf_client(KF_HOST_URI) + LOGGER.debug(KFCONNECT_CONFIG_OBJ.appport) + THR = Thread(target=wait_status_thread, args=(1, KFCONNECT_KF_OBJ)) + THR.start() + APP.run(host='0.0.0.0', port=KFCONNECT_CONFIG_OBJ.appport) + except Exception as some_err:# pylint: disable=broad-except + LOGGER.error(some_err) \ No newline at end of file -- 2.16.6