From: Zhang Rong(Jon) Date: Thu, 9 Jun 2022 07:16:08 +0000 (+0800) Subject: Enhance: Enable O2 DMS for distributed cloud X-Git-Tag: 2.0.0-rc1~46 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=d23f4cb727b41a43cdb28c3e819fa902cd4fc8cc;p=pti%2Fo2.git Enhance: Enable O2 DMS for distributed cloud 1. Multi DMS k8s auto watch into DB 2. Generate k8s config file for each DMS when helm executes 3. Update test case for DMS watcher Issue-ID: INF-276 Signed-off-by: Zhang Rong(Jon) Change-Id: If9f60697b01282b241952c2a941f995d79979b13 --- diff --git a/o2common/config/config.py b/o2common/config/config.py index b8186b8..8f44fa2 100644 --- a/o2common/config/config.py +++ b/o2common/config/config.py @@ -163,3 +163,47 @@ def get_helm_cli(): def get_system_controller_as_respool(): return True + + +def gen_k8s_config_dict(cluster_api_endpoint, cluster_ca_cert, admin_user, + admin_client_cert, admin_client_key): + # KUBECONFIG environment variable + # reference: + # https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/ + data = { + 'apiVersion': 'v1', + 'clusters': [ + { + 'cluster': { + 'server': + cluster_api_endpoint, + 'certificate-authority-data': + cluster_ca_cert, + }, + 'name': 'inf-cluster' + }], + 'contexts': [ + { + 'context': { + 'cluster': 'inf-cluster', + 'user': 'kubernetes-admin' + }, + 'name': 'kubernetes-admin@inf-cluster' + } + ], + 'current-context': 'kubernetes-admin@inf-cluster', + 'kind': 'Config', + 'preferences': {}, + 'users': [ + { + 'name': admin_user, + 'user': { + 'client-certificate-data': + admin_client_cert, + 'client-key-data': + admin_client_key, + } + }] + } + + return data diff --git a/o2dms/service/nfdeployment_handler.py b/o2dms/service/nfdeployment_handler.py index 0fdd00a..a00e093 100644 --- a/o2dms/service/nfdeployment_handler.py +++ b/o2dms/service/nfdeployment_handler.py @@ -14,20 +14,24 @@ # pylint: disable=unused-argument from __future__ import annotations +import os +import json +import random +import string +import yaml +from datetime import datetime +from helm_sdk import Helm +from typing import Callable +from retry import retry + from o2dms.domain.states import NfDeploymentState # from o2common.service import messagebus from o2dms.domain.dms import NfDeployment, NfDeploymentDesc from o2dms.domain import commands -from typing import Callable - from o2dms.domain.exceptions import NfdeploymentNotFoundError from o2dms.domain import events from o2common.service.unit_of_work import AbstractUnitOfWork -from helm_sdk import Helm -from ruamel import yaml -import json from o2common.config import config -from retry import retry # if TYPE_CHECKING: # from . import unit_of_work @@ -121,6 +125,12 @@ def install_nfdeployment( nfdeployment.set_state(NfDeploymentState.Installing) + # Gen kube config file and set the path + dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId) + dms_res = dms.serialize() + p = dms_res.pop("profile", None) + k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p) + # helm repo add repourl = desc.artifactRepoUrl helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={}) @@ -162,8 +172,8 @@ def install_nfdeployment( # myflags = {"name": "version", "value": tokens[1]} result = helm.install( nfdeployment.name, "{}/{}".format(repoName, chartname), flags=myflags, - values_file=values_file_path, kubeconfig=K8S_KUBECONFIG, - token=K8S_TOKEN, apiserver=K8S_APISERVER) + values_file=values_file_path, kubeconfig=k8sconf_path) + # token=K8S_TOKEN, apiserver=K8S_APISERVER) logger.debug('result: {}'.format(result)) # in case success @@ -177,7 +187,7 @@ def install_nfdeployment( def _create_values_file(filePath: str, content: dict): with open(filePath, "w", encoding="utf-8") as f: - yaml.dump(content, f, Dumper=yaml.RoundTripDumper) + yaml.dump(content, f) def uninstall_nfdeployment( @@ -205,6 +215,12 @@ def uninstall_nfdeployment( entity.set_state(NfDeploymentState.Uninstalling) uow.commit() + # Gen kube config file and set the path + dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId) + dms_res = dms.serialize() + p = dms_res.pop("profile", None) + k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p) + helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={}) logger.debug('Try to helm del {}'.format( @@ -214,8 +230,8 @@ def uninstall_nfdeployment( # myflags = {"name": "version", "value": tokens[1]} result = helm.uninstall( nfdeployment.name, flags=myflags, - kubeconfig=K8S_KUBECONFIG, - token=K8S_TOKEN, apiserver=K8S_APISERVER) + kubeconfig=k8sconf_path,) + # token=K8S_TOKEN, apiserver=K8S_APISERVER) logger.debug('result: {}'.format(result)) # in case success @@ -241,3 +257,43 @@ def delete_nfdeployment( with uow: uow.nfdeployments.delete(cmd.NfDeploymentId) uow.commit() + + +def _get_kube_config_path(dmId: str, kubeconfig: dict) -> dict: + + # TODO: update this kube file for each DMS k8s when it changes. + + link_file_path = '/tmp/kubeconfig_' + dmId + if os.path.exists(link_file_path) and \ + os.path.exists(os.readlink(link_file_path)): + return link_file_path + + # Generate a random key for tmp kube config file + letters = string.ascii_uppercase + random_key = ''.join(random.choice(letters) for i in range(10)) + + # Get datetime of now as tag of the tmp file + current_time = datetime.now().strftime("%Y%m%d%H%M%S") + tmp_file_name = random_key + "_" + current_time + tmp_file_path = '/tmp/kubeconfig_' + tmp_file_name + + data = config.gen_k8s_config_dict( + kubeconfig.pop('cluster_api_endpoint', None), + kubeconfig.pop('cluster_ca_cert', None), + kubeconfig.pop('admin_user', None), + kubeconfig.pop('admin_client_cert', None), + kubeconfig.pop('admin_client_key', None), + ) + + # write down the yaml file of kubectl into tmp folder + with open(tmp_file_path, 'w') as file: + yaml.dump(data, file) + + # os.symlink(tmp_file_path, link_file_path) + os.symlink(tmp_file_path, '/tmp/tmp_'+tmp_file_name) + os.rename('/tmp/tmp_'+tmp_file_name, link_file_path) + if os.path.realpath(link_file_path) != tmp_file_path: + # Symlink was updated failed + logger.error('symlink update failed') + + return link_file_path diff --git a/o2ims/adapter/clients/ocloud_client.py b/o2ims/adapter/clients/ocloud_client.py index e963987..ddc644e 100644 --- a/o2ims/adapter/clients/ocloud_client.py +++ b/o2ims/adapter/clients/ocloud_client.py @@ -299,22 +299,80 @@ class StxClientImp(object): ResourceTypeEnum.PSERVER, self._hostconverter(host)) def getK8sList(self, **filters) -> List[ocloudModel.StxGenericModel]: - k8sclusters = self.stxclient.kube_cluster.list() - logger.debug('k8sresources[0]:' + str(k8sclusters[0].to_dict())) - # logger.debug('k8sresources[0] cluster_api_endpoint: ' + - # str(k8sclusters[0].cluster_api_endpoint)) - return [ocloudModel.StxGenericModel( - ResourceTypeEnum.DMS, - self._k8sconverter(k8sres), self._k8shasher(k8sres)) - for k8sres in k8sclusters if k8sres] + systems = self.stxclient.isystem.list() + logger.debug('system controller distributed_cloud_role:' + + str(systems[0].distributed_cloud_role)) + + if systems[0].distributed_cloud_role is None or \ + systems[0].distributed_cloud_role != 'systemcontroller': + k8sclusters = self.stxclient.kube_cluster.list() + setattr(k8sclusters[0], 'cloud_name', systems[0].name) + logger.debug('k8sresources[0]:' + str(k8sclusters[0].to_dict())) + # logger.debug('k8sresources[0] cluster_api_endpoint: ' + + # str(k8sclusters[0].cluster_api_endpoint)) + return [ocloudModel.StxGenericModel( + ResourceTypeEnum.DMS, + self._k8sconverter(k8sres), self._k8shasher(k8sres)) + for k8sres in k8sclusters if k8sres] + + k8s_list = [] + if config.get_system_controller_as_respool(): + k8sclusters = self.stxclient.kube_cluster.list() + setattr(k8sclusters[0], 'cloud_name', systems[0].name) + logger.debug('k8sresources[0]:' + str(k8sclusters[0].to_dict())) + # logger.debug('k8sresources[0] cluster_api_endpoint: ' + + # str(k8sclusters[0].cluster_api_endpoint)) + k8s_list.append(k8sclusters[0]) + + subclouds = self.getSubcloudList() + logger.debug('subclouds numbers: %s' % len(subclouds)) + for subcloud in subclouds: + try: + subcloud_stxclient = self.getSubcloudClient( + subcloud.subcloud_id) + systems = subcloud_stxclient.isystem.list() + k8sclusters = subcloud_stxclient.kube_cluster.list() + setattr(k8sclusters[0], 'cloud_name', systems[0].name) + logger.debug('k8sresources[0]:' + + str(k8sclusters[0].to_dict())) + # logger.debug('k8sresources[0] cluster_api_endpoint: ' + + # str(k8sclusters[0].cluster_api_endpoint)) + k8s_list.append(k8sclusters[0]) + except Exception as ex: + logger.warning('Failed get cgstclient of subcloud %s: %s' % + (subcloud.name, ex)) + continue + + return [ocloudModel.StxGenericModel(ResourceTypeEnum.DMS, + self._k8sconverter(k8sres), self._k8shasher(k8sres)) + for k8sres in k8s_list if k8sres] def getK8sDetail(self, name) -> ocloudModel.StxGenericModel: + systems = self.stxclient.isystem.list() if not name: k8sclusters = self.stxclient.kube_cluster.list() # logger.debug("k8sresources[0]:" + str(k8sclusters[0].to_dict())) + setattr(k8sclusters[0], 'cloud_name', systems[0].name) k8scluster = k8sclusters.pop() else: - k8scluster = self.stxclient.kube_cluster.get(name) + sname = name.split('.') + cloud_name = '.'.join(sname[:-1]) + k8s_name = sname[-1] + if cloud_name == systems[0].name: + k8scluster = self.stxclient.kube_cluster.get(k8s_name) + setattr(k8scluster, 'cloud_name', cloud_name) + else: + subclouds = self.getSubcloudList() + subcloud_id = [ + sub.subcloud_id for sub in subclouds + if sub.name == cloud_name][0] + subcloud_stxclient = self.getSubcloudClient(subcloud_id) + k8scluster = subcloud_stxclient.kube_cluster.get(k8s_name) + setattr(k8scluster, 'cloud_name', cloud_name) + # logger.debug('k8sresources[0]:' + + # str(k8sclusters[0].to_dict())) + # logger.debug('k8sresources[0] cluster_api_endpoint: ' + + # str(k8sclusters[0].cluster_api_endpoint)) if not k8scluster: return None @@ -434,9 +492,10 @@ class StxClientImp(object): @ staticmethod def _k8sconverter(cluster): - setattr(cluster, 'name', cluster.cluster_name) + setattr(cluster, 'name', cluster.cloud_name + + '.' + cluster.cluster_name) setattr(cluster, 'uuid', - uuid.uuid3(uuid.NAMESPACE_URL, cluster.cluster_name)) + uuid.uuid3(uuid.NAMESPACE_URL, cluster.name)) setattr(cluster, 'updated_at', None) setattr(cluster, 'created_at', None) setattr(cluster, 'events', []) @@ -446,5 +505,5 @@ class StxClientImp(object): @ staticmethod def _k8shasher(cluster): - return str(hash((cluster.cluster_name, + return str(hash((cluster.cluster_name, cluster.cloud_name, cluster.cluster_api_endpoint, cluster.admin_user))) diff --git a/tests/integration-ocloud/test_clientdriver_stx.py b/tests/integration-ocloud/test_clientdriver_stx.py index 9c0c394..24de6c4 100644 --- a/tests/integration-ocloud/test_clientdriver_stx.py +++ b/tests/integration-ocloud/test_clientdriver_stx.py @@ -81,6 +81,13 @@ def test_get_k8s_list(real_stx_aio_client): assert k8s1.name == k8s2.name assert k8s1.id == k8s2.id + if len(k8slist) > 1: + k8s3 = k8slist[1] + k8s4 = stxClientImp.getK8sDetail(k8s3.name) + assert k8s3 != k8s4 + assert k8s3.name == k8s4.name + assert k8s3.id == k8s4.id + def test_get_cpu_list(real_stx_aio_client): stxClientImp = StxClientImp(real_stx_aio_client) @@ -155,9 +162,21 @@ def test_get_if_port_list(real_stx_aio_client): assert port1.id == port2.id -def test_get_subcloud_list(real_stx_aio_client, real_stx_dc_client): - # dcClientImp = StxClientImp(real_stx_dc_client) - dcClientImp = StxClientImp( - stx_client=real_stx_aio_client, dc_client=real_stx_dc_client) - sa = dcClientImp.getSubcloudList() - assert len(sa) == 0 +def test_get_res_pool_list(real_stx_aio_client, real_stx_dc_client): + stxClientImp = StxClientImp(real_stx_aio_client, real_stx_dc_client) + assert stxClientImp is not None + reslist = stxClientImp.getResourcePoolList() + assert reslist is not None + assert len(reslist) > 0 + res1 = reslist[0] + res2 = stxClientImp.getResourcePoolDetail(res1.id) + assert res1 != res2 + assert res1.name == res2.name + assert res1.id == res2.id + + if len(reslist) > 1: + res3 = reslist[1] + res4 = stxClientImp.getResourcePoolDetail(res3.id) + assert res3 != res4 + assert res3.name == res4.name + assert res3.id == res4.id