From 7523770508219bdc0294e0c21aa6ddc62b8d1264 Mon Sep 17 00:00:00 2001 From: Youhwan Seol Date: Fri, 28 Oct 2022 18:44:16 +0900 Subject: [PATCH] Test codes for kfadapter_main Issue-Id: AIMLFW-9 Signed-off-by: Youhwan Seol Change-Id: I95ec333a28384fb37905911b602f496615c6298f --- kfadapter/kfadapter_conf.py | 3 +- kfadapter/kfadapter_kfconnect.py | 4 +- kfadapter/kfadapter_util.py | 2 +- test/fake_kfconf.py | 44 ++++++ test/fake_kfconnect.py | 287 +++++++++++++++++++++++++++++++++++++++ test/fake_kfp.py | 246 +++++++++++++++++++++++++++++++++ test/test_kfadapter_main.py | 276 +++++++++++++++++++++++++++++++++++++ 7 files changed, 857 insertions(+), 5 deletions(-) create mode 100644 test/fake_kfconf.py create mode 100644 test/fake_kfconnect.py create mode 100644 test/fake_kfp.py create mode 100644 test/test_kfadapter_main.py diff --git a/kfadapter/kfadapter_conf.py b/kfadapter/kfadapter_conf.py index 83c7af6..e2f27ec 100644 --- a/kfadapter/kfadapter_conf.py +++ b/kfadapter/kfadapter_conf.py @@ -25,7 +25,6 @@ Application configuration - Application Port and run status interval """ - from os import getenv from threading import Lock from tmgr_logger import TMLogger @@ -66,7 +65,7 @@ class KfConfiguration: self.kf_dict = {} self.trainingmgr_dict = {} - self.tmgr_logger = TMLogger("../config/log_config.yaml") + self.tmgr_logger = TMLogger("config/log_config.yaml") self.logger = self.tmgr_logger.logger self.run_status_polling_interval_sec = 20 diff --git a/kfadapter/kfadapter_kfconnect.py b/kfadapter/kfadapter_kfconnect.py index 1436ade..a9b1396 100644 --- a/kfadapter/kfadapter_kfconnect.py +++ b/kfadapter/kfadapter_kfconnect.py @@ -22,7 +22,7 @@ This module is for interfacing and interworking with KubeFlow SDK """ import kfp -import kfadapter_util +from kfadapter_util import random_suffix from kfadapter_conf import KfConfiguration class KfConnect: @@ -263,7 +263,7 @@ class KfConnect: """ self.logger.debug("run_kf_pipeline Entered") run = self.kfp_client.run_pipeline(exp_id, job_name="testjob_"+\ - kfadapter_util.random_suffix(), + random_suffix(), pipeline_package_path=None, params=arguments, pipeline_id=pipeline_id, version_id=version_id) diff --git a/kfadapter/kfadapter_util.py b/kfadapter/kfadapter_util.py index cde187d..cc88d8a 100644 --- a/kfadapter/kfadapter_util.py +++ b/kfadapter/kfadapter_util.py @@ -28,7 +28,7 @@ import time import json import requests from flask_api import status -import kfadapter_conf +# import kfadapter_conf class BadRequest(Exception): """ diff --git a/test/fake_kfconf.py b/test/fake_kfconf.py new file mode 100644 index 0000000..5ea0d49 --- /dev/null +++ b/test/fake_kfconf.py @@ -0,0 +1,44 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +from kfadapter.tmgr_logger import TMLogger + +class FakeKfConf: + __instance = None + + def get_instance(): + if FakeKfConf.__instance is None: + FakeKfConf() + + return FakeKfConf.__instance + + def __init__(self): + FakeKfConf.__instance = self + + self.kf_dict = {} + self.ucmgr_dict = {} + self.tmgr_logger = TMLogger("config/log_config.yaml") + self.logger = self.tmgr_logger.logger + + self.run_status_polling_interval_sec = 20 + self.kf_dict['kfhostname'] = 'kfhostname' + self.kf_dict['kfport'] = 9999 + self.kf_dict['kfdefaultns'] = 'ai-server' + self.appport = 7777 + self.ucmgr_dict['ucmgr_host'] = '127.0.0.1' + self.ucmgr_dict['ucmgr_port'] = 30025 diff --git a/test/fake_kfconnect.py b/test/fake_kfconnect.py new file mode 100644 index 0000000..50136d1 --- /dev/null +++ b/test/fake_kfconnect.py @@ -0,0 +1,287 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import kfp_server_api +from kfp_server_api.models.api_run import ApiRun +from kfp_server_api.models.api_list_runs_response import ApiListRunsResponse + +from kfp_server_api.models.api_experiment import ApiExperiment +from kfp_server_api.models.api_list_pipelines_response import ApiListPipelinesResponse +from kfp_server_api.models.api_pipeline import ApiPipeline +from kfp_server_api.models.api_parameter import ApiParameter +from kfp_server_api.models.api_resource_reference import ApiResourceReference +from kfp_server_api.models.api_resource_key import ApiResourceKey +from kfp_server_api.models.api_pipeline_version import ApiPipelineVersion + +class FakeKfConnect: + + def __init__(self): + print("Initialized Fake KfConnect") + + def get_kf_list_experiments(self, nspace): + explist = kfp_server_api.ApiListExperimentsResponse() + + exp = kfp_server_api.ApiExperiment() + exp.name = "name" + exp.id = "id" + explist.experiments = [exp] + return explist + + def get_pl_versions_by_pl_name(self, pipeline_name): + version_list = [] + version_list.append("2.0.0") + return version_list + """ + def run_kf_pipeline(self,exp_id,arguments, + experiment_id: str, + job_name: str, + pipeline_package_path: Optional[str] = None, + params: Optional[dict] = None, + pipeline_id: Optional[str] = None, + version_id: Optional[str] = None, + pipeline_root: Optional[str] = None, + enable_caching: Optional[str] = None, + service_account: Optional[str] = None,): + + run = ApiRun() + run.id = "run_id" + run.name = "run_name" + rr0 = ApiResourceReference() + rr0.name = "rr0" + key0 = ApiResourceKey() + key0.id = "id0" + rr0.key = key0 + rr1 = ApiResourceReference() + rr1.name = "rr0" + key1 = ApiResourceKey() + key1.id = "id1" + rr1.key = key1 + run.status = "Running" + run.resource_references = [rr0, rr1] + return run + """ + + def delete_kf_pipeline(self, pipeline_id): + return True + + def get_kf_pipeline_version_id( self,pipeline_version_name, + pipeline_id: str, + page_token: str = '', + page_size: int = 10, + sort_by: str = '' + ): + return "pipeline_id" + + def get_kf_list_runs(self, + page_token='', + page_size=10, + sort_by='', + experiment_id=None, + namespace=None): + listrun = ApiListRunsResponse() + run1 = ApiRun() + run1.id = "id" + run1.description = "description" + run1.status = "status" + + rr0 = ApiResourceReference() + rr0.name = "rr0" + key0 = ApiResourceKey() + key0.id = "id" + rr0.key = key0 + + rr1 = ApiResourceReference() + rr1.name = "rr1" + key1 = ApiResourceKey() + key1.id = "id" + rr1.key = key1 + + run1.resource_references = [rr0, rr1] + + run2 = ApiRun() + run2.id = "id" + run2.description = "description" + run2.status = "status" + + rr2 = ApiResourceReference() + rr2.name = "rr2" + key2 = ApiResourceKey() + key2.id = "id" + rr2.key = key2 + + rr3 = ApiResourceReference() + rr3.name = "rr1" + key3 = ApiResourceKey() + key3.id = "id" + rr3.key = key3 + + run2.resource_references = [rr2, rr3] + + + listrun.runs = [run1, run2] + + return listrun + + def get_kf_pipeline_desc(self, pipeline_id: str): + pipeline_info = ApiPipeline() + + param1 = ApiParameter() + param1.name = "param1" + param1.value = "value1" + param2 = ApiParameter() + param2.name = "param2" + param2.value = "value2" + pipeline_info.parameters = [param1, param2] + pipeline_info.description = 'description' + pipeline_info.id = "id" + pipeline_info.name = "name" + + param3 = ApiParameter() + param3.name = "param3" + param3.value = "value3" + param4 = ApiParameter() + param4.name = "param4" + param4.value = "value4" + + default_version = ApiPipelineVersion() + default_version.parameters = [param3, param4] + + pipeline_info.default_version = default_version + return pipeline_info + + def get_kf_run(self, run_id: str): + run = ApiRun() + run.name = "run_name" + run.status = "Running" + run.id = "run_id" + return run + + def get_kf_experiment_details(self, ex_name, nspace): + experiment = ApiExperiment() + experiment.name = "exp_name" + experiment.id = "exp_id" + return experiment + + def get_kf_pipeline_id(self, pipeline_name): + return "pipeline_id" + """ + def upload_pipeline_with_versions(self, pipeline_name, file, desc): + pipeline_info = kfp_server_api.ApiPipelineVersion() + pipeline_info.id("pipeline_id") + return pipeline_info + """ + + def get_kf_list_pipelines(self): + pipeline_list = ApiListPipelinesResponse() + pipeline = ApiPipeline() + pipeline.id = "pipeline_id" + pipeline.description = "pipeline_description" + parameter = ApiParameter() + parameter.name = "param1" + parameter.value = "value1" + pipeline.parameters = [parameter] + pipeline_list.pipelines = [pipeline] + return pipeline_list + + +class NegativeFakeKfConnect: + def __init__(self): + print("Initialized Negative Fake KfConnect") + + def get_kf_pipeline_id(self, pipeline_name): + return None + + def get_kf_experiment_details(self, ex_name, nspace): + return None + + def get_pl_versions_by_pl_name(self, pipeline_name): + raise kfp_server_api.exceptions.ApiException + + def get_kf_run(self, run_id): + """ + run = ApiRun() + run.id = run_id + run.name = 'run_name' + run.status = 'Running' + return run + """ + raise Exception('erro') + + def get_kf_list_runs(self, nspace): + + run = ApiRun() + run.name = "run_name" + run.status = "Running" + run.id = "run_id" + run.description = "descrption" + rr0 = ApiResourceReference() + rr0.name = "rr0" + key0 = ApiResourceKey() + key0.id = "key0id" + run.resource_references = [rr0] + + runs = ApiListRunsResponse() + runs.runs = [run] + + return runs + + def get_kf_list_pipelines(self, + page_token='', + page_size=10, + sort_by=''): + raise Exception('error') + + + def get_kf_pipeline_desc(self, pipeline_id): + raise kfp_server_api.exceptions.ApiException + +class NegativeFakeAdditionalKfConnect: + + def get_kf_experiment_details(self, ex_name, nspace): + raise Exception('error') + + def get_kf_pipeline_id(self, pipeline_name): + raise kfp_server_api.exceptions.ApiException('error') + """ + def get_pl_versions_by_pl_namee(self, pipeline_name): + raise Exception('error') + """ + """ + def get_kf_pipeline_version_id(self, pipeline_id, pipeline_version_name): + raise Exception('error') + """ + def get_kf_list_experiments(self, nspace): + raise Exception('error') + + def get_kf_pipeline_desc(self, pipeline_id: str): + raise kfp_server_api.exceptions.ApiException('error') + +class NegativeFakeNoneKfConnect: + + def get_kf_experiment_details(self, ex_name, nspace): + experiment = ApiExperiment() + experiment.name = "exp_name" + experiment.id = "exp_id" + return experiment + + def get_kf_pipeline_id(self, pipeline_name): + return None + + + + diff --git a/test/fake_kfp.py b/test/fake_kfp.py new file mode 100644 index 0000000..283d8e2 --- /dev/null +++ b/test/fake_kfp.py @@ -0,0 +1,246 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +from typing import Optional + +from kfp_server_api.models.api_list_pipeline_versions_response import ApiListPipelineVersionsResponse +from kfp_server_api.models.api_pipeline_version import ApiPipelineVersion +from kfp_server_api.models.api_experiment import ApiExperiment +from kfp_server_api.models.api_run_detail import ApiRunDetail +from kfp_server_api.models.api_run import ApiRun + + + +import kfp_server_api + +class FakeKfp: + def list_experiments(self, + page_token='', + page_size=10, + sort_by='', + namespace=None): + return None + + def get_experiment(self, + experiment_id=None, + experiment_name=None, + namespace=None): + + exp = ApiExperiment() + exp.id = 'ex-id' + exp.name = 'exp-name' + return exp + + + def upload_pipeline_version( + self, + pipeline_package_path, + pipeline_version_name: str, + pipeline_id: Optional[str] = None, + pipeline_name: Optional[str] = None, + description: Optional[str] = None, + ): + return None + + def get_run(self, run_id: str): + rundetail = ApiRunDetail() + run = ApiRun() + run._id = 'id' + run._name = 'name' + run._status = 'status' + rundetail._run = run + return None + """ + def list_runs(self, + page_token='', + page_size=10, + sort_by='', + experiment_id=None, + namespace=None, + filter=None): + + listrun = ApiListRunsResponse() + run1 = ApiRun() + run1.id(self, 'id') + run1.description(self, 'description') + run1.status(self, 'status') + rr0 = ApiResourceReference() + rr0.name(self, 'name') + key0 = ApiResourceKey() + key0.id(self, 'id') + rr0.key(self, key0) + + rr1 = ApiResourceReference() + rr1.name(self, 'name') + key1 = ApiResourceKey() + key1.id(self, 'id') + rr1.key(key1) + + run1.resource_references(self, rr0) + listrun.runs(self, [run1]) + + return listrun + """ + + + #def get_kf_pipeline_id(self, pipeline_name): + # return None + + def list_pipeline_versions( + self, + pipeline_id: str, + page_token: str = '', + page_size: int = 10, + sort_by: str = '' + ): + response = ApiListPipelineVersionsResponse() + version1 = ApiPipelineVersion() + version1.name = 'version_name' + version1.id = 'version_id' + version2 = ApiPipelineVersion() + version2.name = 'version_name' + version2.id = 'version_id' + response._versions = [version1, version2] + response._total_size = 1 + return response + + # def get_kf_pipeline_version_id(self, pipeline_id, pipeline_version_name): + # return None + + def run_pipeline( + self, + experiment_id: str, + job_name: str, + pipeline_package_path: Optional[str] = None, + params: Optional[dict] = None, + pipeline_id: Optional[str] = None, + version_id: Optional[str] = None, + pipeline_root: Optional[str] = None, + enable_caching: Optional[str] = None, + service_account: Optional[str] = None, + ): + return None + + + def delete_pipeline(self, pipeline_id): + return None + + def get_pipeline(self, pipeline_id: str): + return None + + def list_pipelines(self, + page_token='', + page_size=10, + sort_by=''): + return None + + + def upload_pipeline( + self, + pipeline_package_path: str = None, + pipeline_name: str = None, + description: str = None, + ): + return None + + def get_pipeline_id(self, name): + + return ['pipelin_id', 'pipelin_id2',] + + +class FakeNegativeKfp: + """ def list_pipeline_versions( + self, + pipeline_id: str, + page_token: str = '', + page_size: int = 10, + sort_by: str = '' + ): + return None """ + + """ + def upload_pipeline( + self, + pipeline_package_path: str = None, + pipeline_name: str = None, + description: str = None, + ): + return None + """ + """ + def list_pipeline_versions( + self, + pipeline_id: str, + page_token: str = '', + page_size: int = 10, + sort_by: str = '' + ): + response = ApiListPipelineVersionsResponse() + version1 = ApiPipelineVersion() + version1.name = 'version_name' + version1.id = 'version_id' + version2 = ApiPipelineVersion() + version2.name = 'version_name' + version2.id = 'version_id' + response._versions = [version1, version2] + response._total_size = None + return response + """ + def get_pipeline_id(self, name): + raise kfp_server_api.exceptions.ApiException(name) + + def get_experiment(self, + experiment_id=None, + experiment_name=None, + namespace=None): + raise ValueError( + 'Either experiment_id or experiment_name is required') + +class FakeAdditionalKfp: + """ + def list_pipeline_versions( + self, + pipeline_id: str, + page_token: str = '', + page_size: int = 10, + sort_by: str = '' + ): + response = ApiListPipelineVersionsResponse() + version1 = ApiPipelineVersion() + version1.name = 'version_name' + version1.id = 'version_id' + version2 = ApiPipelineVersion() + version2.name = 'version_name' + version2.id = 'version_id' + response._versions = [version1, version2] + response._total_size = None + return response + """ + """ + def get_pipeline_id(self, name): + return None + + def upload_pipeline( + self, + pipeline_package_path: str = None, + pipeline_name: str = None, + description: str = None, + ): + return None + """ + diff --git a/test/test_kfadapter_main.py b/test/test_kfadapter_main.py new file mode 100644 index 0000000..efe1385 --- /dev/null +++ b/test/test_kfadapter_main.py @@ -0,0 +1,276 @@ +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import sys +sys.path.append("kfadapter") +import kfadapter_main +from tmgr_logger import TMLogger + +from flask_api import status +import json +from fake_kfconnect import FakeKfConnect, NegativeFakeAdditionalKfConnect, NegativeFakeKfConnect, NegativeFakeNoneKfConnect +from fake_kfconf import FakeKfConf + +class Test_pipeline_id_operations: + + def setup_method(self): + kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger + kfadapter_main.KFCONNECT_KF_OBJ = FakeKfConnect() + kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance() + self.client = kfadapter_main.APP.test_client(self) + + def test_get_experiment(self): + experiment_name = "ai-server" + response = self.client.get("/experiments/{}".format(experiment_name)) + expected_data = b'{"name": "ai-server", "id":"isj0t3jdhf"}' + + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_200_OK, "not equal code" + + def test_get_all_runs(self): + response = self.client.get("/runs") + assert response.content_type == "application/json", "not equal content type" + assert response.status == '200 OK' + + def test_get_pipeline_id(self): + pipeline_name = "car-racing" + response = self.client.get("/pipelineIds/{}".format(pipeline_name)) + expected_data = b'{"name":"car-racing", "id":"3jfidhsueuf2oj"}' + + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_200_OK, "not equal code" + + def test_upload_pipeline(self): + pipeline_file_name="Training-Manager/kf_connector/test/pipeline.zip" + + response = self.client.post("/user/2/edit", data={ + "file": pipeline_file_name, + "description": "description" + }) + + assert response.content_type == "text/html; charset=utf-8" + + def test_get_run(self): + run_id = "run_id" + response = self.client.get("/runs/{}".format(run_id)) + + assert response.content_type == "application/json", "not equal content type" + assert response.status == '200 OK' + + def test_get_pipeline_version(self): + pipeline_name="car-racing" + + response = self.client.get("/pipelines/{}/versions".format(pipeline_name)) + expected_data = "" + + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_200_OK, "not equal code" + + def test_delete_run(self): + run_id = "run_id" + response = self.client.delete("/runs/{}".format(run_id)) + + assert response.content_type == "application/json", "not equal content type" + assert response.status == '400 BAD REQUEST' + + def test_get_pipelines(self): + response = self.client.get("/pipelines") + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_200_OK + + def test_delete_pipelines(self): + pipeline_id = "pipelineIdsample" + + response = self.client.delete("/pipelines/{}".format(pipeline_id)) + expected_data = b'{"OK"}' + + assert response.content_type == "application/json" + assert response.status_code == status.HTTP_200_OK + + def test_check_liveness(self): + response = self.client.get("/liveness") + expected_data = b'Okay' + assert response.content_type == "text/html; charset=utf-8", "not equal content type" + assert response.data == expected_data + + def test_get_experiments(self): + response = self.client.get("/experiments") + expected_data="" + assert response.content_type == "application/json", "not equal content type" + + def test_execute_job(self): + job_name = "job_name" + dict_job = {'arguments' : {'key1':'value1', 'key2':'value2'}, 'pipeline_name' : "pipeline_name", 'experiment_name' : "experiment_name", 'pipeline_version' : "pipeline_version"} + payload = json.dumps(dict_job) + headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} + response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers) + assert response.content_type == "application/json", "not equal content type" + + def test_get_pipelines_id(self): + pipeline_id = "pipelineIdsample" + response = self.client.get("/pipelines/{}".format(pipeline_id)) + + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == status.HTTP_200_OK, "not equal code" + +class Test_Negative: + def setup_method(self): + self.client = kfadapter_main.APP.test_client(self) + kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger + + kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeKfConnect() + kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance() + + def test_negative_get_pipeline_version(self): + pipeline_name="car-racing" + + response = self.client.get("/pipelines/{}/versions".format(pipeline_name)) + assert response.content_type == "text/html; charset=utf-8", "not equal content type" + + def test_negative_get_pipelines_id(self): + pipeline_id = "pipelineIdsample" + response = self.client.get("/pipelines/{}".format(pipeline_id)) + + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 500, "not equal code" + + def test_negative_get_pipeline_id(self): + pipeline_name = "car-racing" + response = self.client.get("/pipelineIds/{}".format(pipeline_name)) + + assert response.content_type == "application/json" + + def test_negative_get_experiment(self): + experiment_name = "ai-server" + response = self.client.get("/experiments/{}".format(experiment_name)) + expected_data = b'{"name": "ai-server", "id":"isj03jdhf"}' + + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 400, "not equal code" + + def test_negative_get_run(self): + run_id = "run_id" + response = self.client.get("/runs/{}".format(run_id)) + + assert response.content_type == "application/json", "not equal content type" + assert response.status == '400 BAD REQUEST' + + def test_negative_upload_pipeline(self): + pipeline_file_name="pipeline.zip" + description="test pipeline" + pipeline_name="car-racing" + + dict_pipeline = {'file' : pipeline_file_name, 'description' : description} + headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} + + response = self.client.post("pipelineIds/{}".format(pipeline_name), data=json.dumps(dict_pipeline), headers=headers) + + assert response.content_type == "application/json" + assert response.status == '500 INTERNAL SERVER ERROR' + + def test_negative_get_all_runs(self): + response = self.client.get("/runs") + assert response.content_type == "application/json", "not equal content type" + assert response.status == '400 BAD REQUEST' + + def test_negative_get_pipelines(self): + response = self.client.get("/pipelines") + print(response) + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 500 + + def test_negative_execute_job(self): + job_name = "job_name" + dict_job = {} + dict_job['arguments'] = "param1" + dict_job['pipeline_name'] = "pipeline_name" + dict_job['experiment_name'] = "experiment_name" + dict_job['pipeline_version'] = "pipeline_version" + payload = json.dumps(dict_job) + headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} + response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers) + assert response.content_type == "application/json", "not equal content type" + +class TestAdditionalKfConnector: + def setup_method(self): + self.client = kfadapter_main.APP.test_client(self) + kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger + kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeAdditionalKfConnect() + kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance() + + def test_negative_exception_get_kf_experiment(self): + experiment_name = "ai-server" + response = self.client.get("/experiments/{}".format(experiment_name)) + + assert response.content_type == "application/json", "not equal content type" + + def test_negative_exception_get_pipeline_id(self): + pipeline_name = "car-racing" + response = self.client.get("/pipelineIds/{}".format(pipeline_name)) + + assert response.content_type == "text/html; charset=utf-8", "not equal content type" + + def test_negative_exception_get_pipeline_version(self): + pipeline_name="car-racing" + response = self.client.get("/pipelines/{}/versions".format(pipeline_name)) + assert response.content_type == "application/json", "not equal content type" + + def test_negative_exception_execute_job(self): + job_name = "job_name" + dict_job = {} + dict_job['arguments'] = {"param3":"value3", "param4":"value4"} + dict_job['pipeline_name'] = 'pipeline_name' + dict_job['experiment_name'] = 'experiment_name' + dict_job['pipeline_version'] = 'pipeline_version' + payload = json.dumps(dict_job) + headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} + response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers) + assert response.content_type == "application/json", "not equal content type" + + def test_negative_get_experiments(self): + response = self.client.get("/experiments") + expected_data="" + assert response.content_type == "application/json", "not equal content type" + + def test_negative_exception_get_pipelines_id(self): + pipeline_id = "pipelineIdsample" + response = self.client.get("/pipelines/{}".format(pipeline_id)) + + assert response.content_type == "application/json", "not equal content type" + assert response.status_code == 500, "not equal code" + + +class TestNoneKfConnect: + def setup_method(self): + self.client = kfadapter_main.APP.test_client(self) + kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger + kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeNoneKfConnect() + kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance() + + def test_nagetive_none_execute_job(self): + job_name = "job_name" + dict_job = {} + dict_job['arguments'] = "param1" + dict_job['pipeline_name'] = "pipeline_name" + dict_job['experiment_name'] = "experiment_name" + dict_job['pipeline_version'] = "pipeline_version" + payload = json.dumps(dict_job) + headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} + response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers) + assert response.content_type == "application/json", "not equal content type" + \ No newline at end of file -- 2.16.6