From ed6776ea969d9dcdafc8fc3bd8f435804e1e7409 Mon Sep 17 00:00:00 2001 From: Youhwan Seol Date: Thu, 3 Nov 2022 00:26:53 +0900 Subject: [PATCH] update test_kfadapter_main.py Increased code coverage, improved logic verification, exception raise/catch verification, and increased intuition by method-based mocking(fake kfconnect is deleted) Issue-id: AIMLFW-9 Signed-off-by: Youhwan Seol Change-Id: I9a61d6fce943c6980e941fb96643a26781205704 --- kfadapter/kfadapter_main.py | 3 +- test/fake_kfconnect.py | 286 -------------- test/test_kfadapter_main.py | 889 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 704 insertions(+), 474 deletions(-) delete mode 100644 test/fake_kfconnect.py diff --git a/kfadapter/kfadapter_main.py b/kfadapter/kfadapter_main.py index 3fe032c..5191ad8 100644 --- a/kfadapter/kfadapter_main.py +++ b/kfadapter/kfadapter_main.py @@ -35,6 +35,7 @@ from flask import Flask, request, jsonify from flask_api import status import kfp_server_api +from . import kfadapter_conf from .kfadapter_kfconnect import KfConnect from .kfadapter_util import BadRequest, wait_status_thread, keys_match, check_map @@ -197,7 +198,7 @@ def get_versions_for_pipeline(pipeline_name): @APP.route("/pipelines/", methods=['GET', 'DELETE']) -def pipelinei(pipe_id): +def get_pipeline(pipe_id): """Function handling HTTP GET/DELETE rest endpoint to get/delete pipeline based on pipeline id from kubeflow diff --git a/test/fake_kfconnect.py b/test/fake_kfconnect.py deleted file mode 100644 index 1344db6..0000000 --- a/test/fake_kfconnect.py +++ /dev/null @@ -1,286 +0,0 @@ -# ================================================================================== -# -# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ================================================================================== - -import kfp_server_api -from kfp_server_api.models.api_run import ApiRun -from kfp_server_api.models.api_list_runs_response import ApiListRunsResponse -from kfp_server_api.models.api_experiment import ApiExperiment -from kfp_server_api.models.api_list_pipelines_response import ApiListPipelinesResponse -from kfp_server_api.models.api_pipeline import ApiPipeline -from kfp_server_api.models.api_parameter import ApiParameter -from kfp_server_api.models.api_resource_reference import ApiResourceReference -from kfp_server_api.models.api_resource_key import ApiResourceKey -from kfp_server_api.models.api_pipeline_version import ApiPipelineVersion - -class FakeKfConnect: - - def __init__(self): - print("Initialized Fake KfConnect") - - def get_kf_list_experiments(self, nspace): - explist = kfp_server_api.ApiListExperimentsResponse() - - exp = kfp_server_api.ApiExperiment() - exp.name = "name" - exp.id = "id" - explist.experiments = [exp] - return explist - - def get_pl_versions_by_pl_name(self, pipeline_name): - version_list = [] - version_list.append("2.0.0") - return version_list - """ - def run_kf_pipeline(self,exp_id,arguments, - experiment_id: str, - job_name: str, - pipeline_package_path: Optional[str] = None, - params: Optional[dict] = None, - pipeline_id: Optional[str] = None, - version_id: Optional[str] = None, - pipeline_root: Optional[str] = None, - enable_caching: Optional[str] = None, - service_account: Optional[str] = None,): - - run = ApiRun() - run.id = "run_id" - run.name = "run_name" - rr0 = ApiResourceReference() - rr0.name = "rr0" - key0 = ApiResourceKey() - key0.id = "id0" - rr0.key = key0 - rr1 = ApiResourceReference() - rr1.name = "rr0" - key1 = ApiResourceKey() - key1.id = "id1" - rr1.key = key1 - run.status = "Running" - run.resource_references = [rr0, rr1] - return run - """ - - def delete_kf_pipeline(self, pipeline_id): - return True - - def get_kf_pipeline_version_id( self,pipeline_version_name, - pipeline_id: str, - page_token: str = '', - page_size: int = 10, - sort_by: str = '' - ): - return "pipeline_id" - - def get_kf_list_runs(self, - page_token='', - page_size=10, - sort_by='', - experiment_id=None, - namespace=None): - listrun = ApiListRunsResponse() - run1 = ApiRun() - run1.id = "id" - run1.description = "description" - run1.status = "status" - - rr0 = ApiResourceReference() - rr0.name = "rr0" - key0 = ApiResourceKey() - key0.id = "id" - rr0.key = key0 - - rr1 = ApiResourceReference() - rr1.name = "rr1" - key1 = ApiResourceKey() - key1.id = "id" - rr1.key = key1 - - run1.resource_references = [rr0, rr1] - - run2 = ApiRun() - run2.id = "id" - run2.description = "description" - run2.status = "status" - - rr2 = ApiResourceReference() - rr2.name = "rr2" - key2 = ApiResourceKey() - key2.id = "id" - rr2.key = key2 - - rr3 = ApiResourceReference() - rr3.name = "rr1" - key3 = ApiResourceKey() - key3.id = "id" - rr3.key = key3 - - run2.resource_references = [rr2, rr3] - - - listrun.runs = [run1, run2] - - return listrun - - def get_kf_pipeline_desc(self, pipeline_id: str): - pipeline_info = ApiPipeline() - - param1 = ApiParameter() - param1.name = "param1" - param1.value = "value1" - param2 = ApiParameter() - param2.name = "param2" - param2.value = "value2" - pipeline_info.parameters = [param1, param2] - pipeline_info.description = 'description' - pipeline_info.id = "id" - pipeline_info.name = "name" - - param3 = ApiParameter() - param3.name = "param3" - param3.value = "value3" - param4 = ApiParameter() - param4.name = "param4" - param4.value = "value4" - - default_version = ApiPipelineVersion() - default_version.parameters = [param3, param4] - - pipeline_info.default_version = default_version - return pipeline_info - - def get_kf_run(self, run_id: str): - run = ApiRun() - run.name = "run_name" - run.status = "Running" - run.id = "run_id" - return run - - def get_kf_experiment_details(self, ex_name, nspace): - experiment = ApiExperiment() - experiment.name = "exp_name" - experiment.id = "exp_id" - return experiment - - def get_kf_pipeline_id(self, pipeline_name): - return "pipeline_id" - """ - def upload_pipeline_with_versions(self, pipeline_name, file, desc): - pipeline_info = kfp_server_api.ApiPipelineVersion() - pipeline_info.id("pipeline_id") - return pipeline_info - """ - - def get_kf_list_pipelines(self): - pipeline_list = ApiListPipelinesResponse() - pipeline = ApiPipeline() - pipeline.id = "pipeline_id" - pipeline.description = "pipeline_description" - parameter = ApiParameter() - parameter.name = "param1" - parameter.value = "value1" - pipeline.parameters = [parameter] - pipeline_list.pipelines = [pipeline] - return pipeline_list - - -class NegativeFakeKfConnect: - def __init__(self): - print("Initialized Negative Fake KfConnect") - - def get_kf_pipeline_id(self, pipeline_name): - return None - - def get_kf_experiment_details(self, ex_name, nspace): - return None - - def get_pl_versions_by_pl_name(self, pipeline_name): - raise kfp_server_api.exceptions.ApiException - - def get_kf_run(self, run_id): - """ - run = ApiRun() - run.id = run_id - run.name = 'run_name' - run.status = 'Running' - return run - """ - raise Exception('erro') - - def get_kf_list_runs(self, nspace): - - run = ApiRun() - run.name = "run_name" - run.status = "Running" - run.id = "run_id" - run.description = "descrption" - rr0 = ApiResourceReference() - rr0.name = "rr0" - key0 = ApiResourceKey() - key0.id = "key0id" - run.resource_references = [rr0] - - runs = ApiListRunsResponse() - runs.runs = [run] - - return runs - - def get_kf_list_pipelines(self, - page_token='', - page_size=10, - sort_by=''): - raise Exception('error') - - - def get_kf_pipeline_desc(self, pipeline_id): - raise kfp_server_api.exceptions.ApiException - -class NegativeFakeAdditionalKfConnect: - - def get_kf_experiment_details(self, ex_name, nspace): - raise Exception('error') - - def get_kf_pipeline_id(self, pipeline_name): - raise kfp_server_api.exceptions.ApiException('error') - """ - def get_pl_versions_by_pl_namee(self, pipeline_name): - raise Exception('error') - """ - """ - def get_kf_pipeline_version_id(self, pipeline_id, pipeline_version_name): - raise Exception('error') - """ - def get_kf_list_experiments(self, nspace): - raise Exception('error') - - def get_kf_pipeline_desc(self, pipeline_id: str): - raise kfp_server_api.exceptions.ApiException('error') - -class NegativeFakeNoneKfConnect: - - def get_kf_experiment_details(self, ex_name, nspace): - experiment = ApiExperiment() - experiment.name = "exp_name" - experiment.id = "exp_id" - return experiment - - def get_kf_pipeline_id(self, pipeline_name): - return None - - - - diff --git a/test/test_kfadapter_main.py b/test/test_kfadapter_main.py index cc7c448..6537d98 100644 --- a/test/test_kfadapter_main.py +++ b/test/test_kfadapter_main.py @@ -15,260 +15,775 @@ # limitations under the License. # # ================================================================================== -import json - +import json +import io +from unittest import TestCase +from mock import patch from flask_api import status +import kfp_server_api +from kfp_server_api.models.api_run import ApiRun +from kfp_server_api.models.api_list_runs_response import ApiListRunsResponse +from kfp_server_api.models.api_experiment import ApiExperiment +from kfp_server_api.models.api_list_pipelines_response import ApiListPipelinesResponse +from kfp_server_api.models.api_pipeline import ApiPipeline +from kfp_server_api.models.api_parameter import ApiParameter +from kfp_server_api.models.api_resource_reference import ApiResourceReference +from kfp_server_api.models.api_resource_key import ApiResourceKey +from kfp_server_api.models.api_pipeline_version import ApiPipelineVersion from kfadapter import kfadapter_main -from kfadapter.tmgr_logger import TMLogger -from .fake_kfconnect import FakeKfConnect, NegativeFakeAdditionalKfConnect, NegativeFakeKfConnect, NegativeFakeNoneKfConnect -from .fake_kfconf import FakeKfConf +from kfadapter import kfadapter_conf +from kfadapter import kfadapter_kfconnect + +class testKfadapterApi(TestCase): + @classmethod + def setUpClass(self): + kfadapter_main.KFCONNECT_CONFIG_OBJ = kfadapter_conf.KfConfiguration.get_instance() + kfadapter_main.LOGGER = kfadapter_main.KFCONNECT_CONFIG_OBJ.logger + kfadapter_main.KFCONNECT_KF_OBJ = kfadapter_kfconnect.KfConnect() -class Test_pipeline_id_operations: - - def setup_method(self): - kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger - kfadapter_main.KFCONNECT_KF_OBJ = FakeKfConnect() - kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance() self.client = kfadapter_main.APP.test_client(self) - def test_get_experiment(self): - experiment_name = "ai-server" - response = self.client.get("/experiments/{}".format(experiment_name)) - expected_data = b'{"name": "ai-server", "id":"isj0t3jdhf"}' - assert response.content_type == "application/json", "not equal content type" - assert response.status_code == status.HTTP_200_OK, "not equal code" + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details") + def test_get_experiment(self, mock_get_kf_experiment_details): + # given + exp = ApiExperiment() + exp.name = "exp-name" + exp.id = "exp-id" + mock_get_kf_experiment_details.return_value = exp + + # when + response = self.client.get("/experiments/{}".format(exp.name)) + + # then + mock_get_kf_experiment_details.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_json()["name"], exp.name) + self.assertEqual(response.get_json()["id"], exp.id) - def test_get_all_runs(self): + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_runs") + def test_get_all_runs(self, mock_get_kf_list_runs): + # given + resources = [ ApiResourceReference() for _ in range(3)] + for i, resource in enumerate(resources) : + resource.name = "rr-name{}".format(i) + resource.key = ApiResourceKey() + resource.key.id = "rr-id{}".format(i) + + runs = [ ApiRun() for _ in range(2)] + for i, run in enumerate(runs) : + run.id = "runid" + run.description = "description" + run.status = "status" + run.resource_references = [resources[i], resources[i+1]] + + list_run = ApiListRunsResponse() + list_run.runs = runs + mock_get_kf_list_runs.return_value = list_run + + # when response = self.client.get("/runs") - assert response.content_type == "application/json", "not equal content type" - assert response.status == '200 OK' - - def test_get_pipeline_id(self): - pipeline_name = "car-racing" - response = self.client.get("/pipelineIds/{}".format(pipeline_name)) - expected_data = b'{"name":"car-racing", "id":"3jfidhsueuf2oj"}' - assert response.content_type == "application/json", "not equal content type" - assert response.status_code == status.HTTP_200_OK, "not equal code" - - def test_upload_pipeline(self): - pipeline_file_name="Training-Manager/kf_connector/test/pipeline.zip" + # then + mock_get_kf_list_runs.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_data(), b'{"null":{"experiment_id":"rr-id1","experiment_name":"rr-name1","pipeline_id":"rr-id2","pipeline_name":"rr-name2","run_description":"description","run_id":"runid","run_status":"status"}}\n') + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id") + def test_get_pipeline_id(self, mock_get_kf_pipeline_id): + # given + pipeline_name = "pipeline-name" + pipeline_id = "pipeline-id" + mock_get_kf_pipeline_id.return_value = pipeline_id + + # when + response = self.client.get("/pipelineIds/{}".format(pipeline_name)) - response = self.client.post("/user/2/edit", data={ - "file": pipeline_file_name, - "description": "description" - }) + # then + mock_get_kf_pipeline_id.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_json()["id"], pipeline_id) + self.assertEqual(response.get_json()["name"], pipeline_name) + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.upload_pipeline_with_versions") + def test_upload_pipeline(self, mock_upload_pipeline_with_versions): + # given + pipeline_name = "pipeline-name" + pipeline_id = "pipeline-id" + + pipeline_info = ApiPipeline() + pipeline_info.id = pipeline_id + mock_upload_pipeline_with_versions.return_value = pipeline_info + + files = {} + files['file'] = (io.BytesIO(b"pipeline-file"), 'pipeline.zip') + files['description'] = "pipeline-description" + + # when + response = self.client.post("pipelineIds/{}".format(pipeline_name), data=files, content_type="multipart/form-data") + + # then + mock_upload_pipeline_with_versions.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_json()['name'], pipeline_name) + self.assertEqual(response.get_json()['id'], pipeline_id) - assert response.content_type == "text/html; charset=utf-8" - - def test_get_run(self): - run_id = "run_id" - response = self.client.get("/runs/{}".format(run_id)) - assert response.content_type == "application/json", "not equal content type" - assert response.status == '200 OK' - - def test_get_pipeline_version(self): - pipeline_name="car-racing" + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_pl_versions_by_pl_name") + def test_get_pipeline_version(self, mock_get_pl_versions_by_pl_name): + # given + pipeline_name="pipeline-name" + version_list = ["1.0.0"] + mock_get_pl_versions_by_pl_name.return_value = version_list + # when response = self.client.get("/pipelines/{}/versions".format(pipeline_name)) - expected_data = "" - assert response.content_type == "application/json", "not equal content type" - assert response.status_code == status.HTTP_200_OK, "not equal code" + # then + mock_get_pl_versions_by_pl_name.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_json()["versions_list"], version_list) + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_run") + def test_get_run(self, mock_get_kf_run): + # given + run = ApiRun() + run.id = "run-id" + run.name = "run-name" + run.status = "Running" + mock_get_kf_run.return_value = run + + # when + response = self.client.get("/runs/{}".format(run.id)) + + # then + mock_get_kf_run.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_json()["run_id"], run.id) + self.assertEqual(response.get_json()["run_name"], run.name) + self.assertEqual(response.get_json()["run_status"], run.status) + def test_delete_run(self): - run_id = "run_id" + # given + run_id = "run-id" + + # when response = self.client.delete("/runs/{}".format(run_id)) - assert response.content_type == "application/json", "not equal content type" - assert response.status == '400 BAD REQUEST' + # then + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - def test_get_pipelines(self): + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_pipelines") + def test_get_pipelines(self, mock_get_kf_list_pipelines): + # given + parameter = ApiParameter() + parameter.name = "param1" + parameter.value = "value1" + + pipeline = ApiPipeline() + pipeline.id = "pipeline-id" + pipeline.description = "pipeline-description" + + pipeline.parameters = [parameter] + + pipeline_list = ApiListPipelinesResponse() + pipeline_list.pipelines = [pipeline] + + mock_get_kf_list_pipelines.return_value = pipeline_list + + # when response = self.client.get("/pipelines") - assert response.content_type == "application/json", "not equal content type" - assert response.status_code == status.HTTP_200_OK + + # then + mock_get_kf_list_pipelines.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_data(), b'{"null":{"description":"pipeline-description","id":"pipeline-id","parameters":{"param1":"value1"}}}\n') - def test_delete_pipelines(self): - pipeline_id = "pipelineIdsample" + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_desc") + def test_get_pipeline(self, mock_get_kf_pipeline_desc): + # given + pipeline_name = "pipeline-name" + pipeline_id = "pipeline-id" + + params = [ ApiParameter() for _ in range(4)] + for i, param in enumerate(params) : + param.name = "param-name{}".format(i) + param.value = "param-value{}".format(i) + + pipeline_info = ApiPipeline() + pipeline_info.parameters = [params[0], params[1]] + pipeline_info.description = "description" + pipeline_info.id = pipeline_name + pipeline_info.name = pipeline_id + + default_version = ApiPipelineVersion() + default_version.parameters = [params[2], params[3]] + pipeline_info.default_version = default_version + mock_get_kf_pipeline_desc.return_value = pipeline_info + + # when + response = self.client.get("/pipelines/{}".format(pipeline_id)) + + # then + mock_get_kf_pipeline_desc.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_data(), b'{"arguments":{"param-name0":"param-value0","param-name1":"param-value1"},"description":"description","id":"pipeline-name","name":"pipeline-id"}\n') + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.delete_kf_pipeline") + def test_delete_pipelines(self, mock_delete_kf_pipeline): + # given + pipeline_id = "pipeline-id" + mock_delete_kf_pipeline.return_value = True + + # when response = self.client.delete("/pipelines/{}".format(pipeline_id)) - expected_data = b'{"OK"}' - assert response.content_type == "application/json" - assert response.status_code == status.HTTP_200_OK - + # then + mock_delete_kf_pipeline.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_json()["id"], pipeline_id) + self.assertEqual(response.get_json()["status"], "Deleted") + + def test_check_liveness(self): + # when response = self.client.get("/liveness") - expected_data = b'Okay' - assert response.content_type == "text/html; charset=utf-8", "not equal content type" - assert response.data == expected_data - def test_get_experiments(self): + # then + self.assertEqual(response.content_type, "text/html; charset=utf-8") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_data(), b'Okay') + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_experiments") + def test_get_experiments(self, mock_get_kf_list_experiments): + # given + exp = kfp_server_api.ApiExperiment() + exp.name = "exp-name" + exp.id = "exp-id" + + explist = kfp_server_api.ApiListExperimentsResponse() + explist.experiments = [exp] + mock_get_kf_list_experiments.return_value = explist + + # when response = self.client.get("/experiments") - expected_data="" - assert response.content_type == "application/json", "not equal content type" - def test_execute_job(self): + # then + mock_get_kf_list_experiments.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_json()[exp.name], exp.id) + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_desc") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_version_id") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.run_kf_pipeline") + def test_execute_job(self, mock_run_kf_pipeline, mock_get_kf_pipeline_version_id, mock_get_kf_pipeline_desc, mock_get_kf_pipeline_id, mock_get_kf_experiment_details): + # given + exp = ApiExperiment() + exp.name = "exp-name" + exp.id = "exp-id" + mock_get_kf_experiment_details.return_value = exp + + pipeline_name = "pipeline-name" + pipeline_id = "pipeline-id" + mock_get_kf_pipeline_id.return_value = pipeline_id + + params = [ ApiParameter() for _ in range(4)] + for i, param in enumerate(params) : + param.name = "param-name{}".format(i) + param.value = "param-value{}".format(i) + + pipeline_info = ApiPipeline() + pipeline_info.parameters = [params[0], params[1]] + pipeline_info.description = "pipeline-description" + pipeline_info.id = pipeline_name + pipeline_info.name = pipeline_id + + default_version = ApiPipelineVersion() + default_version.parameters = [params[2], params[3]] + pipeline_info.default_version = default_version + mock_get_kf_pipeline_desc.return_value = pipeline_info + + mock_get_kf_pipeline_version_id.return_value = pipeline_id + + run = ApiRun() + run.name = "run-name" + run.id = "run-id" + + resources = [ ApiResourceReference() for _ in range(2)] + for i, resource in enumerate(resources) : + resource.name = "rr-name{}".format(i) + resource.key = ApiResourceKey() + resource.key.id = "rr-id{}".format(i) + + run.resource_references = [resources[0], resources[1]] + run.status = "Running" + mock_run_kf_pipeline.return_value = run + + job_name = "job_name" - dict_job = {'arguments' : {'key1':'value1', 'key2':'value2'}, 'pipeline_name' : "pipeline_name", 'experiment_name' : "experiment_name", 'pipeline_version' : "pipeline_version"} - payload = json.dumps(dict_job) - headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} - response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers) - assert response.content_type == "application/json", "not equal content type" + dict_job = {} + args = {} + args[params[2].name] = params[2].value + args[params[3].name] = params[3].value + dict_job["arguments"] = args + dict_job["pipeline_name"] = pipeline_name + dict_job["pipeline_version"] = "2.0.0" + dict_job["experiment_name"] = exp.name + + # when + response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}) + + # then + mock_get_kf_experiment_details.assert_called_once() + mock_get_kf_pipeline_id.assert_called_once() + mock_get_kf_pipeline_desc.assert_called_once() + mock_get_kf_pipeline_version_id.assert_called_once() + mock_run_kf_pipeline.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_data(), b'{"experiment_id":"rr-id0","experiment_name":"rr-name0","pipeline_id":"rr-id1","pipeline_name":"rr-name1","run_id":"run-id","run_name":"run-name","trainingjob_name":"job_name"}\n') + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_desc") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_version_id") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.run_kf_pipeline") + def test_execute_job_scheduled(self, mock_run_kf_pipeline, mock_get_kf_pipeline_version_id, mock_get_kf_pipeline_desc, mock_get_kf_pipeline_id, mock_get_kf_experiment_details): + # given + exp = ApiExperiment() + exp.name = "exp-name" + exp.id = "exp-id" + mock_get_kf_experiment_details.return_value = exp + + pipeline_name = "pipeline-name" + pipeline_id = "pipeline-id" + mock_get_kf_pipeline_id.return_value = pipeline_id + + params = [ ApiParameter() for _ in range(4)] + for i, param in enumerate(params) : + param.name = "param-name{}".format(i) + param.value = "param-value{}".format(i) + + pipeline_info = ApiPipeline() + pipeline_info.parameters = [params[0], params[1]] + pipeline_info.description = "pipeline-description" + pipeline_info.id = pipeline_name + pipeline_info.name = pipeline_id - def test_get_pipelines_id(self): - pipeline_id = "pipelineIdsample" - response = self.client.get("/pipelines/{}".format(pipeline_id)) + default_version = ApiPipelineVersion() + default_version.parameters = [params[2], params[3]] + pipeline_info.default_version = default_version + mock_get_kf_pipeline_desc.return_value = pipeline_info - assert response.content_type == "application/json", "not equal content type" - assert response.status_code == status.HTTP_200_OK, "not equal code" + mock_get_kf_pipeline_version_id.return_value = pipeline_id + + run = ApiRun() + run.name = "run-name" + run.id = "run-id" + + resources = [ ApiResourceReference() for _ in range(2)] + for i, resource in enumerate(resources) : + resource.name = "rr-name{}".format(i) + resource.key = ApiResourceKey() + resource.key.id = "rr-id{}".format(i) + + run.resource_references = [resources[0], resources[1]] + run.status = None + mock_run_kf_pipeline.return_value = run + + job_name = "job_name" + dict_job = {} + args = {} + args[params[2].name] = params[2].value + args[params[3].name] = params[3].value + dict_job["arguments"] = args + dict_job["pipeline_name"] = pipeline_name + dict_job["pipeline_version"] = "2.0.0" + dict_job["experiment_name"] = exp.name + + # when + response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}) + + print("RRR: ", response.get_json()) + # then + mock_get_kf_experiment_details.assert_called_once() + mock_get_kf_pipeline_id.assert_called_once() + mock_get_kf_pipeline_desc.assert_called_once() + mock_get_kf_pipeline_version_id.assert_called_once() + mock_run_kf_pipeline.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.get_json()['run_status'], 'scheduled') + +class testNegativeKfadapterApi(TestCase): + @classmethod + def setUpClass(self): + kfadapter_main.KFCONNECT_CONFIG_OBJ = kfadapter_conf.KfConfiguration.get_instance() + kfadapter_main.LOGGER = kfadapter_main.KFCONNECT_CONFIG_OBJ.logger + kfadapter_main.KFCONNECT_KF_OBJ = kfadapter_kfconnect.KfConnect() -class Test_Negative: - def setup_method(self): self.client = kfadapter_main.APP.test_client(self) - kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger - kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeKfConnect() - kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance() - def test_negative_get_pipeline_version(self): - pipeline_name="car-racing" + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_pl_versions_by_pl_name") + def test_negative_get_versions_for_pipeline_failed_with_api_exception(self, mock_get_pl_versions_by_pl_name): + # given + pipeline_name="pipeline-name" + mock_get_pl_versions_by_pl_name.side_effect = kfp_server_api.exceptions.ApiException + + # when + response = self.client.get("/pipelines/{}/versions".format(pipeline_name)) + + # then + mock_get_pl_versions_by_pl_name.assert_called_once() + self.assertEqual(response.content_type, "text/html; charset=utf-8") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_pl_versions_by_pl_name") + def test_negative_get_versions_for_pipeline_failed_with_unsupported_error(self, mock_get_pl_versions_by_pl_name): + # given + pipeline_name="pipeline-name" + mock_get_pl_versions_by_pl_name.side_effect = IndexError("") + # when response = self.client.get("/pipelines/{}/versions".format(pipeline_name)) - assert response.content_type == "text/html; charset=utf-8", "not equal content type" - def test_negative_get_pipelines_id(self): - pipeline_id = "pipelineIdsample" + # then + mock_get_pl_versions_by_pl_name.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow") + self.assertEqual(response.get_json()["ext"], 1) + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_desc") + def test_negative_get_pipeline_failed_with_api_exception(self, mock_get_kf_pipeline_desc): + # given + pipeline_id = "pipeline-id" + mock_get_kf_pipeline_desc.side_effect = kfp_server_api.exceptions.ApiException + + # when response = self.client.get("/pipelines/{}".format(pipeline_id)) - assert response.content_type == "application/json", "not equal content type" - assert response.status_code == 500, "not equal code" - - def test_negative_get_pipeline_id(self): - pipeline_name = "car-racing" + # then + mock_get_kf_pipeline_desc.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(response.get_json()["message"],"Unsupported error from Kubeflow") + self.assertEqual(response.get_json()["payload"],{"pipe_id":pipeline_id}) + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id") + def test_negative_get_pipeline_id_failed_with_value_error(self, mock_get_kf_pipeline_id): + # given + pipeline_name = "pipeline-name" + mock_get_kf_pipeline_id.return_value = None + + # when response = self.client.get("/pipelineIds/{}".format(pipeline_name)) - assert response.content_type == "application/json" + # then + mock_get_kf_pipeline_id.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.get_json()["message"],"PipeLine Name does not exist") + self.assertEqual(response.get_json()["payload"],{"error":"No pipeline is found with name pipeline-name","pipe_name":pipeline_name}) - def test_negative_get_experiment(self): - experiment_name = "ai-server" - response = self.client.get("/experiments/{}".format(experiment_name)) - expected_data = b'{"name": "ai-server", "id":"isj03jdhf"}' - assert response.content_type == "application/json", "not equal content type" - assert response.status_code == 400, "not equal code" + @patch("kfadapter.kfadapter_kfconnect.KfConnect.upload_pipeline_with_versions") + def test_negative_upload_pipeline_failed_with_api_exception(self, mock_upload_pipeline_with_versions): + # given + pipeline_name = "pipeline-name" - def test_negative_get_run(self): - run_id = "run_id" - response = self.client.get("/runs/{}".format(run_id)) + mock_upload_pipeline_with_versions.side_effect = kfp_server_api.exceptions.ApiException - assert response.content_type == "application/json", "not equal content type" - assert response.status == '400 BAD REQUEST' - - def test_negative_upload_pipeline(self): - pipeline_file_name="pipeline.zip" - description="test pipeline" - pipeline_name="car-racing" + files = {} + files['file'] = (io.BytesIO(b"pipeline-file"), 'pipeline.zip') + files['description'] = "pipeline-description" + + # when + response = self.client.post("pipelineIds/{}".format(pipeline_name), data=files, content_type="multipart/form-data") + + # then + mock_upload_pipeline_with_versions.assert_called_once() + self.assertEqual(response.content_type, "text/html; charset=utf-8") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + + + def test_negative_upload_pipeline_failed_cause_empty_file_name(self): + # given + pipeline_name = "pipeline-name" + + files = {} + files['file'] = (io.BytesIO(b"pipeline-file"), '') - dict_pipeline = {'file' : pipeline_file_name, 'description' : description} - headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} + # when + response = self.client.post("pipelineIds/{}".format(pipeline_name), data=files, content_type="multipart/form-data") + + # then + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(response.get_json()["message"],"Unsupported error from Kubeflow") + self.assertEqual(response.get_json()["ext"], 1) + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details") + def test_negative_get_experiment_failed_cause_no_such_experiment(self, mock_get_kf_experiment_details): + # given + exp_name = "exp-name" + mock_get_kf_experiment_details.return_value = None + + # when + response = self.client.get("/experiments/{}".format(exp_name)) + + # then + mock_get_kf_experiment_details.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.get_json()["message"],"Experiment name does not exist") + self.assertEqual(response.get_json()["payload"],{"exp.name":exp_name}) + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details") + def test_negative_get_experiment_failed_with_unsupported_error(self, mock_get_kf_experiment_details): + # given + exp_name = "exp-name" + mock_get_kf_experiment_details.return_value = IndexError("") + + # when + response = self.client.get("/experiments/{}".format(exp_name)) + + # then + mock_get_kf_experiment_details.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow") + self.assertEqual(response.get_json()["ext"], 1) + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_run") + def test_negative_get_run_failed_with_exception(self, mock_get_kf_run): + # given + run_id = "run-id" + mock_get_kf_run.side_effect = Exception("") - response = self.client.post("pipelineIds/{}".format(pipeline_name), data=json.dumps(dict_pipeline), headers=headers) + # when + response = self.client.get("/runs/{}".format(run_id)) - assert response.content_type == "application/json" - assert response.status == '500 INTERNAL SERVER ERROR' + # then + mock_get_kf_run.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.get_json()["message"],"Unsupported error from Kubeflow") + self.assertEqual(response.get_json()["payload"],{"run_id":run_id}) - def test_negative_get_all_runs(self): + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_runs") + def test_negative_get_all_runs_failed_with_unsupported_error(self, mock_get_kf_list_runs): + # given + mock_get_kf_list_runs.side_effect = IndexError("") + + # when response = self.client.get("/runs") - assert response.content_type == "application/json", "not equal content type" - assert response.status == '400 BAD REQUEST' - def test_negative_get_pipelines(self): + # then + mock_get_kf_list_runs.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow") + self.assertEqual(response.get_json()["ext"], 1) + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_pipelines") + def test_negative_get_pipelines_failed_with_unsupported_error(self, mock_get_kf_list_pipelines): + # given + mock_get_kf_list_pipelines.side_effect = IndexError("") + + # when response = self.client.get("/pipelines") - print(response) - assert response.content_type == "application/json", "not equal content type" - assert response.status_code == 500 - def test_negative_execute_job(self): + # then + mock_get_kf_list_pipelines.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow") + self.assertEqual(response.get_json()["ext"], 1) + + + def test_negative_execute_job_failed_cause_less_arguments(self): + # given job_name = "job_name" dict_job = {} - dict_job['arguments'] = "param1" - dict_job['pipeline_name'] = "pipeline_name" - dict_job['experiment_name'] = "experiment_name" - dict_job['pipeline_version'] = "pipeline_version" - payload = json.dumps(dict_job) - headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} - response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers) - assert response.content_type == "application/json", "not equal content type" - -class TestAdditionalKfConnector: - def setup_method(self): - self.client = kfadapter_main.APP.test_client(self) - kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger - kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeAdditionalKfConnect() - kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance() - - def test_negative_exception_get_kf_experiment(self): - experiment_name = "ai-server" - response = self.client.get("/experiments/{}".format(experiment_name)) - assert response.content_type == "application/json", "not equal content type" - - def test_negative_exception_get_pipeline_id(self): - pipeline_name = "car-racing" - response = self.client.get("/pipelineIds/{}".format(pipeline_name)) + # when + response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}) + + # then + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.get_json()["message"], "Less arguments") + self.assertFalse(response.get_json()["payload"]) + - assert response.content_type == "text/html; charset=utf-8", "not equal content type" + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details") + def test_negative_execute_job_failed_cause_no_such_experiment(self, mock_get_kf_experiment_details): + mock_get_kf_experiment_details.return_value = None + + job_name = "job_name" + dict_job = {} + + params = [ ApiParameter() for _ in range(2)] + for i, param in enumerate(params) : + param.name = "param-name{}".format(i) + param.value = "param-value{}".format(i) - def test_negative_exception_get_pipeline_version(self): - pipeline_name="car-racing" - response = self.client.get("/pipelines/{}/versions".format(pipeline_name)) - assert response.content_type == "application/json", "not equal content type" + args = {} + args[params[0].name] = params[0].value + args[params[1].name] = params[1].value - def test_negative_exception_execute_job(self): + dict_job["arguments"] = args + dict_job["pipeline_name"] = "pipeline-name" + dict_job["pipeline_version"] = "2.0.0" + dict_job["experiment_name"] = "exp-name" + + # when + response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}) + + # then + mock_get_kf_experiment_details.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow") + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id") + def test_negative_execute_job_failed_cause_no_such_pipeline(self, mock_get_kf_pipeline_id, mock_get_kf_experiment_details): + exp = ApiExperiment() + exp_name = "exp-name" + exp.id = "exp-id" + mock_get_kf_experiment_details.return_value = exp + + mock_get_kf_pipeline_id.return_value = None + job_name = "job_name" dict_job = {} - dict_job['arguments'] = {"param3":"value3", "param4":"value4"} - dict_job['pipeline_name'] = 'pipeline_name' - dict_job['experiment_name'] = 'experiment_name' - dict_job['pipeline_version'] = 'pipeline_version' - payload = json.dumps(dict_job) - headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} - response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers) - assert response.content_type == "application/json", "not equal content type" + + params = [ ApiParameter() for _ in range(2)] + for i, param in enumerate(params) : + param.name = "param-name{}".format(i) + param.value = "param-value{}".format(i) - def test_negative_get_experiments(self): - response = self.client.get("/experiments") - expected_data="" - assert response.content_type == "application/json", "not equal content type" + args = {} + args[params[0].name] = params[0].value + args[params[1].name] = params[1].value + + dict_job["arguments"] = args + dict_job["pipeline_name"] = "pipeline-name" + dict_job["pipeline_version"] = "2.0.0" + dict_job["experiment_name"] = exp_name + + # when + response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}) + + # then + mock_get_kf_experiment_details.assert_called_once() + mock_get_kf_pipeline_id.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow") + + + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_experiment_details") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_id") + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_pipeline_desc") + def test_negative_execute_job_failed_cause_arguments_not_matched(self, mock_get_kf_pipeline_desc, mock_get_kf_pipeline_id, mock_get_kf_experiment_details): + exp = ApiExperiment() + exp_name = "exp-name" + exp.id = "exp-id" + mock_get_kf_experiment_details.return_value = exp + + pipeline_name = "pipeline-name" + pipeline_id = "pipeline-id" + mock_get_kf_pipeline_id.return_value = pipeline_id + + params = [ ApiParameter() for _ in range(4)] + for i, param in enumerate(params) : + param.name = "param-name{}".format(i) + param.value = "param-value{}".format(i) + + pipeline_info = ApiPipeline() + pipeline_info.parameters = [params[0], params[1]] + pipeline_info.description = "pipeline-description" + pipeline_info.id = pipeline_name + pipeline_info.name = pipeline_id + + default_version = ApiPipelineVersion() + default_version.parameters = [params[2], params[3]] + pipeline_info.default_version = default_version + mock_get_kf_pipeline_desc.return_value = pipeline_info + + job_name = "job_name" + dict_job = {} + args = {} + + # args_match is going to fail + args["nosuchname"] = "nosuchvalue" + + dict_job["arguments"] = args + dict_job["pipeline_name"] = pipeline_name + dict_job["pipeline_version"] = "2.0.0" + dict_job["experiment_name"] = exp_name + + # when + response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=json.dumps(dict_job), headers={'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}) + + # then + mock_get_kf_experiment_details.assert_called_once() + mock_get_kf_pipeline_id.assert_called_once() + mock_get_kf_pipeline_desc.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow") - def test_negative_exception_get_pipelines_id(self): - pipeline_id = "pipelineIdsample" - response = self.client.get("/pipelines/{}".format(pipeline_id)) - assert response.content_type == "application/json", "not equal content type" - assert response.status_code == 500, "not equal code" + @patch("kfadapter.kfadapter_kfconnect.KfConnect.get_kf_list_experiments") + def test_negative_get_experiments_failed_with_unsupported_error(self, mock_get_kf_list_experiments): + # given + mock_get_kf_list_experiments.return_value = IndexError("") + # when + response = self.client.get("/experiments") -class TestNoneKfConnect: - def setup_method(self): - self.client = kfadapter_main.APP.test_client(self) - kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger - kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeNoneKfConnect() - kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance() - - def test_nagetive_none_execute_job(self): - job_name = "job_name" - dict_job = {} - dict_job['arguments'] = "param1" - dict_job['pipeline_name'] = "pipeline_name" - dict_job['experiment_name'] = "experiment_name" - dict_job['pipeline_version'] = "pipeline_version" - payload = json.dumps(dict_job) - headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} - response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers) - assert response.content_type == "application/json", "not equal content type" - \ No newline at end of file + # then + mock_get_kf_list_experiments.assert_called_once() + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(response.get_json()["message"], "Unsupported error from Kubeflow") + self.assertEqual(response.get_json()["ext"], 1) -- 2.16.6