X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2ims%2Fadapter%2Fclients%2Focloud_client.py;h=6cbeda761265e17846829393f37ef84ae4850ce4;hb=defeb292d90ce07556f0dc6d5f9ab0d16d760e42;hp=b2ea3d300334232b0b1f838d965e823a03353a24;hpb=f86db0cb8f06d0561575582fdcd38dacae16bac4;p=pti%2Fo2.git diff --git a/o2ims/adapter/clients/ocloud_client.py b/o2ims/adapter/clients/ocloud_client.py index b2ea3d3..6cbeda7 100644 --- a/o2ims/adapter/clients/ocloud_client.py +++ b/o2ims/adapter/clients/ocloud_client.py @@ -14,6 +14,7 @@ # client talking to Stx standalone +import base64 import uuid import json from typing import List @@ -22,6 +23,7 @@ from typing import List from cgtsclient.client import get_client as get_stx_client from cgtsclient.exc import EndpointException from dcmanagerclient.api.client import client as get_dc_client +from kubernetes import client as k8sclient, config as k8sconfig from o2common.config import config from o2common.service.client.base_client import BaseClient @@ -226,9 +228,11 @@ class StxClientImp(object): subcloud_additional_details(subcloud_id) logger.debug('subcloud name: %s, oam_floating_ip: %s' % (subcloud[0].name, subcloud[0].oam_floating_ip)) + if subcloud[0].oam_floating_ip == 'unavailable': + raise EnvironmentError(f"{subcloud[0].name} was unavailable") try: os_client_args = config.get_stx_access_info( - region_name=subcloud[0].name, + region_name=subcloud[0].region_name, subcloud_hostname=subcloud[0].oam_floating_ip) # logger.info(os_client_args) config_client = get_stx_client(**os_client_args) @@ -236,7 +240,7 @@ class StxClientImp(object): msg = e.format_message() if CGTSCLIENT_ENDPOINT_ERROR_MSG in msg: os_client_args = config.get_stx_access_info( - region_name=subcloud[0].name, sub_is_https=True, + region_name=subcloud[0].region_name, sub_is_https=True, subcloud_hostname=subcloud[0].oam_floating_ip) # logger.info(os_client_args) config_client = get_stx_client(**os_client_args) @@ -247,6 +251,24 @@ class StxClientImp(object): return config_client + def getK8sClient(self, k8scluster): + def _b64_encode_str(msg: str, encode: str = 'utf-8') -> str: + msg_bytes = msg.encode('utf-8') + base64_bytes = base64.b64encode(msg_bytes) + base64_msg = base64_bytes.decode('utf-8') + return base64_msg + + conf_dict = config.gen_k8s_config_dict( + k8scluster.cluster_api_endpoint, + _b64_encode_str(k8scluster.cluster_ca_cert), + k8scluster.admin_user, + _b64_encode_str(k8scluster.admin_client_cert), + _b64_encode_str(k8scluster.admin_client_key), + ) + k8sconfig.load_kube_config_from_dict(conf_dict) + v1 = k8sclient.CoreV1Api() + return v1 + def setStxClient(self, resource_pool_id): systems = self.stxclient.isystem.list() if resource_pool_id == systems[0].uuid: @@ -299,7 +321,7 @@ class StxClientImp(object): subcloud_stxclient = self.getSubcloudClient( subcloud.subcloud_id) systems = subcloud_stxclient.isystem.list() - logger.debug('systems:' + str(systems[0].to_dict())) + logger.debug('subcloud system:' + str(systems[0].to_dict())) pools.append(systems[0]) except Exception as ex: logger.warning('Failed get cgstclient of subcloud %s: %s' % @@ -363,15 +385,59 @@ class StxClientImp(object): return True return False - def _setK8sCapabilities(self, client, k8scluster): + def _getK8sNodes(self, k8sclient): + return k8sclient.list_node() + + def _getK8sNodeDetail(self, k8sclient, node_name): + return k8sclient.read_node(name=node_name) + + def _getK8sCapabilities(self, k8s_client): + k8s_capabilities = {} + nodes = self._getK8sNodes(k8s_client) + for node in nodes.items: + logger.debug(f'k8s node {node.metadata.name} allocatable: ' + f'{node.status.allocatable}') + for allocatable in node.status.allocatable: + if allocatable.startswith('intel.com/pci_sriov_net_'): + k8s_capabilities[f'{node.metadata.name}_sriov'] = True + if allocatable == 'windriver.com/isolcpus': + k8s_capabilities[f'{node.metadata.name}_isolcpus'] = True + return k8s_capabilities + + def _setK8sCapabilities(self, k8scluster, client, k8s_client): capabilities = {} label_OS_2chk = {'key': 'OS', 'value': 'low_latency'} if self._checkLabelExistOnCluster(client, label_OS_2chk, True): logger.debug("low latency host inside of the k8s cluster") capabilities[label_OS_2chk['key']] = label_OS_2chk['value'] + + # Add Kubernetes capabilities + k8s_capabilities = self._getK8sCapabilities(k8s_client) + capabilities.update(k8s_capabilities) + setattr(k8scluster, 'capabilities', json.dumps(capabilities)) return k8scluster + def _getK8sCapacity(self, k8s_client): + k8s_capacity = {} + nodes = self._getK8sNodes(k8s_client) + for node in nodes.items: + logger.debug(f'k8s node {node.metadata.name} capacity: ' + f'{node.status.capacity}') + for key, value in node.status.capacity.items(): + k8s_capacity[f'{node.metadata.name}_{key}'] = value + return k8s_capacity + + def _setK8sCapacity(self, k8scluster, client, k8s_client): + capacity = {} + + # Add Kubernetes capacity + k8s_capacity = self._getK8sCapacity(k8s_client) + capacity.update(k8s_capacity) + + setattr(k8scluster, 'capacity', json.dumps(capacity)) + return k8scluster + def getLabelList(self, **filters) -> List[ocloudModel.StxGenericModel]: hostid = filters.get('hostid', None) assert (hostid is not None), 'missing hostid to query label list' @@ -383,7 +449,12 @@ class StxClientImp(object): def getK8sList(self, **filters) -> List[ocloudModel.StxGenericModel]: def process_cluster(client, cluster): setattr(cluster, 'cloud_name', systems[0].name) - cluster = self._setK8sCapabilities(client, cluster) + setattr(cluster, 'cloud_uuid', systems[0].uuid) + + k8s_client = self.getK8sClient(cluster) + cluster = self._setK8sCapabilities(cluster, client, k8s_client) + cluster = self._setK8sCapacity(cluster, client, k8s_client) + logger.debug('k8sresources cluster_api_endpoint: ' + str(cluster.cluster_api_endpoint)) return ocloudModel.StxGenericModel(ResourceTypeEnum.DMS, @@ -430,13 +501,18 @@ class StxClientImp(object): return k8s_list def getK8sDetail(self, name) -> ocloudModel.StxGenericModel: - def process_k8s_cluster(client, k8s_cluster, cloud_name): + def process_k8s_cluster(client, k8s_cluster, cloud_name, cloud_uuid): setattr(k8s_cluster, 'cloud_name', cloud_name) - k8s_cluster = self._setK8sCapabilities(client, k8s_cluster) + setattr(k8s_cluster, 'cloud_uuid', cloud_uuid) + + k8s_client = self.getK8sClient(k8s_cluster) + cluster = self._setK8sCapabilities(k8s_cluster, client, k8s_client) + cluster = self._setK8sCapacity(cluster, client, k8s_client) return k8s_cluster systems = self.stxclient.isystem.list() system_name = systems[0].name + system_uuid = systems[0].uuid if not name: k8s_clusters = self.stxclient.kube_cluster.list() @@ -450,16 +526,20 @@ class StxClientImp(object): if cloud_name == system_name: k8s_cluster = process_k8s_cluster( self.stxclient, - self.stxclient.kube_cluster.get(k8s_name), cloud_name) + self.stxclient.kube_cluster.get(k8s_name), cloud_name, + system_uuid) else: subclouds = self.getSubcloudList() subcloud_id = next( sub.subcloud_id for sub in subclouds if sub.name == cloud_name) subcloud_stxclient = self.getSubcloudClient(subcloud_id) + systems = subcloud_stxclient.isystem.list() + system_uuid = systems[0].uuid k8s_cluster = process_k8s_cluster( subcloud_stxclient, - subcloud_stxclient.kube_cluster.get(k8s_name), cloud_name) + subcloud_stxclient.kube_cluster.get(k8s_name), cloud_name, + system_uuid) if not k8s_cluster: return None @@ -582,12 +662,12 @@ class StxClientImp(object): 'more than one system exists in the account.') return isystems[0] - @ staticmethod + @staticmethod def _respoolconverter(res_pool): setattr(res_pool, 'name', res_pool.region_name) return res_pool - @ staticmethod + @staticmethod def _hostconverter(host): selected_keys = [ "hostname", "personality", "id", "mgmt_ip", "mgmt_mac", @@ -603,7 +683,7 @@ class StxClientImp(object): setattr(host, 'name', host.hostname) return host - @ staticmethod + @staticmethod def _labelconverter(label): selected_keys = [ "uuid", "label_key", "label_value", "host_uuid" @@ -619,7 +699,7 @@ class StxClientImp(object): setattr(label, 'created_at', None) return label - @ staticmethod + @staticmethod def _cpuconverter(cpu): selected_keys = [ "cpu", "core", "thread", "allocated_function", "numa_node", @@ -633,7 +713,7 @@ class StxClientImp(object): '-', 1)[0] + '-cpu-'+str(cpu.cpu)) return cpu - @ staticmethod + @staticmethod def _memconverter(mem): selected_keys = [ "memtotal_mib", "memavail_mib", "vm_hugepages_use_1G", @@ -651,7 +731,7 @@ class StxClientImp(object): '-mem-node-'+str(mem.numa_node)) return mem - @ staticmethod + @staticmethod def _ethconverter(eth): selected_keys = [ "name", "namedisplay", "dev_id", "pdevice", "capabilities", @@ -669,7 +749,7 @@ class StxClientImp(object): setattr(eth, 'created_at', None) return eth - @ staticmethod + @staticmethod def _ifconverter(ifs): selected_keys = [ "ifname", "iftype", "imac", "vlan_id", "imtu", @@ -685,7 +765,7 @@ class StxClientImp(object): setattr(ifs, 'created_at', None) return ifs - @ staticmethod + @staticmethod def _devconverter(dev): selected_keys = [ "name", "pdevice", "pciaddr", "pvendor_id", "pvendor", @@ -699,12 +779,12 @@ class StxClientImp(object): setattr(dev, 'name', dev.host_uuid.split('-', 1)[0] + '-'+dev.name) return dev - @ staticmethod + @staticmethod def _k8sconverter(cluster): setattr(cluster, 'name', cluster.cloud_name + '.' + cluster.cluster_name) setattr(cluster, 'uuid', - uuid.uuid3(uuid.NAMESPACE_URL, cluster.cluster_name)) + uuid.uuid3(uuid.NAMESPACE_URL, cluster.cloud_uuid)) setattr(cluster, 'updated_at', None) setattr(cluster, 'created_at', None) setattr(cluster, 'events', []) @@ -712,7 +792,7 @@ class StxClientImp(object): cluster.name + '/' + str(cluster.uuid)) return cluster - @ staticmethod + @staticmethod def _k8shasher(cluster): return str(hash((cluster.cluster_name, cluster.cloud_name, cluster.cluster_api_endpoint, cluster.admin_user,