Enhance: Enable O2 DMS for distributed cloud 02/8502/3
authorZhang Rong(Jon) <rong.zhang@windriver.com>
Thu, 9 Jun 2022 07:16:08 +0000 (15:16 +0800)
committerBin Yang <bin.yang@windriver.com>
Sun, 12 Jun 2022 08:14:01 +0000 (08:14 +0000)
1. Multi DMS k8s auto watch into DB
2. Generate k8s config file for each DMS when helm executes
3. Update test case for DMS watcher

Issue-ID: INF-276
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
Change-Id: If9f60697b01282b241952c2a941f995d79979b13

o2common/config/config.py
o2dms/service/nfdeployment_handler.py
o2ims/adapter/clients/ocloud_client.py
tests/integration-ocloud/test_clientdriver_stx.py

index b8186b8..8f44fa2 100644 (file)
@@ -163,3 +163,47 @@ def get_helm_cli():
 
 def get_system_controller_as_respool():
     return True
+
+
+def gen_k8s_config_dict(cluster_api_endpoint, cluster_ca_cert, admin_user,
+                        admin_client_cert, admin_client_key):
+    # KUBECONFIG environment variable
+    # reference:
+    # https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/
+    data = {
+        'apiVersion': 'v1',
+        'clusters': [
+            {
+                'cluster': {
+                    'server':
+                    cluster_api_endpoint,
+                    'certificate-authority-data':
+                    cluster_ca_cert,
+                },
+                'name': 'inf-cluster'
+            }],
+        'contexts': [
+            {
+                'context': {
+                    'cluster': 'inf-cluster',
+                    'user': 'kubernetes-admin'
+                },
+                'name': 'kubernetes-admin@inf-cluster'
+            }
+        ],
+        'current-context': 'kubernetes-admin@inf-cluster',
+        'kind': 'Config',
+        'preferences': {},
+        'users': [
+            {
+                'name': admin_user,
+                'user': {
+                    'client-certificate-data':
+                    admin_client_cert,
+                    'client-key-data':
+                    admin_client_key,
+                }
+            }]
+    }
+
+    return data
index 0fdd00a..a00e093 100644 (file)
 
 # pylint: disable=unused-argument
 from __future__ import annotations
+import os
+import json
+import random
+import string
+import yaml
+from datetime import datetime
+from helm_sdk import Helm
+from typing import Callable
+from retry import retry
+
 from o2dms.domain.states import NfDeploymentState
 # from o2common.service import messagebus
 from o2dms.domain.dms import NfDeployment, NfDeploymentDesc
 from o2dms.domain import commands
-from typing import Callable
-
 from o2dms.domain.exceptions import NfdeploymentNotFoundError
 from o2dms.domain import events
 from o2common.service.unit_of_work import AbstractUnitOfWork
-from helm_sdk import Helm
-from ruamel import yaml
-import json
 from o2common.config import config
-from retry import retry
 # if TYPE_CHECKING:
 #     from . import unit_of_work
 
