+++ /dev/null
-# ==================================================================================
-#
-# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ==================================================================================
-
-import kfp_server_api
-from kfp_server_api.models.api_run import ApiRun
-from kfp_server_api.models.api_list_runs_response import ApiListRunsResponse
-from kfp_server_api.models.api_experiment import ApiExperiment
-from kfp_server_api.models.api_list_pipelines_response import ApiListPipelinesResponse
-from kfp_server_api.models.api_pipeline import ApiPipeline
-from kfp_server_api.models.api_parameter import ApiParameter
-from kfp_server_api.models.api_resource_reference import ApiResourceReference
-from kfp_server_api.models.api_resource_key import ApiResourceKey
-from kfp_server_api.models.api_pipeline_version import ApiPipelineVersion
-
-class FakeKfConnect:
-
- def __init__(self):
- print("Initialized Fake KfConnect")
-
- def get_kf_list_experiments(self, nspace):
- explist = kfp_server_api.ApiListExperimentsResponse()
-
- exp = kfp_server_api.ApiExperiment()
- exp.name = "name"
- exp.id = "id"
- explist.experiments = [exp]
- return explist
-
- def get_pl_versions_by_pl_name(self, pipeline_name):
- version_list = []
- version_list.append("2.0.0")
- return version_list
- """
- def run_kf_pipeline(self,exp_id,arguments,
- experiment_id: str,
- job_name: str,
- pipeline_package_path: Optional[str] = None,
- params: Optional[dict] = None,
- pipeline_id: Optional[str] = None,
- version_id: Optional[str] = None,
- pipeline_root: Optional[str] = None,
- enable_caching: Optional[str] = None,
- service_account: Optional[str] = None,):
-
- run = ApiRun()
- run.id = "run_id"
- run.name = "run_name"
- rr0 = ApiResourceReference()
- rr0.name = "rr0"
- key0 = ApiResourceKey()
- key0.id = "id0"
- rr0.key = key0
- rr1 = ApiResourceReference()
- rr1.name = "rr0"
- key1 = ApiResourceKey()
- key1.id = "id1"
- rr1.key = key1
- run.status = "Running"
- run.resource_references = [rr0, rr1]
- return run
- """
-
- def delete_kf_pipeline(self, pipeline_id):
- return True
-
- def get_kf_pipeline_version_id( self,pipeline_version_name,
- pipeline_id: str,
- page_token: str = '',
- page_size: int = 10,
- sort_by: str = ''
- ):
- return "pipeline_id"
-
- def get_kf_list_runs(self,
- page_token='',
- page_size=10,
- sort_by='',
- experiment_id=None,
- namespace=None):
- listrun = ApiListRunsResponse()
- run1 = ApiRun()
- run1.id = "id"
- run1.description = "description"
- run1.status = "status"
-
- rr0 = ApiResourceReference()
- rr0.name = "rr0"
- key0 = ApiResourceKey()
- key0.id = "id"
- rr0.key = key0
-
- rr1 = ApiResourceReference()
- rr1.name = "rr1"
- key1 = ApiResourceKey()
- key1.id = "id"
- rr1.key = key1
-
- run1.resource_references = [rr0, rr1]
-
- run2 = ApiRun()
- run2.id = "id"
- run2.description = "description"
- run2.status = "status"
-
- rr2 = ApiResourceReference()
- rr2.name = "rr2"
- key2 = ApiResourceKey()
- key2.id = "id"
- rr2.key = key2
-
- rr3 = ApiResourceReference()
- rr3.name = "rr1"
- key3 = ApiResourceKey()
- key3.id = "id"
- rr3.key = key3
-
- run2.resource_references = [rr2, rr3]
-
-
- listrun.runs = [run1, run2]
-
- return listrun
-
- def get_kf_pipeline_desc(self, pipeline_id: str):
- pipeline_info = ApiPipeline()
-
- param1 = ApiParameter()
- param1.name = "param1"
- param1.value = "value1"
- param2 = ApiParameter()
- param2.name = "param2"
- param2.value = "value2"
- pipeline_info.parameters = [param1, param2]
- pipeline_info.description = 'description'
- pipeline_info.id = "id"
- pipeline_info.name = "name"
-
- param3 = ApiParameter()
- param3.name = "param3"
- param3.value = "value3"
- param4 = ApiParameter()
- param4.name = "param4"
- param4.value = "value4"
-
- default_version = ApiPipelineVersion()
- default_version.parameters = [param3, param4]
-
- pipeline_info.default_version = default_version
- return pipeline_info
-
- def get_kf_run(self, run_id: str):
- run = ApiRun()
- run.name = "run_name"
- run.status = "Running"
- run.id = "run_id"
- return run
-
- def get_kf_experiment_details(self, ex_name, nspace):
- experiment = ApiExperiment()
- experiment.name = "exp_name"
- experiment.id = "exp_id"
- return experiment
-
- def get_kf_pipeline_id(self, pipeline_name):
- return "pipeline_id"
- """
- def upload_pipeline_with_versions(self, pipeline_name, file, desc):
- pipeline_info = kfp_server_api.ApiPipelineVersion()
- pipeline_info.id("pipeline_id")
- return pipeline_info
- """
-
- def get_kf_list_pipelines(self):
- pipeline_list = ApiListPipelinesResponse()
- pipeline = ApiPipeline()
- pipeline.id = "pipeline_id"
- pipeline.description = "pipeline_description"
- parameter = ApiParameter()
- parameter.name = "param1"
- parameter.value = "value1"
- pipeline.parameters = [parameter]
- pipeline_list.pipelines = [pipeline]
- return pipeline_list
-
-
-class NegativeFakeKfConnect:
- def __init__(self):
- print("Initialized Negative Fake KfConnect")
-
- def get_kf_pipeline_id(self, pipeline_name):
- return None
-
- def get_kf_experiment_details(self, ex_name, nspace):
- return None
-
- def get_pl_versions_by_pl_name(self, pipeline_name):
- raise kfp_server_api.exceptions.ApiException
-
- def get_kf_run(self, run_id):
- """
- run = ApiRun()
- run.id = run_id
- run.name = 'run_name'
- run.status = 'Running'
- return run
- """
- raise Exception('erro')
-
- def get_kf_list_runs(self, nspace):
-
- run = ApiRun()
- run.name = "run_name"
- run.status = "Running"
- run.id = "run_id"
- run.description = "descrption"
- rr0 = ApiResourceReference()
- rr0.name = "rr0"
- key0 = ApiResourceKey()
- key0.id = "key0id"
- run.resource_references = [rr0]
-
- runs = ApiListRunsResponse()
- runs.runs = [run]
-
- return runs
-
- def get_kf_list_pipelines(self,
- page_token='',
- page_size=10,
- sort_by=''):
- raise Exception('error')
-
-
- def get_kf_pipeline_desc(self, pipeline_id):
- raise kfp_server_api.exceptions.ApiException
-
-class NegativeFakeAdditionalKfConnect:
-
- def get_kf_experiment_details(self, ex_name, nspace):
- raise Exception('error')
-
- def get_kf_pipeline_id(self, pipeline_name):
- raise kfp_server_api.exceptions.ApiException('error')
- """
- def get_pl_versions_by_pl_namee(self, pipeline_name):
- raise Exception('error')
- """
- """
- def get_kf_pipeline_version_id(self, pipeline_id, pipeline_version_name):
- raise Exception('error')
- """
- def get_kf_list_experiments(self, nspace):
- raise Exception('error')
-
- def get_kf_pipeline_desc(self, pipeline_id: str):
- raise kfp_server_api.exceptions.ApiException('error')
-
-class NegativeFakeNoneKfConnect:
-
- def get_kf_experiment_details(self, ex_name, nspace):
- experiment = ApiExperiment()
- experiment.name = "exp_name"
- experiment.id = "exp_id"
- return experiment
-
- def get_kf_pipeline_id(self, pipeline_name):
- return None
-
-
-
-
# limitations under the License.
#
# ==================================================================================
-import json
-
+import json
+import io
+from unittest import TestCase
+from mock import patch
from flask_api import status
+import kfp_server_api
+from kfp_server_api.models.api_run import ApiRun
+from kfp_server_api.models.api_list_runs_response import ApiListRunsResponse
+from kfp_server_api.models.api_experiment import ApiExperiment
+from kfp_server_api.models.api_list_pipelines_response import ApiListPipelinesResponse
+from kfp_server_api.models.api_pipeline import ApiPipeline
+from kfp_server_api.models.api_parameter import ApiParameter
+from kfp_server_api.models.api_resource_reference import ApiResourceReference
+from kfp_server_api.models.api_resource_key import ApiResourceKey
+from kfp_server_api.models.api_pipeline_version import ApiPipelineVersion
from kfadapter import kfadapter_main
-from kfadapter.tmgr_logger import TMLogger
-from .fake_kfconnect import FakeKfConnect, NegativeFakeAdditionalKfConnect, NegativeFakeKfConnect, NegativeFakeNoneKfConnect
-from .fake_kfconf import FakeKfConf
+from kfadapter import kfadapter_conf
+from kfadapter import kfadapter_kfconnect
+
+class testKfadapterApi(TestCase):
+ @classmethod
+ def setUpClass(self):
+ kfadapter_main.KFCONNECT_CONFIG_OBJ = kfadapter_conf.KfConfiguration.get_instance()
+ kfadapter_main.LOGGER = kfadapter_main.KFCONNECT_CONFIG_OBJ.logger
+ kfadapter_main.KFCONNECT_KF_OBJ = kfadapter_kfconnect.KfConnect()
-class Test_pipeline_id_operations:
-
- def setup_method(self):
- kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger
- kfadapter_main.KFCONNECT_KF_OBJ = FakeKfConnect()
- kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance()
self.client = kfadapter_main.APP.test_client(self)
- def test_get_experiment(self):
- experiment_name = "ai-server"
- response = self.client.get("/experiments/{}".format(experiment_name))
- expected_data = b'{"name": "ai-server", "id":"isj0t3jdhf"}'
- assert response.content_type == "application/json", "not equal content type"
- assert response.status_code == status.HTTP_200_OK, "not equal code"
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details")
+ def test_get_experiment(self, mock_get_kf_experiment_details):
+ # given
+ exp = ApiExperiment()
+ exp.name = "exp-name"
+ exp.id = "exp-id"
+ mock_get_kf_experiment_details.return_value = exp
+
+ # when
+ response = self.client.get("/experiments/{}".format(exp.name))
+
+ # then
+ mock_get_kf_experiment_details.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_json()["name"], exp.name)
+ self.assertEqual(response.get_json()["id"], exp.id)
- def test_get_all_runs(self):
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_runs")
+ def test_get_all_runs(self, mock_get_kf_list_runs):
+ # given
+ resources = [ ApiResourceReference() for _ in range(3)]
+ for i, resource in enumerate(resources) :
+ resource.name = "rr-name{}".format(i)
+ resource.key = ApiResourceKey()
+ resource.key.id = "rr-id{}".format(i)
+
+ runs = [ ApiRun() for _ in range(2)]
+ for i, run in enumerate(runs) :
+ run.id = "runid"
+ run.description = "description"
+ run.status = "status"
+ run.resource_references = [resources[i], resources[i+1]]
+
+ list_run = ApiListRunsResponse()
+ list_run.runs = runs
+ mock_get_kf_list_runs.return_value = list_run
+
+ # when
response = self.client.get("/runs")
- assert response.content_type == "application/json", "not equal content type"
- assert response.status == '200 OK'
-
- def test_get_pipeline_id(self):
- pipeline_name = "car-racing"
- response = self.client.get("/pipelineIds/{}".format(pipeline_name))
- expected_data = b'{"name":"car-racing", "id":"3jfidhsueuf2oj"}'
- assert response.content_type == "application/json", "not equal content type"
- assert response.status_code == status.HTTP_200_OK, "not equal code"
-
- def test_upload_pipeline(self):
- pipeline_file_name="Training-Manager/kf_connector/test/pipeline.zip"
+ # then
+ mock_get_kf_list_runs.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_data(), b'{"null":{"experiment_id":"rr-id1","experiment_name":"rr-name1","pipeline_id":"rr-id2","pipeline_name":"rr-name2","run_description":"description","run_id":"runid","run_status":"status"}}\n')
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id")
+ def test_get_pipeline_id(self, mock_get_kf_pipeline_id):
+ # given
+ pipeline_name = "pipeline-name"
+ pipeline_id = "pipeline-id"
+ mock_get_kf_pipeline_id.return_value = pipeline_id
+
+ # when
+ response = self.client.get("/pipelineIds/{}".format(pipeline_name))
- response = self.client.post("/user/2/edit", data={
- "file": pipeline_file_name,
- "description": "description"
- })
+ # then
+ mock_get_kf_pipeline_id.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_json()["id"], pipeline_id)
+ self.assertEqual(response.get_json()["name"], pipeline_name)
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.upload_pipeline_with_versions")
+ def test_upload_pipeline(self, mock_upload_pipeline_with_versions):
+ # given
+ pipeline_name = "pipeline-name"
+ pipeline_id = "pipeline-id"
+
+ pipeline_info = ApiPipeline()
+ pipeline_info.id = pipeline_id
+ mock_upload_pipeline_with_versions.return_value = pipeline_info
+
+ files = {}
+ files['file'] = (io.BytesIO(b"pipeline-file"), 'pipeline.zip')
+ files['description'] = "pipeline-description"
+
+ # when
+ response = self.client.post("pipelineIds/{}".format(pipeline_name), data=files, content_type="multipart/form-data")
+
+ # then
+ mock_upload_pipeline_with_versions.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_json()['name'], pipeline_name)
+ self.assertEqual(response.get_json()['id'], pipeline_id)
- assert response.content_type == "text/html; charset=utf-8"
-
- def test_get_run(self):
- run_id = "run_id"
- response = self.client.get("/runs/{}".format(run_id))
- assert response.content_type == "application/json", "not equal content type"
- assert response.status == '200 OK'
-
- def test_get_pipeline_version(self):
- pipeline_name="car-racing"
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_pl_versions_by_pl_name")
+ def test_get_pipeline_version(self, mock_get_pl_versions_by_pl_name):
+ # given
+ pipeline_name="pipeline-name"
+ version_list = ["1.0.0"]
+ mock_get_pl_versions_by_pl_name.return_value = version_list
+ # when
response = self.client.get("/pipelines/{}/versions".format(pipeline_name))
- expected_data = ""
- assert response.content_type == "application/json", "not equal content type"
- assert response.status_code == status.HTTP_200_OK, "not equal code"
+ # then
+ mock_get_pl_versions_by_pl_name.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_json()["versions_list"], version_list)
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_run")
+ def test_get_run(self, mock_get_kf_run):
+ # given
+ run = ApiRun()
+ run.id = "run-id"
+ run.name = "run-name"
+ run.status = "Running"
+ mock_get_kf_run.return_value = run
+
+ # when
+ response = self.client.get("/runs/{}".format(run.id))
+
+ # then
+ mock_get_kf_run.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_json()["run_id"], run.id)
+ self.assertEqual(response.get_json()["run_name"], run.name)
+ self.assertEqual(response.get_json()["run_status"], run.status)
+
def test_delete_run(self):
- run_id = "run_id"
+ # given
+ run_id = "run-id"
+
+ # when
response = self.client.delete("/runs/{}".format(run_id))
- assert response.content_type == "application/json", "not equal content type"
- assert response.status == '400 BAD REQUEST'
+ # then
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
- def test_get_pipelines(self):
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_pipelines")
+ def test_get_pipelines(self, mock_get_kf_list_pipelines):
+ # given
+ parameter = ApiParameter()
+ parameter.name = "param1"
+ parameter.value = "value1"
+
+ pipeline = ApiPipeline()
+ pipeline.id = "pipeline-id"
+ pipeline.description = "pipeline-description"
+
+ pipeline.parameters = [parameter]
+
+ pipeline_list = ApiListPipelinesResponse()
+ pipeline_list.pipelines = [pipeline]
+
+ mock_get_kf_list_pipelines.return_value = pipeline_list
+
+ # when
response = self.client.get("/pipelines")
- assert response.content_type == "application/json", "not equal content type"
- assert response.status_code == status.HTTP_200_OK
+
+ # then
+ mock_get_kf_list_pipelines.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_data(), b'{"null":{"description":"pipeline-description","id":"pipeline-id","parameters":{"param1":"value1"}}}\n')
- def test_delete_pipelines(self):
- pipeline_id = "pipelineIdsample"
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_desc")
+ def test_get_pipeline(self, mock_get_kf_pipeline_desc):
+ # given
+ pipeline_name = "pipeline-name"
+ pipeline_id = "pipeline-id"
+
+ params = [ ApiParameter() for _ in range(4)]
+ for i, param in enumerate(params) :
+ param.name = "param-name{}".format(i)
+ param.value = "param-value{}".format(i)
+
+ pipeline_info = ApiPipeline()
+ pipeline_info.parameters = [params[0], params[1]]
+ pipeline_info.description = "description"
+ pipeline_info.id = pipeline_name
+ pipeline_info.name = pipeline_id
+
+ default_version = ApiPipelineVersion()
+ default_version.parameters = [params[2], params[3]]
+ pipeline_info.default_version = default_version
+ mock_get_kf_pipeline_desc.return_value = pipeline_info
+
+ # when
+ response = self.client.get("/pipelines/{}".format(pipeline_id))
+
+ # then
+ mock_get_kf_pipeline_desc.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_data(), b'{"arguments":{"param-name0":"param-value0","param-name1":"param-value1"},"description":"description","id":"pipeline-name","name":"pipeline-id"}\n')
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.delete_kf_pipeline")
+ def test_delete_pipelines(self, mock_delete_kf_pipeline):
+ # given
+ pipeline_id = "pipeline-id"
+ mock_delete_kf_pipeline.return_value = True
+
+ # when
response = self.client.delete("/pipelines/{}".format(pipeline_id))
- expected_data = b'{"OK"}'
- assert response.content_type == "application/json"
- assert response.status_code == status.HTTP_200_OK
-
+ # then
+ mock_delete_kf_pipeline.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_json()["id"], pipeline_id)
+ self.assertEqual(response.get_json()["status"], "Deleted")
+
+
def test_check_liveness(self):
+ # when
response = self.client.get("/liveness")
- expected_data = b'Okay'
- assert response.content_type == "text/html; charset=utf-8", "not equal content type"
- assert response.data == expected_data
- def test_get_experiments(self):
+ # then
+ self.assertEqual(response.content_type, "text/html; charset=utf-8")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_data(), b'Okay')
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_experiments")
+ def test_get_experiments(self, mock_get_kf_list_experiments):
+ # given
+ exp = kfp_server_api.ApiExperiment()
+ exp.name = "exp-name"
+ exp.id = "exp-id"
+
+ explist = kfp_server_api.ApiListExperimentsResponse()
+ explist.experiments = [exp]
+ mock_get_kf_list_experiments.return_value = explist
+
+ # when
response = self.client.get("/experiments")
- expected_data=""
- assert response.content_type == "application/json", "not equal content type"
- def test_execute_job(self):
+ # then
+ mock_get_kf_list_experiments.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_json()[exp.name], exp.id)
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_desc")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_version_id")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.run_kf_pipeline")
+ def test_execute_job(self, mock_run_kf_pipeline, mock_get_kf_pipeline_version_id, mock_get_kf_pipeline_desc, mock_get_kf_pipeline_id, mock_get_kf_experiment_details):
+ # given
+ exp = ApiExperiment()
+ exp.name = "exp-name"
+ exp.id = "exp-id"
+ mock_get_kf_experiment_details.return_value = exp
+
+ pipeline_name = "pipeline-name"
+ pipeline_id = "pipeline-id"
+ mock_get_kf_pipeline_id.return_value = pipeline_id
+
+ params = [ ApiParameter() for _ in range(4)]
+ for i, param in enumerate(params) :
+ param.name = "param-name{}".format(i)
+ param.value = "param-value{}".format(i)
+
+ pipeline_info = ApiPipeline()
+ pipeline_info.parameters = [params[0], params[1]]
+ pipeline_info.description = "pipeline-description"
+ pipeline_info.id = pipeline_name
+ pipeline_info.name = pipeline_id
+
+ default_version = ApiPipelineVersion()
+ default_version.parameters = [params[2], params[3]]
+ pipeline_info.default_version = default_version
+ mock_get_kf_pipeline_desc.return_value = pipeline_info
+
+ mock_get_kf_pipeline_version_id.return_value = pipeline_id
+
+ run = ApiRun()
+ run.name = "run-name"
+ run.id = "run-id"
+
+ resources = [ ApiResourceReference() for _ in range(2)]
+ for i, resource in enumerate(resources) :
+ resource.name = "rr-name{}".format(i)
+ resource.key = ApiResourceKey()
+ resource.key.id = "rr-id{}".format(i)
+
+ run.resource_references = [resources[0], resources[1]]
+ run.status = "Running"
+ mock_run_kf_pipeline.return_value = run
+
+
job_name = "job_name"
- dict_job = {'arguments' : {'key1':'value1', 'key2':'value2'}, 'pipeline_name' : "pipeline_name", 'experiment_name' : "experiment_name", 'pipeline_version' : "pipeline_version"}
- payload = json.dumps(dict_job)
- headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
- response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers)
- assert response.content_type == "application/json", "not equal content type"
+ dict_job = {}
+ args = {}
+ args[params[2].name] = params[2].value
+ args[params[3].name] = params[3].value
+ dict_job["arguments"] = args
+ dict_job["pipeline_name"] = pipeline_name
+ dict_job["pipeline_version"] = "2.0.0"
+ dict_job["experiment_name"] = exp.name
+
+ # when
+ response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'})
+
+ # then
+ mock_get_kf_experiment_details.assert_called_once()
+ mock_get_kf_pipeline_id.assert_called_once()
+ mock_get_kf_pipeline_desc.assert_called_once()
+ mock_get_kf_pipeline_version_id.assert_called_once()
+ mock_run_kf_pipeline.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_data(), b'{"experiment_id":"rr-id0","experiment_name":"rr-name0","pipeline_id":"rr-id1","pipeline_name":"rr-name1","run_id":"run-id","run_name":"run-name","trainingjob_name":"job_name"}\n')
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_desc")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_version_id")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.run_kf_pipeline")
+ def test_execute_job_scheduled(self, mock_run_kf_pipeline, mock_get_kf_pipeline_version_id, mock_get_kf_pipeline_desc, mock_get_kf_pipeline_id, mock_get_kf_experiment_details):
+ # given
+ exp = ApiExperiment()
+ exp.name = "exp-name"
+ exp.id = "exp-id"
+ mock_get_kf_experiment_details.return_value = exp
+
+ pipeline_name = "pipeline-name"
+ pipeline_id = "pipeline-id"
+ mock_get_kf_pipeline_id.return_value = pipeline_id
+
+ params = [ ApiParameter() for _ in range(4)]
+ for i, param in enumerate(params) :
+ param.name = "param-name{}".format(i)
+ param.value = "param-value{}".format(i)
+
+ pipeline_info = ApiPipeline()
+ pipeline_info.parameters = [params[0], params[1]]
+ pipeline_info.description = "pipeline-description"
+ pipeline_info.id = pipeline_name
+ pipeline_info.name = pipeline_id
- def test_get_pipelines_id(self):
- pipeline_id = "pipelineIdsample"
- response = self.client.get("/pipelines/{}".format(pipeline_id))
+ default_version = ApiPipelineVersion()
+ default_version.parameters = [params[2], params[3]]
+ pipeline_info.default_version = default_version
+ mock_get_kf_pipeline_desc.return_value = pipeline_info
- assert response.content_type == "application/json", "not equal content type"
- assert response.status_code == status.HTTP_200_OK, "not equal code"
+ mock_get_kf_pipeline_version_id.return_value = pipeline_id
+
+ run = ApiRun()
+ run.name = "run-name"
+ run.id = "run-id"
+
+ resources = [ ApiResourceReference() for _ in range(2)]
+ for i, resource in enumerate(resources) :
+ resource.name = "rr-name{}".format(i)
+ resource.key = ApiResourceKey()
+ resource.key.id = "rr-id{}".format(i)
+
+ run.resource_references = [resources[0], resources[1]]
+ run.status = None
+ mock_run_kf_pipeline.return_value = run
+
+ job_name = "job_name"
+ dict_job = {}
+ args = {}
+ args[params[2].name] = params[2].value
+ args[params[3].name] = params[3].value
+ dict_job["arguments"] = args
+ dict_job["pipeline_name"] = pipeline_name
+ dict_job["pipeline_version"] = "2.0.0"
+ dict_job["experiment_name"] = exp.name
+
+ # when
+ response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'})
+
+ print("RRR: ", response.get_json())
+ # then
+ mock_get_kf_experiment_details.assert_called_once()
+ mock_get_kf_pipeline_id.assert_called_once()
+ mock_get_kf_pipeline_desc.assert_called_once()
+ mock_get_kf_pipeline_version_id.assert_called_once()
+ mock_run_kf_pipeline.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.get_json()['run_status'], 'scheduled')
+
+class testNegativeKfadapterApi(TestCase):
+ @classmethod
+ def setUpClass(self):
+ kfadapter_main.KFCONNECT_CONFIG_OBJ = kfadapter_conf.KfConfiguration.get_instance()
+ kfadapter_main.LOGGER = kfadapter_main.KFCONNECT_CONFIG_OBJ.logger
+ kfadapter_main.KFCONNECT_KF_OBJ = kfadapter_kfconnect.KfConnect()
-class Test_Negative:
- def setup_method(self):
self.client = kfadapter_main.APP.test_client(self)
- kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger
- kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeKfConnect()
- kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance()
- def test_negative_get_pipeline_version(self):
- pipeline_name="car-racing"
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_pl_versions_by_pl_name")
+ def test_negative_get_versions_for_pipeline_failed_with_api_exception(self, mock_get_pl_versions_by_pl_name):
+ # given
+ pipeline_name="pipeline-name"
+ mock_get_pl_versions_by_pl_name.side_effect = kfp_server_api.exceptions.ApiException
+
+ # when
+ response = self.client.get("/pipelines/{}/versions".format(pipeline_name))
+
+ # then
+ mock_get_pl_versions_by_pl_name.assert_called_once()
+ self.assertEqual(response.content_type, "text/html; charset=utf-8")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_pl_versions_by_pl_name")
+ def test_negative_get_versions_for_pipeline_failed_with_unsupported_error(self, mock_get_pl_versions_by_pl_name):
+ # given
+ pipeline_name="pipeline-name"
+ mock_get_pl_versions_by_pl_name.side_effect = IndexError("")
+ # when
response = self.client.get("/pipelines/{}/versions".format(pipeline_name))
- assert response.content_type == "text/html; charset=utf-8", "not equal content type"
- def test_negative_get_pipelines_id(self):
- pipeline_id = "pipelineIdsample"
+ # then
+ mock_get_pl_versions_by_pl_name.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+ self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow")
+ self.assertEqual(response.get_json()["ext"], 1)
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_desc")
+ def test_negative_get_pipeline_failed_with_api_exception(self, mock_get_kf_pipeline_desc):
+ # given
+ pipeline_id = "pipeline-id"
+ mock_get_kf_pipeline_desc.side_effect = kfp_server_api.exceptions.ApiException
+
+ # when
response = self.client.get("/pipelines/{}".format(pipeline_id))
- assert response.content_type == "application/json", "not equal content type"
- assert response.status_code == 500, "not equal code"
-
- def test_negative_get_pipeline_id(self):
- pipeline_name = "car-racing"
+ # then
+ mock_get_kf_pipeline_desc.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+ self.assertEqual(response.get_json()["message"],"Unsupported error from Kubeflow")
+ self.assertEqual(response.get_json()["payload"],{"pipe_id":pipeline_id})
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id")
+ def test_negative_get_pipeline_id_failed_with_value_error(self, mock_get_kf_pipeline_id):
+ # given
+ pipeline_name = "pipeline-name"
+ mock_get_kf_pipeline_id.return_value = None
+
+ # when
response = self.client.get("/pipelineIds/{}".format(pipeline_name))
- assert response.content_type == "application/json"
+ # then
+ mock_get_kf_pipeline_id.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(response.get_json()["message"],"PipeLine Name does not exist")
+ self.assertEqual(response.get_json()["payload"],{"error":"No pipeline is found with name pipeline-name","pipe_name":pipeline_name})
- def test_negative_get_experiment(self):
- experiment_name = "ai-server"
- response = self.client.get("/experiments/{}".format(experiment_name))
- expected_data = b'{"name": "ai-server", "id":"isj03jdhf"}'
- assert response.content_type == "application/json", "not equal content type"
- assert response.status_code == 400, "not equal code"
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.upload_pipeline_with_versions")
+ def test_negative_upload_pipeline_failed_with_api_exception(self, mock_upload_pipeline_with_versions):
+ # given
+ pipeline_name = "pipeline-name"
- def test_negative_get_run(self):
- run_id = "run_id"
- response = self.client.get("/runs/{}".format(run_id))
+ mock_upload_pipeline_with_versions.side_effect = kfp_server_api.exceptions.ApiException
- assert response.content_type == "application/json", "not equal content type"
- assert response.status == '400 BAD REQUEST'
-
- def test_negative_upload_pipeline(self):
- pipeline_file_name="pipeline.zip"
- description="test pipeline"
- pipeline_name="car-racing"
+ files = {}
+ files['file'] = (io.BytesIO(b"pipeline-file"), 'pipeline.zip')
+ files['description'] = "pipeline-description"
+
+ # when
+ response = self.client.post("pipelineIds/{}".format(pipeline_name), data=files, content_type="multipart/form-data")
+
+ # then
+ mock_upload_pipeline_with_versions.assert_called_once()
+ self.assertEqual(response.content_type, "text/html; charset=utf-8")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+
+ def test_negative_upload_pipeline_failed_cause_empty_file_name(self):
+ # given
+ pipeline_name = "pipeline-name"
+
+ files = {}
+ files['file'] = (io.BytesIO(b"pipeline-file"), '')
- dict_pipeline = {'file' : pipeline_file_name, 'description' : description}
- headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
+ # when
+ response = self.client.post("pipelineIds/{}".format(pipeline_name), data=files, content_type="multipart/form-data")
+
+ # then
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+ self.assertEqual(response.get_json()["message"],"Unsupported error from Kubeflow")
+ self.assertEqual(response.get_json()["ext"], 1)
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details")
+ def test_negative_get_experiment_failed_cause_no_such_experiment(self, mock_get_kf_experiment_details):
+ # given
+ exp_name = "exp-name"
+ mock_get_kf_experiment_details.return_value = None
+
+ # when
+ response = self.client.get("/experiments/{}".format(exp_name))
+
+ # then
+ mock_get_kf_experiment_details.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(response.get_json()["message"],"Experiment name does not exist")
+ self.assertEqual(response.get_json()["payload"],{"exp.name":exp_name})
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details")
+ def test_negative_get_experiment_failed_with_unsupported_error(self, mock_get_kf_experiment_details):
+ # given
+ exp_name = "exp-name"
+ mock_get_kf_experiment_details.return_value = IndexError("")
+
+ # when
+ response = self.client.get("/experiments/{}".format(exp_name))
+
+ # then
+ mock_get_kf_experiment_details.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+ self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow")
+ self.assertEqual(response.get_json()["ext"], 1)
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_run")
+ def test_negative_get_run_failed_with_exception(self, mock_get_kf_run):
+ # given
+ run_id = "run-id"
+ mock_get_kf_run.side_effect = Exception("")
- response = self.client.post("pipelineIds/{}".format(pipeline_name), data=json.dumps(dict_pipeline), headers=headers)
+ # when
+ response = self.client.get("/runs/{}".format(run_id))
- assert response.content_type == "application/json"
- assert response.status == '500 INTERNAL SERVER ERROR'
+ # then
+ mock_get_kf_run.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(response.get_json()["message"],"Unsupported error from Kubeflow")
+ self.assertEqual(response.get_json()["payload"],{"run_id":run_id})
- def test_negative_get_all_runs(self):
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_runs")
+ def test_negative_get_all_runs_failed_with_unsupported_error(self, mock_get_kf_list_runs):
+ # given
+ mock_get_kf_list_runs.side_effect = IndexError("")
+
+ # when
response = self.client.get("/runs")
- assert response.content_type == "application/json", "not equal content type"
- assert response.status == '400 BAD REQUEST'
- def test_negative_get_pipelines(self):
+ # then
+ mock_get_kf_list_runs.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow")
+ self.assertEqual(response.get_json()["ext"], 1)
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_pipelines")
+ def test_negative_get_pipelines_failed_with_unsupported_error(self, mock_get_kf_list_pipelines):
+ # given
+ mock_get_kf_list_pipelines.side_effect = IndexError("")
+
+ # when
response = self.client.get("/pipelines")
- print(response)
- assert response.content_type == "application/json", "not equal content type"
- assert response.status_code == 500
- def test_negative_execute_job(self):
+ # then
+ mock_get_kf_list_pipelines.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+ self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow")
+ self.assertEqual(response.get_json()["ext"], 1)
+
+
+ def test_negative_execute_job_failed_cause_less_arguments(self):
+ # given
job_name = "job_name"
dict_job = {}
- dict_job['arguments'] = "param1"
- dict_job['pipeline_name'] = "pipeline_name"
- dict_job['experiment_name'] = "experiment_name"
- dict_job['pipeline_version'] = "pipeline_version"
- payload = json.dumps(dict_job)
- headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
- response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers)
- assert response.content_type == "application/json", "not equal content type"
-
-class TestAdditionalKfConnector:
- def setup_method(self):
- self.client = kfadapter_main.APP.test_client(self)
- kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger
- kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeAdditionalKfConnect()
- kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance()
-
- def test_negative_exception_get_kf_experiment(self):
- experiment_name = "ai-server"
- response = self.client.get("/experiments/{}".format(experiment_name))
- assert response.content_type == "application/json", "not equal content type"
-
- def test_negative_exception_get_pipeline_id(self):
- pipeline_name = "car-racing"
- response = self.client.get("/pipelineIds/{}".format(pipeline_name))
+ # when
+ response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'})
+
+ # then
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(response.get_json()["message"], "Less arguments")
+ self.assertFalse(response.get_json()["payload"])
+
- assert response.content_type == "text/html; charset=utf-8", "not equal content type"
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details")
+ def test_negative_execute_job_failed_cause_no_such_experiment(self, mock_get_kf_experiment_details):
+ mock_get_kf_experiment_details.return_value = None
+
+ job_name = "job_name"
+ dict_job = {}
+
+ params = [ ApiParameter() for _ in range(2)]
+ for i, param in enumerate(params) :
+ param.name = "param-name{}".format(i)
+ param.value = "param-value{}".format(i)
- def test_negative_exception_get_pipeline_version(self):
- pipeline_name="car-racing"
- response = self.client.get("/pipelines/{}/versions".format(pipeline_name))
- assert response.content_type == "application/json", "not equal content type"
+ args = {}
+ args[params[0].name] = params[0].value
+ args[params[1].name] = params[1].value
- def test_negative_exception_execute_job(self):
+ dict_job["arguments"] = args
+ dict_job["pipeline_name"] = "pipeline-name"
+ dict_job["pipeline_version"] = "2.0.0"
+ dict_job["experiment_name"] = "exp-name"
+
+ # when
+ response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'})
+
+ # then
+ mock_get_kf_experiment_details.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+ self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow")
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id")
+ def test_negative_execute_job_failed_cause_no_such_pipeline(self, mock_get_kf_pipeline_id, mock_get_kf_experiment_details):
+ exp = ApiExperiment()
+ exp_name = "exp-name"
+ exp.id = "exp-id"
+ mock_get_kf_experiment_details.return_value = exp
+
+ mock_get_kf_pipeline_id.return_value = None
+
job_name = "job_name"
dict_job = {}
- dict_job['arguments'] = {"param3":"value3", "param4":"value4"}
- dict_job['pipeline_name'] = 'pipeline_name'
- dict_job['experiment_name'] = 'experiment_name'
- dict_job['pipeline_version'] = 'pipeline_version'
- payload = json.dumps(dict_job)
- headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
- response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers)
- assert response.content_type == "application/json", "not equal content type"
+
+ params = [ ApiParameter() for _ in range(2)]
+ for i, param in enumerate(params) :
+ param.name = "param-name{}".format(i)
+ param.value = "param-value{}".format(i)
- def test_negative_get_experiments(self):
- response = self.client.get("/experiments")
- expected_data=""
- assert response.content_type == "application/json", "not equal content type"
+ args = {}
+ args[params[0].name] = params[0].value
+ args[params[1].name] = params[1].value
+
+ dict_job["arguments"] = args
+ dict_job["pipeline_name"] = "pipeline-name"
+ dict_job["pipeline_version"] = "2.0.0"
+ dict_job["experiment_name"] = exp_name
+
+ # when
+ response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'})
+
+ # then
+ mock_get_kf_experiment_details.assert_called_once()
+ mock_get_kf_pipeline_id.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+ self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow")
+
+
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id")
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_desc")
+ def test_negative_execute_job_failed_cause_arguments_not_matched(self, mock_get_kf_pipeline_desc, mock_get_kf_pipeline_id, mock_get_kf_experiment_details):
+ exp = ApiExperiment()
+ exp_name = "exp-name"
+ exp.id = "exp-id"
+ mock_get_kf_experiment_details.return_value = exp
+
+ pipeline_name = "pipeline-name"
+ pipeline_id = "pipeline-id"
+ mock_get_kf_pipeline_id.return_value = pipeline_id
+
+ params = [ ApiParameter() for _ in range(4)]
+ for i, param in enumerate(params) :
+ param.name = "param-name{}".format(i)
+ param.value = "param-value{}".format(i)
+
+ pipeline_info = ApiPipeline()
+ pipeline_info.parameters = [params[0], params[1]]
+ pipeline_info.description = "pipeline-description"
+ pipeline_info.id = pipeline_name
+ pipeline_info.name = pipeline_id
+
+ default_version = ApiPipelineVersion()
+ default_version.parameters = [params[2], params[3]]
+ pipeline_info.default_version = default_version
+ mock_get_kf_pipeline_desc.return_value = pipeline_info
+
+ job_name = "job_name"
+ dict_job = {}
+ args = {}
+
+ # args_match is going to fail
+ args["nosuchname"] = "nosuchvalue"
+
+ dict_job["arguments"] = args
+ dict_job["pipeline_name"] = pipeline_name
+ dict_job["pipeline_version"] = "2.0.0"
+ dict_job["experiment_name"] = exp_name
+
+ # when
+ response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'})
+
+ # then
+ mock_get_kf_experiment_details.assert_called_once()
+ mock_get_kf_pipeline_id.assert_called_once()
+ mock_get_kf_pipeline_desc.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+ self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow")
- def test_negative_exception_get_pipelines_id(self):
- pipeline_id = "pipelineIdsample"
- response = self.client.get("/pipelines/{}".format(pipeline_id))
- assert response.content_type == "application/json", "not equal content type"
- assert response.status_code == 500, "not equal code"
+ @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_experiments")
+ def test_negative_get_experiments_failed_with_unsupported_error(self, mock_get_kf_list_experiments):
+ # given
+ mock_get_kf_list_experiments.return_value = IndexError("")
+ # when
+ response = self.client.get("/experiments")
-class TestNoneKfConnect:
- def setup_method(self):
- self.client = kfadapter_main.APP.test_client(self)
- kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger
- kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeNoneKfConnect()
- kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance()
-
- def test_nagetive_none_execute_job(self):
- job_name = "job_name"
- dict_job = {}
- dict_job['arguments'] = "param1"
- dict_job['pipeline_name'] = "pipeline_name"
- dict_job['experiment_name'] = "experiment_name"
- dict_job['pipeline_version'] = "pipeline_version"
- payload = json.dumps(dict_job)
- headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
- response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers)
- assert response.content_type == "application/json", "not equal content type"
-
\ No newline at end of file
+ # then
+ mock_get_kf_list_experiments.assert_called_once()
+ self.assertEqual(response.content_type, "application/json")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+ self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow")
+ self.assertEqual(response.get_json()["ext"], 1)