# pylint: disable=unused-argument
from __future__ import annotations
+import os
+import json
+import random
+import string
+import yaml
+from datetime import datetime
+from helm_sdk import Helm
+from typing import Callable
+from retry import retry
+
from o2dms.domain.states import NfDeploymentState
# from o2common.service import messagebus
from o2dms.domain.dms import NfDeployment, NfDeploymentDesc
from o2dms.domain import commands
-from typing import Callable
-
from o2dms.domain.exceptions import NfdeploymentNotFoundError
from o2dms.domain import events
from o2common.service.unit_of_work import AbstractUnitOfWork
-from helm_sdk import Helm
-from ruamel import yaml
-import json
from o2common.config import config
-from retry import retry
# if TYPE_CHECKING:
# from . import unit_of_work
if cmd.ToState == NfDeploymentState.Installing:
cmd2 = commands.InstallNfDeployment(cmd.NfDeploymentId)
install_nfdeployment(cmd2, uow)
+ elif cmd.ToState == NfDeploymentState.Deleting:
+ cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
+ delete_nfdeployment(cmd2, uow)
else:
logger.debug("Not insterested state change: {}".format(cmd))
elif cmd.FromState == NfDeploymentState.Installed \
uninstall_nfdeployment(cmd2, uow)
else:
logger.debug("Not insterested state change: {}".format(cmd))
- elif cmd.FromState == NfDeploymentState.Initial \
- or cmd.FromState == NfDeploymentState.Abnormal:
-
+ elif cmd.FromState == NfDeploymentState.Abnormal:
if cmd.ToState == NfDeploymentState.Deleting:
- cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
- uninstall_nfdeployment(cmd2, uow)
+ # cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
+ # uninstall_nfdeployment(cmd2, uow)
cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
delete_nfdeployment(cmd2, uow)
else:
nfdeployment.set_state(NfDeploymentState.Installing)
+ # Gen kube config file and set the path
+ dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
+ dms_res = dms.serialize()
+ p = dms_res.pop("profile", None)
+ k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
+
# helm repo add
repourl = desc.artifactRepoUrl
helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
# myflags = {"name": "version", "value": tokens[1]}
result = helm.install(
nfdeployment.name, "{}/{}".format(repoName, chartname), flags=myflags,
- values_file=values_file_path, kubeconfig=K8S_KUBECONFIG,
- token=K8S_TOKEN, apiserver=K8S_APISERVER)
+ values_file=values_file_path, kubeconfig=k8sconf_path)
+ # token=K8S_TOKEN, apiserver=K8S_APISERVER)
logger.debug('result: {}'.format(result))
# in case success
def _create_values_file(filePath: str, content: dict):
with open(filePath, "w", encoding="utf-8") as f:
- yaml.dump(content, f, Dumper=yaml.RoundTripDumper)
+ yaml.dump(content, f)
def uninstall_nfdeployment(
nfdeployment.descriptorId, nfdeployment.id
))
- nfdeployment.set_state(NfDeploymentState.Uninstalling)
+ with uow:
+ entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
+ if entity:
+ entity.set_state(NfDeploymentState.Uninstalling)
+ uow.commit()
+
+ # Gen kube config file and set the path
+ dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
+ dms_res = dms.serialize()
+ p = dms_res.pop("profile", None)
+ k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
+
helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
logger.debug('Try to helm del {}'.format(
# myflags = {"name": "version", "value": tokens[1]}
result = helm.uninstall(
nfdeployment.name, flags=myflags,
- kubeconfig=K8S_KUBECONFIG,
- token=K8S_TOKEN, apiserver=K8S_APISERVER)
+ kubeconfig=k8sconf_path,)
+ # token=K8S_TOKEN, apiserver=K8S_APISERVER)
logger.debug('result: {}'.format(result))
# in case success
with uow:
entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
if entity:
- entity.transit_state(NfDeploymentState.Initial)
+ entity.set_state(NfDeploymentState.Initial)
+ entity.transit_state(NfDeploymentState.Deleting)
# uow.nfdeployments.update(
# cmd.NfDeploymentId, status=NfDeploymentState.Initial)
uow.commit()
with uow:
uow.nfdeployments.delete(cmd.NfDeploymentId)
uow.commit()
+
+
+def _get_kube_config_path(dmId: str, kubeconfig: dict) -> dict:
+
+ # TODO: update this kube file for each DMS k8s when it changes.
+
+ link_file_path = '/tmp/kubeconfig_' + dmId
+ if os.path.exists(link_file_path) and \
+ os.path.exists(os.readlink(link_file_path)):
+ return link_file_path
+
+ # Generate a random key for tmp kube config file
+ letters = string.ascii_uppercase
+ random_key = ''.join(random.choice(letters) for i in range(10))
+
+ # Get datetime of now as tag of the tmp file
+ current_time = datetime.now().strftime("%Y%m%d%H%M%S")
+ tmp_file_name = random_key + "_" + current_time
+ tmp_file_path = '/tmp/kubeconfig_' + tmp_file_name
+
+ data = config.gen_k8s_config_dict(
+ kubeconfig.pop('cluster_api_endpoint', None),
+ kubeconfig.pop('cluster_ca_cert', None),
+ kubeconfig.pop('admin_user', None),
+ kubeconfig.pop('admin_client_cert', None),
+ kubeconfig.pop('admin_client_key', None),
+ )
+
+ # write down the yaml file of kubectl into tmp folder
+ with open(tmp_file_path, 'w') as file:
+ yaml.dump(data, file)
+
+ # os.symlink(tmp_file_path, link_file_path)
+ os.symlink(tmp_file_path, '/tmp/tmp_'+tmp_file_name)
+ os.rename('/tmp/tmp_'+tmp_file_name, link_file_path)
+ if os.path.realpath(link_file_path) != tmp_file_path:
+ # Symlink was updated failed
+ logger.error('symlink update failed')
+
+ return link_file_path