+
+
+class DeploymentManagerSqlAlchemyRepository(DeploymentManagerRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, deployment_manager: ocloud.DeploymentManager):
+ self.session.add(deployment_manager)
+
+ def _get(self, deployment_manager_id) -> ocloud.DeploymentManager:
+ return self.session.query(ocloud.DeploymentManager).filter_by(
+ deploymentManagerId=deployment_manager_id).first()
+
+ def _list(self) -> List[ocloud.DeploymentManager]:
+ return self.session.query(ocloud.DeploymentManager)
+
+ def _update(self, deployment_manager: ocloud.DeploymentManager):
+ self.session.add(deployment_manager)
+
+
+class SubscriptionSqlAlchemyRepository(SubscriptionRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, subscription: subscription_obj.Subscription):
+ self.session.add(subscription)
+
+ def _get(self, subscription_id) -> subscription_obj.Subscription:
+ return self.session.query(subscription_obj.Subscription).filter_by(
+ subscriptionId=subscription_id).first()
+
+ def _list(self) -> List[subscription_obj.Subscription]:
+ return self.session.query(subscription_obj.Subscription)
+
+ def _update(self, subscription: subscription_obj.Subscription):
+ self.session.add(subscription)
+
+ def _delete(self, subscription_id):
+ self.session.query(subscription_obj.Subscription).filter_by(
+ subscriptionId=subscription_id).delete()
+
+
+class ConfigurationSqlAlchemyRepository(ConfigurationRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, configuration: configuration_obj.Configuration):
+ self.session.add(configuration)
+
+ def _get(self, configuration_id) -> configuration_obj.Configuration:
+ return self.session.query(configuration_obj.Configuration).filter_by(
+ configurationId=configuration_id).first()
+
+ def _list(self) -> List[configuration_obj.Configuration]:
+ return self.session.query(configuration_obj.Configuration)
+
+ def _update(self, configuration: configuration_obj.Configuration):
+ self.session.add(configuration)
+
+ def _delete(self, configuration_id):
+ self.session.query(configuration_obj.Configuration).filter_by(
+ configurationId=configuration_id).delete()