X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=inline;f=o2ims%2Fdomain%2Focloud_repo.py;h=b224a68123cc07f60c7a7e264a3375e47e7369ab;hb=refs%2Fchanges%2F82%2F7282%2F1;hp=f3b43d2a29e97704401078e9d6096222398ad946;hpb=b21c925328b0c700fde9001b7a56ea55decdf088;p=pti%2Fo2.git diff --git a/o2ims/domain/ocloud_repo.py b/o2ims/domain/ocloud_repo.py index f3b43d2..b224a68 100644 --- a/o2ims/domain/ocloud_repo.py +++ b/o2ims/domain/ocloud_repo.py @@ -136,8 +136,8 @@ class ResourceRepository(abc.ABC): self.seen.add(resource) return resource - def list(self) -> List[ocloud.Resource]: - return self._list() + def list(self, resourcepool_id) -> List[ocloud.Resource]: + return self._list(resourcepool_id) def update(self, resource: ocloud.Resource): self._update(resource) @@ -154,3 +154,76 @@ class ResourceRepository(abc.ABC): @abc.abstractmethod def _update(self, resource: ocloud.Resource): raise NotImplementedError + + +class DeploymentManagerRepository(abc.ABC): + def __init__(self): + self.seen = set() # type: Set[ocloud.DeploymentManager] + + def add(self, deployment_manager: ocloud.DeploymentManager): + self._add(deployment_manager) + self.seen.add(deployment_manager) + + def get(self, deployment_manager_id) -> ocloud.DeploymentManager: + deployment_manager = self._get(deployment_manager_id) + if deployment_manager: + self.seen.add(deployment_manager) + return deployment_manager + + def list(self) -> List[ocloud.DeploymentManager]: + return self._list() + + def update(self, deployment_manager: ocloud.DeploymentManager): + self._update(deployment_manager) + + @abc.abstractmethod + def _add(self, deployment_manager: ocloud.DeploymentManager): + raise NotImplementedError + + @abc.abstractmethod + def _get(self, deployment_manager_id) -> ocloud.DeploymentManager: + raise NotImplementedError + + @abc.abstractmethod + def _update(self, deployment_manager: ocloud.DeploymentManager): + raise NotImplementedError + + +class SubscriptionRepository(abc.ABC): + def __init__(self): + self.seen = set() # type: Set[ocloud.Subscription] + + def add(self, subscription: ocloud.Subscription): + self._add(subscription) + self.seen.add(subscription) + + def get(self, subscription_id) -> ocloud.Subscription: + subscription = self._get(subscription_id) + if subscription: + self.seen.add(subscription) + return subscription + + def list(self) -> List[ocloud.Subscription]: + return self._list() + + def update(self, subscription: ocloud.Subscription): + self._update(subscription) + + def delete(self, subscription_id): + self._delete(subscription_id) + + @abc.abstractmethod + def _add(self, subscription: ocloud.Subscription): + raise NotImplementedError + + @abc.abstractmethod + def _get(self, subscription_id) -> ocloud.Subscription: + raise NotImplementedError + + @abc.abstractmethod + def _update(self, subscription: ocloud.Subscription): + raise NotImplementedError + + @abc.abstractmethod + def _delete(self, subscription_id): + raise NotImplementedError