1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from datetime import datetime
21 from o2common.service import unit_of_work
22 from o2ims.views.ocloud_dto import SubscriptionDTO
23 from o2ims.domain.subscription_obj import Subscription
25 from o2common.helper import o2logging
26 from o2common.config import config
27 logger = o2logging.get_logger(__name__)
30 def oclouds(uow: unit_of_work.AbstractUnitOfWork):
32 li = uow.oclouds.list()
33 return [r.serialize() for r in li]
36 def ocloud_one(ocloudid: str, uow: unit_of_work.AbstractUnitOfWork):
38 first = uow.oclouds.get(ocloudid)
39 return first.serialize() if first is not None else None
42 def resource_types(uow: unit_of_work.AbstractUnitOfWork):
44 li = uow.resource_types.list()
45 return [r.serialize() for r in li]
48 def resource_type_one(resourceTypeId: str,
49 uow: unit_of_work.AbstractUnitOfWork):
51 first = uow.resource_types.get(resourceTypeId)
52 return first.serialize() if first is not None else None
55 def resource_pools(uow: unit_of_work.AbstractUnitOfWork):
57 li = uow.resource_pools.list()
58 return [r.serialize() for r in li]
61 def resource_pool_one(resourcePoolId: str,
62 uow: unit_of_work.AbstractUnitOfWork):
64 first = uow.resource_pools.get(resourcePoolId)
65 return first.serialize() if first is not None else None
68 def resources(resourcePoolId: str, uow: unit_of_work.AbstractUnitOfWork,
71 filter_kwargs = {} # filter key should be the same with database name
72 if 'resourceTypeName' in kwargs:
73 resource_type_name = kwargs['resourceTypeName']
75 # res_types = uow.resource_types.list()
77 # restype.resourceTypeId for restype in res_types
78 # if resourceTypeName == restype.name]
79 # restype_id = '' if len(restype_ids) == 0 else restype_ids[0]
80 res_type = uow.resource_types.get_by_name(resource_type_name)
81 restype_id = '' if res_type is None else res_type.resourceTypeId
82 filter_kwargs['resourceTypeId'] = restype_id
84 # li = uow.resources.list(resourcePoolId)
85 # return [r.serialize() for r in li if r.resourceTypeId == restype_id]
86 if 'parentId' in kwargs:
87 filter_kwargs['parentId'] = kwargs['parentId']
90 li = uow.resources.list(resourcePoolId, **filter_kwargs)
91 return [r.serialize() for r in li]
94 def resource_one(resourceId: str, uow: unit_of_work.AbstractUnitOfWork):
96 first = uow.resources.get(resourceId)
97 return first.serialize() if first is not None else None
100 def deployment_managers(uow: unit_of_work.AbstractUnitOfWork):
102 li = uow.deployment_managers.list()
103 return [r.serialize() for r in li]
106 def deployment_manager_one(deploymentManagerId: str,
107 uow: unit_of_work.AbstractUnitOfWork,
108 profile: str = 'params'):
110 first = uow.deployment_managers.get(deploymentManagerId)
113 result = first.serialize()
115 if "params" == profile:
117 elif "file" == profile and result.hasattr("profile"):
118 p = result.pop("profile", None)
119 result["profile"] = _gen_kube_config(deploymentManagerId, p)
121 result.pop("profile", None)
126 def _gen_kube_config(dmId: str, kubeconfig: dict) -> dict:
128 # KUBECONFIG environment variable
130 # https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/
137 kubeconfig.pop('cluster_api_endpoint', None),
138 'certificate-authority-data':
139 kubeconfig.pop('cluster_ca_cert', None),
141 'name': 'inf-cluster'
146 'cluster': 'inf-cluster',
147 'user': 'kubernetes-admin'
149 'name': 'kubernetes-admin@inf-cluster'
152 'current-context': 'kubernetes-admin@inf-cluster',
157 'name': kubeconfig.pop('admin_user', None),
159 'client-certificate-data':
160 kubeconfig.pop('admin_client_cert', None),
162 kubeconfig.pop('admin_client_key', None),
167 # Generate a random key for tmp kube config file
168 letters = string.ascii_uppercase
169 random_key = ''.join(random.choice(letters) for i in range(10))
171 # Get datetime of now as tag of the tmp file
172 current_time = datetime.now().strftime("%Y%m%d%H%M%S")
173 tmp_file_name = random_key + "_" + current_time
175 # write down the yaml file of kubectl into tmp folder
176 with open('/tmp/kubeconfig_' + tmp_file_name, 'w') as file:
177 yaml.dump(data, file)
179 kubeconfig["kube_config_file"] = config.get_api_url() + \
180 config.get_o2dms_api_base() + "/" + dmId + "/download/" + tmp_file_name
185 def subscriptions(uow: unit_of_work.AbstractUnitOfWork):
187 li = uow.subscriptions.list()
188 return [r.serialize() for r in li]
191 def subscription_one(subscriptionId: str,
192 uow: unit_of_work.AbstractUnitOfWork):
194 first = uow.subscriptions.get(subscriptionId)
195 return first.serialize() if first is not None else None
198 def subscription_create(subscriptionDto: SubscriptionDTO.subscription,
199 uow: unit_of_work.AbstractUnitOfWork):
201 sub_uuid = str(uuid.uuid4())
202 subscription = Subscription(
203 sub_uuid, subscriptionDto['callback'],
204 subscriptionDto['consumerSubscriptionId'],
205 subscriptionDto['filter'])
207 uow.subscriptions.add(subscription)
209 return {"subscriptionId": sub_uuid}
212 def subscription_delete(subscriptionId: str,
213 uow: unit_of_work.AbstractUnitOfWork):
215 uow.subscriptions.delete(subscriptionId)