from cgtsclient.client import get_client as get_stx_client
from cgtsclient.exc import EndpointException
from dcmanagerclient.api.client import client as get_dc_client
from cgtsclient.client import get_client as get_stx_client
from cgtsclient.exc import EndpointException
from dcmanagerclient.api.client import client as get_dc_client
+from o2common.config import config
+from o2common.service.client.base_client import BaseClient
+from o2ims.domain import stx_object as ocloudModel
+from o2ims.domain.resource_type import ResourceTypeEnum
+
return self.driver.getPserver(id)
def _list(self, **filters) -> List[ocloudModel.StxGenericModel]:
return self.driver.getPserver(id)
def _list(self, **filters) -> List[ocloudModel.StxGenericModel]:
os_client_args = config.get_stx_access_info(
region_name=subcloud[0].name,
subcloud_hostname=subcloud[0].oam_floating_ip)
os_client_args = config.get_stx_access_info(
region_name=subcloud[0].name,
subcloud_hostname=subcloud[0].oam_floating_ip)
os_client_args = config.get_stx_access_info(
region_name=subcloud[0].name, sub_is_https=True,
subcloud_hostname=subcloud[0].oam_floating_ip)
os_client_args = config.get_stx_access_info(
region_name=subcloud[0].name, sub_is_https=True,
subcloud_hostname=subcloud[0].oam_floating_ip)
config_client = get_stx_client(**os_client_args)
else:
raise ValueError('Stx endpoint exception: %s' % msg)
config_client = get_stx_client(**os_client_args)
else:
raise ValueError('Stx endpoint exception: %s' % msg)
if systems[0].distributed_cloud_role is None or \
systems[0].distributed_cloud_role != 'systemcontroller':
return [ocloudModel.StxGenericModel(
if systems[0].distributed_cloud_role is None or \
systems[0].distributed_cloud_role != 'systemcontroller':
return [ocloudModel.StxGenericModel(
return [ocloudModel.StxGenericModel(
ResourceTypeEnum.RESOURCE_POOL,
return [ocloudModel.StxGenericModel(
ResourceTypeEnum.RESOURCE_POOL,
def getResourcePoolDetail(self, id):
self.setStxClient(id)
systems = self.stxclient.isystem.list()
logger.debug('systems:' + str(systems[0].to_dict()))
return ocloudModel.StxGenericModel(
def getResourcePoolDetail(self, id):
self.setStxClient(id)
systems = self.stxclient.isystem.list()
logger.debug('systems:' + str(systems[0].to_dict()))
return ocloudModel.StxGenericModel(
def getPserverList(self, **filters) -> List[ocloudModel.StxGenericModel]:
hosts = self.stxclient.ihost.list()
def getPserverList(self, **filters) -> List[ocloudModel.StxGenericModel]:
hosts = self.stxclient.ihost.list()
return [ocloudModel.StxGenericModel(
ResourceTypeEnum.PSERVER, self._hostconverter(host))
for host in hosts if host and (host.availability == 'available'
return [ocloudModel.StxGenericModel(
ResourceTypeEnum.PSERVER, self._hostconverter(host))
for host in hosts if host and (host.availability == 'available'
or host.availability == 'degraded')]
def getPserver(self, id) -> ocloudModel.StxGenericModel:
or host.availability == 'degraded')]
def getPserver(self, id) -> ocloudModel.StxGenericModel:
@ staticmethod
def _hostconverter(host):
setattr(host, 'name', host.hostname)
@ staticmethod
def _hostconverter(host):
setattr(host, 'name', host.hostname)