@@ -121,6 +125,12 @@ def install_nfdeployment(
 
     nfdeployment.set_state(NfDeploymentState.Installing)
 
+    # Gen kube config file and set the path
+    dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
+    dms_res = dms.serialize()
+    p = dms_res.pop("profile", None)
+    k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
+
     # helm repo add
     repourl = desc.artifactRepoUrl
     helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
@@ -162,8 +172,8 @@ def install_nfdeployment(
     #     myflags = {"name": "version", "value": tokens[1]}
     result = helm.install(
         nfdeployment.name, "{}/{}".format(repoName, chartname), flags=myflags,
-        values_file=values_file_path, kubeconfig=K8S_KUBECONFIG,
-        token=K8S_TOKEN, apiserver=K8S_APISERVER)
+        values_file=values_file_path, kubeconfig=k8sconf_path)
+    # token=K8S_TOKEN, apiserver=K8S_APISERVER)
     logger.debug('result: {}'.format(result))
 
     # in case success
@@ -177,7 +187,7 @@ def install_nfdeployment(
 
 def _create_values_file(filePath: str, content: dict):
     with open(filePath, "w", encoding="utf-8") as f:
-        yaml.dump(content, f, Dumper=yaml.RoundTripDumper)
+        yaml.dump(content, f)
 
 
 def uninstall_nfdeployment(
@@ -205,6 +215,12 @@ def uninstall_nfdeployment(
             entity.set_state(NfDeploymentState.Uninstalling)
         uow.commit()
 
+    # Gen kube config file and set the path
+    dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
+    dms_res = dms.serialize()
+    p = dms_res.pop("profile", None)
+    k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
+
     helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
 
     logger.debug('Try to helm del {}'.format(
@@ -214,8 +230,8 @@ def uninstall_nfdeployment(
     #     myflags = {"name": "version", "value": tokens[1]}
     result = helm.uninstall(
         nfdeployment.name, flags=myflags,
-        kubeconfig=K8S_KUBECONFIG,
-        token=K8S_TOKEN, apiserver=K8S_APISERVER)
+        kubeconfig=k8sconf_path,)
+    # token=K8S_TOKEN, apiserver=K8S_APISERVER)
     logger.debug('result: {}'.format(result))
 
     # in case success
@@ -241,3 +257,43 @@ def delete_nfdeployment(
     with uow:
         uow.nfdeployments.delete(cmd.NfDeploymentId)
         uow.commit()
+
+
+def _get_kube_config_path(dmId: str, kubeconfig: dict) -> dict:
+
+    # TODO: update this kube file for each DMS k8s when it changes.
+
+    link_file_path = '/tmp/kubeconfig_' + dmId
+    if os.path.exists(link_file_path) and \
+            os.path.exists(os.readlink(link_file_path)):
+        return link_file_path
+
+    # Generate a random key for tmp kube config file
+    letters = string.ascii_uppercase
+    random_key = ''.join(random.choice(letters) for i in range(10))
+
+    # Get datetime of now as tag of the tmp file
+    current_time = datetime.now().strftime("%Y%m%d%H%M%S")
+    tmp_file_name = random_key + "_" + current_time
+    tmp_file_path = '/tmp/kubeconfig_' + tmp_file_name
+
+    data = config.gen_k8s_config_dict(
+        kubeconfig.pop('cluster_api_endpoint', None),
+        kubeconfig.pop('cluster_ca_cert', None),
+        kubeconfig.pop('admin_user', None),
+        kubeconfig.pop('admin_client_cert', None),
+        kubeconfig.pop('admin_client_key', None),
+    )
+
+    # write down the yaml file of kubectl into tmp folder
+    with open(tmp_file_path, 'w') as file:
+        yaml.dump(data, file)
+
+    # os.symlink(tmp_file_path, link_file_path)
+    os.symlink(tmp_file_path, '/tmp/tmp_'+tmp_file_name)
+    os.rename('/tmp/tmp_'+tmp_file_name, link_file_path)
+    if os.path.realpath(link_file_path) != tmp_file_path:
+        # Symlink was updated failed
+        logger.error('symlink update failed')
+
+    return link_file_path
index e963987..ddc644e 100644 (file)
@@ -299,22 +299,80 @@ class StxClientImp(object):
             ResourceTypeEnum.PSERVER, self._hostconverter(host))
 
     def getK8sList(self, **filters) -> List[ocloudModel.StxGenericModel]:
-        k8sclusters = self.stxclient.kube_cluster.list()
-        logger.debug('k8sresources[0]:' + str(k8sclusters[0].to_dict()))
-        # logger.debug('k8sresources[0] cluster_api_endpoint: ' +
-        #  str(k8sclusters[0].cluster_api_endpoint))
-        return [ocloudModel.StxGenericModel(
-            ResourceTypeEnum.DMS,
-            self._k8sconverter(k8sres), self._k8shasher(k8sres))
-            for k8sres in k8sclusters if k8sres]
+        systems = self.stxclient.isystem.list()
+        logger.debug('system controller distributed_cloud_role:' +
+                     str(systems[0].distributed_cloud_role))
+
+        if systems[0].distributed_cloud_role is None or \
+                systems[0].distributed_cloud_role != 'systemcontroller':
+            k8sclusters = self.stxclient.kube_cluster.list()
+            setattr(k8sclusters[0], 'cloud_name', systems[0].name)
+            logger.debug('k8sresources[0]:' + str(k8sclusters[0].to_dict()))
+            # logger.debug('k8sresources[0] cluster_api_endpoint: ' +
+            #  str(k8sclusters[0].cluster_api_endpoint))
+            return [ocloudModel.StxGenericModel(
+                ResourceTypeEnum.DMS,
+                self._k8sconverter(k8sres), self._k8shasher(k8sres))
+                for k8sres in k8sclusters if k8sres]
+
+        k8s_list = []
+        if config.get_system_controller_as_respool():
+            k8sclusters = self.stxclient.kube_cluster.list()
+            setattr(k8sclusters[0], 'cloud_name', systems[0].name)
+            logger.debug('k8sresources[0]:' + str(k8sclusters[0].to_dict()))
+            # logger.debug('k8sresources[0] cluster_api_endpoint: ' +
+            #  str(k8sclusters[0].cluster_api_endpoint))
+            k8s_list.append(k8sclusters[0])
+
+        subclouds = self.getSubcloudList()
+        logger.debug('subclouds numbers: %s' % len(subclouds))
+        for subcloud in subclouds:
+            try:
+                subcloud_stxclient = self.getSubcloudClient(
+                    subcloud.subcloud_id)
+                systems = subcloud_stxclient.isystem.list()
+                k8sclusters = subcloud_stxclient.kube_cluster.list()
+                setattr(k8sclusters[0], 'cloud_name', systems[0].name)
+                logger.debug('k8sresources[0]:' +
+                             str(k8sclusters[0].to_dict()))
+                # logger.debug('k8sresources[0] cluster_api_endpoint: ' +
+                #  str(k8sclusters[0].cluster_api_endpoint))
+                k8s_list.append(k8sclusters[0])
+            except Exception as ex:
+                logger.warning('Failed get cgstclient of subcloud %s: %s' %
+                               (subcloud.name, ex))
+                continue
+
+        return [ocloudModel.StxGenericModel(ResourceTypeEnum.DMS,
+                self._k8sconverter(k8sres), self._k8shasher(k8sres))
+                for k8sres in k8s_list if k8sres]
 
     def getK8sDetail(self, name) -> ocloudModel.StxGenericModel:
+        systems = self.stxclient.isystem.list()
         if not name:
             k8sclusters = self.stxclient.kube_cluster.list()
             # logger.debug("k8sresources[0]:" + str(k8sclusters[0].to_dict()))
+            setattr(k8sclusters[0], 'cloud_name', systems[0].name)
             k8scluster = k8sclusters.pop()
         else:
-            k8scluster = self.stxclient.kube_cluster.get(name)
+            sname = name.split('.')
+            cloud_name = '.'.join(sname[:-1])
+            k8s_name = sname[-1]
+            if cloud_name == systems[0].name:
+                k8scluster = self.stxclient.kube_cluster.get(k8s_name)
+                setattr(k8scluster, 'cloud_name', cloud_name)
+            else:
+                subclouds = self.getSubcloudList()
+                subcloud_id = [
+                    sub.subcloud_id for sub in subclouds
+                    if sub.name == cloud_name][0]
+                subcloud_stxclient = self.getSubcloudClient(subcloud_id)
+                k8scluster = subcloud_stxclient.kube_cluster.get(k8s_name)
+                setattr(k8scluster, 'cloud_name', cloud_name)
+                # logger.debug('k8sresources[0]:' +
+                #  str(k8sclusters[0].to_dict()))
+                # logger.debug('k8sresources[0] cluster_api_endpoint: ' +
+                #  str(k8sclusters[0].cluster_api_endpoint))
 
         if not k8scluster:
             return None
@@ -434,9 +492,10 @@ class StxClientImp(object):
 
     @ staticmethod
     def _k8sconverter(cluster):
-        setattr(cluster, 'name', cluster.cluster_name)
+        setattr(cluster, 'name', cluster.cloud_name +
+                '.' + cluster.cluster_name)
         setattr(cluster, 'uuid',
-                uuid.uuid3(uuid.NAMESPACE_URL, cluster.cluster_name))
+                uuid.uuid3(uuid.NAMESPACE_URL, cluster.name))
         setattr(cluster, 'updated_at', None)
         setattr(cluster, 'created_at', None)
         setattr(cluster, 'events', [])
@@ -446,5 +505,5 @@ class StxClientImp(object):
 
     @ staticmethod
     def _k8shasher(cluster):
-        return str(hash((cluster.cluster_name,
+        return str(hash((cluster.cluster_name, cluster.cloud_name,
                          cluster.cluster_api_endpoint, cluster.admin_user)))
index 9c0c394..24de6c4 100644 (file)
@@ -81,6 +81,13 @@ def test_get_k8s_list(real_stx_aio_client):
     assert k8s1.name == k8s2.name
     assert k8s1.id == k8s2.id
 
+    if len(k8slist) > 1:
+        k8s3 = k8slist[1]
+        k8s4 = stxClientImp.getK8sDetail(k8s3.name)
+        assert k8s3 != k8s4
+        assert k8s3.name == k8s4.name
+        assert k8s3.id == k8s4.id
+
 
 def test_get_cpu_list(real_stx_aio_client):
     stxClientImp = StxClientImp(real_stx_aio_client)
@@ -155,9 +162,21 @@ def test_get_if_port_list(real_stx_aio_client):
     assert port1.id == port2.id
 
 
-def test_get_subcloud_list(real_stx_aio_client, real_stx_dc_client):
-    # dcClientImp = StxClientImp(real_stx_dc_client)
-    dcClientImp = StxClientImp(
-        stx_client=real_stx_aio_client, dc_client=real_stx_dc_client)
-    sa = dcClientImp.getSubcloudList()
-    assert len(sa) == 0
+def test_get_res_pool_list(real_stx_aio_client, real_stx_dc_client):
+    stxClientImp = StxClientImp(real_stx_aio_client, real_stx_dc_client)
+    assert stxClientImp is not None
+    reslist = stxClientImp.getResourcePoolList()
+    assert reslist is not None
+    assert len(reslist) > 0
+    res1 = reslist[0]
+    res2 = stxClientImp.getResourcePoolDetail(res1.id)
+    assert res1 != res2
+    assert res1.name == res2.name
+    assert res1.id == res2.id
+
+    if len(reslist) > 1:
+        res3 = reslist[1]
+        res4 = stxClientImp.getResourcePoolDetail(res3.id)
+        assert res3 != res4
+        assert res3.name == res4.name
+        assert res3.id == res4.id