1 # Copyright (C) 2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from typing import List, Tuple
17 from o2ims.domain import alarm_obj
18 from o2ims.domain.alarm_repo import AlarmDefinitionRepository, \
19 AlarmEventRecordRepository, AlarmSubscriptionRepository, \
20 AlarmProbableCauseRepository
21 from o2common.helper import o2logging
22 logger = o2logging.get_logger(__name__)
25 class AlarmEventRecordSqlAlchemyRepository(AlarmEventRecordRepository):
26 def __init__(self, session):
28 self.session = session
30 def _add(self, alarm_event_record: alarm_obj.AlarmEventRecord):
31 self.session.add(alarm_event_record)
33 def _get(self, alarm_event_record_id) -> alarm_obj.AlarmEventRecord:
34 return self.session.query(alarm_obj.AlarmEventRecord).filter_by(
35 alarmEventRecordId=alarm_event_record_id).first()
37 def _list(self, *args, **kwargs) -> Tuple[
38 int, List[alarm_obj.AlarmEventRecord]]:
39 size = kwargs.pop('limit') if 'limit' in kwargs else None
40 offset = kwargs.pop('start') if 'start' in kwargs else 0
42 result = self.session.query(alarm_obj.AlarmEventRecord).filter(
43 *args).order_by('alarmEventRecordId')
44 count = result.count()
45 if size is not None and size != -1:
46 return (count, result.limit(size).offset(offset))
47 return (count, result)
49 def _update(self, alarm_event_record: alarm_obj.AlarmEventRecord):
50 self.session.add(alarm_event_record)
52 def _delete(self, alarm_event_record_id):
53 self.session.query(alarm_obj.AlarmEventRecord).filter_by(
54 alarmEventRecordId=alarm_event_record_id).delete()
57 class AlarmDefinitionSqlAlchemyRepository(AlarmDefinitionRepository):
58 def __init__(self, session):
60 self.session = session
62 def _add(self, definition: alarm_obj.AlarmDefinition):
63 self.session.add(definition)
65 def _get(self, definition_id) -> alarm_obj.AlarmDefinition:
66 return self.session.query(alarm_obj.AlarmDefinition).filter_by(
67 alarmDefinitionId=definition_id).first()
69 def _list(self) -> List[alarm_obj.AlarmDefinition]:
70 return self.session.query(alarm_obj.AlarmDefinition)
72 def _update(self, definition: alarm_obj.AlarmDefinition):
73 self.session.add(definition)
75 def _delete(self, alarm_definition_id):
76 self.session.query(alarm_obj.AlarmDefinition).filter_by(
77 alarmDefinitionId=alarm_definition_id).delete()
80 class AlarmSubscriptionSqlAlchemyRepository(AlarmSubscriptionRepository):
81 def __init__(self, session):
83 self.session = session
85 def _add(self, subscription: alarm_obj.AlarmSubscription):
86 self.session.add(subscription)
88 def _get(self, subscription_id) -> alarm_obj.AlarmSubscription:
89 return self.session.query(alarm_obj.AlarmSubscription).filter_by(
90 alarmSubscriptionId=subscription_id).first()
92 def _list(self, *args, **kwargs) -> Tuple[
93 int, List[alarm_obj.AlarmSubscription]]:
94 size = kwargs.pop('limit') if 'limit' in kwargs else None
95 offset = kwargs.pop('start') if 'start' in kwargs else 0
97 result = self.session.query(alarm_obj.AlarmSubscription).filter(
98 *args).order_by('alarmSubscriptionId')
99 count = result.count()
100 if size is not None and size != -1:
101 return (count, result.limit(size).offset(offset))
102 return (count, result)
104 def _update(self, subscription: alarm_obj.AlarmSubscription):
105 self.session.add(subscription)
107 def _delete(self, alarm_subscription_id):
108 self.session.query(alarm_obj.AlarmSubscription).filter_by(
109 alarmSubscriptionId=alarm_subscription_id).delete()
112 class AlarmProbableCauseSqlAlchemyRepository(AlarmProbableCauseRepository):
113 def __init__(self, session):
115 self.session = session
117 def _add(self, probable_cause: alarm_obj.ProbableCause):
118 self.session.add(probable_cause)
120 def _get(self, probable_cause_id) -> alarm_obj.ProbableCause:
121 return self.session.query(alarm_obj.ProbableCause).filter_by(
122 probableCauseId=probable_cause_id).first()
124 def _list(self) -> List[alarm_obj.ProbableCause]:
125 return self.session.query(alarm_obj.ProbableCause)
127 def _update(self, probable_cause: alarm_obj.ProbableCause):
128 self.session.add(probable_cause)
130 def _delete(self, probable_cause_id):
131 self.session.query(alarm_obj.ProbableCause).filter_by(
132 probableCauseId=probable_cause_id).delete()