add main.py 65/9365/1
authorYouhwan Seol <yh.seol@samsung.com>
Mon, 24 Oct 2022 04:37:17 +0000 (13:37 +0900)
committerYouhwan Seol <yh.seol@samsung.com>
Mon, 24 Oct 2022 04:37:17 +0000 (13:37 +0900)
Issue-Id: AIMLFW-2

Signed-off-by: Youhwan Seol <yh.seol@samsung.com>
Change-Id: I97a7bc49aa022b6a436c3bc180b96a059f23cbc5

kfadapter/main.py [new file with mode: 0644]

diff --git a/kfadapter/main.py b/kfadapter/main.py
new file mode 100644 (file)
index 0000000..45be5b3
--- /dev/null
@@ -0,0 +1,533 @@
+#!/usr/bin/python3
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+"""kfadapter_main.py.
+
+This is the main files and it  exposes various rest endpoints for
+HTTP GET - for getting experiments,runs,pipelines,experiment details,
+           pipeline description,run description(TBD)
+HTTP POST - for execution of pipeline
+
+
+"""
+
+import os
+import traceback
+import json
+from threading import Thread
+from flask import Flask, request, jsonify
+from flask_api import status
+import kfp_server_api
+from kfadapter_kfconnect import KfConnect
+import kfadapter_conf
+from kfadapter_util import BadRequest, wait_status_thread, keys_match, check_map
+
+#Handles to Config and Kubeflow
+KFCONNECT_CONFIG_OBJ = None
+KFCONNECT_KF_OBJ = None
+LOGGER = None
+
+
+APP = Flask(__name__)
+
+@APP.errorhandler(BadRequest)
+def handle_bad_request(error):
+    """
+    Function handling BadRequest exception globally,
+    serialize into JSON, and respond with 400.
+    """
+    print(error.payload)
+    payload = dict(error.payload or ())
+    payload['status'] = error.status
+    payload['message'] = error.message
+
+    return jsonify(payload), error.status
+
+@APP.route("/experiments/<expname>")
+def get_experiment(expname):
+    """Function handling HTTP GET rest endpoint to get experiment details from kubeflow
+
+    Args:
+        expname (str): Experiment name
+
+    Returns:
+        json dict: denoting expid for expname
+        status: HTTP status 200 or 400
+
+    Exceptions:
+        error payload describing status, message and HTTP status code
+
+    """
+    exp_dict = {}
+    try:
+        exp = KFCONNECT_KF_OBJ.get_kf_experiment_details(expname,\
+                KFCONNECT_CONFIG_OBJ.kf_dict['kfdefaultns'])
+
+        if exp is None:
+            raise ValueError("Experiment name is not correct " +expname)
+
+        LOGGER.debug("Experiment name is present")
+
+        LOGGER.debug(exp)
+        exp_dict['name'] = exp.name
+        exp_dict['id'] = exp.id
+    except ValueError as err:
+        LOGGER.error(err)
+        raise BadRequest('Experiment name does not exist', status.HTTP_400_BAD_REQUEST,\
+                {'payload': {'exp.name': expname}}) from None
+    except:# pylint: disable=bare-except
+        tbk = traceback.format_exc()
+        LOGGER.error(tbk)
+        raise BadRequest('Unsupported error from Kubeflow',\
+                status.HTTP_500_INTERNAL_SERVER_ERROR, {'ext': 1}) from None
+
+    return jsonify(exp_dict), status.HTTP_200_OK
+
+@APP.route("/pipelineIds/<pipe_name>", methods=['GET', 'POST'])
+def get_pipeline_id(pipe_name):
+    """Function handling HTTP GET rest endpoint to get pipeline id from kubeflow
+
+    Args:
+        pipe_name (str): Pipeline name
+
+    Returns:
+        json dict: denoting pipline_id for pipeline_name
+        status: HTTP status 200 or 400
+
+    Exceptions:
+        error payload describing status, message and HTTP status code
+
+    """
+    pipe_dict = {}
+    try:
+        if request.method == 'GET':
+            pipe_id = KFCONNECT_KF_OBJ.get_kf_pipeline_id(pipe_name)
+            if pipe_id is None:
+                raise ValueError("No pipeline is found with name "+ pipe_name)
+
+            LOGGER.debug(pipe_id)
+            pipe_dict['name'] = pipe_name
+            pipe_dict['id'] = pipe_id
+        else:
+            uploaded_file = request.files['file']
+            LOGGER.debug("Uploading received for %s", uploaded_file.filename)
+            if uploaded_file.filename != '':
+                uploaded_file_path = "/tmp/" + uploaded_file.filename
+                uploaded_file.save(uploaded_file_path)
+                LOGGER.debug("File uploaded :%s", uploaded_file_path)
+                description = request.form['description']
+                pipe_info = KFCONNECT_KF_OBJ.upload_pipeline_with_versions(
+                    pipe_name,
+                    uploaded_file_path,
+                    description)
+                LOGGER.debug("Pipeline uploaded :%s", pipe_name)
+                pipe_dict['name'] = pipe_name
+                pipe_dict['id'] = pipe_info.id
+                os.remove(uploaded_file_path)
+            else:
+                raise Exception("Error saving file from POST")
+    except ValueError as err:
+        LOGGER.error(err)
+        raise BadRequest('PipeLine Name does not exist', status.HTTP_400_BAD_REQUEST,\
+                {'payload': {'pipe_name': pipe_name, 'error': str(err)}}) from None
+    except kfp_server_api.exceptions.ApiException as err:
+        LOGGER.error("Exception from KubeFlow")
+        LOGGER.error(err)
+        os.remove(uploaded_file_path)
+        raise BadRequest(check_map(json.loads(err.body), "error_details"),\
+                status.HTTP_500_INTERNAL_SERVER_ERROR) from None
+    except: # pylint: disable=bare-except
+        tbk = traceback.format_exc()
+        LOGGER.error(tbk)
+        raise BadRequest('Unsupported error from Kubeflow',\
+                status.HTTP_500_INTERNAL_SERVER_ERROR, {'ext': 1}) from None
+
+    return jsonify(pipe_dict), status.HTTP_200_OK
+
+@APP.route("/pipelines/<pipeline_name>/versions", methods=['GET'])
+def get_versions_for_pipeline(pipeline_name):
+    """
+    Function handling HTTP GET rest endpoint to get pipeline versions based on
+       pipeline name.
+
+    Args:
+        pipeline_name (str): Pipeline name
+
+    Returns:
+        json dict:
+            it contains versons list for given pipeline name
+
+        status: HTTP status 200
+
+    Exceptions:
+        error payload describing status, message and HTTP status code
+    """
+    result_dict = {}
+    try:
+        versions_list = KFCONNECT_KF_OBJ.get_pl_versions_by_pl_name(
+            pipeline_name)
+    except kfp_server_api.exceptions.ApiException as err:
+        LOGGER.error("Exception from KubeFlow")
+        LOGGER.error(err)
+        raise BadRequest(check_map(json.loads(err.body), "error_details"),\
+                status.HTTP_500_INTERNAL_SERVER_ERROR) from None
+    except: # pylint: disable=bare-except
+        tbk = traceback.format_exc()
+        LOGGER.error(tbk)
+        raise BadRequest('Unsupported error from Kubeflow',\
+                status.HTTP_500_INTERNAL_SERVER_ERROR, {'ext': 1}) from None
+    result_dict['versions_list'] = versions_list
+    return jsonify(result_dict), status.HTTP_200_OK
+
+
+@APP.route("/pipelines/<pipe_id>", methods=['GET', 'DELETE'])
+def pipelinei(pipe_id):
+    """Function handling HTTP GET/DELETE rest endpoint to get/delete pipeline based on
+       pipeline id from kubeflow
+
+    Args:
+        pipe_id (str): Pipeline id
+
+    Returns:
+        json dict:
+                   denoting pipline description for pipeline_id in HTTP GET METHOD
+                   denoting pipeline and status for pipeline_id in HTTP DELETE METHOD
+
+        status: HTTP status 200 or 400
+
+    Exceptions:
+        error payload describing status, message and HTTP status code
+
+    """
+    pipe_dict = {}
+    pipe_arg = {}
+    try:
+        if request.method == 'DELETE':
+            pipeline_info = KFCONNECT_KF_OBJ.delete_kf_pipeline(pipe_id)
+            pipeline_info = {}
+            if bool(pipeline_info) is False:
+                pipe_dict['id'] = pipe_id
+                pipe_dict['status'] = "Deleted"
+        else:
+            pipeline_info = KFCONNECT_KF_OBJ.get_kf_pipeline_desc(pipe_id)
+            LOGGER.debug(pipeline_info)
+            for parameter in pipeline_info.parameters:
+                pipe_arg[parameter.name] = parameter.value
+            pipe_dict['arguments'] = pipe_arg
+            pipe_dict['description'] = pipeline_info.description
+            pipe_dict['id'] = pipeline_info.id
+            pipe_dict['name'] = pipeline_info.name
+    except kfp_server_api.exceptions.ApiException as err:
+        LOGGER.error("Exception from KubeFlow")
+        LOGGER.error(err)
+        raise BadRequest('Unsupported error from Kubeflow',\
+                status.HTTP_500_INTERNAL_SERVER_ERROR, {'payload': {'pipe_id': pipe_id}}) from None
+
+    return jsonify(pipe_dict), status.HTTP_200_OK
+
+
+@APP.route("/liveness")
+def kf_liveness():
+    """Function handling liveness probe
+
+    Args:none
+
+    Returns:
+        status: HTTP status 200
+
+    """
+    return "Okay", status.HTTP_200_OK
+
+@APP.route("/experiments")
+def list_experiments():
+    """Function handling rest endpoint to get all experiments
+       from kubeflow
+
+    Args:none
+
+    Returns:
+        json dict:
+                   denoting experiment id  for each experiment name
+
+        status: HTTP status 200 or 400
+
+    Exceptions:
+        error payload describing status, message and HTTP status code
+
+    """
+
+    exp_dict = {}
+    try:
+        exp = KFCONNECT_KF_OBJ.get_kf_list_experiments(KFCONNECT_CONFIG_OBJ.kf_dict['kfdefaultns'])
+        for experiment in exp.experiments:
+            exp_dict[experiment.name] = experiment.id
+    except:# pylint: disable=bare-except
+        tbk = traceback.format_exc()
+        LOGGER.error(tbk)
+        raise BadRequest('Unsupported error from Kubeflow',\
+                status.HTTP_500_INTERNAL_SERVER_ERROR, {'ext': 1}) from None
+
+    return jsonify(exp_dict), status.HTTP_200_OK
+
+@APP.route("/pipelines")
+def list_pipelines():
+    """Function handling rest endpoint to get all pipelines
+       from kubeflow
+
+    Args:none
+
+    Returns:
+        json dict:
+                   denoting pipeline description for each pipeline
+
+        status: HTTP status 200 or 400
+
+    Exceptions:
+        error payload describing status, message and HTTP status code
+
+    """
+    pipe_dict = {}
+    try:
+        pipeline_list = KFCONNECT_KF_OBJ.get_kf_list_pipelines()
+
+        for pipeline in pipeline_list.pipelines:
+            pipe_super_dict = {}
+            pipe_param_dict = {}
+            pipe_super_dict['id'] = pipeline.id
+            pipe_super_dict['description'] = pipeline.description
+            if pipeline.parameters is not None:
+                for parameter in pipeline.parameters:
+                    pipe_param_dict[parameter.name] = parameter.value
+
+            pipe_super_dict['parameters'] = pipe_param_dict
+            pipe_dict[pipeline.name] = pipe_super_dict
+    except:# pylint: disable=bare-except
+        tbk = traceback.format_exc()
+        LOGGER.error(tbk)
+        raise BadRequest('Unsupported error from Kubeflow',\
+                status.HTTP_500_INTERNAL_SERVER_ERROR, {'ext': 1}) from None
+
+    return jsonify(pipe_dict), status.HTTP_200_OK
+
+@APP.route('/trainingjobs/<trainingjob_name>/execution', methods=['POST'])
+def run_pipeline(trainingjob_name):
+    """Function handling HTTP POST rest endpoint to execute pipeline based on trainingjob name
+
+    Args:
+        trainingjob_name (str): Unique trainingjob_name
+        json_request_args(dict):
+                            arguments(dict) - Arguments required for pipeline to run
+                            pipeline_name(str) - name of Pipeline registered in KubeFlow
+                            experiment_name(str) - Experiment under which the pipeline
+                                                   run will happen
+
+
+    Returns:
+        json dict: denoting run for pipeline
+        status: HTTP status 200 or 400
+
+    Exceptions:
+        error payload describing status, message and HTTP status code
+
+    """
+    errcode = None
+    err_string = None
+    LOGGER.debug("run_pipeline for %s", trainingjob_name)
+    run_dict = {}
+    try:
+        errcode = status.HTTP_400_BAD_REQUEST
+        err_string = "Internal Error"
+        req = request.json
+        LOGGER.debug(req)
+        if("arguments" in req.keys() and "pipeline_name" in req.keys() and \
+                "experiment_name" in req.keys()):
+            arguments = req["arguments"]
+            pipe_name = req["pipeline_name"]
+            exp_name = req["experiment_name"]
+            pipeline_version_name = req["pipeline_version"]
+
+            errcode = status.HTTP_500_INTERNAL_SERVER_ERROR
+            err_string = "Unsupported error from Kubeflow"
+            exp = KFCONNECT_KF_OBJ.get_kf_experiment_details(exp_name,\
+                    KFCONNECT_CONFIG_OBJ.kf_dict['kfdefaultns'])
+            if exp is None:
+                raise ValueError("Experiment name is not correct " +exp_name)
+
+            LOGGER.debug(exp)
+            pipe_id = KFCONNECT_KF_OBJ.get_kf_pipeline_id(pipe_name)
+            if pipe_id is None:
+                raise ValueError("Pipeline name is not correct " +pipe_name)
+
+            LOGGER.debug("Pipeline ID = " + pipe_id)
+
+            pipe_arg = {}
+            LOGGER.debug("Getting pipeline desc")
+
+            pipeline_info = KFCONNECT_KF_OBJ.get_kf_pipeline_desc(pipe_id)
+            LOGGER.debug(pipeline_info)
+            for parameter in pipeline_info.default_version.parameters:
+                pipe_arg[parameter.name] = parameter.value
+            LOGGER.debug("Arguments provided " + str(arguments.keys()))
+            LOGGER.debug("Arguments in pipeline " + str(pipe_arg.keys()))
+            args_match = keys_match(arguments, pipe_arg)
+            if args_match is False:
+                LOGGER.error("arguments: "+str(arguments))
+                LOGGER.error("pipe_arg: "+str(pipe_arg))
+                raise ValueError("Arguments does not match with pipeline arguments")
+
+            version_id = KFCONNECT_KF_OBJ.get_kf_pipeline_version_id(pipe_id, pipeline_version_name)
+            LOGGER.debug("Running pipeline")
+            run = KFCONNECT_KF_OBJ.run_kf_pipeline(exp.id, pipe_id, arguments, version_id)
+            LOGGER.debug("Run ID = %s", run.id)
+            run_dict['trainingjob_name'] = trainingjob_name
+            run_dict['run_id'] = run.id
+            run_dict['run_name'] = run.name
+            run_dict['experiment_name'] = run.resource_references[0].name
+            run_dict['experiment_id'] = run.resource_references[0].key.id
+
+            if len(run.resource_references) > 1:
+                run_dict['pipeline_name'] = run.resource_references[1].name
+                run_dict['pipeline_id'] = run.resource_references[1].key.id
+
+            if run.status is None:
+                run_dict['run_status'] = "scheduled"
+                with kfadapter_conf.LOCK:
+                    kfadapter_conf.TRAINING_DICT[run.id] = trainingjob_name
+        else:
+            errcode = status.HTTP_400_BAD_REQUEST
+            err_string = 'Less arguments'
+            raise BadRequest('Less arguments', errcode, {'ext':1})
+    except ValueError as err:
+        LOGGER.error(err)
+        payload = {'payload': request.json}
+        raise BadRequest(err_string, errcode, payload) from None
+
+    except:# pylint: disable=bare-except
+        tbk = traceback.format_exc()
+        if err_string == 'Internal Error':
+            LOGGER.error(tbk)
+            payload = {'ext': 1}
+        else:
+            payload = {'payload': request.json}
+
+        raise BadRequest(err_string, errcode, payload) from None
+
+    return jsonify(run_dict), status.HTTP_200_OK
+
+@APP.route("/runs")
+def list_runs():
+    """Function handling rest endpoint to get all runs
+       from kubeflow
+
+    Args:none
+
+    Returns:
+        json dict:
+                   denoting run description for each pipeline run
+
+        status: HTTP status 200 or 400
+
+    Exceptions:
+        error payload describing status, message and HTTP status code
+
+    """
+    run_dict = {}
+
+    try:
+        runs = KFCONNECT_KF_OBJ.get_kf_list_runs(KFCONNECT_CONFIG_OBJ.kf_dict['kfdefaultns'])
+
+        for run in runs.runs:
+            run_super_dict = {}
+            run_super_dict['run_id'] = run.id
+            run_super_dict['run_description'] = run.description
+            run_super_dict['run_status'] = run.status
+            run_super_dict['experiment_name'] = run.resource_references[0].name
+            run_super_dict['experiment_id'] = run.resource_references[0].key.id
+
+            if len(run.resource_references) > 1:
+                run_super_dict['pipeline_name'] = run.resource_references[1].name
+                run_super_dict['pipeline_id'] = run.resource_references[1].key.id
+
+            run_dict[run.name] = run_super_dict
+    except:# pylint: disable=bare-except
+        tbk = traceback.format_exc()
+        LOGGER.error(tbk)
+        raise BadRequest('Unsupported error from Kubeflow', status.HTTP_400_BAD_REQUEST,
+                         {'ext': 1}) from None
+
+
+    return jsonify(run_dict), status.HTTP_200_OK
+
+@APP.route("/runs/<run_id>", methods=['GET', 'DELETE'])
+def kf_run(run_id):
+    """Function handling HTTP GET/DELETE rest endpoint to get/delete run based on
+       run id from kubeflow
+
+    Args:
+        run_id (str): Run id
+
+    Returns:
+        json dict:
+                   denoting run description for run_id in HTTP GET METHOD
+                   denoting run and status for pipeline_id in HTTP DELETE METHOD
+
+        status: HTTP status 200 or 400
+
+    Exceptions:
+        error payload describing status, message and HTTP status code
+
+    """
+    run_dict = {}
+    try:
+        if request.method == 'DELETE':
+            LOGGER.error("Method not supported yet")
+            raise BadRequest("Method not supported yet", status.HTTP_501_NOT_IMPLEMENTED,\
+                   {'ext': 1})
+
+        run_info = KFCONNECT_KF_OBJ.get_kf_run(run_id)
+        run_dict['run_id'] = run_info.run.id
+        run_dict['run_name'] = run_info.run.name
+        run_dict['run_status'] = run_info.run.status
+        LOGGER.debug(run_dict)
+    except Exception as err:
+        LOGGER.error("Exception from KubeFlow in run")
+        LOGGER.error(err)
+        raise BadRequest('Unsupported error from Kubeflow', status.HTTP_400_BAD_REQUEST,\
+                {'payload': {'run_id': run_id}}) from None
+
+    return jsonify(run_dict), status.HTTP_200_OK
+
+if __name__ == "__main__":
+    KFCONNECT_CONFIG_OBJ = kfadapter_conf.KfConfiguration.get_instance()
+    if KFCONNECT_CONFIG_OBJ.is_config_loaded_properly() is False:
+        print("Config not loaded properly")
+    else:
+        KF_HOST_URI = "http://"+KFCONNECT_CONFIG_OBJ.kf_dict['kfhostname']+\
+                    ":"+KFCONNECT_CONFIG_OBJ.kf_dict['kfport']+"/pipeline"
+        LOGGER = KFCONNECT_CONFIG_OBJ.logger
+        LOGGER.debug(KF_HOST_URI)
+        KFCONNECT_KF_OBJ = KfConnect()
+        try:
+            KFCONNECT_KF_OBJ.get_kf_client(KF_HOST_URI)
+            LOGGER.debug(KFCONNECT_CONFIG_OBJ.appport)
+            THR = Thread(target=wait_status_thread, args=(1, KFCONNECT_KF_OBJ))
+            THR.start()
+            APP.run(host='0.0.0.0', port=KFCONNECT_CONFIG_OBJ.appport)
+        except Exception as some_err:# pylint: disable=broad-except
+            LOGGER.error(some_err)
\ No newline at end of file