Merge "CI: Migrate tox verify from Jenkins to GHA"
[pti/o2.git] / o2dms / service / nfdeployment_handler.py
index 6143f18..a00e093 100644 (file)
 
 # pylint: disable=unused-argument
 from __future__ import annotations
-from o2dms.domain.commands import InstallNfDeployment
+import os
+import json
+import random
+import string
+import yaml
+from datetime import datetime
+from helm_sdk import Helm
 from typing import Callable
+from retry import retry
 
+from o2dms.domain.states import NfDeploymentState
+# from o2common.service import messagebus
+from o2dms.domain.dms import NfDeployment, NfDeploymentDesc
+from o2dms.domain import commands
+from o2dms.domain.exceptions import NfdeploymentNotFoundError
 from o2dms.domain import events
 from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2common.config import config
 # if TYPE_CHECKING:
 #     from . import unit_of_work
 
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
+LOCAL_HELM_BIN = config.get_helm_cli()
+K8S_KUBECONFIG, K8S_APISERVER, K8S_TOKEN = \
+    config.get_k8s_api_endpoint()
 
 
-def publish_nfdeployment_created(
-    event: events.NfDeploymentCreated,
+def publish_nfdeployment_state_change(
+    event: events.NfDeploymentStateChanged,
     publish: Callable,
 ):
-    publish("NfDeploymentCreated", event)
-    logger.debug("published NfDeploymentCreated: {}".format(
-        event.NfDeploymentId))
+    publish("NfDeploymentStateChanged", event)
+    logger.debug(
+        "published NfDeploymentStateChanged: {}, state from {} to {}".format(
+            event.NfDeploymentId, event.FromState, event.ToState))
+
+
+def handle_nfdeployment_statechanged(
+    cmd: commands.HandleNfDeploymentStateChanged,
+    uow: AbstractUnitOfWork
+):
+    if cmd.FromState == NfDeploymentState.Initial:
+        if cmd.ToState == NfDeploymentState.Installing:
+            cmd2 = commands.InstallNfDeployment(cmd.NfDeploymentId)
+            install_nfdeployment(cmd2, uow)
+        elif cmd.ToState == NfDeploymentState.Deleting:
+            cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
+            delete_nfdeployment(cmd2, uow)
+        else:
+            logger.debug("Not insterested state change: {}".format(cmd))
+    elif cmd.FromState == NfDeploymentState.Installed \
+            or cmd.FromState == NfDeploymentState.Installing \
+            or cmd.FromState == NfDeploymentState.Updating \
+            or cmd.FromState == NfDeploymentState.Abnormal:
+
+        if cmd.ToState == NfDeploymentState.Uninstalling:
+            cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
+            uninstall_nfdeployment(cmd2, uow)
+        else:
+            logger.debug("Not insterested state change: {}".format(cmd))
+    elif cmd.FromState == NfDeploymentState.Abnormal:
+        if cmd.ToState == NfDeploymentState.Deleting:
+            # cmd2 = commands.UninstallNfDeployment(cmd.NfDeploymentId)
+            # uninstall_nfdeployment(cmd2, uow)
+            cmd2 = commands.DeleteNfDeployment(cmd.NfDeploymentId)
+            delete_nfdeployment(cmd2, uow)
+        else:
+            logger.debug("Not insterested state change: {}".format(cmd))
+    else:
+        logger.debug("Not insterested state change: {}".format(cmd))
+
+
+# retry 10 seconds
+@retry(
+    (NfdeploymentNotFoundError),
+    tries=100,
+    delay=2, max_delay=10000, backoff=1)
+def _retry_get_nfdeployment(
+        cmd: commands.InstallNfDeployment,
+        uow: AbstractUnitOfWork):
+    nfdeployment: NfDeployment = uow.nfdeployments.get(
+        cmd.NfDeploymentId)
+    if nfdeployment is None:
+        raise NfdeploymentNotFoundError(
+            "Cannot find NfDeployment: {}".format(
+                cmd.NfDeploymentId))
+    return nfdeployment
 
 
 def install_nfdeployment(
-    cmd: InstallNfDeployment,
+    cmd: commands.InstallNfDeployment,
     uow: AbstractUnitOfWork
 ):
     logger.info("install with NfDeploymentId: {}".format(
         cmd.NfDeploymentId))
+    nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
+    if nfdeployment is None:
+        raise Exception("Cannot find NfDeployment: {}".format(
+            cmd.NfDeploymentId))
+    # get nfdeploymentdescriptor by descriptorId
+    desc: NfDeploymentDesc = uow.nfdeployment_descs.get(
+        nfdeployment.descriptorId)
+    if desc is None:
+        raise Exception(
+            "Cannot find NfDeploymentDescriptor:{} for NfDeployment:{}".format(
+                nfdeployment.descriptorId, nfdeployment.id
+            ))
+
+    nfdeployment.set_state(NfDeploymentState.Installing)
+
+    # Gen kube config file and set the path
+    dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
+    dms_res = dms.serialize()
+    p = dms_res.pop("profile", None)
+    k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
+
+    # helm repo add
+    repourl = desc.artifactRepoUrl
+    helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
+    repoName = None
+    try:
+        repolist = helm.repo_list()
+        for repo in repolist:
+            if repo['url'] == repourl:
+                repoName = repo['name']
+                break
+    except Exception:
+        # repoExisted
+        repoName = None
+
+    if not repoName:
+        repoName = "repo4{}".format(nfdeployment.name)
+        logger.debug("Trying to add repo:{}".format(repourl))
+        helm.repo_add(repoName, repourl)
+    helm.repo_update(None)
+
+    repolist = helm.repo_list()
+    logger.debug('repo list:{}'.format(repolist))
+
+    # helm install name chart
+    values_file_path = '/tmp/override_{}.yaml'.format(nfdeployment.name)
+    if len(desc.inputParams) > 0:
+        logger.info("dump override yaml:{}".format(values_file_path))
+        values = json.loads(desc.inputParams)
+        _create_values_file(values_file_path, values)
+    else:
+        values_file_path = None
+
+    logger.debug('Try to helm install {}/{} {} -f {}'.format(
+        repoName, nfdeployment.name, desc.artifactName, values_file_path))
+    tokens = desc.artifactName.split(':')
+    chartname = tokens[0]
+    myflags = None
+    # if (len(tokens) > 1):
+    #     myflags = {"name": "version", "value": tokens[1]}
+    result = helm.install(
+        nfdeployment.name, "{}/{}".format(repoName, chartname), flags=myflags,
+        values_file=values_file_path, kubeconfig=k8sconf_path)
+    # token=K8S_TOKEN, apiserver=K8S_APISERVER)
+    logger.debug('result: {}'.format(result))
+
+    # in case success
+    with uow:
+        entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
+        if entity:
+            entity.set_state(NfDeploymentState.Installed)
+            entity.transit_state(NfDeploymentState.Installed)
+        uow.commit()
+
+
+def _create_values_file(filePath: str, content: dict):
+    with open(filePath, "w", encoding="utf-8") as f:
+        yaml.dump(content, f)
+
+
+def uninstall_nfdeployment(
+    cmd: commands.UninstallNfDeployment,
+    uow: AbstractUnitOfWork
+):
+    logger.info("uninstall with NfDeploymentId: {}".format(
+        cmd.NfDeploymentId))
+    nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
+    if nfdeployment is None:
+        raise Exception("Cannot find NfDeployment: {}".format(
+            cmd.NfDeploymentId))
+    # get nfdeploymentdescriptor by descriptorId
+    desc: NfDeploymentDesc = uow.nfdeployment_descs.get(
+        nfdeployment.descriptorId)
+    if desc is None:
+        raise Exception(
+            "Cannot find NfDeploymentDescriptor:{} for NfDeployment:{}".format(
+                nfdeployment.descriptorId, nfdeployment.id
+            ))
+
+    with uow:
+        entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
+        if entity:
+            entity.set_state(NfDeploymentState.Uninstalling)
+        uow.commit()
+
+    # Gen kube config file and set the path
+    dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
+    dms_res = dms.serialize()
+    p = dms_res.pop("profile", None)
+    k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
+
+    helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
+
+    logger.debug('Try to helm del {}'.format(
+        nfdeployment.name))
+    myflags = None
+    # if (len(tokens) > 1):
+    #     myflags = {"name": "version", "value": tokens[1]}
+    result = helm.uninstall(
+        nfdeployment.name, flags=myflags,
+        kubeconfig=k8sconf_path,)
+    # token=K8S_TOKEN, apiserver=K8S_APISERVER)
+    logger.debug('result: {}'.format(result))
+
+    # in case success
+
+    with uow:
+        entity: NfDeployment = uow.nfdeployments.get(cmd.NfDeploymentId)
+        if entity:
+            entity.set_state(NfDeploymentState.Initial)
+            entity.transit_state(NfDeploymentState.Deleting)
+        # uow.nfdeployments.update(
+        #     cmd.NfDeploymentId, status=NfDeploymentState.Initial)
+        uow.commit()
+
+
+def delete_nfdeployment(
+    cmd: commands.UninstallNfDeployment,
+    uow: AbstractUnitOfWork
+):
+    logger.info("delete with NfDeploymentId: {}".format(
+        cmd.NfDeploymentId))
+
+    # nfdeployment: NfDeployment = _retry_get_nfdeployment(cmd, uow)
+    with uow:
+        uow.nfdeployments.delete(cmd.NfDeploymentId)
+        uow.commit()
+
+
+def _get_kube_config_path(dmId: str, kubeconfig: dict) -> dict:
+
+    # TODO: update this kube file for each DMS k8s when it changes.
+
+    link_file_path = '/tmp/kubeconfig_' + dmId
+    if os.path.exists(link_file_path) and \
+            os.path.exists(os.readlink(link_file_path)):
+        return link_file_path
+
+    # Generate a random key for tmp kube config file
+    letters = string.ascii_uppercase
+    random_key = ''.join(random.choice(letters) for i in range(10))
+
+    # Get datetime of now as tag of the tmp file
+    current_time = datetime.now().strftime("%Y%m%d%H%M%S")
+    tmp_file_name = random_key + "_" + current_time
+    tmp_file_path = '/tmp/kubeconfig_' + tmp_file_name
+
+    data = config.gen_k8s_config_dict(
+        kubeconfig.pop('cluster_api_endpoint', None),
+        kubeconfig.pop('cluster_ca_cert', None),
+        kubeconfig.pop('admin_user', None),
+        kubeconfig.pop('admin_client_cert', None),
+        kubeconfig.pop('admin_client_key', None),
+    )
+
+    # write down the yaml file of kubectl into tmp folder
+    with open(tmp_file_path, 'w') as file:
+        yaml.dump(data, file)
+
+    # os.symlink(tmp_file_path, link_file_path)
+    os.symlink(tmp_file_path, '/tmp/tmp_'+tmp_file_name)
+    os.rename('/tmp/tmp_'+tmp_file_name, link_file_path)
+    if os.path.realpath(link_file_path) != tmp_file_path:
+        # Symlink was updated failed
+        logger.error('symlink update failed')
+
+    return link_file_path