--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+from typing import List\r
+from .events import Event\r
+\r
+\r
+class AgRoot:\r
+ def __init__(self) -> None:\r
+ self.events = [] # type: List[Event]\r
# limitations under the License.\r
\r
from __future__ import annotations\r
+\r
+from o2ims.domain.base import AgRoot\r
# from dataclasses import dataclass\r
# from datetime import date\r
# from typing import Optional, List, Set\r
# from uuid import UUID\r
\r
\r
-class Subscription:\r
+class Subscription(AgRoot):\r
def __init__(self, id: str, callback: str, consumersubid: list = [],\r
filter: list = []) -> None:\r
self.subscriptionId = id\r
self.extensions = []\r
\r
\r
-class ResourcePool:\r
+class ResourcePool(AgRoot):\r
def __init__(self, id: str, name: str, location: str,\r
ocloudid: str, gLocationId: str = '',\r
description: str = '') -> None:\r
self.extensions = []\r
\r
\r
-class ResourceType:\r
+class ResourceType(AgRoot):\r
def __init__(self, typeid: str, name: str, typeEnum: ResourceTypeEnum,\r
ocloudid: str, vender: str = '', model: str = '',\r
version: str = '',\r
self.extensions = []\r
\r
\r
-class Resource:\r
+class Resource(AgRoot):\r
def __init__(self, resourceId: str, resourceTypeId: str,\r
resourcePoolId: str, oCloudId: str = '',\r
parentId: str = '', elements: list = [],\r
self.extensions = []\r
\r
\r
-class Ocloud:\r
+class Ocloud(AgRoot):\r
def __init__(self, ocloudid: str, name: str, imsendpoint: str,\r
globalcloudId: str = '',\r
description: str = '', version_number: int = 0) -> None:\r
-\r
+ super().__init__()\r
self.oCloudId = ocloudid\r
self.globalcloudId = globalcloudId\r
self.version_number = version_number\r
self.deploymentManagers = []\r
self.resourceTypes = []\r
self.extensions = []\r
- self.events = []\r
+ # self.events = []\r
\r
def addDeploymentManager(self,\r
deploymentManager: DeploymentManager):\r
\r
def update(self, ocloud: ocloud.Ocloud):\r
self._update(ocloud)\r
+ self.seen.add(ocloud)\r
\r
# def update_fields(self, ocloudid: str, updatefields: dict):\r
# self._update(ocloudid, updatefields)\r
\r
def update(self, resource_type: ocloud.ResourceType):\r
self._update(resource_type)\r
+ self.seen.add(resource_type)\r
\r
@abc.abstractmethod\r
def _add(self, resource_type: ocloud.ResourceType):\r
\r
def update(self, resource_pool: ocloud.ResourcePool):\r
self._update(resource_pool)\r
+ self.seen.add(resource_pool)\r
\r
@abc.abstractmethod\r
def _add(self, resource_pool: ocloud.ResourcePool):\r
\r
def update(self, resource: ocloud.Resource):\r
self._update(resource)\r
+ self.seen.add(resource)\r
\r
@abc.abstractmethod\r
def _add(self, resource: ocloud.Resource):\r
# from dataclasses import dataclass\r
import datetime\r
import json\r
+from o2ims.domain.base import AgRoot\r
\r
from o2ims.domain.resource_type import ResourceTypeEnum\r
from o2common.helper import o2logging\r
pass\r
\r
\r
-class StxGenericModel:\r
+class StxGenericModel(AgRoot):\r
def __init__(self, type: ResourceTypeEnum,\r
api_response: dict = None, content_hash=None) -> None:\r
if api_response:\r
\r
def update(self, stx_obj: StxGenericModel):\r
self._update(stx_obj)\r
+ self.seen.add(stx_obj)\r
\r
# def update_fields(self, stx_obj_id: str, updatefields: dict):\r
# self._update(stx_obj_id, updatefields)\r
from __future__ import annotations
import abc
-from o2ims.domain.ocloud_repo import OcloudRepository
+from o2ims.domain.ocloud_repo import OcloudRepository,\
+ ResourcePoolRepository, ResourceRepository, ResourceTypeRepository
from o2ims.domain.stx_repo import StxObjectRepository
class AbstractUnitOfWork(abc.ABC):
oclouds: OcloudRepository
+ resource_types: ResourceTypeRepository
+ resource_pools: ResourcePoolRepository
+ resources: ResourceRepository
stxobjects: StxObjectRepository
def __enter__(self):
self._commit()
def collect_new_events(self):
- for ocloud in self.oclouds.seen:
- while ocloud.events:
- yield ocloud.events.pop(0)
+ for entry in self.oclouds.seen:
+ while entry.events:
+ yield entry.events.pop(0)
+ for entry in self.resource_pools.seen:
+ while entry.events:
+ yield entry.events.pop(0)
+ for entry in self.resources.seen:
+ while entry.events:
+ yield entry.events.pop(0)
+ for entry in self.resource_types.seen:
+ while entry.events:
+ yield entry.events.pop(0)
+ for entry in self.stxobjects.seen:
+ while entry.events:
+ yield entry.events.pop(0)
@abc.abstractmethod
def _commit(self):