1 # Copyright (C) 2022-2024 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from typing import List, Tuple
17 from o2ims.domain import alarm_obj
18 from o2ims.domain.alarm_repo import AlarmDefinitionRepository, \
19 AlarmEventRecordRepository, AlarmSubscriptionRepository, \
20 AlarmProbableCauseRepository, AlarmDictionaryRepository
21 from o2common.helper import o2logging
22 logger = o2logging.get_logger(__name__)
25 class AlarmEventRecordSqlAlchemyRepository(AlarmEventRecordRepository):
26 def __init__(self, session):
28 self.session = session
30 def _add(self, alarm_event_record: alarm_obj.AlarmEventRecord):
31 self.session.add(alarm_event_record)
33 def _get(self, alarm_event_record_id) -> alarm_obj.AlarmEventRecord:
34 return self.session.query(alarm_obj.AlarmEventRecord).filter_by(
35 alarmEventRecordId=alarm_event_record_id).first()
37 def _list(self, *args, **kwargs) -> Tuple[
38 int, List[alarm_obj.AlarmEventRecord]]:
39 size = kwargs.pop('limit') if 'limit' in kwargs else None
40 offset = kwargs.pop('start') if 'start' in kwargs else 0
42 result = self.session.query(alarm_obj.AlarmEventRecord).filter(
43 *args).order_by('alarmEventRecordId')
44 count = result.count()
45 if size is not None and size != -1:
46 return (count, result.limit(size).offset(offset))
47 return (count, result)
49 def _update(self, alarm_event_record: alarm_obj.AlarmEventRecord):
50 self.session.merge(alarm_event_record)
52 def _delete(self, alarm_event_record_id):
53 self.session.query(alarm_obj.AlarmEventRecord).filter_by(
54 alarmEventRecordId=alarm_event_record_id).delete()
57 class AlarmDefinitionSqlAlchemyRepository(AlarmDefinitionRepository):
58 def __init__(self, session):
60 self.session = session
62 def _add(self, definition: alarm_obj.AlarmDefinition):
63 self.session.add(definition)
65 def _get(self, definition_id) -> alarm_obj.AlarmDefinition:
66 return self.session.query(alarm_obj.AlarmDefinition).filter_by(
67 alarmDefinitionId=definition_id).first()
69 def _list(self) -> List[alarm_obj.AlarmDefinition]:
70 return self.session.query(alarm_obj.AlarmDefinition)
72 def _update(self, definition: alarm_obj.AlarmDefinition):
73 self.session.add(definition)
75 def _delete(self, alarm_definition_id):
76 self.session.query(alarm_obj.AlarmDefinition).filter_by(
77 alarmDefinitionId=alarm_definition_id).delete()
80 class AlarmDictionarySqlAlchemyRepository(AlarmDictionaryRepository):
81 def __init__(self, session):
83 self.session = session
85 def _add(self, alarm_dict: alarm_obj.AlarmDictionary):
86 self.session.add(alarm_dict)
88 def _get(self, dictionary_id) -> alarm_obj.AlarmDictionary:
89 return self.session.query(alarm_obj.AlarmDictionary).filter_by(
90 id=dictionary_id).first()
92 def _list(self) -> List[alarm_obj.AlarmDictionary]:
93 return self.session.query(alarm_obj.AlarmDictionary)
95 def _delete(self, dictionary_id):
96 self.session.query(alarm_obj.AlarmDictionary).filter_by(
97 id=dictionary_id).delete()
100 class AlarmSubscriptionSqlAlchemyRepository(AlarmSubscriptionRepository):
101 def __init__(self, session):
103 self.session = session
105 def _add(self, subscription: alarm_obj.AlarmSubscription):
106 self.session.add(subscription)
108 def _get(self, subscription_id) -> alarm_obj.AlarmSubscription:
109 return self.session.query(alarm_obj.AlarmSubscription).filter_by(
110 alarmSubscriptionId=subscription_id).first()
112 def _list(self, *args, **kwargs) -> Tuple[
113 int, List[alarm_obj.AlarmSubscription]]:
114 size = kwargs.pop('limit') if 'limit' in kwargs else None
115 offset = kwargs.pop('start') if 'start' in kwargs else 0
117 result = self.session.query(alarm_obj.AlarmSubscription).filter(
118 *args).order_by('alarmSubscriptionId')
119 count = result.count()
120 if size is not None and size != -1:
121 return (count, result.limit(size).offset(offset))
122 return (count, result)
124 def _update(self, subscription: alarm_obj.AlarmSubscription):
125 self.session.add(subscription)
127 def _delete(self, alarm_subscription_id):
128 self.session.query(alarm_obj.AlarmSubscription).filter_by(
129 alarmSubscriptionId=alarm_subscription_id).delete()
132 class AlarmProbableCauseSqlAlchemyRepository(AlarmProbableCauseRepository):
133 def __init__(self, session):
135 self.session = session
137 def _add(self, probable_cause: alarm_obj.ProbableCause):
138 self.session.add(probable_cause)
140 def _get(self, probable_cause_id) -> alarm_obj.ProbableCause:
141 return self.session.query(alarm_obj.ProbableCause).filter_by(
142 probableCauseId=probable_cause_id).first()
144 def _list(self) -> List[alarm_obj.ProbableCause]:
145 return self.session.query(alarm_obj.ProbableCause)
147 def _update(self, probable_cause: alarm_obj.ProbableCause):
148 self.session.add(probable_cause)
150 def _delete(self, probable_cause_id):
151 self.session.query(alarm_obj.ProbableCause).filter_by(
152 probableCauseId=probable_cause_id).delete()