Add the PATCH method for the monitoring API
[pti/o2.git] / o2ims / adapter / alarm_repository.py
index ef20e6a..ebfe816 100644 (file)
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-from typing import List
+from typing import List, Tuple
 
 from o2ims.domain import alarm_obj
 from o2ims.domain.alarm_repo import AlarmDefinitionRepository, \
     AlarmEventRecordRepository, AlarmSubscriptionRepository, \
-    AlarmProbableCauseRepository
+    AlarmProbableCauseRepository, AlarmDictionaryRepository
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
 
@@ -34,11 +34,20 @@ class AlarmEventRecordSqlAlchemyRepository(AlarmEventRecordRepository):
         return self.session.query(alarm_obj.AlarmEventRecord).filter_by(
             alarmEventRecordId=alarm_event_record_id).first()
 
-    def _list(self) -> List[alarm_obj.AlarmEventRecord]:
-        return self.session.query(alarm_obj.AlarmEventRecord)
+    def _list(self, *args, **kwargs) -> Tuple[
+            int, List[alarm_obj.AlarmEventRecord]]:
+        size = kwargs.pop('limit') if 'limit' in kwargs else None
+        offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+        result = self.session.query(alarm_obj.AlarmEventRecord).filter(
+            *args).order_by('alarmEventRecordId')
+        count = result.count()
+        if size is not None and size != -1:
+            return (count, result.limit(size).offset(offset))
+        return (count, result)
 
     def _update(self, alarm_event_record: alarm_obj.AlarmEventRecord):
-        self.session.add(alarm_event_record)
+        self.session.merge(alarm_event_record)
 
     def _delete(self, alarm_event_record_id):
         self.session.query(alarm_obj.AlarmEventRecord).filter_by(
@@ -68,6 +77,26 @@ class AlarmDefinitionSqlAlchemyRepository(AlarmDefinitionRepository):
             alarmDefinitionId=alarm_definition_id).delete()
 
 
+class AlarmDictionarySqlAlchemyRepository(AlarmDictionaryRepository):
+    def __init__(self, session):
+        super().__init__()
+        self.session = session
+
+    def _add(self, alarm_dict: alarm_obj.AlarmDictionary):
+        self.session.add(alarm_dict)
+
+    def _get(self, dictionary_id) -> alarm_obj.AlarmDictionary:
+        return self.session.query(alarm_obj.AlarmDictionary).filter_by(
+            id=dictionary_id).first()
+
+    def _list(self) -> List[alarm_obj.AlarmDictionary]:
+        return self.session.query(alarm_obj.AlarmDictionary)
+
+    def _delete(self, dictionary_id):
+        self.session.query(alarm_obj.AlarmDictionary).filter_by(
+            id=dictionary_id).delete()
+
+
 class AlarmSubscriptionSqlAlchemyRepository(AlarmSubscriptionRepository):
     def __init__(self, session):
         super().__init__()
@@ -80,8 +109,17 @@ class AlarmSubscriptionSqlAlchemyRepository(AlarmSubscriptionRepository):
         return self.session.query(alarm_obj.AlarmSubscription).filter_by(
             alarmSubscriptionId=subscription_id).first()
 
-    def _list(self) -> List[alarm_obj.AlarmSubscription]:
-        return self.session.query(alarm_obj.AlarmSubscription)
+    def _list(self, *args, **kwargs) -> Tuple[
+            int, List[alarm_obj.AlarmSubscription]]:
+        size = kwargs.pop('limit') if 'limit' in kwargs else None
+        offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+        result = self.session.query(alarm_obj.AlarmSubscription).filter(
+            *args).order_by('alarmSubscriptionId')
+        count = result.count()
+        if size is not None and size != -1:
+            return (count, result.limit(size).offset(offset))
+        return (count, result)
 
     def _update(self, subscription: alarm_obj.AlarmSubscription):
         self.session.add(subscription)