from o2ims.service.watcher.pserver_if_watcher import PServerIfWatcher\r
from o2ims.adapter.clients.ocloud_sa_client import StxIfClient\r
\r
-from o2ims.service.watcher.pserver_port_watcher import PServerPortWatcher\r
-from o2ims.adapter.clients.ocloud_sa_client import StxPortClient\r
+from o2ims.service.watcher.pserver_port_watcher import PServerIfPortWatcher\r
+from o2ims.adapter.clients.ocloud_sa_client import StxIfPortClient\r
+\r
+from o2ims.service.watcher.pserver_eth_watcher import PServerEthWatcher\r
+from o2ims.adapter.clients.ocloud_sa_client import StxEthClient\r
\r
from o2common.helper import o2logging\r
logger = o2logging.get_logger(__name__)\r
child_pserver.addchild(\r
PServerMemWatcher(StxMemClient(), self.bus))\r
child_pserver.addchild(\r
+ PServerEthWatcher(StxEthClient(), self.bus))\r
+ child_if = child_pserver.addchild(\r
PServerIfWatcher(StxIfClient(), self.bus))\r
- child_pserver.addchild(\r
- PServerPortWatcher(StxPortClient(), self.bus))\r
+ # child_if.addchild(\r
+ # PServerIfPortWatcher(StxIfPortClient(), self.bus))\r
\r
self.worker.add_watcher(root)\r
\r
# pylint: disable=unused-argument
from __future__ import annotations
+from o2dms.service.nfdeployment_handler import install_nfdeployment
+from o2dms.service.nfdeployment_handler import publish_nfdeployment_created
# from dataclasses import asdict
from typing import List, Dict, Callable, Type
# TYPE_CHECKING
from o2dms.domain import events as o2dms_events
from o2ims.service.auditor import ocloud_handler, dms_handler, \
resourcepool_handler, pserver_handler, pserver_cpu_handler, \
- pserver_mem_handler, pserver_port_handler, pserver_if_handler
-from o2dms.service.nfdeployment_handler import publish_nfdeployment_created
-from o2dms.service.nfdeployment_handler import install_nfdeployment
+ pserver_mem_handler, pserver_port_handler, pserver_if_handler,\
+ pserver_eth_handler
# if TYPE_CHECKING:
# from . import unit_of_work
commands.UpdatePserverCpu: pserver_cpu_handler.update_pserver_cpu,
commands.UpdatePserverMem: pserver_mem_handler.update_pserver_mem,
commands.UpdatePserverIf: pserver_if_handler.update_pserver_if,
- commands.UpdatePserverPort: pserver_port_handler.update_pserver_port,
+ commands.UpdatePserverIfPort: pserver_port_handler.update_pserver_port,
+ commands.UpdatePserverEth: pserver_eth_handler.update_pserver_eth,
o2dms_cmmands.InstallNfDeployment: install_nfdeployment
-
} # type: Dict[Type[commands.Command], Callable]
\r
from datetime import datetime\r
from typing import List\r
+from sqlalchemy.inspection import inspect\r
from .events import Event\r
\r
\r
class AgRoot:\r
+\r
def __init__(self) -> None:\r
self.hash = ""\r
self.updatetime = datetime.now()\r
self.createtime = datetime.now()\r
self.events = [] # type: List[Event]\r
# self.id = ""\r
+\r
+\r
+class Serializer(object):\r
+\r
+ def serialize(self):\r
+ # d = {c: getattr(self, c) for c in inspect(self).attrs.keys()}\r
+ # if 'createtime' in d:\r
+ # d['createtime'] = d['createtime'].isoformat()\r
+ # if 'updatetime' in d:\r
+ # d['updatetime'] = d['updatetime'].isoformat()\r
+ # return d\r
+ return {c: getattr(self, c) for c in inspect(self).attrs.keys()}\r
+\r
+ @staticmethod\r
+ def serialize_list(li):\r
+ return [m.serialize() for m in li]\r
class StxMemClient(BaseClient):\r
def __init__(self):\r
super().__init__()\r
- # self._pserver_id = pserver_id\r
self.driver = StxSaClientImp()\r
\r
def _get(self, id) -> ocloudModel.StxGenericModel:\r
return self.driver.getMemList(**filters)\r
\r
\r
-class StxPortClient(BaseClient):\r
+class StxEthClient(BaseClient):\r
def __init__(self):\r
super().__init__()\r
- # self._pserver_id = pserver_id\r
self.driver = StxSaClientImp()\r
\r
def _get(self, id) -> ocloudModel.StxGenericModel:\r
- return self.driver.getPort(id)\r
+ return self.driver.getEthernet(id)\r
\r
def _list(self, **filters) -> List[ocloudModel.StxGenericModel]:\r
- return self.driver.getPortList(**filters)\r
+ return self.driver.getEthernetList(**filters)\r
\r
\r
class StxIfClient(BaseClient):\r
def __init__(self):\r
super().__init__()\r
- # self._pserver_id = pserver_id\r
self.driver = StxSaClientImp()\r
\r
def _get(self, id) -> ocloudModel.StxGenericModel:\r
return self.driver.getIfList(**filters)\r
\r
\r
+class StxIfPortClient(BaseClient):\r
+ def __init__(self):\r
+ super().__init__()\r
+ self.driver = StxSaClientImp()\r
+\r
+ def _get(self, id) -> ocloudModel.StxGenericModel:\r
+ return self.driver.getPort(id)\r
+\r
+ def _list(self, **filters) -> List[ocloudModel.StxGenericModel]:\r
+ return self.driver.getPortList(**filters)\r
+\r
+\r
# internal driver which implement client call to Stx Standalone instance\r
class StxSaClientImp(object):\r
def __init__(self, stx_client=None):\r
\r
def getInstanceInfo(self) -> ocloudModel.StxGenericModel:\r
systems = self.stxclient.isystem.list()\r
- logger.debug("systems:" + str(systems[0].to_dict()))\r
+ logger.debug('systems:' + str(systems[0].to_dict()))\r
return ocloudModel.StxGenericModel(\r
ResourceTypeEnum.OCLOUD, systems[0]) if systems else None\r
\r
def getPserverList(self, **filters) -> List[ocloudModel.StxGenericModel]:\r
# resourcepoolid = filters.get("resourcepoolid", None)\r
hosts = self.stxclient.ihost.list()\r
- logger.debug("host 1:" + str(hosts[0].to_dict()))\r
+ logger.debug('host 1:' + str(hosts[0].to_dict()))\r
return [ocloudModel.StxGenericModel(\r
ResourceTypeEnum.PSERVER, self._hostconverter(host))\r
for host in hosts if host]\r
\r
def getPserver(self, id) -> ocloudModel.StxGenericModel:\r
host = self.stxclient.ihost.get(id)\r
- logger.debug("host:" + str(host.to_dict()))\r
+ logger.debug('host:' + str(host.to_dict()))\r
return ocloudModel.StxGenericModel(\r
ResourceTypeEnum.PSERVER, self._hostconverter(host))\r
\r
def getK8sList(self, **filters) -> List[ocloudModel.StxGenericModel]:\r
k8sclusters = self.stxclient.kube_cluster.list()\r
- logger.debug("k8sresources[0]:" + str(k8sclusters[0].to_dict()))\r
+ logger.debug('k8sresources[0]:' + str(k8sclusters[0].to_dict()))\r
return [ocloudModel.StxGenericModel(\r
ResourceTypeEnum.DMS,\r
self._k8sconverter(k8sres), self._k8shasher(k8sres))\r
\r
if not k8scluster:\r
return None\r
- logger.debug("k8sresource:" + str(k8scluster.to_dict()))\r
+ logger.debug('k8sresource:' + str(k8scluster.to_dict()))\r
return ocloudModel.StxGenericModel(\r
ResourceTypeEnum.DMS,\r
self._k8sconverter(k8scluster), self._k8shasher(k8scluster))\r
\r
def getCpuList(self, **filters) -> List[ocloudModel.StxGenericModel]:\r
- hostid = filters.get("hostid", None)\r
- assert (hostid is not None), "missing hostid to query icpu list"\r
+ hostid = filters.get('hostid', None)\r
+ assert (hostid is not None), 'missing hostid to query icpu list'\r
cpulist = self.stxclient.icpu.list(hostid)\r
return [ocloudModel.StxGenericModel(\r
ResourceTypeEnum.PSERVER_CPU,\r
ResourceTypeEnum.PSERVER_CPU, self._cpuconverter(cpuinfo))\r
\r
def getMemList(self, **filters) -> List[ocloudModel.StxGenericModel]:\r
- hostid = filters.get("hostid", None)\r
- assert (hostid is not None), "missing hostid to query imem list"\r
+ hostid = filters.get('hostid', None)\r
+ assert (hostid is not None), 'missing hostid to query imem list'\r
memlist = self.stxclient.imemory.list(hostid)\r
return [ocloudModel.StxGenericModel(\r
ResourceTypeEnum.PSERVER_RAM,\r
return ocloudModel.StxGenericModel(\r
ResourceTypeEnum.PSERVER_RAM, self._memconverter(meminfo))\r
\r
- def getPortList(self, **filters) -> List[ocloudModel.StxGenericModel]:\r
- hostid = filters.get("hostid", None)\r
- assert (hostid is not None), "missing hostid to query port list"\r
- portlist = self.stxclient.port.list(hostid)\r
+ def getEthernetList(self, **filters) -> List[ocloudModel.StxGenericModel]:\r
+ hostid = filters.get('hostid', None)\r
+ assert (hostid is not None), 'missing hostid to query port list'\r
+ ethlist = self.stxclient.ethernet_port.list(hostid)\r
return [ocloudModel.StxGenericModel(\r
- ResourceTypeEnum.PSERVER_PORT,\r
- port) for port in portlist if port]\r
+ ResourceTypeEnum.PSERVER_ETH,\r
+ self._ethconverter(eth)) for eth in ethlist if eth]\r
\r
- def getPort(self, id) -> ocloudModel.StxGenericModel:\r
- portinfo = self.stxclient.port.get(id)\r
+ def getEthernet(self, id) -> ocloudModel.StxGenericModel:\r
+ ethinfo = self.stxclient.ethernet_port.get(id)\r
return ocloudModel.StxGenericModel(\r
- ResourceTypeEnum.PSERVER_PORT, portinfo)\r
+ ResourceTypeEnum.PSERVER_ETH, self._ethconverter(ethinfo))\r
\r
def getIfList(self, **filters) -> List[ocloudModel.StxGenericModel]:\r
- hostid = filters.get("hostid", None)\r
- assert (hostid is not None), "missing hostid to query iinterface list"\r
+ hostid = filters.get('hostid', None)\r
+ assert (hostid is not None), 'missing hostid to query iinterface list'\r
iflist = self.stxclient.iinterface.list(hostid)\r
return [ocloudModel.StxGenericModel(\r
ResourceTypeEnum.PSERVER_IF,\r
return ocloudModel.StxGenericModel(\r
ResourceTypeEnum.PSERVER_IF, self._ifconverter(ifinfo))\r
\r
+ def getPortList(self, **filters) -> List[ocloudModel.StxGenericModel]:\r
+ ifid = filters.get('interfaceid', None)\r
+ assert (ifid is not None), 'missing interface id to query port list'\r
+ portlist = self.stxclient.iinterface.list_ports(ifid)\r
+ return [ocloudModel.StxGenericModel(\r
+ ResourceTypeEnum.PSERVER_IF_PORT,\r
+ port) for port in portlist if port]\r
+\r
+ def getPort(self, id) -> ocloudModel.StxGenericModel:\r
+ portinfo = self.stxclient.port.get(id)\r
+ return ocloudModel.StxGenericModel(\r
+ ResourceTypeEnum.PSERVER_IF_PORT, portinfo)\r
+\r
def _getIsystems(self):\r
return self.stxclient.isystem.list()\r
\r
\r
@ staticmethod\r
def _hostconverter(host):\r
- setattr(host, "name", host.hostname)\r
+ setattr(host, 'name', host.hostname)\r
return host\r
\r
@ staticmethod\r
def _cpuconverter(cpu):\r
- setattr(cpu, "name", "cpu-"+str(cpu.cpu))\r
+ setattr(cpu, 'name', cpu.ihost_uuid.split(\r
+ '-', 1)[0] + '-cpu-'+str(cpu.cpu))\r
return cpu\r
\r
@ staticmethod\r
def _memconverter(mem):\r
- setattr(mem, "name", "mem-node-"+str(mem.numa_node))\r
+ setattr(mem, 'name', mem.ihost_uuid.split('-', 1)[0] +\r
+ '-mem-node-'+str(mem.numa_node))\r
return mem\r
\r
+ @ staticmethod\r
+ def _ethconverter(eth):\r
+ setattr(eth, 'name', eth.host_uuid.split('-', 1)[0] + '-'+eth.name)\r
+ setattr(eth, 'updated_at', None)\r
+ setattr(eth, 'created_at', None)\r
+ return eth\r
+\r
@ staticmethod\r
def _ifconverter(ifs):\r
- setattr(ifs, "name", ifs.ifname)\r
+ setattr(ifs, 'name', ifs.ihost_uuid.split('-', 1)[0] + '-'+ifs.ifname)\r
setattr(ifs, 'updated_at', None)\r
setattr(ifs, 'created_at', None)\r
return ifs\r
\r
@ staticmethod\r
def _k8sconverter(cluster):\r
- setattr(cluster, "name", cluster.cluster_name)\r
- setattr(cluster, "uuid",\r
+ setattr(cluster, 'name', cluster.cluster_name)\r
+ setattr(cluster, 'uuid',\r
uuid.uuid3(uuid.NAMESPACE_URL, cluster.cluster_name))\r
setattr(cluster, 'updated_at', None)\r
setattr(cluster, 'created_at', None)\r
setattr(cluster, 'events', [])\r
- logger.debug("k8s cluster name/uuid:" +\r
- cluster.name + "/" + str(cluster.uuid))\r
+ logger.debug('k8s cluster name/uuid:' +\r
+ cluster.name + '/' + str(cluster.uuid))\r
return cluster\r
\r
@ staticmethod\r
# limitations under the License.
from typing import List
-# from o2ims.adapter import orm
+
from o2ims.domain import ocloud
from o2ims.domain.ocloud_repo import OcloudRepository, ResourceTypeRepository,\
ResourcePoolRepository, ResourceRepository, DeploymentManagerRepository,\
oCloudId=ocloud_id).first()
def _list(self) -> List[ocloud.Ocloud]:
- return self.session.query(ocloud.Ocloud).order_by(
- ocloud.Ocloud.name).all()
+ return self.session.query(ocloud.Ocloud)
def _update(self, ocloud: ocloud.Ocloud):
self.session.add(ocloud)
resourceTypeId=resource_type_id).first()
def _list(self) -> List[ocloud.ResourceType]:
- return self.session.query()
+ return self.session.query(ocloud.ResourceType)
def _update(self, resourceType: ocloud.ResourceType):
self.session.add(resourceType)
resourcePoolId=resource_pool_id).first()
def _list(self) -> List[ocloud.ResourcePool]:
- return self.session.query()
+ return self.session.query(ocloud.ResourcePool)
def _update(self, resourcePool: ocloud.ResourcePool):
self.session.add(resourcePool)
self.session.add(resource)
def _get(self, resource_id) -> ocloud.Resource:
+ # return self.session.query(ocloud.Resource).filter_by(
+ # resourceId=resource_id).first()
+ # topq = uow.session.query(orm.resource).filter(
+ # orm.resource.c.resourceId == resourceId).\
+ # cte('cte', recursive=True)
+ # bootomq = uow.session.query(orm.resource).join(
+ # topq, orm.resource.c.parentId == topq.c.resourceId)
+ # res = uow.session.query(topq.union(bootomq))
+ def recursive(id):
+ res = self.session.query(ocloud.Resource).filter_by(
+ resourceId=id).first()
+ if res is not None:
+ query = self.session.query(ocloud.Resource).filter_by(
+ parentId=res.resourceId)
+ children = []
+ for r in query:
+ child = recursive(r.resourceId)
+ children.append(child)
+ res.set_children(children)
+ return res
+ return recursive(resource_id)
+
+ def _list(self, resourcepool_id) -> List[ocloud.Resource]:
return self.session.query(ocloud.Resource).filter_by(
- resourceId=resource_id).first()
-
- def _list(self) -> List[ocloud.Resource]:
- return self.session.query()
+ resourcePoolId=resourcepool_id)
def _update(self, resource: ocloud.Resource):
self.session.add(resource)
deploymentManagerId=deployment_manager_id).first()
def _list(self) -> List[ocloud.DeploymentManager]:
- return self.session.query()
+ return self.session.query(ocloud.DeploymentManager)
def _update(self, deployment_manager: ocloud.DeploymentManager):
self.session.add(deployment_manager)
subscriptionId=subscription_id).first()
def _list(self) -> List[ocloud.Subscription]:
- return self.session.query()
+ return self.session.query(ocloud.Subscription)
def _update(self, subscription: ocloud.Subscription):
self.session.add(subscription)
Column("resourceId", String(255), primary_key=True),\r
Column("resourceTypeId", ForeignKey("resourcetype.resourceTypeId")),\r
Column("resourcePoolId", ForeignKey("resourcepool.resourcePoolId")),\r
+ Column("name", String(255)),\r
# Column("globalAssetId", String(255)),\r
Column("parentId", String(255)),\r
Column("description", String(255)),\r
pass
+@dataclass
+class UpdatePserverEth(UpdateResource):
+ pass
+
+
@dataclass
class UpdatePserverIf(UpdateResource):
pass
@dataclass
-class UpdatePserverPort(UpdateResource):
+class UpdatePserverIfPort(UpdateResource):
pass
# limitations under the License.\r
\r
from __future__ import annotations\r
+import json\r
\r
-from o2common.domain.base import AgRoot\r
+from o2common.domain.base import AgRoot, Serializer\r
# from dataclasses import dataclass\r
# from datetime import date\r
# from typing import Optional, List, Set\r
# from uuid import UUID\r
\r
\r
-class Subscription(AgRoot):\r
+class Subscription(AgRoot, Serializer):\r
def __init__(self, id: str, callback: str, consumersubid: str = '',\r
filter: str = '') -> None:\r
super().__init__()\r
self.filter = filter\r
\r
\r
-class DeploymentManager(AgRoot):\r
+class DeploymentManager(AgRoot, Serializer):\r
def __init__(self, id: str, name: str, ocloudid: str,\r
dmsendpoint: str, description: str = '',\r
supportedLocations: str = '', capabilities: str = '',\r
self.extensions = []\r
\r
\r
-class ResourcePool(AgRoot):\r
+class ResourcePool(AgRoot, Serializer):\r
def __init__(self, id: str, name: str, location: str,\r
ocloudid: str, gLocationId: str = '',\r
description: str = '') -> None:\r
self.extensions = []\r
\r
\r
-class ResourceType(AgRoot):\r
+class ResourceType(AgRoot, Serializer):\r
def __init__(self, typeid: str, name: str, typeEnum: ResourceTypeEnum,\r
ocloudid: str, vender: str = '', model: str = '',\r
version: str = '',\r
self.extensions = []\r
\r
\r
-class Resource(AgRoot):\r
+class Resource(AgRoot, Serializer):\r
def __init__(self, resourceId: str, resourceTypeId: str,\r
- resourcePoolId: str, parentId: str = '',\r
+ resourcePoolId: str, name: str, parentId: str = '',\r
gAssetId: str = '', elements: str = '',\r
description: str = '') -> None:\r
super().__init__()\r
self.version_number = 0\r
self.resourceTypeId = resourceTypeId\r
self.resourcePoolId = resourcePoolId\r
+ self.name = name\r
self.globalAssetId = gAssetId\r
self.parentId = parentId\r
self.elements = elements\r
self.description = description\r
+ self.children = []\r
self.extensions = []\r
\r
+ def set_children(self, children: list):\r
+ self.children = children\r
+\r
+ def serialize(self):\r
+ d = Serializer.serialize(self)\r
+\r
+ if 'elements' in d and d['elements'] != '':\r
+ d['elements'] = json.loads(d['elements'])\r
+\r
+ if not hasattr(self, 'children') or len(self.children) == 0:\r
+ return d\r
+ else:\r
+ d['children'] = []\r
+\r
+ for child in self.children:\r
+ d['children'].append(child.serialize())\r
+ return d\r
+\r
\r
-class Ocloud(AgRoot):\r
+class Ocloud(AgRoot, Serializer):\r
def __init__(self, ocloudid: str, name: str, imsendpoint: str,\r
globalcloudId: str = '',\r
description: str = '', version_number: int = 0) -> None:\r
self.seen.add(resource)\r
return resource\r
\r
- def list(self) -> List[ocloud.Resource]:\r
- return self._list()\r
+ def list(self, resourcepool_id) -> List[ocloud.Resource]:\r
+ return self._list(resourcepool_id)\r
\r
def update(self, resource: ocloud.Resource):\r
self._update(resource)\r
PSERVER = 11\r
PSERVER_CPU = 12\r
PSERVER_RAM = 13\r
- PSERVER_PORT = 14\r
- PSERVER_IF = 15\r
+ PSERVER_IF = 14\r
+ PSERVER_IF_PORT = 15\r
+ PSERVER_ETH = 16\r
\r
\r
class InvalidOcloudState(Exception):\r
stxobj = cmd.data
with uow:
oclouds = uow.oclouds.list()
- if oclouds and len(oclouds) > 1:
+ if oclouds and oclouds.count() > 1:
raise InvalidOcloudState("More than 1 ocloud is found")
- elif not oclouds or len(oclouds) == 0:
+ elif not oclouds or oclouds.count() == 0:
logger.info("add ocloud:" + stxobj.name
+ " update_at: " + str(stxobj.updatetime)
+ " id: " + str(stxobj.id)
logger.info("Add the ocloud: " + stxobj.id
+ ", name: " + stxobj.name)
else:
- localmodel = oclouds.pop()
+ localmodel = oclouds.first()
if is_outdated(localmodel, stxobj):
logger.info("update ocloud:" + stxobj.name
+ " update_at: " + str(stxobj.updatetime)
gAssetId = '' # TODO: global ID
description = "A CPU resource of the physical server"
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description)
+ stxobj.name, parent_id, gAssetId, stxobj.content,
+ description)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import uuid
+# import json
+
+from o2ims.domain import commands
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Resource, ResourceType
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+ pass
+
+
+def update_pserver_eth(
+ cmd: commands.UpdatePserverEth,
+ uow: AbstractUnitOfWork
+):
+ stxobj = cmd.data
+ with uow:
+ p_resource = uow.resources.get(cmd.parentid)
+ resourcepool = uow.resource_pools.get(p_resource.resourcePoolId)
+
+ res = uow.session.execute(
+ '''
+ SELECT "resourceTypeId", "oCloudId", "name"
+ FROM resourcetype
+ WHERE "resourceTypeEnum" = :resource_type_enum
+ ''',
+ dict(resource_type_enum=stxobj.type.name)
+ )
+ first = res.first()
+ if first is None:
+ resourcetype_id = str(uuid.uuid4())
+ uow.resource_types.add(ResourceType(
+ resourcetype_id,
+ 'pserver_ethernet', stxobj.type,
+ resourcepool.oCloudId))
+ else:
+ resourcetype_id = first['resourceTypeId']
+
+ resource = uow.resources.get(stxobj.id)
+ if not resource:
+ logger.info("add the ethernet of pserver:" + stxobj.name
+ + " update_at: " + str(stxobj.updatetime)
+ + " id: " + str(stxobj.id)
+ + " hash: " + str(stxobj.hash))
+ localmodel = create_by(stxobj, p_resource, resourcetype_id)
+ uow.resources.add(localmodel)
+
+ logger.info("Add the ethernet of pserver: " + stxobj.id
+ + ", name: " + stxobj.name)
+ else:
+ localmodel = resource
+ if is_outdated(localmodel, stxobj):
+ logger.info("update ethernet of pserver:" + stxobj.name
+ + " update_at: " + str(stxobj.updatetime)
+ + " id: " + str(stxobj.id)
+ + " hash: " + str(stxobj.hash))
+ update_by(localmodel, stxobj, p_resource)
+ uow.resources.update(localmodel)
+
+ logger.info("Update the ethernet of pserver: " + stxobj.id
+ + ", name: " + stxobj.name)
+ uow.commit()
+
+
+def is_outdated(resource: Resource, stxobj: StxGenericModel):
+ return True if resource.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parent: Resource, resourcetype_id: str)\
+ -> Resource:
+ # content = json.loads(stxobj.content)
+ resourcetype_id = resourcetype_id
+ resourcepool_id = parent.resourcePoolId
+ parent_id = parent.resourceId
+ gAssetId = '' # TODO: global ID
+ description = "An ethernet resource of the physical server"
+ resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
+ stxobj.name, parent_id, gAssetId, stxobj.content,
+ description)
+ resource.createtime = stxobj.createtime
+ resource.updatetime = stxobj.updatetime
+ resource.hash = stxobj.hash
+
+ return resource
+
+
+def update_by(target: Resource, stxobj: StxGenericModel,
+ parentid: str) -> None:
+ if target.resourceId != stxobj.id:
+ raise MismatchedModel("Mismatched Id")
+ target.createtime = stxobj.createtime
+ target.updatetime = stxobj.updatetime
+ target.hash = stxobj.hash
+ target.version_number = target.version_number + 1
+ target.events = []
gAssetId = '' # TODO: global ID
description = "A physical server resource"
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description)
+ stxobj.name, parent_id, gAssetId, stxobj.content,
+ description)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
resourcepool_id = parent.resourcePoolId
parent_id = parent.resourceId
gAssetId = '' # TODO: global ID
- description = "A interface resource of the physical server"
+ description = "An interface resource of the physical server"
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description)
+ stxobj.name, parent_id, gAssetId, stxobj.content,
+ description)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
gAssetId = '' # TODO: global ID
description = "A memory resource of the physical server"
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description)
+ stxobj.name, parent_id, gAssetId, stxobj.content,
+ description)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
def update_pserver_port(
- cmd: commands.UpdatePserverPort,
+ cmd: commands.UpdatePserverIfPort,
uow: AbstractUnitOfWork
):
stxobj = cmd.data
resourcetype_id = str(uuid.uuid4())
uow.resource_types.add(ResourceType(
resourcetype_id,
- 'pserver_port', stxobj.type,
+ 'pserver_if_port', stxobj.type,
resourcepool.oCloudId))
else:
resourcetype_id = first['resourceTypeId']
resource = uow.resources.get(stxobj.id)
if not resource:
- logger.info("add the port of pserver:" + stxobj.name
+ logger.info("add the port of pserver interface:" + stxobj.name
+ " update_at: " + str(stxobj.updatetime)
+ " id: " + str(stxobj.id)
+ " hash: " + str(stxobj.hash))
localmodel = create_by(stxobj, p_resource, resourcetype_id)
uow.resources.add(localmodel)
- logger.info("Add the port of pserver: " + stxobj.id
+ logger.info("Add the port of pserver interface: " + stxobj.id
+ ", name: " + stxobj.name)
else:
localmodel = resource
if is_outdated(localmodel, stxobj):
- logger.info("update port of pserver:" + stxobj.name
+ logger.info("update port of pserver interface:" + stxobj.name
+ " update_at: " + str(stxobj.updatetime)
+ " id: " + str(stxobj.id)
+ " hash: " + str(stxobj.hash))
update_by(localmodel, stxobj, p_resource)
uow.resources.update(localmodel)
- logger.info("Update the port of pserver: " + stxobj.id
+ logger.info("Update the port of pserver interface: " + stxobj.id
+ ", name: " + stxobj.name)
uow.commit()
resourcepool_id = parent.resourcePoolId
parent_id = parent.resourceId
gAssetId = '' # TODO: global ID
- description = "A port resource of the physical server"
+ description = "A port resource of the interface"
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
- parent_id, gAssetId, stxobj.content, description)
+ stxobj.name, parent_id, gAssetId, stxobj.content,
+ description)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+from o2ims.domain.stx_object import StxGenericModel\r
+from o2common.service.client.base_client import BaseClient\r
+# from o2common.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.service.watcher.resource_watcher import ResourceWatcher\r
+from o2ims.domain import commands\r
+from o2common.service.messagebus import MessageBus\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+class PServerEthWatcher(ResourceWatcher):\r
+ def __init__(self, client: BaseClient,\r
+ bus: MessageBus) -> None:\r
+ super().__init__(client, bus)\r
+\r
+ def _targetname(self):\r
+ return "pserver_ethernet"\r
+\r
+ def _probe(self, parent: StxGenericModel):\r
+ hostid = parent.id\r
+ newmodels = self._client.list(hostid=hostid)\r
+ return [commands.UpdatePserverEth(data=m, parentid=hostid)\r
+ for m in newmodels]\r
logger = o2logging.get_logger(__name__)\r
\r
\r
-class PServerPortWatcher(ResourceWatcher):\r
+class PServerIfPortWatcher(ResourceWatcher):\r
def __init__(self, client: BaseClient,\r
bus: MessageBus) -> None:\r
super().__init__(client, bus)\r
\r
def _targetname(self):\r
- return "pserver_port"\r
+ return "pserver_if_port"\r
\r
def _probe(self, parent: StxGenericModel):\r
- hostid = parent.id\r
- newmodels = self._client.list(hostid=hostid)\r
- return [commands.UpdatePserverPort(data=m, parentid=hostid)\r
+ interfaceid = parent.id\r
+ newmodels = self._client.list(interfaceid=interfaceid)\r
+ return [commands.UpdatePserverIfPort(data=m, parentid=interfaceid)\r
for m in newmodels]\r
description='Resource ID'),
'resourceTypeId': fields.String,
'resourcePoolId': fields.String,
+ 'name': fields.String,
'parentId': fields.String,
'description': fields.String,
}
)
- resource_get = api_ims_inventory_v1.model(
- "ResourceGetDto",
- {
+ def recursive_resource_mapping(iteration_number=2):
+ resource_json_mapping = {
'resourceId': fields.String(required=True,
description='Resource ID'),
'resourceTypeId': fields.String,
'resourcePoolId': fields.String,
+ 'name': fields.String,
'parentId': fields.String,
'description': fields.String,
+ 'elements': fields.String,
}
- )
+ if iteration_number:
+ resource_json_mapping['children'] = fields.List(
+ fields.Nested(ResourceDTO.recursive_resource_mapping(
+ iteration_number-1)))
+ return api_ims_inventory_v1.model(
+ 'ResourceGetDto' + str(iteration_number), resource_json_mapping)
+
+ def _recursive_resource_mapping(self, iteration_number=2):
+ resource_json_mapping = {
+ 'resourceId': fields.String(required=True,
+ description='Resource ID'),
+ 'resourceTypeId': fields.String,
+ 'resourcePoolId': fields.String,
+ 'name': fields.String,
+ 'parentId': fields.String,
+ 'description': fields.String,
+ }
+ if iteration_number:
+ resource_json_mapping['children'] = fields.List(
+ fields.Nested(self._recursive_resource_mapping(
+ iteration_number-1)))
+ # print(type(resource_json_mapping['children']))
+ if resource_json_mapping['children'] is None:
+ del resource_json_mapping['children']
+ return resource_json_mapping
+
+ def get_resource_get(self):
+ return api_ims_inventory_v1.model(
+ 'ResourceGetDto',
+ {
+ 'resourceId': fields.String(required=True,
+ description='Resource ID'),
+ 'resourceTypeId': fields.String,
+ 'resourcePoolId': fields.String,
+ 'name': fields.String,
+ 'parentId': fields.String,
+ 'description': fields.String,
+ 'children': fields.List(fields.Nested(
+ self._recursive_resource_mapping()))
+ }
+ )
class DeploymentManagerDTO:
@api_ims_inventory_v1.response(404, 'Resource not found')
class ResourceGetRouter(Resource):
- model = ResourceDTO.resource_get
+ # dto = ResourceDTO()
+ # model = dto.get_resource_get()
+ model = ResourceDTO.recursive_resource_mapping()
@api_ims_inventory_v1.doc('Get resource')
@api_ims_inventory_v1.marshal_with(model)
# limitations under the License.\r
\r
import uuid\r
-from sqlalchemy import select\r
\r
-from o2ims.adapter.orm import ocloud, resource, resourcetype, \\r
- resourcepool, deploymentmanager, subscription\r
from o2common.service import unit_of_work\r
from o2ims.views.ocloud_dto import SubscriptionDTO\r
from o2ims.domain.ocloud import Subscription\r
\r
def oclouds(uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(select(ocloud))\r
- return [dict(r) for r in res]\r
+ li = uow.oclouds.list()\r
+ return [r.serialize() for r in li]\r
\r
\r
def ocloud_one(ocloudid: str, uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(\r
- select(ocloud).where(ocloud.c.oCloudId == ocloudid))\r
- first = res.first()\r
- return None if first is None else dict(first)\r
+ first = uow.oclouds.get(ocloudid)\r
+ return first.serialize() if first is not None else None\r
\r
\r
def resource_types(uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(select(resourcetype))\r
- return [dict(r) for r in res]\r
+ li = uow.resource_types.list()\r
+ return [r.serialize() for r in li]\r
\r
\r
def resource_type_one(resourceTypeId: str,\r
uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(select(resourcetype).where(\r
- resourcetype.c.resourceTypeId == resourceTypeId))\r
- first = res.first()\r
- return None if first is None else dict(first)\r
+ first = uow.resource_types.get(resourceTypeId)\r
+ print(first)\r
+ return first.serialize() if first is not None else None\r
\r
\r
def resource_pools(uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(select(resourcepool))\r
- return [dict(r) for r in res]\r
+ li = uow.resource_pools.list()\r
+ return [r.serialize() for r in li]\r
\r
\r
def resource_pool_one(resourcePoolId: str,\r
uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(select(resourcepool).where(\r
- resourcepool.c.resourcePoolId == resourcePoolId))\r
- first = res.first()\r
- return None if first is None else dict(first)\r
+ first = uow.resource_pools.get(resourcePoolId)\r
+ return first.serialize() if first is not None else None\r
\r
\r
def resources(resourcePoolId: str, uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(select(resource).where(\r
- resource.c.resourcePoolId == resourcePoolId))\r
- return [dict(r) for r in res]\r
+ li = uow.resources.list(resourcePoolId)\r
+ return [r.serialize() for r in li]\r
\r
\r
def resource_one(resourceId: str, uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- # topq = uow.session.query(resource).filter(\r
- # resource.c.resourceId == resourceId).cte('cte', recursive=True)\r
- # bootomq = uow.session.query(resource).join(\r
- # topq, resource.c.parentId == topq.c.resourceId)\r
- # res = uow.session.query(topq.union(bootomq))\r
- # print(res)\r
- res = uow.session.execute(select(resource).where(\r
- resource.c.resourceId == resourceId))\r
- first = res.first()\r
- return None if first is None else dict(first)\r
+ first = uow.resources.get(resourceId)\r
+ return first.serialize() if first is not None else None\r
\r
\r
def deployment_managers(uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(select(deploymentmanager))\r
- return [dict(r) for r in res]\r
+ li = uow.deployment_managers.list()\r
+ return [r.serialize() for r in li]\r
\r
\r
def deployment_manager_one(deploymentManagerId: str,\r
uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(select(deploymentmanager).where(\r
- deploymentmanager.c.deploymentManagerId == deploymentManagerId))\r
- first = res.first()\r
- return None if first is None else dict(first)\r
+ first = uow.deployment_managers.get(deploymentManagerId)\r
+ return first.serialize() if first is not None else None\r
\r
\r
def subscriptions(uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(select(subscription))\r
- return [dict(r) for r in res]\r
+ li = uow.subscriptions.list()\r
+ return [r.serialize() for r in li]\r
\r
\r
def subscription_one(subscriptionId: str,\r
uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
- res = uow.session.execute(select(subscription).where(\r
- subscription.c.subscriptionId == subscriptionId))\r
- first = res.first()\r
- return None if first is None else dict(first)\r
+ first = uow.subscriptions.get(subscriptionId)\r
+ return first.serialize() if first is not None else None\r
\r
\r
def subscription_create(subscriptionDto: SubscriptionDTO.subscription,\r
assert mem1.id == mem2.id\r
\r
\r
+def test_get_eth_list(real_stx_aio_client):\r
+ stxSaClientImp = StxSaClientImp(real_stx_aio_client)\r
+ assert stxSaClientImp is not None\r
+ hostlist = stxSaClientImp.getPserverList()\r
+ assert len(hostlist) > 0\r
+\r
+ ethlist = stxSaClientImp.getEthernetList(hostid=hostlist[0].id)\r
+ assert len(ethlist) > 0\r
+ eth1 = ethlist[0]\r
+ eth2 = stxSaClientImp.getEthernet(eth1.id)\r
+ assert eth1 != eth2\r
+ assert eth1.id == eth2.id\r
+\r
+\r
def test_get_if_list(real_stx_aio_client):\r
stxSaClientImp = StxSaClientImp(real_stx_aio_client)\r
assert stxSaClientImp is not None\r
assert if1.id == if2.id\r
\r
\r
-def test_get_port_list(real_stx_aio_client):\r
+def test_get_if_port_list(real_stx_aio_client):\r
stxSaClientImp = StxSaClientImp(real_stx_aio_client)\r
assert stxSaClientImp is not None\r
hostlist = stxSaClientImp.getPserverList()\r
assert len(hostlist) > 0\r
\r
- portlist = stxSaClientImp.getPortList(hostid=hostlist[0].id)\r
+ iflist = stxSaClientImp.getIfList(hostid=hostlist[0].id)\r
+ assert len(iflist) > 0\r
+\r
+ portlist = stxSaClientImp.getPortList(interfaceid=iflist[0].id)\r
assert len(portlist) > 0\r
port1 = portlist[0]\r
port2 = stxSaClientImp.getPort(port1.id)\r
resource_type_id1 = str(uuid.uuid4())
resource_pool_id1 = str(uuid.uuid4())
resource1 = ocloud.Resource(
- resource_id1, resource_type_id1, resource_pool_id1)
+ resource_id1, resource_type_id1, resource_pool_id1, 'resource1')
repo.add(resource1)
assert repo.get(resource_id1) == resource1
resource_type_id1 = str(uuid.uuid4())
resource_pool_id1 = str(uuid.uuid4())
resource1 = ocloud.Resource(
- resource_id1, resource_type_id1, resource_pool_id1)
+ resource_id1, resource_type_id1, resource_pool_id1, 'resource1')
uow.resources.add(resource1)
uow.commit()
resource_type_id1 = str(uuid.uuid4())
resource_pool_id1 = str(uuid.uuid4())
resource1 = ocloud.Resource(
- resource_id1, resource_type_id1, resource_pool_id1)
+ resource_id1, resource_type_id1, resource_pool_id1, 'resource1')
with sqlite_uow as uow:
uow.resources.add(resource1)
uow.commit()
resource_type_id1 = str(uuid.uuid4())
resource_pool_id1 = str(uuid.uuid4())
resource1 = ocloud.Resource(
- resource_id1, resource_type_id1, resource_pool_id1)
+ resource_id1, resource_type_id1, resource_pool_id1, 'resource1')
# Query return None
resource_res = ocloud_view.resource_one(resource_id1, sqlite_uow)
# limitations under the License.
import uuid
+from unittest.mock import MagicMock
from o2ims.domain import ocloud
from o2ims.domain import resource_type as rt
resource_type_id1 = str(uuid.uuid4())
resource_pool_id1 = str(uuid.uuid4())
resource1 = ocloud.Resource(
- resource_id1, resource_type_id1, resource_pool_id1)
+ resource_id1, resource_type_id1, resource_pool_id1, 'resource1')
assert resource_id1 is not None and resource1.resourceId == resource_id1
session, uow = mock_uow
ocloud1_UUID = str(uuid.uuid4)
- session.return_value.execute.return_value = [
- {"oCloudId": ocloud1_UUID}]
+ ocloud1 = MagicMock()
+ ocloud1.serialize.return_value = {
+ 'oCloudId': ocloud1_UUID, 'name': 'ocloud1'}
+ session.return_value.query.return_value = [ocloud1]
ocloud_list = ocloud_view.oclouds(uow)
- assert str(ocloud_list[0].get("oCloudId")) == ocloud1_UUID
+ # assert str(ocloud_list[0].get("oCloudId")) == ocloud1_UUID
+ assert len(ocloud_list) == 1
def test_view_olcoud_one(mock_uow):
session, uow = mock_uow
ocloud1_UUID = str(uuid.uuid4)
- session.return_value.execute.return_value.first.return_value = None
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
# Query return None
ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
assert ocloud_res is None
- session.return_value.execute.return_value.first.return_value = {
- "oCloudId": ocloud1_UUID}
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "oCloudId": ocloud1_UUID}
ocloud_res = ocloud_view.ocloud_one(ocloud1_UUID, uow)
assert str(ocloud_res.get("oCloudId")) == ocloud1_UUID
session, uow = mock_uow
resource_type_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value = [
- {"resourceTypeId": resource_type_id1}
- ]
+ restype1 = MagicMock()
+ restype1.serialize.return_value = {
+ "resourceTypeId": resource_type_id1}
+ session.return_value.query.return_value = [restype1]
resource_type_list = ocloud_view.resource_types(uow)
assert str(resource_type_list[0].get(
session, uow = mock_uow
resource_type_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value.first.return_value = None
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
# Query return None
resource_type_res = ocloud_view.resource_type_one(
resource_type_id1, uow)
assert resource_type_res is None
- session.return_value.execute.return_value.first.return_value = {
- "resourceTypeId": resource_type_id1}
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "resourceTypeId": resource_type_id1}
resource_type_res = ocloud_view.resource_type_one(resource_type_id1, uow)
assert str(resource_type_res.get("resourceTypeId")) == resource_type_id1
session, uow = mock_uow
resource_pool_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value = [
- {"resourcePoolId": resource_pool_id1}
- ]
+ respool1 = MagicMock()
+ respool1.serialize.return_value = {
+ "resourcePoolId": resource_pool_id1}
+ session.return_value.query.return_value = [respool1]
resource_pool_list = ocloud_view.resource_pools(uow)
assert str(resource_pool_list[0].get(
session, uow = mock_uow
resource_pool_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value.first.return_value = None
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
# Query return None
resource_pool_res = ocloud_view.resource_pool_one(
resource_pool_id1, uow)
assert resource_pool_res is None
- session.return_value.execute.return_value.first.return_value = {
- "resourcePoolId": resource_pool_id1
- }
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "resourcePoolId": resource_pool_id1
+ }
resource_pool_res = ocloud_view.resource_pool_one(resource_pool_id1, uow)
assert str(resource_pool_res.get("resourcePoolId")) == resource_pool_id1
resource_id1 = str(uuid.uuid4())
resource_pool_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value = [{
+ res1 = MagicMock()
+ res1.serialize.return_value = {
"resourceId": resource_id1,
"resourcePoolId": resource_pool_id1
- }]
+ }
+ session.return_value.query.return_value.filter_by.return_value = [res1]
resource_list = ocloud_view.resources(resource_pool_id1, uow)
assert str(resource_list[0].get("resourceId")) == resource_id1
resource_id1 = str(uuid.uuid4())
resource_pool_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value.first.return_value = None
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
# Query return None
resource_res = ocloud_view.resource_one(resource_id1, uow)
assert resource_res is None
- session.return_value.execute.return_value.first.return_value = {
- "resourceId": resource_id1,
- "resourcePoolId": resource_pool_id1
- }
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "resourceId": resource_id1,
+ "resourcePoolId": resource_pool_id1
+ }
resource_res = ocloud_view.resource_one(resource_id1, uow)
assert str(resource_res.get("resourceId")) == resource_id1
session, uow = mock_uow
deployment_manager_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value = [{
+ dm1 = MagicMock()
+ dm1.serialize.return_value = {
"deploymentManagerId": deployment_manager_id1,
- }]
+ }
+ session.return_value.query.return_value = [dm1]
deployment_manager_list = ocloud_view.deployment_managers(uow)
assert str(deployment_manager_list[0].get(
session, uow = mock_uow
deployment_manager_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value.first.return_value = None
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
# Query return None
deployment_manager_res = ocloud_view.deployment_manager_one(
deployment_manager_id1, uow)
assert deployment_manager_res is None
- session.return_value.execute.return_value.first.return_value = {
- "deploymentManagerId": deployment_manager_id1,
- }
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "deploymentManagerId": deployment_manager_id1,
+ }
deployment_manager_res = ocloud_view.deployment_manager_one(
deployment_manager_id1, uow)
session, uow = mock_uow
subscription_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value = [{
+ sub1 = MagicMock()
+ sub1.serialize.return_value = {
"subscriptionId": subscription_id1,
- }]
+ }
+ session.return_value.query.return_value = [sub1]
subscription_list = ocloud_view.subscriptions(uow)
assert str(subscription_list[0].get(
session, uow = mock_uow
subscription_id1 = str(uuid.uuid4())
- session.return_value.execute.return_value.first.return_value = None
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
# Query return None
subscription_res = ocloud_view.subscription_one(
subscription_id1, uow)
assert subscription_res is None
- session.return_value.execute.return_value.first.return_value = {
- "subscriptionId": subscription_id1,
- }
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = {
+ "subscriptionId": subscription_id1,
+ }
subscription_res = ocloud_view.subscription_one(
subscription_id1, uow)
def test_flask_get_list(mock_flask_uow):
session, app = mock_flask_uow
- session.return_value.execute.return_value = []
+ session.query.return_value = []
apibase = config.get_o2ims_api_base()
with app.test_client() as client:
def test_flask_get_one(mock_flask_uow):
session, app = mock_flask_uow
- session.return_value.execute.return_value.first.return_value = None
+
+ session.return_value.query.return_value.filter_by.return_value.\
+ first.return_value = None
apibase = config.get_o2ims_api_base()
with app.test_client() as client: