Test codes for kfadapter_main
[aiml-fw/athp/tps/kubeflow-adapter.git] / test / fake_kfconnect.py
1 # ==================================================================================
2 #
3 #       Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 # ==================================================================================
18
19 import kfp_server_api
20 from kfp_server_api.models.api_run import ApiRun
21 from kfp_server_api.models.api_list_runs_response import ApiListRunsResponse
22
23 from kfp_server_api.models.api_experiment import ApiExperiment
24 from kfp_server_api.models.api_list_pipelines_response import ApiListPipelinesResponse
25 from kfp_server_api.models.api_pipeline import ApiPipeline
26 from kfp_server_api.models.api_parameter import ApiParameter
27 from kfp_server_api.models.api_resource_reference import ApiResourceReference
28 from kfp_server_api.models.api_resource_key import ApiResourceKey
29 from kfp_server_api.models.api_pipeline_version import ApiPipelineVersion
30
31 class FakeKfConnect:
32
33     def __init__(self):
34         print("Initialized Fake KfConnect")
35
36     def get_kf_list_experiments(self, nspace):
37         explist = kfp_server_api.ApiListExperimentsResponse()
38         
39         exp = kfp_server_api.ApiExperiment()
40         exp.name = "name"
41         exp.id = "id"
42         explist.experiments = [exp]
43         return explist
44
45     def get_pl_versions_by_pl_name(self, pipeline_name):
46         version_list = []
47         version_list.append("2.0.0")
48         return version_list
49     """
50     def run_kf_pipeline(self,exp_id,arguments,
51         experiment_id: str,
52         job_name: str,
53         pipeline_package_path: Optional[str] = None,
54         params: Optional[dict] = None,
55         pipeline_id: Optional[str] = None,
56         version_id: Optional[str] = None,
57         pipeline_root: Optional[str] = None,
58         enable_caching: Optional[str] = None,
59         service_account: Optional[str] = None,):
60         
61         run = ApiRun()
62         run.id = "run_id"
63         run.name = "run_name"
64         rr0 = ApiResourceReference()
65         rr0.name = "rr0"
66         key0 = ApiResourceKey()
67         key0.id = "id0"
68         rr0.key = key0
69         rr1 = ApiResourceReference()
70         rr1.name = "rr0"
71         key1 = ApiResourceKey()
72         key1.id = "id1"
73         rr1.key = key1
74         run.status = "Running"
75         run.resource_references = [rr0, rr1]
76         return run
77     """
78
79     def delete_kf_pipeline(self, pipeline_id):
80         return True
81     
82     def get_kf_pipeline_version_id( self,pipeline_version_name,
83             pipeline_id: str,
84             page_token: str = '',
85             page_size: int = 10,
86             sort_by: str = ''
87     ):
88         return "pipeline_id"
89
90     def get_kf_list_runs(self,
91                   page_token='',
92                   page_size=10,
93                   sort_by='',
94                   experiment_id=None,
95                   namespace=None):
96         listrun = ApiListRunsResponse()
97         run1 = ApiRun()
98         run1.id = "id" 
99         run1.description = "description" 
100         run1.status = "status"  
101         
102         rr0 = ApiResourceReference()
103         rr0.name = "rr0"
104         key0 = ApiResourceKey()
105         key0.id = "id"
106         rr0.key = key0
107   
108         rr1 = ApiResourceReference()
109         rr1.name = "rr1"
110         key1 = ApiResourceKey()
111         key1.id = "id"
112         rr1.key = key1
113     
114         run1.resource_references = [rr0, rr1]
115         
116         run2 = ApiRun()
117         run2.id = "id" 
118         run2.description = "description" 
119         run2.status = "status"  
120         
121         rr2 = ApiResourceReference()
122         rr2.name = "rr2"
123         key2 = ApiResourceKey()
124         key2.id = "id"
125         rr2.key = key2
126   
127         rr3 = ApiResourceReference()
128         rr3.name = "rr1"
129         key3 = ApiResourceKey()
130         key3.id = "id"
131         rr3.key = key3
132     
133         run2.resource_references = [rr2, rr3]
134         
135         
136         listrun.runs = [run1, run2]
137     
138         return listrun
139
140     def get_kf_pipeline_desc(self, pipeline_id: str):
141         pipeline_info = ApiPipeline()
142         
143         param1 = ApiParameter()
144         param1.name = "param1"
145         param1.value = "value1"
146         param2 = ApiParameter()
147         param2.name = "param2"
148         param2.value = "value2"
149         pipeline_info.parameters = [param1, param2]
150         pipeline_info.description = 'description'
151         pipeline_info.id = "id"
152         pipeline_info.name = "name"
153         
154         param3 = ApiParameter()
155         param3.name = "param3"
156         param3.value = "value3"
157         param4 = ApiParameter()
158         param4.name = "param4"
159         param4.value = "value4"
160         
161         default_version = ApiPipelineVersion()
162         default_version.parameters = [param3, param4]    
163            
164         pipeline_info.default_version = default_version
165         return pipeline_info
166
167     def get_kf_run(self, run_id: str):
168         run = ApiRun()
169         run.name = "run_name"
170         run.status = "Running"
171         run.id = "run_id"
172         return run
173
174     def get_kf_experiment_details(self, ex_name, nspace):
175         experiment = ApiExperiment()
176         experiment.name = "exp_name"
177         experiment.id = "exp_id"
178         return experiment
179
180     def get_kf_pipeline_id(self, pipeline_name):
181         return "pipeline_id"
182     """
183     def upload_pipeline_with_versions(self, pipeline_name, file, desc):
184         pipeline_info = kfp_server_api.ApiPipelineVersion()
185         pipeline_info.id("pipeline_id")
186         return pipeline_info   
187     """ 
188
189     def get_kf_list_pipelines(self):
190         pipeline_list = ApiListPipelinesResponse()
191         pipeline = ApiPipeline()
192         pipeline.id = "pipeline_id"
193         pipeline.description = "pipeline_description"
194         parameter = ApiParameter()
195         parameter.name = "param1"
196         parameter.value = "value1"
197         pipeline.parameters = [parameter]
198         pipeline_list.pipelines = [pipeline]
199         return pipeline_list
200
201
202 class NegativeFakeKfConnect:
203     def __init__(self):
204         print("Initialized Negative Fake KfConnect")
205
206     def get_kf_pipeline_id(self, pipeline_name):
207         return None
208
209     def get_kf_experiment_details(self, ex_name, nspace):
210         return None
211
212     def get_pl_versions_by_pl_name(self, pipeline_name):
213         raise kfp_server_api.exceptions.ApiException
214     
215     def get_kf_run(self, run_id):
216         """
217         run = ApiRun()
218         run.id = run_id
219         run.name = 'run_name'
220         run.status = 'Running'
221         return run        
222         """
223         raise Exception('erro')
224
225     def get_kf_list_runs(self, nspace):
226         
227         run = ApiRun()
228         run.name = "run_name"
229         run.status = "Running"
230         run.id = "run_id"
231         run.description = "descrption"
232         rr0 = ApiResourceReference()
233         rr0.name = "rr0"
234         key0 = ApiResourceKey()
235         key0.id = "key0id"  
236         run.resource_references = [rr0]
237
238         runs = ApiListRunsResponse()
239         runs.runs = [run]
240         
241         return runs
242
243     def get_kf_list_pipelines(self,
244                        page_token='',
245                        page_size=10,
246                        sort_by=''):
247         raise Exception('error')
248       
249
250     def get_kf_pipeline_desc(self, pipeline_id):
251         raise kfp_server_api.exceptions.ApiException 
252     
253 class NegativeFakeAdditionalKfConnect:
254     
255     def get_kf_experiment_details(self, ex_name, nspace):
256         raise Exception('error')
257     
258     def get_kf_pipeline_id(self, pipeline_name):
259         raise kfp_server_api.exceptions.ApiException('error')
260     """
261     def get_pl_versions_by_pl_namee(self, pipeline_name):
262         raise Exception('error')
263     """
264     """
265     def get_kf_pipeline_version_id(self, pipeline_id, pipeline_version_name):
266         raise Exception('error')
267     """
268     def get_kf_list_experiments(self, nspace):
269         raise Exception('error')
270
271     def get_kf_pipeline_desc(self, pipeline_id: str):
272          raise kfp_server_api.exceptions.ApiException('error') 
273
274 class NegativeFakeNoneKfConnect:
275     
276     def get_kf_experiment_details(self, ex_name, nspace):
277         experiment = ApiExperiment()
278         experiment.name = "exp_name"
279         experiment.id = "exp_id"
280         return experiment
281     
282     def get_kf_pipeline_id(self, pipeline_name):
283         return None
284     
285     
286     
287