\r
# client talking to Stx standalone\r
\r
-from service.client.base_client import BaseClient\r
+from o2ims.service.client.base_client import BaseClient\r
from typing import List\r
# Optional, Set\r
from o2ims.domain import stx_object as ocloudModel\r
from o2ims import config\r
\r
+# from dcmanagerclient.api import client\r
+from cgtsclient.client import get_client\r
+import logging\r
+logger = logging.getLogger(__name__)\r
+\r
\r
class StxSaOcloudClient(BaseClient):\r
- def __init__(self):\r
+ def __init__(self, driver=None):\r
super().__init__()\r
- self.driver = StxSaClientImp()\r
+ self.driver = driver if driver else StxSaClientImp()\r
\r
# def list(self) -> List[ocloudModel.StxGenericModel]:\r
# return self._list()\r
# return self._get(id)\r
\r
def _get(self, id) -> ocloudModel.StxGenericModel:\r
- raise self.driver.getInstanceInfo()\r
+ return self.driver.getInstanceInfo()\r
\r
def _list(self):\r
return [self.driver.getInstanceInfo()]\r
def _list(self):\r
return self.driver.getK8sList()\r
\r
-# internal driver which implement client call to Stx Standalone instance\r
\r
-# from keystoneauth1.identity import v3\r
-# from keystoneauth1 import session\r
-# # from keystoneclient.v3 import ksclient\r
-# from starlingxclient.v3 import stxclient\r
+class StxPserverClient(BaseClient):\r
+ def __init__(self):\r
+ super().__init__()\r
+ self.driver = StxSaClientImp()\r
+\r
+ def _get(self, id) -> ocloudModel.StxGenericModel:\r
+ return self.driver.getPserver(id)\r
+\r
+ def _list(self) -> List[ocloudModel.StxGenericModel]:\r
+ return self.driver.getPserverList()\r
+\r
+# internal driver which implement client call to Stx Standalone instance\r
\r
\r
class StxSaClientImp(object):\r
- def __init__(self, access_info=None) -> None:\r
+ def __init__(self, stx_client=None):\r
super().__init__()\r
- self.access_info = access_info\r
- if self.access_info is None:\r
- self.access_info = config.get_stx_access_info()\r
- # self.auth = auth = v3.Password(\r
- # auth_url="http://example.com:5000/v3", username="admin",\r
- # password="password", project_name="admin",\r
- # user_domain_id="default", project_domain_id="default")\r
- # self.session = sess = session.Session(auth=auth)\r
- # # self.keystone = ksclient.Client(session=sess)\r
- # self.stx = stxclient.Client(session=sess)\r
+ self.stxclient = stx_client if stx_client else self.getStxClient()\r
+\r
+ def getStxClient():\r
+ os_client_args = config.get_stx_access_info()\r
+ config_client = get_client(**os_client_args)\r
+ return config_client\r
\r
def getInstanceInfo(self) -> ocloudModel.StxGenericModel:\r
- raise NotImplementedError\r
+ systems = self.stxclient.isystem.list()\r
+ logger.debug("systems:" + str(systems[0].to_dict()))\r
+ return ocloudModel.StxGenericModel(systems[0]) if systems else None\r
+\r
+ def getPserverList(self) -> List[ocloudModel.StxGenericModel]:\r
+ hosts = self.stxclient.ihost.list()\r
+ logger.debug("host 1:" + str(hosts[0].to_dict()))\r
+ return [ocloudModel.StxGenericModel(self._hostconverter(host))\r
+ for host in hosts if host]\r
+\r
+ def getPserver(self, id) -> ocloudModel.StxGenericModel:\r
+ host = self.stxclient.ihost.get(id)\r
+ logger.debug("host:" + str(host.to_dict()))\r
+ return ocloudModel.StxGenericModel(self._hostconverter(host))\r
\r
def getK8sList(self) -> List[ocloudModel.StxGenericModel]:\r
raise NotImplementedError\r
\r
def getK8sDetail(self, id) -> ocloudModel.StxGenericModel:\r
raise NotImplementedError\r
+\r
+ @staticmethod\r
+ def _hostconverter(host):\r
+ setattr(host, "name", host.hostname)\r
+ return host\r