+class AlarmDictionarySqlAlchemyRepository(AlarmDictionaryRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, alarm_dict: alarm_obj.AlarmDictionary):
+ self.session.add(alarm_dict)
+
+ def _get(self, dictionary_id) -> alarm_obj.AlarmDictionary:
+ return self.session.query(alarm_obj.AlarmDictionary).filter_by(
+ id=dictionary_id).first()
+
+ def _list(self) -> List[alarm_obj.AlarmDictionary]:
+ return self.session.query(alarm_obj.AlarmDictionary)
+
+ def _delete(self, dictionary_id):
+ self.session.query(alarm_obj.AlarmDictionary).filter_by(
+ id=dictionary_id).delete()
+
+