"""
-
from os import getenv
from threading import Lock
from tmgr_logger import TMLogger
self.kf_dict = {}
self.trainingmgr_dict = {}
- self.tmgr_logger = TMLogger("../config/log_config.yaml")
+ self.tmgr_logger = TMLogger("config/log_config.yaml")
self.logger = self.tmgr_logger.logger
self.run_status_polling_interval_sec = 20
"""
import kfp
-import kfadapter_util
+from kfadapter_util import random_suffix
from kfadapter_conf import KfConfiguration
class KfConnect:
"""
self.logger.debug("run_kf_pipeline Entered")
run = self.kfp_client.run_pipeline(exp_id, job_name="testjob_"+\
- kfadapter_util.random_suffix(),
+ random_suffix(),
pipeline_package_path=None, params=arguments,
pipeline_id=pipeline_id,
version_id=version_id)
import json
import requests
from flask_api import status
-import kfadapter_conf
+# import kfadapter_conf
class BadRequest(Exception):
"""
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+from kfadapter.tmgr_logger import TMLogger
+
+class FakeKfConf:
+ __instance = None
+
+ def get_instance():
+ if FakeKfConf.__instance is None:
+ FakeKfConf()
+
+ return FakeKfConf.__instance
+
+ def __init__(self):
+ FakeKfConf.__instance = self
+
+ self.kf_dict = {}
+ self.ucmgr_dict = {}
+ self.tmgr_logger = TMLogger("config/log_config.yaml")
+ self.logger = self.tmgr_logger.logger
+
+ self.run_status_polling_interval_sec = 20
+ self.kf_dict['kfhostname'] = 'kfhostname'
+ self.kf_dict['kfport'] = 9999
+ self.kf_dict['kfdefaultns'] = 'ai-server'
+ self.appport = 7777
+ self.ucmgr_dict['ucmgr_host'] = '127.0.0.1'
+ self.ucmgr_dict['ucmgr_port'] = 30025
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import kfp_server_api
+from kfp_server_api.models.api_run import ApiRun
+from kfp_server_api.models.api_list_runs_response import ApiListRunsResponse
+
+from kfp_server_api.models.api_experiment import ApiExperiment
+from kfp_server_api.models.api_list_pipelines_response import ApiListPipelinesResponse
+from kfp_server_api.models.api_pipeline import ApiPipeline
+from kfp_server_api.models.api_parameter import ApiParameter
+from kfp_server_api.models.api_resource_reference import ApiResourceReference
+from kfp_server_api.models.api_resource_key import ApiResourceKey
+from kfp_server_api.models.api_pipeline_version import ApiPipelineVersion
+
+class FakeKfConnect:
+
+ def __init__(self):
+ print("Initialized Fake KfConnect")
+
+ def get_kf_list_experiments(self, nspace):
+ explist = kfp_server_api.ApiListExperimentsResponse()
+
+ exp = kfp_server_api.ApiExperiment()
+ exp.name = "name"
+ exp.id = "id"
+ explist.experiments = [exp]
+ return explist
+
+ def get_pl_versions_by_pl_name(self, pipeline_name):
+ version_list = []
+ version_list.append("2.0.0")
+ return version_list
+ """
+ def run_kf_pipeline(self,exp_id,arguments,
+ experiment_id: str,
+ job_name: str,
+ pipeline_package_path: Optional[str] = None,
+ params: Optional[dict] = None,
+ pipeline_id: Optional[str] = None,
+ version_id: Optional[str] = None,
+ pipeline_root: Optional[str] = None,
+ enable_caching: Optional[str] = None,
+ service_account: Optional[str] = None,):
+
+ run = ApiRun()
+ run.id = "run_id"
+ run.name = "run_name"
+ rr0 = ApiResourceReference()
+ rr0.name = "rr0"
+ key0 = ApiResourceKey()
+ key0.id = "id0"
+ rr0.key = key0
+ rr1 = ApiResourceReference()
+ rr1.name = "rr0"
+ key1 = ApiResourceKey()
+ key1.id = "id1"
+ rr1.key = key1
+ run.status = "Running"
+ run.resource_references = [rr0, rr1]
+ return run
+ """
+
+ def delete_kf_pipeline(self, pipeline_id):
+ return True
+
+ def get_kf_pipeline_version_id( self,pipeline_version_name,
+ pipeline_id: str,
+ page_token: str = '',
+ page_size: int = 10,
+ sort_by: str = ''
+ ):
+ return "pipeline_id"
+
+ def get_kf_list_runs(self,
+ page_token='',
+ page_size=10,
+ sort_by='',
+ experiment_id=None,
+ namespace=None):
+ listrun = ApiListRunsResponse()
+ run1 = ApiRun()
+ run1.id = "id"
+ run1.description = "description"
+ run1.status = "status"
+
+ rr0 = ApiResourceReference()
+ rr0.name = "rr0"
+ key0 = ApiResourceKey()
+ key0.id = "id"
+ rr0.key = key0
+
+ rr1 = ApiResourceReference()
+ rr1.name = "rr1"
+ key1 = ApiResourceKey()
+ key1.id = "id"
+ rr1.key = key1
+
+ run1.resource_references = [rr0, rr1]
+
+ run2 = ApiRun()
+ run2.id = "id"
+ run2.description = "description"
+ run2.status = "status"
+
+ rr2 = ApiResourceReference()
+ rr2.name = "rr2"
+ key2 = ApiResourceKey()
+ key2.id = "id"
+ rr2.key = key2
+
+ rr3 = ApiResourceReference()
+ rr3.name = "rr1"
+ key3 = ApiResourceKey()
+ key3.id = "id"
+ rr3.key = key3
+
+ run2.resource_references = [rr2, rr3]
+
+
+ listrun.runs = [run1, run2]
+
+ return listrun
+
+ def get_kf_pipeline_desc(self, pipeline_id: str):
+ pipeline_info = ApiPipeline()
+
+ param1 = ApiParameter()
+ param1.name = "param1"
+ param1.value = "value1"
+ param2 = ApiParameter()
+ param2.name = "param2"
+ param2.value = "value2"
+ pipeline_info.parameters = [param1, param2]
+ pipeline_info.description = 'description'
+ pipeline_info.id = "id"
+ pipeline_info.name = "name"
+
+ param3 = ApiParameter()
+ param3.name = "param3"
+ param3.value = "value3"
+ param4 = ApiParameter()
+ param4.name = "param4"
+ param4.value = "value4"
+
+ default_version = ApiPipelineVersion()
+ default_version.parameters = [param3, param4]
+
+ pipeline_info.default_version = default_version
+ return pipeline_info
+
+ def get_kf_run(self, run_id: str):
+ run = ApiRun()
+ run.name = "run_name"
+ run.status = "Running"
+ run.id = "run_id"
+ return run
+
+ def get_kf_experiment_details(self, ex_name, nspace):
+ experiment = ApiExperiment()
+ experiment.name = "exp_name"
+ experiment.id = "exp_id"
+ return experiment
+
+ def get_kf_pipeline_id(self, pipeline_name):
+ return "pipeline_id"
+ """
+ def upload_pipeline_with_versions(self, pipeline_name, file, desc):
+ pipeline_info = kfp_server_api.ApiPipelineVersion()
+ pipeline_info.id("pipeline_id")
+ return pipeline_info
+ """
+
+ def get_kf_list_pipelines(self):
+ pipeline_list = ApiListPipelinesResponse()
+ pipeline = ApiPipeline()
+ pipeline.id = "pipeline_id"
+ pipeline.description = "pipeline_description"
+ parameter = ApiParameter()
+ parameter.name = "param1"
+ parameter.value = "value1"
+ pipeline.parameters = [parameter]
+ pipeline_list.pipelines = [pipeline]
+ return pipeline_list
+
+
+class NegativeFakeKfConnect:
+ def __init__(self):
+ print("Initialized Negative Fake KfConnect")
+
+ def get_kf_pipeline_id(self, pipeline_name):
+ return None
+
+ def get_kf_experiment_details(self, ex_name, nspace):
+ return None
+
+ def get_pl_versions_by_pl_name(self, pipeline_name):
+ raise kfp_server_api.exceptions.ApiException
+
+ def get_kf_run(self, run_id):
+ """
+ run = ApiRun()
+ run.id = run_id
+ run.name = 'run_name'
+ run.status = 'Running'
+ return run
+ """
+ raise Exception('erro')
+
+ def get_kf_list_runs(self, nspace):
+
+ run = ApiRun()
+ run.name = "run_name"
+ run.status = "Running"
+ run.id = "run_id"
+ run.description = "descrption"
+ rr0 = ApiResourceReference()
+ rr0.name = "rr0"
+ key0 = ApiResourceKey()
+ key0.id = "key0id"
+ run.resource_references = [rr0]
+
+ runs = ApiListRunsResponse()
+ runs.runs = [run]
+
+ return runs
+
+ def get_kf_list_pipelines(self,
+ page_token='',
+ page_size=10,
+ sort_by=''):
+ raise Exception('error')
+
+
+ def get_kf_pipeline_desc(self, pipeline_id):
+ raise kfp_server_api.exceptions.ApiException
+
+class NegativeFakeAdditionalKfConnect:
+
+ def get_kf_experiment_details(self, ex_name, nspace):
+ raise Exception('error')
+
+ def get_kf_pipeline_id(self, pipeline_name):
+ raise kfp_server_api.exceptions.ApiException('error')
+ """
+ def get_pl_versions_by_pl_namee(self, pipeline_name):
+ raise Exception('error')
+ """
+ """
+ def get_kf_pipeline_version_id(self, pipeline_id, pipeline_version_name):
+ raise Exception('error')
+ """
+ def get_kf_list_experiments(self, nspace):
+ raise Exception('error')
+
+ def get_kf_pipeline_desc(self, pipeline_id: str):
+ raise kfp_server_api.exceptions.ApiException('error')
+
+class NegativeFakeNoneKfConnect:
+
+ def get_kf_experiment_details(self, ex_name, nspace):
+ experiment = ApiExperiment()
+ experiment.name = "exp_name"
+ experiment.id = "exp_id"
+ return experiment
+
+ def get_kf_pipeline_id(self, pipeline_name):
+ return None
+
+
+
+
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+from typing import Optional
+
+from kfp_server_api.models.api_list_pipeline_versions_response import ApiListPipelineVersionsResponse
+from kfp_server_api.models.api_pipeline_version import ApiPipelineVersion
+from kfp_server_api.models.api_experiment import ApiExperiment
+from kfp_server_api.models.api_run_detail import ApiRunDetail
+from kfp_server_api.models.api_run import ApiRun
+
+
+
+import kfp_server_api
+
+class FakeKfp:
+ def list_experiments(self,
+ page_token='',
+ page_size=10,
+ sort_by='',
+ namespace=None):
+ return None
+
+ def get_experiment(self,
+ experiment_id=None,
+ experiment_name=None,
+ namespace=None):
+
+ exp = ApiExperiment()
+ exp.id = 'ex-id'
+ exp.name = 'exp-name'
+ return exp
+
+
+ def upload_pipeline_version(
+ self,
+ pipeline_package_path,
+ pipeline_version_name: str,
+ pipeline_id: Optional[str] = None,
+ pipeline_name: Optional[str] = None,
+ description: Optional[str] = None,
+ ):
+ return None
+
+ def get_run(self, run_id: str):
+ rundetail = ApiRunDetail()
+ run = ApiRun()
+ run._id = 'id'
+ run._name = 'name'
+ run._status = 'status'
+ rundetail._run = run
+ return None
+ """
+ def list_runs(self,
+ page_token='',
+ page_size=10,
+ sort_by='',
+ experiment_id=None,
+ namespace=None,
+ filter=None):
+
+ listrun = ApiListRunsResponse()
+ run1 = ApiRun()
+ run1.id(self, 'id')
+ run1.description(self, 'description')
+ run1.status(self, 'status')
+ rr0 = ApiResourceReference()
+ rr0.name(self, 'name')
+ key0 = ApiResourceKey()
+ key0.id(self, 'id')
+ rr0.key(self, key0)
+
+ rr1 = ApiResourceReference()
+ rr1.name(self, 'name')
+ key1 = ApiResourceKey()
+ key1.id(self, 'id')
+ rr1.key(key1)
+
+ run1.resource_references(self, rr0)
+ listrun.runs(self, [run1])
+
+ return listrun
+ """
+
+
+ #def get_kf_pipeline_id(self, pipeline_name):
+ # return None
+
+ def list_pipeline_versions(
+ self,
+ pipeline_id: str,
+ page_token: str = '',
+ page_size: int = 10,
+ sort_by: str = ''
+ ):
+ response = ApiListPipelineVersionsResponse()
+ version1 = ApiPipelineVersion()
+ version1.name = 'version_name'
+ version1.id = 'version_id'
+ version2 = ApiPipelineVersion()
+ version2.name = 'version_name'
+ version2.id = 'version_id'
+ response._versions = [version1, version2]
+ response._total_size = 1
+ return response
+
+ # def get_kf_pipeline_version_id(self, pipeline_id, pipeline_version_name):
+ # return None
+
+ def run_pipeline(
+ self,
+ experiment_id: str,
+ job_name: str,
+ pipeline_package_path: Optional[str] = None,
+ params: Optional[dict] = None,
+ pipeline_id: Optional[str] = None,
+ version_id: Optional[str] = None,
+ pipeline_root: Optional[str] = None,
+ enable_caching: Optional[str] = None,
+ service_account: Optional[str] = None,
+ ):
+ return None
+
+
+ def delete_pipeline(self, pipeline_id):
+ return None
+
+ def get_pipeline(self, pipeline_id: str):
+ return None
+
+ def list_pipelines(self,
+ page_token='',
+ page_size=10,
+ sort_by=''):
+ return None
+
+
+ def upload_pipeline(
+ self,
+ pipeline_package_path: str = None,
+ pipeline_name: str = None,
+ description: str = None,
+ ):
+ return None
+
+ def get_pipeline_id(self, name):
+
+ return ['pipelin_id', 'pipelin_id2',]
+
+
+class FakeNegativeKfp:
+ """ def list_pipeline_versions(
+ self,
+ pipeline_id: str,
+ page_token: str = '',
+ page_size: int = 10,
+ sort_by: str = ''
+ ):
+ return None """
+
+ """
+ def upload_pipeline(
+ self,
+ pipeline_package_path: str = None,
+ pipeline_name: str = None,
+ description: str = None,
+ ):
+ return None
+ """
+ """
+ def list_pipeline_versions(
+ self,
+ pipeline_id: str,
+ page_token: str = '',
+ page_size: int = 10,
+ sort_by: str = ''
+ ):
+ response = ApiListPipelineVersionsResponse()
+ version1 = ApiPipelineVersion()
+ version1.name = 'version_name'
+ version1.id = 'version_id'
+ version2 = ApiPipelineVersion()
+ version2.name = 'version_name'
+ version2.id = 'version_id'
+ response._versions = [version1, version2]
+ response._total_size = None
+ return response
+ """
+ def get_pipeline_id(self, name):
+ raise kfp_server_api.exceptions.ApiException(name)
+
+ def get_experiment(self,
+ experiment_id=None,
+ experiment_name=None,
+ namespace=None):
+ raise ValueError(
+ 'Either experiment_id or experiment_name is required')
+
+class FakeAdditionalKfp:
+ """
+ def list_pipeline_versions(
+ self,
+ pipeline_id: str,
+ page_token: str = '',
+ page_size: int = 10,
+ sort_by: str = ''
+ ):
+ response = ApiListPipelineVersionsResponse()
+ version1 = ApiPipelineVersion()
+ version1.name = 'version_name'
+ version1.id = 'version_id'
+ version2 = ApiPipelineVersion()
+ version2.name = 'version_name'
+ version2.id = 'version_id'
+ response._versions = [version1, version2]
+ response._total_size = None
+ return response
+ """
+ """
+ def get_pipeline_id(self, name):
+ return None
+
+ def upload_pipeline(
+ self,
+ pipeline_package_path: str = None,
+ pipeline_name: str = None,
+ description: str = None,
+ ):
+ return None
+ """
+
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import sys
+sys.path.append("kfadapter")
+import kfadapter_main
+from tmgr_logger import TMLogger
+
+from flask_api import status
+import json
+from fake_kfconnect import FakeKfConnect, NegativeFakeAdditionalKfConnect, NegativeFakeKfConnect, NegativeFakeNoneKfConnect
+from fake_kfconf import FakeKfConf
+
+class Test_pipeline_id_operations:
+
+ def setup_method(self):
+ kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger
+ kfadapter_main.KFCONNECT_KF_OBJ = FakeKfConnect()
+ kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance()
+ self.client = kfadapter_main.APP.test_client(self)
+
+ def test_get_experiment(self):
+ experiment_name = "ai-server"
+ response = self.client.get("/experiments/{}".format(experiment_name))
+ expected_data = b'{"name": "ai-server", "id":"isj0t3jdhf"}'
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_200_OK, "not equal code"
+
+ def test_get_all_runs(self):
+ response = self.client.get("/runs")
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status == '200 OK'
+
+ def test_get_pipeline_id(self):
+ pipeline_name = "car-racing"
+ response = self.client.get("/pipelineIds/{}".format(pipeline_name))
+ expected_data = b'{"name":"car-racing", "id":"3jfidhsueuf2oj"}'
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_200_OK, "not equal code"
+
+ def test_upload_pipeline(self):
+ pipeline_file_name="Training-Manager/kf_connector/test/pipeline.zip"
+
+ response = self.client.post("/user/2/edit", data={
+ "file": pipeline_file_name,
+ "description": "description"
+ })
+
+ assert response.content_type == "text/html; charset=utf-8"
+
+ def test_get_run(self):
+ run_id = "run_id"
+ response = self.client.get("/runs/{}".format(run_id))
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status == '200 OK'
+
+ def test_get_pipeline_version(self):
+ pipeline_name="car-racing"
+
+ response = self.client.get("/pipelines/{}/versions".format(pipeline_name))
+ expected_data = ""
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_200_OK, "not equal code"
+
+ def test_delete_run(self):
+ run_id = "run_id"
+ response = self.client.delete("/runs/{}".format(run_id))
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status == '400 BAD REQUEST'
+
+ def test_get_pipelines(self):
+ response = self.client.get("/pipelines")
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_200_OK
+
+ def test_delete_pipelines(self):
+ pipeline_id = "pipelineIdsample"
+
+ response = self.client.delete("/pipelines/{}".format(pipeline_id))
+ expected_data = b'{"OK"}'
+
+ assert response.content_type == "application/json"
+ assert response.status_code == status.HTTP_200_OK
+
+ def test_check_liveness(self):
+ response = self.client.get("/liveness")
+ expected_data = b'Okay'
+ assert response.content_type == "text/html; charset=utf-8", "not equal content type"
+ assert response.data == expected_data
+
+ def test_get_experiments(self):
+ response = self.client.get("/experiments")
+ expected_data=""
+ assert response.content_type == "application/json", "not equal content type"
+
+ def test_execute_job(self):
+ job_name = "job_name"
+ dict_job = {'arguments' : {'key1':'value1', 'key2':'value2'}, 'pipeline_name' : "pipeline_name", 'experiment_name' : "experiment_name", 'pipeline_version' : "pipeline_version"}
+ payload = json.dumps(dict_job)
+ headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
+ response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers)
+ assert response.content_type == "application/json", "not equal content type"
+
+ def test_get_pipelines_id(self):
+ pipeline_id = "pipelineIdsample"
+ response = self.client.get("/pipelines/{}".format(pipeline_id))
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == status.HTTP_200_OK, "not equal code"
+
+class Test_Negative:
+ def setup_method(self):
+ self.client = kfadapter_main.APP.test_client(self)
+ kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger
+
+ kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeKfConnect()
+ kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance()
+
+ def test_negative_get_pipeline_version(self):
+ pipeline_name="car-racing"
+
+ response = self.client.get("/pipelines/{}/versions".format(pipeline_name))
+ assert response.content_type == "text/html; charset=utf-8", "not equal content type"
+
+ def test_negative_get_pipelines_id(self):
+ pipeline_id = "pipelineIdsample"
+ response = self.client.get("/pipelines/{}".format(pipeline_id))
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 500, "not equal code"
+
+ def test_negative_get_pipeline_id(self):
+ pipeline_name = "car-racing"
+ response = self.client.get("/pipelineIds/{}".format(pipeline_name))
+
+ assert response.content_type == "application/json"
+
+ def test_negative_get_experiment(self):
+ experiment_name = "ai-server"
+ response = self.client.get("/experiments/{}".format(experiment_name))
+ expected_data = b'{"name": "ai-server", "id":"isj03jdhf"}'
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 400, "not equal code"
+
+ def test_negative_get_run(self):
+ run_id = "run_id"
+ response = self.client.get("/runs/{}".format(run_id))
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status == '400 BAD REQUEST'
+
+ def test_negative_upload_pipeline(self):
+ pipeline_file_name="pipeline.zip"
+ description="test pipeline"
+ pipeline_name="car-racing"
+
+ dict_pipeline = {'file' : pipeline_file_name, 'description' : description}
+ headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
+
+ response = self.client.post("pipelineIds/{}".format(pipeline_name), data=json.dumps(dict_pipeline), headers=headers)
+
+ assert response.content_type == "application/json"
+ assert response.status == '500 INTERNAL SERVER ERROR'
+
+ def test_negative_get_all_runs(self):
+ response = self.client.get("/runs")
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status == '400 BAD REQUEST'
+
+ def test_negative_get_pipelines(self):
+ response = self.client.get("/pipelines")
+ print(response)
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 500
+
+ def test_negative_execute_job(self):
+ job_name = "job_name"
+ dict_job = {}
+ dict_job['arguments'] = "param1"
+ dict_job['pipeline_name'] = "pipeline_name"
+ dict_job['experiment_name'] = "experiment_name"
+ dict_job['pipeline_version'] = "pipeline_version"
+ payload = json.dumps(dict_job)
+ headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
+ response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers)
+ assert response.content_type == "application/json", "not equal content type"
+
+class TestAdditionalKfConnector:
+ def setup_method(self):
+ self.client = kfadapter_main.APP.test_client(self)
+ kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger
+ kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeAdditionalKfConnect()
+ kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance()
+
+ def test_negative_exception_get_kf_experiment(self):
+ experiment_name = "ai-server"
+ response = self.client.get("/experiments/{}".format(experiment_name))
+
+ assert response.content_type == "application/json", "not equal content type"
+
+ def test_negative_exception_get_pipeline_id(self):
+ pipeline_name = "car-racing"
+ response = self.client.get("/pipelineIds/{}".format(pipeline_name))
+
+ assert response.content_type == "text/html; charset=utf-8", "not equal content type"
+
+ def test_negative_exception_get_pipeline_version(self):
+ pipeline_name="car-racing"
+ response = self.client.get("/pipelines/{}/versions".format(pipeline_name))
+ assert response.content_type == "application/json", "not equal content type"
+
+ def test_negative_exception_execute_job(self):
+ job_name = "job_name"
+ dict_job = {}
+ dict_job['arguments'] = {"param3":"value3", "param4":"value4"}
+ dict_job['pipeline_name'] = 'pipeline_name'
+ dict_job['experiment_name'] = 'experiment_name'
+ dict_job['pipeline_version'] = 'pipeline_version'
+ payload = json.dumps(dict_job)
+ headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
+ response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers)
+ assert response.content_type == "application/json", "not equal content type"
+
+ def test_negative_get_experiments(self):
+ response = self.client.get("/experiments")
+ expected_data=""
+ assert response.content_type == "application/json", "not equal content type"
+
+ def test_negative_exception_get_pipelines_id(self):
+ pipeline_id = "pipelineIdsample"
+ response = self.client.get("/pipelines/{}".format(pipeline_id))
+
+ assert response.content_type == "application/json", "not equal content type"
+ assert response.status_code == 500, "not equal code"
+
+
+class TestNoneKfConnect:
+ def setup_method(self):
+ self.client = kfadapter_main.APP.test_client(self)
+ kfadapter_main.LOGGER = TMLogger("config/log_config.yaml").logger
+ kfadapter_main.KFCONNECT_KF_OBJ = NegativeFakeNoneKfConnect()
+ kfadapter_main.KFCONNECT_CONFIG_OBJ = FakeKfConf.get_instance()
+
+ def test_nagetive_none_execute_job(self):
+ job_name = "job_name"
+ dict_job = {}
+ dict_job['arguments'] = "param1"
+ dict_job['pipeline_name'] = "pipeline_name"
+ dict_job['experiment_name'] = "experiment_name"
+ dict_job['pipeline_version'] = "pipeline_version"
+ payload = json.dumps(dict_job)
+ headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
+ response = self.client.post("/trainingjobs/{}/execution".format(job_name), data=payload, headers=headers)
+ assert response.content_type == "application/json", "not equal content type"
+
\ No newline at end of file