Adding kfconnect 64/9364/1
authorYouhwan Seol <yh.seol@samsung.com>
Mon, 24 Oct 2022 04:25:26 +0000 (13:25 +0900)
committerYouhwan Seol <yh.seol@samsung.com>
Mon, 24 Oct 2022 04:25:26 +0000 (13:25 +0900)
Issue-Id: AIMLFW-2

Signed-off-by: Youhwan Seol <yh.seol@samsung.com>
Change-Id: I869d91eed2fbcc1129d5b83f7fdbfe9e3aa16bdc

kfadapter/kfadapter_kfconnect.py [new file with mode: 0644]

diff --git a/kfadapter/kfadapter_kfconnect.py b/kfadapter/kfadapter_kfconnect.py
new file mode 100644 (file)
index 0000000..090ec9f
--- /dev/null
@@ -0,0 +1,269 @@
+# ==================================================================================
+#
+#       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+"""kfadapter_kfconnect.py.
+
+This module is for interfacing and interworking with KubeFlow SDK
+
+"""
+import kfp
+import kfadapter_util
+from kfadapter_conf import KfConfiguration
+
+class KfConnect:
+    """
+    This is a class for interfacing and interworking with KubeFlow SDK.
+
+    Attributes: None
+    """
+    def __init__(self):
+        """
+        The constructor for KfConnect class.
+
+        Parameters:None
+         """
+        self.kfp_client = None
+        kfc_config = KfConfiguration.get_instance()
+        self.logger = kfc_config.logger
+        self.logger.debug("Initialized KfConnect")
+
+
+    def get_kf_client(self, host=None):
+        """
+        Function for getting kubeflow connection handle for use
+
+        Args:
+            host: hostname or host ip where kubeflow is running
+
+        Returns: None
+
+        """
+        self.kfp_client = kfp.Client(host)
+
+    def get_kf_list_experiments(self, nspace):
+        """
+        Function for getting list of experiments based on namespace
+
+        Args:
+            nspace: namespace from which experiments can be listed
+
+        Returns: experiment list
+
+        """
+        exp = self.kfp_client.list_experiments(page_size=10)
+        return exp
+
+    def get_kf_experiment_details(self, ex_name, nspace):
+        """
+        Function for getting experiment description based on experiment name
+
+        Args:
+            ex_name: experiment name
+            nspace: namespace from which experiments can be listed
+
+        Returns:
+                experiment details
+
+        """
+        self.logger.debug("Get Experiment details " + ex_name)
+        try:
+            exp = self.kfp_client.get_experiment(experiment_name=ex_name)
+        except ValueError as err:
+            self.logger.error(err)
+            return None
+
+        return exp
+
+    def get_kf_run(self, run_id):
+        """
+        Function for getting status of run based on run_id
+
+        Args:
+            run_id: run_id for a previous run
+
+        Returns: run status
+
+        """
+        run = self.kfp_client.get_run(run_id)
+        return run
+
+    def get_kf_list_runs(self, nspace):
+        """
+        Function for getting list of runs based on namespace
+
+        Args:
+            nspace: namespace from which runs can be listed
+
+        Returns: runs list
+
+        """
+        runs = self.kfp_client.list_runs(page_size=20)
+        return runs
+
+    def get_kf_list_pipelines(self):
+        """
+        Function for getting list of pipelines in kubeflow
+
+        Args: None
+
+        Returns: list of pipeline and its description
+
+        """
+        pipeline = self.kfp_client.list_pipelines(page_size=20)
+        return pipeline
+
+    def get_kf_pipeline_id(self, pipeline_name):
+        """
+        Function for getting pipeline id for a pipeline name in kubeflow
+
+        Args:
+            pipeline_name: name of pipeline
+
+        Returns:pipeline id in a string
+
+        """
+        pipe_id = self.kfp_client.get_pipeline_id(pipeline_name)
+        return pipe_id
+
+    def get_kf_pipeline_version_id(self, pipeline_id, pipeline_version_name):
+        """
+        Function for getting pipeline's version id for a pipeline's version name
+        for given pipeline id in kubeflow
+
+        Args:
+            pipeline_id: id of pipeline
+            pipeline_version_name: name of pipeline's version
+
+        Returns:pipeline's version id in a string
+
+        """
+        version_id = None
+
+        obj_list = self.kfp_client.list_pipeline_versions(pipeline_id,
+                                                          page_size=1000000000).versions
+        for pipeline_version_obj in obj_list:
+            if pipeline_version_obj.name == pipeline_version_name:
+                version_id = pipeline_version_obj.id
+
+        return version_id
+
+    def upload_kf_pipeline(self, pipeline_name, file, desc):
+        """
+        Function for uploading pipeline in kubeflow
+
+        Args:
+            pipeline_name: name of pipeline
+            file: zip file containing pipeline code
+
+        Returns:pipeline id in a string
+
+        """
+        pipe_info = self.kfp_client.upload_pipeline(file, pipeline_name, desc)
+        return pipe_info
+
+    def upload_pipeline_with_versions(self, pipeline_name, file, desc):
+        """
+        Function for uploading pipeline with version functionality
+
+        Args:
+            pipeline_name: name of pipeline
+            file: zip file containing pipeline code
+            desc: description of pipeline
+
+        Returns:object containing pipleine id and other information
+
+        """
+        versions_list = self.get_pl_versions_by_pl_name(pipeline_name)
+        length = len(versions_list)
+        pipe_info = None
+        if length == 0:
+            pipe_info = self.kfp_client.upload_pipeline(file, pipeline_name=pipeline_name,
+                                                        description=desc)
+        else:
+            pipe_info = self.kfp_client.upload_pipeline_version(file,
+                                                                pipeline_version_name=str(length+1),
+                                                                pipeline_name=pipeline_name)
+        return pipe_info
+
+    def get_pl_versions_by_pl_name(self, pipeline_name):
+        """
+        Function for getting versions list for given pipeline name
+
+        Args:
+            pipeline_name: name of pipeline
+
+        Returns:list containing versions name
+
+        """
+        pipeline_id = self.get_kf_pipeline_id(pipeline_name)
+        res_obj = self.kfp_client.list_pipeline_versions(pipeline_id,
+                                                         page_size=1000000000)
+        if res_obj.total_size is None:
+            return []
+        obj_list = res_obj.versions
+        versions_list = []
+        for obj in obj_list:
+            versions_list.append(obj.name)
+        return versions_list
+
+    def get_kf_pipeline_desc(self, pipeline_id):
+        """
+        Function for getting pipeline description for a pipeline id in kubeflow
+
+        Args:
+            pipeline_id: id of pipeline
+
+        Returns:pipeline description
+
+        """
+        pipeline = self.kfp_client.get_pipeline(pipeline_id)
+        return pipeline
+
+    def delete_kf_pipeline(self, pipeline_id):
+        """
+        Function for deleting pipeline for a pipeline id in kubeflow
+
+        Args:
+            pipeline_id: id of pipeline
+
+        Returns:pipeline description
+
+        """
+        pipeline = self.kfp_client.delete_pipeline(pipeline_id)
+        return pipeline
+
+    def run_kf_pipeline(self, exp_id, pipeline_id, arguments, version_id):
+        """
+        Function for running pipeline with arguments under an experiment in kubeflow
+
+        Args:
+            exp_id: experiment under which pipeline run will occur
+            pipeline_id: id of pipeline
+            arguments: arguments to the pipeline
+            version id: version of the pipeline
+        Returns:run description
+
+        """
+        self.logger.debug("run_kf_pipeline Entered")
+        run = self.kfp_client.run_pipeline(exp_id, job_name="testjob_"+\
+                                           kfadapter_util.random_suffix(),
+                                           pipeline_package_path=None, params=arguments,
+                                           pipeline_id=pipeline_id,
+                                           version_id=version_id)
+        self.logger.debug("run_kf_pipeline Exited")
+        return run
\ No newline at end of file