1 # Copyright (C) 2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from typing import List, Tuple
17 from o2ims.domain import alarm_obj
18 from o2ims.domain.alarm_repo import AlarmDefinitionRepository, \
19 AlarmEventRecordRepository, AlarmSubscriptionRepository, \
20 AlarmProbableCauseRepository
21 from o2common.helper import o2logging
22 logger = o2logging.get_logger(__name__)
25 class AlarmEventRecordSqlAlchemyRepository(AlarmEventRecordRepository):
26 def __init__(self, session):
28 self.session = session
30 def _add(self, alarm_event_record: alarm_obj.AlarmEventRecord):
31 self.session.add(alarm_event_record)
33 def _get(self, alarm_event_record_id) -> alarm_obj.AlarmEventRecord:
34 return self.session.query(alarm_obj.AlarmEventRecord).filter_by(
35 alarmEventRecordId=alarm_event_record_id).first()
37 def _list(self, **kwargs) -> Tuple[int, List[alarm_obj.AlarmEventRecord]]:
38 size = kwargs.pop('limit') if 'limit' in kwargs else None
39 offset = kwargs.pop('start') if 'start' in kwargs else 0
41 result = self.session.query(alarm_obj.AlarmEventRecord).filter_by(
42 **kwargs).order_by('alarmEventRecordId')
43 count = result.count()
44 if size is not None and size != -1:
45 return (count, result.limit(size).offset(offset))
46 return (count, result)
48 def _update(self, alarm_event_record: alarm_obj.AlarmEventRecord):
49 self.session.add(alarm_event_record)
51 def _delete(self, alarm_event_record_id):
52 self.session.query(alarm_obj.AlarmEventRecord).filter_by(
53 alarmEventRecordId=alarm_event_record_id).delete()
56 class AlarmDefinitionSqlAlchemyRepository(AlarmDefinitionRepository):
57 def __init__(self, session):
59 self.session = session
61 def _add(self, definition: alarm_obj.AlarmDefinition):
62 self.session.add(definition)
64 def _get(self, definition_id) -> alarm_obj.AlarmDefinition:
65 return self.session.query(alarm_obj.AlarmDefinition).filter_by(
66 alarmDefinitionId=definition_id).first()
68 def _list(self) -> List[alarm_obj.AlarmDefinition]:
69 return self.session.query(alarm_obj.AlarmDefinition)
71 def _update(self, definition: alarm_obj.AlarmDefinition):
72 self.session.add(definition)
74 def _delete(self, alarm_definition_id):
75 self.session.query(alarm_obj.AlarmDefinition).filter_by(
76 alarmDefinitionId=alarm_definition_id).delete()
79 class AlarmSubscriptionSqlAlchemyRepository(AlarmSubscriptionRepository):
80 def __init__(self, session):
82 self.session = session
84 def _add(self, subscription: alarm_obj.AlarmSubscription):
85 self.session.add(subscription)
87 def _get(self, subscription_id) -> alarm_obj.AlarmSubscription:
88 return self.session.query(alarm_obj.AlarmSubscription).filter_by(
89 alarmSubscriptionId=subscription_id).first()
91 def _list(self, **kwargs) -> Tuple[int, List[alarm_obj.AlarmSubscription]]:
92 size = kwargs.pop('limit') if 'limit' in kwargs else None
93 offset = kwargs.pop('start') if 'start' in kwargs else 0
95 result = self.session.query(alarm_obj.AlarmSubscription).filter_by(
96 **kwargs).order_by('alarmSubscriptionId')
97 count = result.count()
98 if size is not None and size != -1:
99 return (count, result.limit(size).offset(offset))
100 return (count, result)
102 def _update(self, subscription: alarm_obj.AlarmSubscription):
103 self.session.add(subscription)
105 def _delete(self, alarm_subscription_id):
106 self.session.query(alarm_obj.AlarmSubscription).filter_by(
107 alarmSubscriptionId=alarm_subscription_id).delete()
110 class AlarmProbableCauseSqlAlchemyRepository(AlarmProbableCauseRepository):
111 def __init__(self, session):
113 self.session = session
115 def _add(self, probable_cause: alarm_obj.ProbableCause):
116 self.session.add(probable_cause)
118 def _get(self, probable_cause_id) -> alarm_obj.ProbableCause:
119 return self.session.query(alarm_obj.ProbableCause).filter_by(
120 probableCauseId=probable_cause_id).first()
122 def _list(self) -> List[alarm_obj.ProbableCause]:
123 return self.session.query(alarm_obj.ProbableCause)
125 def _update(self, probable_cause: alarm_obj.ProbableCause):
126 self.session.add(probable_cause)
128 def _delete(self, probable_cause_id):
129 self.session.query(alarm_obj.ProbableCause).filter_by(
130 probableCauseId=probable_cause_id).delete()