Enhance: Enable O2 DMS for distributed cloud
[pti/o2.git] / o2dms / service / nfdeployment_handler.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 # pylint: disable=unused-argument
16 from __future__ import annotations
17 import os
18 import json
19 import random
20 import string
21 import yaml
22 from datetime import datetime
23 from helm_sdk import Helm
24 from typing import Callable
25 from retry import retry
26
27 from o2dms.domain.states import NfDeploymentState
28 # from o2common.service import messagebus
29 from o2dms.domain.dms import NfDeployment, NfDeploymentDesc
30 from o2dms.domain import commands
31 from o2dms.domain.exceptions import NfdeploymentNotFoundError
32 from o2dms.domain import events
33 from o2common.service.unit_of_work import AbstractUnitOfWork
34 from o2common.config import config
35 # if TYPE_CHECKING:
36 #     from . import unit_of_work
37
38 from o2common.helper import o2logging
39 logger = o2logging.get_logger(__name__)
40 LOCAL_HELM_BIN = config.get_helm_cli()
41 K8S_KUBECONFIG, K8S_APISERVER, K8S_TOKEN = \
42     config.get_k8s_api_endpoint()
43
44
45 def publish_nfdeployment_state_change(
46     event: events.NfDeploymentStateChanged,
47     publish: Callable,
48 ):
49     publish("NfDeploymentStateChanged", event)
50     logger.debug(
51         "published NfDeploymentStateChanged: {}, state from {} to {}".format(
52             event.NfDeploymentId, event.FromState, event.ToState))
53
54
55 def handle_nfdeployment_statechanged(
56     cmd: commands.HandleNfDeploymentStateChanged,
57     uow: AbstractUnitOfWork
58 ):
59     if cmd.FromState == NfDeploymentState.Initial:
60         if cmd.ToState == NfDeploymentState.Installing:
61             cmd2 = commands.InstallNfDeployment(cmd.NfDeploymentId)
62             install_nfdeployment(cmd2, uow)
63         elif cmd.ToState == NfDeploymentState.Deleting:
64             cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
65             delete_nfdeployment(cmd2, uow)
66         else:
67             logger.debug("Not insterested state change: {}".format(cmd))
68     elif cmd.FromState == NfDeploymentState.Installed \
69             or cmd.FromState == NfDeploymentState.Installing \
70             or cmd.FromState == NfDeploymentState.Updating \
71             or cmd.FromState == NfDeploymentState.Abnormal:
72
73         if cmd.ToState == NfDeploymentState.Uninstalling:
74             cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
75             uninstall_nfdeployment(cmd2, uow)
76         else:
77             logger.debug("Not insterested state change: {}".format(cmd))
78     elif cmd.FromState == NfDeploymentState.Abnormal:
79         if cmd.ToState == NfDeploymentState.Deleting:
80             # cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
81             # uninstall_nfdeployment(cmd2, uow)
82             cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
83             delete_nfdeployment(cmd2, uow)
84         else:
85             logger.debug("Not insterested state change: {}".format(cmd))
86     else:
87         logger.debug("Not insterested state change: {}".format(cmd))
88
89
90 # retry 10 seconds
91 @retry(
92     (NfdeploymentNotFoundError),
93     tries=100,
94     delay=2, max_delay=10000, backoff=1)
95 def _retry_get_nfdeployment(
96         cmd: commands.InstallNfDeployment,
97         uow: AbstractUnitOfWork):
98     nfdeployment: NfDeployment = uow.nfdeployments.get(
99         cmd.NfDeploymentId)
100     if nfdeployment is None:
101         raise NfdeploymentNotFoundError(
102             "Cannot find NfDeployment: {}".format(
103                 cmd.NfDeploymentId))
104     return nfdeployment
105
106
107 def install_nfdeployment(
108     cmd: commands.InstallNfDeployment,
109     uow: AbstractUnitOfWork
110 ):
111     logger.info("install with NfDeploymentId: {}".format(
112         cmd.NfDeploymentId))
113     nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
114     if nfdeployment is None:
115         raise Exception("Cannot find NfDeployment: {}".format(
116             cmd.NfDeploymentId))
117     # get nfdeploymentdescriptor by descriptorId
118     desc: NfDeploymentDesc = uow.nfdeployment_descs.get(
119         nfdeployment.descriptorId)
120     if desc is None:
121         raise Exception(
122             "Cannot find NfDeploymentDescriptor:{} for NfDeployment:{}".format(
123                 nfdeployment.descriptorId, nfdeployment.id
124             ))
125
126     nfdeployment.set_state(NfDeploymentState.Installing)
127
128     # Gen kube config file and set the path
129     dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
130     dms_res = dms.serialize()
131     p = dms_res.pop("profile", None)
132     k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
133
134     # helm repo add
135     repourl = desc.artifactRepoUrl
136     helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
137     repoName = None
138     try:
139         repolist = helm.repo_list()
140         for repo in repolist:
141             if repo['url'] == repourl:
142                 repoName = repo['name']
143                 break
144     except Exception:
145         # repoExisted
146         repoName = None
147
148     if not repoName:
149         repoName = "repo4{}".format(nfdeployment.name)
150         logger.debug("Trying to add repo:{}".format(repourl))
151         helm.repo_add(repoName, repourl)
152     helm.repo_update(None)
153
154     repolist = helm.repo_list()
155     logger.debug('repo list:{}'.format(repolist))
156
157     # helm install name chart
158     values_file_path = '/tmp/override_{}.yaml'.format(nfdeployment.name)
159     if len(desc.inputParams) > 0:
160         logger.info("dump override yaml:{}".format(values_file_path))
161         values = json.loads(desc.inputParams)
162         _create_values_file(values_file_path, values)
163     else:
164         values_file_path = None
165
166     logger.debug('Try to helm install {}/{} {} -f {}'.format(
167         repoName, nfdeployment.name, desc.artifactName, values_file_path))
168     tokens = desc.artifactName.split(':')
169     chartname = tokens[0]
170     myflags = None
171     # if (len(tokens) > 1):
172     #     myflags = {"name": "version", "value": tokens[1]}
173     result = helm.install(
174         nfdeployment.name, "{}/{}".format(repoName, chartname), flags=myflags,
175         values_file=values_file_path, kubeconfig=k8sconf_path)
176     # token=K8S_TOKEN, apiserver=K8S_APISERVER)
177     logger.debug('result: {}'.format(result))
178
179     # in case success
180     with uow:
181         entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
182         if entity:
183             entity.set_state(NfDeploymentState.Installed)
184             entity.transit_state(NfDeploymentState.Installed)
185         uow.commit()
186
187
188 def _create_values_file(filePath: str, content: dict):
189     with open(filePath, "w", encoding="utf-8") as f:
190         yaml.dump(content, f)
191
192
193 def uninstall_nfdeployment(
194     cmd: commands.UninstallNfDeployment,
195     uow: AbstractUnitOfWork
196 ):
197     logger.info("uninstall with NfDeploymentId: {}".format(
198         cmd.NfDeploymentId))
199     nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
200     if nfdeployment is None:
201         raise Exception("Cannot find NfDeployment: {}".format(
202             cmd.NfDeploymentId))
203     # get nfdeploymentdescriptor by descriptorId
204     desc: NfDeploymentDesc = uow.nfdeployment_descs.get(
205         nfdeployment.descriptorId)
206     if desc is None:
207         raise Exception(
208             "Cannot find NfDeploymentDescriptor:{} for NfDeployment:{}".format(
209                 nfdeployment.descriptorId, nfdeployment.id
210             ))
211
212     with uow:
213         entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
214         if entity:
215             entity.set_state(NfDeploymentState.Uninstalling)
216         uow.commit()
217
218     # Gen kube config file and set the path
219     dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
220     dms_res = dms.serialize()
221     p = dms_res.pop("profile", None)
222     k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
223
224     helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
225
226     logger.debug('Try to helm del {}'.format(
227         nfdeployment.name))
228     myflags = None
229     # if (len(tokens) > 1):
230     #     myflags = {"name": "version", "value": tokens[1]}
231     result = helm.uninstall(
232         nfdeployment.name, flags=myflags,
233         kubeconfig=k8sconf_path,)
234     # token=K8S_TOKEN, apiserver=K8S_APISERVER)
235     logger.debug('result: {}'.format(result))
236
237     # in case success
238
239     with uow:
240         entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
241         if entity:
242             entity.set_state(NfDeploymentState.Initial)
243             entity.transit_state(NfDeploymentState.Deleting)
244         # uow.nfdeployments.update(
245         #     cmd.NfDeploymentId, status=NfDeploymentState.Initial)
246         uow.commit()
247
248
249 def delete_nfdeployment(
250     cmd: commands.UninstallNfDeployment,
251     uow: AbstractUnitOfWork
252 ):
253     logger.info("delete with NfDeploymentId: {}".format(
254         cmd.NfDeploymentId))
255
256     # nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
257     with uow:
258         uow.nfdeployments.delete(cmd.NfDeploymentId)
259         uow.commit()
260
261
262 def _get_kube_config_path(dmId: str, kubeconfig: dict) -> dict:
263
264     # TODO: update this kube file for each DMS k8s when it changes.
265
266     link_file_path = '/tmp/kubeconfig_' + dmId
267     if os.path.exists(link_file_path) and \
268             os.path.exists(os.readlink(link_file_path)):
269         return link_file_path
270
271     # Generate a random key for tmp kube config file
272     letters = string.ascii_uppercase
273     random_key = ''.join(random.choice(letters) for i in range(10))
274
275     # Get datetime of now as tag of the tmp file
276     current_time = datetime.now().strftime("%Y%m%d%H%M%S")
277     tmp_file_name = random_key + "_" + current_time
278     tmp_file_path = '/tmp/kubeconfig_' + tmp_file_name
279
280     data = config.gen_k8s_config_dict(
281         kubeconfig.pop('cluster_api_endpoint', None),
282         kubeconfig.pop('cluster_ca_cert', None),
283         kubeconfig.pop('admin_user', None),
284         kubeconfig.pop('admin_client_cert', None),
285         kubeconfig.pop('admin_client_key', None),
286     )
287
288     # write down the yaml file of kubectl into tmp folder
289     with open(tmp_file_path, 'w') as file:
290         yaml.dump(data, file)
291
292     # os.symlink(tmp_file_path, link_file_path)
293     os.symlink(tmp_file_path, '/tmp/tmp_'+tmp_file_name)
294     os.rename('/tmp/tmp_'+tmp_file_name, link_file_path)
295     if os.path.realpath(link_file_path) != tmp_file_path:
296         # Symlink was updated failed
297         logger.error('symlink update failed')
298
299     return link_file_path