1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 # pylint: disable=unused-argument
16 from __future__ import annotations
22 from datetime import datetime
23 from helm_sdk import Helm
24 from typing import Callable
25 from retry import retry
27 from o2dms.domain.states import NfDeploymentState
28 # from o2common.service import messagebus
29 from o2dms.domain.dms import NfDeployment, NfDeploymentDesc
30 from o2dms.domain import commands
31 from o2dms.domain.exceptions import NfdeploymentNotFoundError
32 from o2dms.domain import events
33 from o2common.service.unit_of_work import AbstractUnitOfWork
34 from o2common.config import config
36 # from . import unit_of_work
38 from o2common.helper import o2logging
39 logger = o2logging.get_logger(__name__)
40 LOCAL_HELM_BIN = config.get_helm_cli()
41 K8S_KUBECONFIG, K8S_APISERVER, K8S_TOKEN = \
42 config.get_k8s_api_endpoint()
45 def publish_nfdeployment_state_change(
46 event: events.NfDeploymentStateChanged,
49 publish("NfDeploymentStateChanged", event)
51 "published NfDeploymentStateChanged: {}, state from {} to {}".format(
52 event.NfDeploymentId, event.FromState, event.ToState))
55 def handle_nfdeployment_statechanged(
56 cmd: commands.HandleNfDeploymentStateChanged,
57 uow: AbstractUnitOfWork
59 if cmd.FromState == NfDeploymentState.Initial:
60 if cmd.ToState == NfDeploymentState.Installing:
61 cmd2 = commands.InstallNfDeployment(cmd.NfDeploymentId)
62 install_nfdeployment(cmd2, uow)
63 elif cmd.ToState == NfDeploymentState.Deleting:
64 cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
65 delete_nfdeployment(cmd2, uow)
67 logger.debug("Not insterested state change: {}".format(cmd))
68 elif cmd.FromState == NfDeploymentState.Installed \
69 or cmd.FromState == NfDeploymentState.Installing \
70 or cmd.FromState == NfDeploymentState.Updating \
71 or cmd.FromState == NfDeploymentState.Abnormal:
73 if cmd.ToState == NfDeploymentState.Uninstalling:
74 cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
75 uninstall_nfdeployment(cmd2, uow)
77 logger.debug("Not insterested state change: {}".format(cmd))
78 elif cmd.FromState == NfDeploymentState.Abnormal:
79 if cmd.ToState == NfDeploymentState.Deleting:
80 # cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
81 # uninstall_nfdeployment(cmd2, uow)
82 cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
83 delete_nfdeployment(cmd2, uow)
85 logger.debug("Not insterested state change: {}".format(cmd))
87 logger.debug("Not insterested state change: {}".format(cmd))
92 (NfdeploymentNotFoundError),
94 delay=2, max_delay=10000, backoff=1)
95 def _retry_get_nfdeployment(
96 cmd: commands.InstallNfDeployment,
97 uow: AbstractUnitOfWork):
98 nfdeployment: NfDeployment = uow.nfdeployments.get(
100 if nfdeployment is None:
101 raise NfdeploymentNotFoundError(
102 "Cannot find NfDeployment: {}".format(
107 def install_nfdeployment(
108 cmd: commands.InstallNfDeployment,
109 uow: AbstractUnitOfWork
111 logger.info("install with NfDeploymentId: {}".format(
113 nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
114 if nfdeployment is None:
115 raise Exception("Cannot find NfDeployment: {}".format(
117 # get nfdeploymentdescriptor by descriptorId
118 desc: NfDeploymentDesc = uow.nfdeployment_descs.get(
119 nfdeployment.descriptorId)
122 "Cannot find NfDeploymentDescriptor:{} for NfDeployment:{}".format(
123 nfdeployment.descriptorId, nfdeployment.id
126 nfdeployment.set_state(NfDeploymentState.Installing)
128 # Gen kube config file and set the path
129 dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
130 dms_res = dms.serialize()
131 p = dms_res.pop("profile", None)
132 k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
135 repourl = desc.artifactRepoUrl
136 helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
139 repolist = helm.repo_list()
140 for repo in repolist:
141 if repo['url'] == repourl:
142 repoName = repo['name']
149 repoName = "repo4{}".format(nfdeployment.name)
150 logger.debug("Trying to add repo:{}".format(repourl))
151 helm.repo_add(repoName, repourl)
152 helm.repo_update(None)
154 repolist = helm.repo_list()
155 logger.debug('repo list:{}'.format(repolist))
157 # helm install name chart
158 values_file_path = '/tmp/override_{}.yaml'.format(nfdeployment.name)
159 if len(desc.inputParams) > 0:
160 logger.info("dump override yaml:{}".format(values_file_path))
161 values = json.loads(desc.inputParams)
162 _create_values_file(values_file_path, values)
164 values_file_path = None
166 logger.debug('Try to helm install {}/{} {} -f {}'.format(
167 repoName, nfdeployment.name, desc.artifactName, values_file_path))
168 tokens = desc.artifactName.split(':')
169 chartname = tokens[0]
171 # if (len(tokens) > 1):
172 # myflags = {"name": "version", "value": tokens[1]}
173 result = helm.install(
174 nfdeployment.name, "{}/{}".format(repoName, chartname), flags=myflags,
175 values_file=values_file_path, kubeconfig=k8sconf_path)
176 # token=K8S_TOKEN, apiserver=K8S_APISERVER)
177 logger.debug('result: {}'.format(result))
181 entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
183 entity.set_state(NfDeploymentState.Installed)
184 entity.transit_state(NfDeploymentState.Installed)
188 def _create_values_file(filePath: str, content: dict):
189 with open(filePath, "w", encoding="utf-8") as f:
190 yaml.dump(content, f)
193 def uninstall_nfdeployment(
194 cmd: commands.UninstallNfDeployment,
195 uow: AbstractUnitOfWork
197 logger.info("uninstall with NfDeploymentId: {}".format(
199 nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
200 if nfdeployment is None:
201 raise Exception("Cannot find NfDeployment: {}".format(
203 # get nfdeploymentdescriptor by descriptorId
204 desc: NfDeploymentDesc = uow.nfdeployment_descs.get(
205 nfdeployment.descriptorId)
208 "Cannot find NfDeploymentDescriptor:{} for NfDeployment:{}".format(
209 nfdeployment.descriptorId, nfdeployment.id
213 entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
215 entity.set_state(NfDeploymentState.Uninstalling)
218 # Gen kube config file and set the path
219 dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
220 dms_res = dms.serialize()
221 p = dms_res.pop("profile", None)
222 k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
224 helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
226 logger.debug('Try to helm del {}'.format(
229 # if (len(tokens) > 1):
230 # myflags = {"name": "version", "value": tokens[1]}
231 result = helm.uninstall(
232 nfdeployment.name, flags=myflags,
233 kubeconfig=k8sconf_path,)
234 # token=K8S_TOKEN, apiserver=K8S_APISERVER)
235 logger.debug('result: {}'.format(result))
240 entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
242 entity.set_state(NfDeploymentState.Initial)
243 entity.transit_state(NfDeploymentState.Deleting)
244 # uow.nfdeployments.update(
245 # cmd.NfDeploymentId, status=NfDeploymentState.Initial)
249 def delete_nfdeployment(
250 cmd: commands.UninstallNfDeployment,
251 uow: AbstractUnitOfWork
253 logger.info("delete with NfDeploymentId: {}".format(
256 # nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
258 uow.nfdeployments.delete(cmd.NfDeploymentId)
262 def _get_kube_config_path(dmId: str, kubeconfig: dict) -> dict:
264 # TODO: update this kube file for each DMS k8s when it changes.
266 link_file_path = '/tmp/kubeconfig_' + dmId
267 if os.path.exists(link_file_path) and \
268 os.path.exists(os.readlink(link_file_path)):
269 return link_file_path
271 # Generate a random key for tmp kube config file
272 letters = string.ascii_uppercase
273 random_key = ''.join(random.choice(letters) for i in range(10))
275 # Get datetime of now as tag of the tmp file
276 current_time = datetime.now().strftime("%Y%m%d%H%M%S")
277 tmp_file_name = random_key + "_" + current_time
278 tmp_file_path = '/tmp/kubeconfig_' + tmp_file_name
280 data = config.gen_k8s_config_dict(
281 kubeconfig.pop('cluster_api_endpoint', None),
282 kubeconfig.pop('cluster_ca_cert', None),
283 kubeconfig.pop('admin_user', None),
284 kubeconfig.pop('admin_client_cert', None),
285 kubeconfig.pop('admin_client_key', None),
288 # write down the yaml file of kubectl into tmp folder
289 with open(tmp_file_path, 'w') as file:
290 yaml.dump(data, file)
292 # os.symlink(tmp_file_path, link_file_path)
293 os.symlink(tmp_file_path, '/tmp/tmp_'+tmp_file_name)
294 os.rename('/tmp/tmp_'+tmp_file_name, link_file_path)
295 if os.path.realpath(link_file_path) != tmp_file_path:
296 # Symlink was updated failed
297 logger.error('symlink update failed')
299 return link_file_path