From: Youhwan Seol Date: Mon, 24 Oct 2022 04:25:26 +0000 (+0900) Subject: Adding kfconnect X-Git-Tag: 1.0.0~16^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F64%2F9364%2F1;p=aiml-fw%2Fathp%2Ftps%2Fkubeflow-adapter.git Adding kfconnect Issue-Id: AIMLFW-2 Signed-off-by: Youhwan Seol Change-Id: I869d91eed2fbcc1129d5b83f7fdbfe9e3aa16bdc --- diff --git a/kfadapter/kfadapter_kfconnect.py b/kfadapter/kfadapter_kfconnect.py new file mode 100644 index 0000000..090ec9f --- /dev/null +++ b/kfadapter/kfadapter_kfconnect.py @@ -0,0 +1,269 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +"""kfadapter_kfconnect.py. + +This module is for interfacing and interworking with KubeFlow SDK + +""" +import kfp +import kfadapter_util +from kfadapter_conf import KfConfiguration + +class KfConnect: + """ + This is a class for interfacing and interworking with KubeFlow SDK. + + Attributes: None + """ + def __init__(self): + """ + The constructor for KfConnect class. + + Parameters:None + """ + self.kfp_client = None + kfc_config = KfConfiguration.get_instance() + self.logger = kfc_config.logger + self.logger.debug("Initialized KfConnect") + + + def get_kf_client(self, host=None): + """ + Function for getting kubeflow connection handle for use + + Args: + host: hostname or host ip where kubeflow is running + + Returns: None + + """ + self.kfp_client = kfp.Client(host) + + def get_kf_list_experiments(self, nspace): + """ + Function for getting list of experiments based on namespace + + Args: + nspace: namespace from which experiments can be listed + + Returns: experiment list + + """ + exp = self.kfp_client.list_experiments(page_size=10) + return exp + + def get_kf_experiment_details(self, ex_name, nspace): + """ + Function for getting experiment description based on experiment name + + Args: + ex_name: experiment name + nspace: namespace from which experiments can be listed + + Returns: + experiment details + + """ + self.logger.debug("Get Experiment details " + ex_name) + try: + exp = self.kfp_client.get_experiment(experiment_name=ex_name) + except ValueError as err: + self.logger.error(err) + return None + + return exp + + def get_kf_run(self, run_id): + """ + Function for getting status of run based on run_id + + Args: + run_id: run_id for a previous run + + Returns: run status + + """ + run = self.kfp_client.get_run(run_id) + return run + + def get_kf_list_runs(self, nspace): + """ + Function for getting list of runs based on namespace + + Args: + nspace: namespace from which runs can be listed + + Returns: runs list + + """ + runs = self.kfp_client.list_runs(page_size=20) + return runs + + def get_kf_list_pipelines(self): + """ + Function for getting list of pipelines in kubeflow + + Args: None + + Returns: list of pipeline and its description + + """ + pipeline = self.kfp_client.list_pipelines(page_size=20) + return pipeline + + def get_kf_pipeline_id(self, pipeline_name): + """ + Function for getting pipeline id for a pipeline name in kubeflow + + Args: + pipeline_name: name of pipeline + + Returns:pipeline id in a string + + """ + pipe_id = self.kfp_client.get_pipeline_id(pipeline_name) + return pipe_id + + def get_kf_pipeline_version_id(self, pipeline_id, pipeline_version_name): + """ + Function for getting pipeline's version id for a pipeline's version name + for given pipeline id in kubeflow + + Args: + pipeline_id: id of pipeline + pipeline_version_name: name of pipeline's version + + Returns:pipeline's version id in a string + + """ + version_id = None + + obj_list = self.kfp_client.list_pipeline_versions(pipeline_id, + page_size=1000000000).versions + for pipeline_version_obj in obj_list: + if pipeline_version_obj.name == pipeline_version_name: + version_id = pipeline_version_obj.id + + return version_id + + def upload_kf_pipeline(self, pipeline_name, file, desc): + """ + Function for uploading pipeline in kubeflow + + Args: + pipeline_name: name of pipeline + file: zip file containing pipeline code + + Returns:pipeline id in a string + + """ + pipe_info = self.kfp_client.upload_pipeline(file, pipeline_name, desc) + return pipe_info + + def upload_pipeline_with_versions(self, pipeline_name, file, desc): + """ + Function for uploading pipeline with version functionality + + Args: + pipeline_name: name of pipeline + file: zip file containing pipeline code + desc: description of pipeline + + Returns:object containing pipleine id and other information + + """ + versions_list = self.get_pl_versions_by_pl_name(pipeline_name) + length = len(versions_list) + pipe_info = None + if length == 0: + pipe_info = self.kfp_client.upload_pipeline(file, pipeline_name=pipeline_name, + description=desc) + else: + pipe_info = self.kfp_client.upload_pipeline_version(file, + pipeline_version_name=str(length+1), + pipeline_name=pipeline_name) + return pipe_info + + def get_pl_versions_by_pl_name(self, pipeline_name): + """ + Function for getting versions list for given pipeline name + + Args: + pipeline_name: name of pipeline + + Returns:list containing versions name + + """ + pipeline_id = self.get_kf_pipeline_id(pipeline_name) + res_obj = self.kfp_client.list_pipeline_versions(pipeline_id, + page_size=1000000000) + if res_obj.total_size is None: + return [] + obj_list = res_obj.versions + versions_list = [] + for obj in obj_list: + versions_list.append(obj.name) + return versions_list + + def get_kf_pipeline_desc(self, pipeline_id): + """ + Function for getting pipeline description for a pipeline id in kubeflow + + Args: + pipeline_id: id of pipeline + + Returns:pipeline description + + """ + pipeline = self.kfp_client.get_pipeline(pipeline_id) + return pipeline + + def delete_kf_pipeline(self, pipeline_id): + """ + Function for deleting pipeline for a pipeline id in kubeflow + + Args: + pipeline_id: id of pipeline + + Returns:pipeline description + + """ + pipeline = self.kfp_client.delete_pipeline(pipeline_id) + return pipeline + + def run_kf_pipeline(self, exp_id, pipeline_id, arguments, version_id): + """ + Function for running pipeline with arguments under an experiment in kubeflow + + Args: + exp_id: experiment under which pipeline run will occur + pipeline_id: id of pipeline + arguments: arguments to the pipeline + version id: version of the pipeline + Returns:run description + + """ + self.logger.debug("run_kf_pipeline Entered") + run = self.kfp_client.run_pipeline(exp_id, job_name="testjob_"+\ + kfadapter_util.random_suffix(), + pipeline_package_path=None, params=arguments, + pipeline_id=pipeline_id, + version_id=version_id) + self.logger.debug("run_kf_pipeline Exited") + return run \ No newline at end of file