def get_system_controller_as_respool():
return True
+
+
+def gen_k8s_config_dict(cluster_api_endpoint, cluster_ca_cert, admin_user,
+ admin_client_cert, admin_client_key):
+ # KUBECONFIG environment variable
+ # reference:
+ # https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/
+ data = {
+ 'apiVersion': 'v1',
+ 'clusters': [
+ {
+ 'cluster': {
+ 'server':
+ cluster_api_endpoint,
+ 'certificate-authority-data':
+ cluster_ca_cert,
+ },
+ 'name': 'inf-cluster'
+ }],
+ 'contexts': [
+ {
+ 'context': {
+ 'cluster': 'inf-cluster',
+ 'user': 'kubernetes-admin'
+ },
+ 'name': 'kubernetes-admin@inf-cluster'
+ }
+ ],
+ 'current-context': 'kubernetes-admin@inf-cluster',
+ 'kind': 'Config',
+ 'preferences': {},
+ 'users': [
+ {
+ 'name': admin_user,
+ 'user': {
+ 'client-certificate-data':
+ admin_client_cert,
+ 'client-key-data':
+ admin_client_key,
+ }
+ }]
+ }
+
+ return data
# pylint: disable=unused-argument
from __future__ import annotations
+import os
+import json
+import random
+import string
+import yaml
+from datetime import datetime
+from helm_sdk import Helm
+from typing import Callable
+from retry import retry
+
from o2dms.domain.states import NfDeploymentState
# from o2common.service import messagebus
from o2dms.domain.dms import NfDeployment, NfDeploymentDesc
from o2dms.domain import commands
-from typing import Callable
-
from o2dms.domain.exceptions import NfdeploymentNotFoundError
from o2dms.domain import events
from o2common.service.unit_of_work import AbstractUnitOfWork
-from helm_sdk import Helm
-from ruamel import yaml
-import json
from o2common.config import config
-from retry import retry
# if TYPE_CHECKING:
# from . import unit_of_work
nfdeployment.set_state(NfDeploymentState.Installing)
+ # Gen kube config file and set the path
+ dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
+ dms_res = dms.serialize()
+ p = dms_res.pop("profile", None)
+ k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
+
# helm repo add
repourl = desc.artifactRepoUrl
helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
# myflags = {"name": "version", "value": tokens[1]}
result = helm.install(
nfdeployment.name, "{}/{}".format(repoName, chartname), flags=myflags,
- values_file=values_file_path, kubeconfig=K8S_KUBECONFIG,
- token=K8S_TOKEN, apiserver=K8S_APISERVER)
+ values_file=values_file_path, kubeconfig=k8sconf_path)
+ # token=K8S_TOKEN, apiserver=K8S_APISERVER)
logger.debug('result: {}'.format(result))
# in case success
def _create_values_file(filePath: str, content: dict):
with open(filePath, "w", encoding="utf-8") as f:
- yaml.dump(content, f, Dumper=yaml.RoundTripDumper)
+ yaml.dump(content, f)
def uninstall_nfdeployment(
entity.set_state(NfDeploymentState.Uninstalling)
uow.commit()
+ # Gen kube config file and set the path
+ dms = uow.deployment_managers.get(nfdeployment.deploymentManagerId)
+ dms_res = dms.serialize()
+ p = dms_res.pop("profile", None)
+ k8sconf_path = _get_kube_config_path(nfdeployment.deploymentManagerId, p)
+
helm = Helm(logger, LOCAL_HELM_BIN, environment_variables={})
logger.debug('Try to helm del {}'.format(
# myflags = {"name": "version", "value": tokens[1]}
result = helm.uninstall(
nfdeployment.name, flags=myflags,
- kubeconfig=K8S_KUBECONFIG,
- token=K8S_TOKEN, apiserver=K8S_APISERVER)
+ kubeconfig=k8sconf_path,)
+ # token=K8S_TOKEN, apiserver=K8S_APISERVER)
logger.debug('result: {}'.format(result))
# in case success
with uow:
uow.nfdeployments.delete(cmd.NfDeploymentId)
uow.commit()
+
+
+def _get_kube_config_path(dmId: str, kubeconfig: dict) -> dict:
+
+ # TODO: update this kube file for each DMS k8s when it changes.
+
+ link_file_path = '/tmp/kubeconfig_' + dmId
+ if os.path.exists(link_file_path) and \
+ os.path.exists(os.readlink(link_file_path)):
+ return link_file_path
+
+ # Generate a random key for tmp kube config file
+ letters = string.ascii_uppercase
+ random_key = ''.join(random.choice(letters) for i in range(10))
+
+ # Get datetime of now as tag of the tmp file
+ current_time = datetime.now().strftime("%Y%m%d%H%M%S")
+ tmp_file_name = random_key + "_" + current_time
+ tmp_file_path = '/tmp/kubeconfig_' + tmp_file_name
+
+ data = config.gen_k8s_config_dict(
+ kubeconfig.pop('cluster_api_endpoint', None),
+ kubeconfig.pop('cluster_ca_cert', None),
+ kubeconfig.pop('admin_user', None),
+ kubeconfig.pop('admin_client_cert', None),
+ kubeconfig.pop('admin_client_key', None),
+ )
+
+ # write down the yaml file of kubectl into tmp folder
+ with open(tmp_file_path, 'w') as file:
+ yaml.dump(data, file)
+
+ # os.symlink(tmp_file_path, link_file_path)
+ os.symlink(tmp_file_path, '/tmp/tmp_'+tmp_file_name)
+ os.rename('/tmp/tmp_'+tmp_file_name, link_file_path)
+ if os.path.realpath(link_file_path) != tmp_file_path:
+ # Symlink was updated failed
+ logger.error('symlink update failed')
+
+ return link_file_path
ResourceTypeEnum.PSERVER, self._hostconverter(host))
def getK8sList(self, **filters) -> List[ocloudModel.StxGenericModel]:
- k8sclusters = self.stxclient.kube_cluster.list()
- logger.debug('k8sresources[0]:' + str(k8sclusters[0].to_dict()))
- # logger.debug('k8sresources[0] cluster_api_endpoint: ' +
- # str(k8sclusters[0].cluster_api_endpoint))
- return [ocloudModel.StxGenericModel(
- ResourceTypeEnum.DMS,
- self._k8sconverter(k8sres), self._k8shasher(k8sres))
- for k8sres in k8sclusters if k8sres]
+ systems = self.stxclient.isystem.list()
+ logger.debug('system controller distributed_cloud_role:' +
+ str(systems[0].distributed_cloud_role))
+
+ if systems[0].distributed_cloud_role is None or \
+ systems[0].distributed_cloud_role != 'systemcontroller':
+ k8sclusters = self.stxclient.kube_cluster.list()
+ setattr(k8sclusters[0], 'cloud_name', systems[0].name)
+ logger.debug('k8sresources[0]:' + str(k8sclusters[0].to_dict()))
+ # logger.debug('k8sresources[0] cluster_api_endpoint: ' +
+ # str(k8sclusters[0].cluster_api_endpoint))
+ return [ocloudModel.StxGenericModel(
+ ResourceTypeEnum.DMS,
+ self._k8sconverter(k8sres), self._k8shasher(k8sres))
+ for k8sres in k8sclusters if k8sres]
+
+ k8s_list = []
+ if config.get_system_controller_as_respool():
+ k8sclusters = self.stxclient.kube_cluster.list()
+ setattr(k8sclusters[0], 'cloud_name', systems[0].name)
+ logger.debug('k8sresources[0]:' + str(k8sclusters[0].to_dict()))
+ # logger.debug('k8sresources[0] cluster_api_endpoint: ' +
+ # str(k8sclusters[0].cluster_api_endpoint))
+ k8s_list.append(k8sclusters[0])
+
+ subclouds = self.getSubcloudList()
+ logger.debug('subclouds numbers: %s' % len(subclouds))
+ for subcloud in subclouds:
+ try:
+ subcloud_stxclient = self.getSubcloudClient(
+ subcloud.subcloud_id)
+ systems = subcloud_stxclient.isystem.list()
+ k8sclusters = subcloud_stxclient.kube_cluster.list()
+ setattr(k8sclusters[0], 'cloud_name', systems[0].name)
+ logger.debug('k8sresources[0]:' +
+ str(k8sclusters[0].to_dict()))
+ # logger.debug('k8sresources[0] cluster_api_endpoint: ' +
+ # str(k8sclusters[0].cluster_api_endpoint))
+ k8s_list.append(k8sclusters[0])
+ except Exception as ex:
+ logger.warning('Failed get cgstclient of subcloud %s: %s' %
+ (subcloud.name, ex))
+ continue
+
+ return [ocloudModel.StxGenericModel(ResourceTypeEnum.DMS,
+ self._k8sconverter(k8sres), self._k8shasher(k8sres))
+ for k8sres in k8s_list if k8sres]
def getK8sDetail(self, name) -> ocloudModel.StxGenericModel:
+ systems = self.stxclient.isystem.list()
if not name:
k8sclusters = self.stxclient.kube_cluster.list()
# logger.debug("k8sresources[0]:" + str(k8sclusters[0].to_dict()))
+ setattr(k8sclusters[0], 'cloud_name', systems[0].name)
k8scluster = k8sclusters.pop()
else:
- k8scluster = self.stxclient.kube_cluster.get(name)
+ sname = name.split('.')
+ cloud_name = '.'.join(sname[:-1])
+ k8s_name = sname[-1]
+ if cloud_name == systems[0].name:
+ k8scluster = self.stxclient.kube_cluster.get(k8s_name)
+ setattr(k8scluster, 'cloud_name', cloud_name)
+ else:
+ subclouds = self.getSubcloudList()
+ subcloud_id = [
+ sub.subcloud_id for sub in subclouds
+ if sub.name == cloud_name][0]
+ subcloud_stxclient = self.getSubcloudClient(subcloud_id)
+ k8scluster = subcloud_stxclient.kube_cluster.get(k8s_name)
+ setattr(k8scluster, 'cloud_name', cloud_name)
+ # logger.debug('k8sresources[0]:' +
+ # str(k8sclusters[0].to_dict()))
+ # logger.debug('k8sresources[0] cluster_api_endpoint: ' +
+ # str(k8sclusters[0].cluster_api_endpoint))
if not k8scluster:
return None
@ staticmethod
def _k8sconverter(cluster):
- setattr(cluster, 'name', cluster.cluster_name)
+ setattr(cluster, 'name', cluster.cloud_name +
+ '.' + cluster.cluster_name)
setattr(cluster, 'uuid',
- uuid.uuid3(uuid.NAMESPACE_URL, cluster.cluster_name))
+ uuid.uuid3(uuid.NAMESPACE_URL, cluster.name))
setattr(cluster, 'updated_at', None)
setattr(cluster, 'created_at', None)
setattr(cluster, 'events', [])
@ staticmethod
def _k8shasher(cluster):
- return str(hash((cluster.cluster_name,
+ return str(hash((cluster.cluster_name, cluster.cloud_name,
cluster.cluster_api_endpoint, cluster.admin_user)))
assert k8s1.name == k8s2.name
assert k8s1.id == k8s2.id
+ if len(k8slist) > 1:
+ k8s3 = k8slist[1]
+ k8s4 = stxClientImp.getK8sDetail(k8s3.name)
+ assert k8s3 != k8s4
+ assert k8s3.name == k8s4.name
+ assert k8s3.id == k8s4.id
+
def test_get_cpu_list(real_stx_aio_client):
stxClientImp = StxClientImp(real_stx_aio_client)
assert port1.id == port2.id
-def test_get_subcloud_list(real_stx_aio_client, real_stx_dc_client):
- # dcClientImp = StxClientImp(real_stx_dc_client)
- dcClientImp = StxClientImp(
- stx_client=real_stx_aio_client, dc_client=real_stx_dc_client)
- sa = dcClientImp.getSubcloudList()
- assert len(sa) == 0
+def test_get_res_pool_list(real_stx_aio_client, real_stx_dc_client):
+ stxClientImp = StxClientImp(real_stx_aio_client, real_stx_dc_client)
+ assert stxClientImp is not None
+ reslist = stxClientImp.getResourcePoolList()
+ assert reslist is not None
+ assert len(reslist) > 0
+ res1 = reslist[0]
+ res2 = stxClientImp.getResourcePoolDetail(res1.id)
+ assert res1 != res2
+ assert res1.name == res2.name
+ assert res1.id == res2.id
+
+ if len(reslist) > 1:
+ res3 = reslist[1]
+ res4 = stxClientImp.getResourcePoolDetail(res3.id)
+ assert res3 != res4
+ assert res3.name == res4.name
+ assert res3.id == res4.id