# pylint: disable=unused-argument
from __future__ import annotations
-from o2dms.domain.commands import InstallNfDeployment
+from o2dms.domain.states import NfDeploymentState
+# from o2common.service import messagebus
+from o2dms.domain.dms import NfDeployment, NfDeploymentDesc
+from o2dms.domain import commands
from typing import Callable
from o2dms.domain import events
from o2common.service.unit_of_work import AbstractUnitOfWork
+from helm_sdk import Helm
+from ruamel import yaml
+import json
+from o2common.config import config
+from retry import retry
# if TYPE_CHECKING:
# from . import unit_of_work
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
+LOCAL_HELM_BIN = config.get_helm_cli()
+K8S_KUBECONFIG, K8S_APISERVER, K8S_TOKEN = \
+ config.get_k8s_api_endpoint()
-def publish_nfdeployment_created(
- event: events.NfDeploymentCreated,
+def publish_nfdeployment_state_change(
+ event: events.NfDeploymentStateChanged,
publish: Callable,
):
- publish("NfDeploymentCreated", event)
- logger.debug("published NfDeploymentCreated: {}".format(
- event.NfDeploymentId))
+ publish("NfDeploymentStateChanged", event)
+ logger.debug(
+ "published NfDeploymentStateChanged: {}, state from {} to {}".format(
+ event.NfDeploymentId, event.FromState, event.ToState))
+
+
+def handle_nfdeployment_statechanged(
+ cmd: commands.HandleNfDeploymentStateChanged,
+ uow: AbstractUnitOfWork
+):
+ if cmd.FromState == NfDeploymentState.Initial:
+ if cmd.ToState == NfDeploymentState.NotInstalled:
+ cmd2 = commands.InstallNfDeployment(cmd.NfDeploymentId)
+ install_nfdeployment(cmd2, uow)
+ else:
+ logger.debug("Not insterested state change: {}".format(cmd))
+ elif cmd.FromState == NfDeploymentState.Installed:
+ if cmd.ToState == NfDeploymentState.Uninstalling:
+ cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
+ uninstall_nfdeployment(cmd2, uow)
+ else:
+ logger.debug("Not insterested state change: {}".format(cmd))
+ elif cmd.FromState == NfDeploymentState.NotInstalled:
+ if cmd.ToState == NfDeploymentState.Initial:
+ cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
+ delete_nfdeployment(cmd2, uow)
+ else:
+ logger.debug("Not insterested state change: {}".format(cmd))
+ else:
+ logger.debug("Not insterested state change: {}".format(cmd))
+
+
+# retry 10 seconds
+@retry(tries=20, max_delay=10000)
+def _retry_get_nfdeployment(
+ cmd: commands.InstallNfDeployment,
+ uow: AbstractUnitOfWork):
+ nfdeployment: NfDeployment = uow.nfdeployments.get(
+ cmd.NfDeploymentId)
+ if nfdeployment is None:
+ raise Exception("Cannot find NfDeployment: {}".format(
+ cmd.NfDeploymentId))
+ return nfdeployment
def install_nfdeployment(
- cmd: InstallNfDeployment,
+ cmd: commands.InstallNfDeployment,
uow: AbstractUnitOfWork
):
logger.info("install with NfDeploymentId: {}".format(
cmd.NfDeploymentId))
+ nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
+ if nfdeployment is None:
+ raise Exception("Cannot find NfDeployment: {}".format(
+ cmd.NfDeploymentId))
+ # get nfdeploymentdescriptor by descriptorId
+ desc: NfDeploymentDesc = uow.nfdeployment_descs.get(
+ nfdeployment.descriptorId)
+ if desc is None:
+ raise Exception(
+ "Cannot find NfDeploymentDescriptor:{} for NfDeployment:{}".format(
+ nfdeployment.descriptorId, nfdeployment.id
+ ))
+
+ # helm repo add
+ repourl = desc.artifactRepoUrl
+ helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
+ repoName = None
+ try:
+ repolist = helm.repo_list()
+ for repo in repolist:
+ if repo['url'] == repourl:
+ repoName = repo['name']
+ break
+ except Exception:
+ # repoExisted
+ repoName = None
+
+ if not repoName:
+ repoName = "repo4{}".format(nfdeployment.name)
+ logger.debug("Trying to add repo:{}".format(repourl))
+ helm.repo_add(repoName, repourl)
+ helm.repo_update(None)
+
+ repolist = helm.repo_list()
+ logger.debug('repo list:{}'.format(repolist))
+
+ # helm install name chart
+ values_file_path = '/tmp/override_{}.yaml'.format(nfdeployment.name)
+ if len(desc.inputParams) > 0:
+ logger.info("dump override yaml:{}".format(values_file_path))
+ values = json.loads(desc.inputParams)
+ _create_values_file(values_file_path, values)
+ else:
+ values_file_path = None
+
+ logger.debug('Try to helm install {}/{} {} -f {}'.format(
+ repoName, nfdeployment.name, desc.artifactName, values_file_path))
+ tokens = desc.artifactName.split(':')
+ chartname = tokens[0]
+ myflags = None
+ # if (len(tokens) > 1):
+ # myflags = {"name": "version", "value": tokens[1]}
+ result = helm.install(
+ nfdeployment.name, "{}/{}".format(repoName, chartname), flags=myflags,
+ values_file=values_file_path, kubeconfig=K8S_KUBECONFIG,
+ token=K8S_TOKEN, apiserver=K8S_APISERVER)
+ logger.debug('result: {}'.format(result))
+
+ # in case success
+ with uow:
+ entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
+ entity.transit_state(NfDeploymentState.Installed)
+ uow.commit()
+
+
+def _create_values_file(filePath: str, content: dict):
+ with open(filePath, "w", encoding="utf-8") as f:
+ yaml.dump(content, f, Dumper=yaml.RoundTripDumper)
+
+
+def uninstall_nfdeployment(
+ cmd: commands.UninstallNfDeployment,
+ uow: AbstractUnitOfWork
+):
+ logger.info("uninstall with NfDeploymentId: {}".format(
+ cmd.NfDeploymentId))
+ nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
+ if nfdeployment is None:
+ raise Exception("Cannot find NfDeployment: {}".format(
+ cmd.NfDeploymentId))
+ # get nfdeploymentdescriptor by descriptorId
+ desc: NfDeploymentDesc = uow.nfdeployment_descs.get(
+ nfdeployment.descriptorId)
+ if desc is None:
+ raise Exception(
+ "Cannot find NfDeploymentDescriptor:{} for NfDeployment:{}".format(
+ nfdeployment.descriptorId, nfdeployment.id
+ ))
+
+ helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
+
+ logger.debug('Try to helm del {}'.format(
+ nfdeployment.name))
+ myflags = None
+ # if (len(tokens) > 1):
+ # myflags = {"name": "version", "value": tokens[1]}
+ result = helm.uninstall(
+ nfdeployment.name, flags=myflags,
+ kubeconfig=K8S_KUBECONFIG,
+ token=K8S_TOKEN, apiserver=K8S_APISERVER)
+ logger.debug('result: {}'.format(result))
+
+ # in case success
+
+ with uow:
+ entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
+ entity.transit_state(NfDeploymentState.Initial)
+ # uow.nfdeployments.update(
+ # cmd.NfDeploymentId, status=NfDeploymentState.Initial)
+ uow.commit()
+
+
+def delete_nfdeployment(
+ cmd: commands.UninstallNfDeployment,
+ uow: AbstractUnitOfWork
+):
+ logger.info("delete with NfDeploymentId: {}".format(
+ cmd.NfDeploymentId))
+
+ # nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
+ with uow:
+ uow.nfdeployments.delete(cmd.NfDeploymentId)
+ uow.commit()