# pylint: disable=unused-argument
from __future__ import annotations
-from o2dms.domain.commands import InstallNfDeployment
+import os
+import json
+import random
+import string
+import yaml
+from datetime import datetime
+from helm_sdk import Helm
from typing import Callable
+from retry import retry
+from o2dms.domain.states import NfDeploymentState
+# from o2common.service import messagebus
+from o2dms.domain.dms import NfDeployment, NfDeploymentDesc
+from o2dms.domain import commands
+from o2dms.domain.exceptions import NfdeploymentNotFoundError
from o2dms.domain import events
from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2common.config import config
# if TYPE_CHECKING:
# from . import unit_of_work
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
+LOCAL_HELM_BIN = config.get_helm_cli()
+K8S_KUBECONFIG, K8S_APISERVER, K8S_TOKEN = \
+ config.get_k8s_api_endpoint()
-def publish_nfdeployment_created(
- event: events.NfDeploymentCreated,
+def publish_nfdeployment_state_change(
+ event: events.NfDeploymentStateChanged,
publish: Callable,
):
- publish("NfDeploymentCreated", event)
- logger.debug("published NfDeploymentCreated: {}".format(
- event.NfDeploymentId))
+ publish("NfDeploymentStateChanged", event)
+ logger.debug(
+ "published NfDeploymentStateChanged: {}, state from {} to {}".format(
+ event.NfDeploymentId, event.FromState, event.ToState))
+
+
+def handle_nfdeployment_statechanged(
+ cmd: commands.HandleNfDeploymentStateChanged,
+ uow: AbstractUnitOfWork
+):
+ if cmd.FromState == NfDeploymentState.Initial:
+ if cmd.ToState == NfDeploymentState.Installing:
+ cmd2 = commands.InstallNfDeployment(cmd.NfDeploymentId)
+ install_nfdeployment(cmd2, uow)
+ elif cmd.ToState == NfDeploymentState.Deleting:
+ cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
+ delete_nfdeployment(cmd2, uow)
+ else:
+ logger.debug("Not insterested state change: {}".format(cmd))
+ elif cmd.FromState == NfDeploymentState.Installed \
+ or cmd.FromState == NfDeploymentState.Installing \
+ or cmd.FromState == NfDeploymentState.Updating \
+ or cmd.FromState == NfDeploymentState.Abnormal:
+
+ if cmd.ToState == NfDeploymentState.Uninstalling:
+ cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
+ uninstall_nfdeployment(cmd2, uow)
+ else:
+ logger.debug("Not insterested state change: {}".format(cmd))
+ elif cmd.FromState == NfDeploymentState.Abnormal:
+ if cmd.ToState == NfDeploymentState.Deleting:
+ # cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
+ # uninstall_nfdeployment(cmd2, uow)
+ cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
+ delete_nfdeployment(cmd2, uow)
+ else:
+ logger.debug("Not insterested state change: {}".format(cmd))
+ else:
+ logger.debug("Not insterested state change: {}".format(cmd))
+
+
+# retry 10 seconds
+@retry(
+ (NfdeploymentNotFoundError),
+ tries=100,
+ delay=2, max_delay=10000, backoff=1)
+def _retry_get_nfdeployment(
+ cmd: commands.InstallNfDeployment,
+ uow: AbstractUnitOfWork):
+ nfdeployment: NfDeployment = uow.nfdeployments.get(
+ cmd.NfDeploymentId)
+ if nfdeployment is None:
+ raise NfdeploymentNotFoundError(
+ "Cannot find NfDeployment: {}".format(
+ cmd.NfDeploymentId))
+ return nfdeployment
def install_nfdeployment(
- cmd: InstallNfDeployment,
+ cmd: commands.InstallNfDeployment,
uow: AbstractUnitOfWork
):
logger.info("install with NfDeploymentId: {}".format(
cmd.NfDeploymentId))
+ nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
+ if nfdeployment is None:
+ raise Exception("Cannot find NfDeployment: {}".format(
+ cmd.NfDeploymentId))
+ # get nfdeploymentdescriptor by descriptorId
+ desc: NfDeploymentDesc = uow.nfdeployment_descs.get(
+ nfdeployment.descriptorId)
+ if desc is None:
+ raise Exception(
+ "Cannot find NfDeploymentDescriptor:{} for NfDeployment:{}".format(
+ nfdeployment.descriptorId, nfdeployment.id
+ ))
+
+ nfdeployment.set_state(NfDeploymentState.Installing)
+
+ # Gen kube config file and set the path
+ dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
+ dms_res = dms.serialize()
+ p = dms_res.pop("profile", None)
+ k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
+
+ # helm repo add
+ repourl = desc.artifactRepoUrl
+ helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
+ repoName = None
+ try:
+ repolist = helm.repo_list()
+ for repo in repolist:
+ if repo['url'] == repourl:
+ repoName = repo['name']
+ break
+ except Exception:
+ # repoExisted
+ repoName = None
+
+ if not repoName:
+ repoName = "repo4{}".format(nfdeployment.name)
+ logger.debug("Trying to add repo:{}".format(repourl))
+ helm.repo_add(repoName, repourl)
+ helm.repo_update(None)
+
+ repolist = helm.repo_list()
+ logger.debug('repo list:{}'.format(repolist))
+
+ # helm install name chart
+ values_file_path = '/tmp/override_{}.yaml'.format(nfdeployment.name)
+ if len(desc.inputParams) > 0:
+ logger.info("dump override yaml:{}".format(values_file_path))
+ values = json.loads(desc.inputParams)
+ _create_values_file(values_file_path, values)
+ else:
+ values_file_path = None
+
+ logger.debug('Try to helm install {}/{} {} -f {}'.format(
+ repoName, nfdeployment.name, desc.artifactName, values_file_path))
+ tokens = desc.artifactName.split(':')
+ chartname = tokens[0]
+ myflags = None
+ # if (len(tokens) > 1):
+ # myflags = {"name": "version", "value": tokens[1]}
+ result = helm.install(
+ nfdeployment.name, "{}/{}".format(repoName, chartname), flags=myflags,
+ values_file=values_file_path, kubeconfig=k8sconf_path)
+ # token=K8S_TOKEN, apiserver=K8S_APISERVER)
+ logger.debug('result: {}'.format(result))
+
+ # in case success
+ with uow:
+ entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
+ if entity:
+ entity.set_state(NfDeploymentState.Installed)
+ entity.transit_state(NfDeploymentState.Installed)
+ uow.commit()
+
+
+def _create_values_file(filePath: str, content: dict):
+ with open(filePath, "w", encoding="utf-8") as f:
+ yaml.dump(content, f)
+
+
+def uninstall_nfdeployment(
+ cmd: commands.UninstallNfDeployment,
+ uow: AbstractUnitOfWork
+):
+ logger.info("uninstall with NfDeploymentId: {}".format(
+ cmd.NfDeploymentId))
+ nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
+ if nfdeployment is None:
+ raise Exception("Cannot find NfDeployment: {}".format(
+ cmd.NfDeploymentId))
+ # get nfdeploymentdescriptor by descriptorId
+ desc: NfDeploymentDesc = uow.nfdeployment_descs.get(
+ nfdeployment.descriptorId)
+ if desc is None:
+ raise Exception(
+ "Cannot find NfDeploymentDescriptor:{} for NfDeployment:{}".format(
+ nfdeployment.descriptorId, nfdeployment.id
+ ))
+
+ with uow:
+ entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
+ if entity:
+ entity.set_state(NfDeploymentState.Uninstalling)
+ uow.commit()
+
+ # Gen kube config file and set the path
+ dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
+ dms_res = dms.serialize()
+ p = dms_res.pop("profile", None)
+ k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
+
+ helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
+
+ logger.debug('Try to helm del {}'.format(
+ nfdeployment.name))
+ myflags = None
+ # if (len(tokens) > 1):
+ # myflags = {"name": "version", "value": tokens[1]}
+ result = helm.uninstall(
+ nfdeployment.name, flags=myflags,
+ kubeconfig=k8sconf_path,)
+ # token=K8S_TOKEN, apiserver=K8S_APISERVER)
+ logger.debug('result: {}'.format(result))
+
+ # in case success
+
+ with uow:
+ entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
+ if entity:
+ entity.set_state(NfDeploymentState.Initial)
+ entity.transit_state(NfDeploymentState.Deleting)
+ # uow.nfdeployments.update(
+ # cmd.NfDeploymentId, status=NfDeploymentState.Initial)
+ uow.commit()
+
+
+def delete_nfdeployment(
+ cmd: commands.UninstallNfDeployment,
+ uow: AbstractUnitOfWork
+):
+ logger.info("delete with NfDeploymentId: {}".format(
+ cmd.NfDeploymentId))
+
+ # nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
+ with uow:
+ uow.nfdeployments.delete(cmd.NfDeploymentId)
+ uow.commit()
+
+
+def _get_kube_config_path(dmId: str, kubeconfig: dict) -> dict:
+
+ # TODO: update this kube file for each DMS k8s when it changes.
+
+ link_file_path = '/tmp/kubeconfig_' + dmId
+ if os.path.exists(link_file_path) and \
+ os.path.exists(os.readlink(link_file_path)):
+ return link_file_path
+
+ # Generate a random key for tmp kube config file
+ letters = string.ascii_uppercase
+ random_key = ''.join(random.choice(letters) for i in range(10))
+
+ # Get datetime of now as tag of the tmp file
+ current_time = datetime.now().strftime("%Y%m%d%H%M%S")
+ tmp_file_name = random_key + "_" + current_time
+ tmp_file_path = '/tmp/kubeconfig_' + tmp_file_name
+
+ data = config.gen_k8s_config_dict(
+ kubeconfig.pop('cluster_api_endpoint', None),
+ kubeconfig.pop('cluster_ca_cert', None),
+ kubeconfig.pop('admin_user', None),
+ kubeconfig.pop('admin_client_cert', None),
+ kubeconfig.pop('admin_client_key', None),
+ )
+
+ # write down the yaml file of kubectl into tmp folder
+ with open(tmp_file_path, 'w') as file:
+ yaml.dump(data, file)
+
+ # os.symlink(tmp_file_path, link_file_path)
+ os.symlink(tmp_file_path, '/tmp/tmp_'+tmp_file_name)
+ os.rename('/tmp/tmp_'+tmp_file_name, link_file_path)
+ if os.path.realpath(link_file_path) != tmp_file_path:
+ # Symlink was updated failed
+ logger.error('symlink update failed')
+
+ return link_file_path