50f9bb29f09315cedbd064cc3ef1313a21503308
[pti/o2.git] / o2ims / adapter / alarm_repository.py
1 # Copyright (C) 2022 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 from typing import List, Tuple
16
17 from o2ims.domain import alarm_obj
18 from o2ims.domain.alarm_repo import AlarmDefinitionRepository, \
19     AlarmEventRecordRepository, AlarmSubscriptionRepository, \
20     AlarmProbableCauseRepository
21 from o2common.helper import o2logging
22 logger = o2logging.get_logger(__name__)
23
24
25 class AlarmEventRecordSqlAlchemyRepository(AlarmEventRecordRepository):
26     def __init__(self, session):
27         super().__init__()
28         self.session = session
29
30     def _add(self, alarm_event_record: alarm_obj.AlarmEventRecord):
31         self.session.add(alarm_event_record)
32
33     def _get(self, alarm_event_record_id) -> alarm_obj.AlarmEventRecord:
34         return self.session.query(alarm_obj.AlarmEventRecord).filter_by(
35             alarmEventRecordId=alarm_event_record_id).first()
36
37     def _list(self, *args, **kwargs) -> Tuple[
38             int, List[alarm_obj.AlarmEventRecord]]:
39         size = kwargs.pop('limit') if 'limit' in kwargs else None
40         offset = kwargs.pop('start') if 'start' in kwargs else 0
41
42         result = self.session.query(alarm_obj.AlarmEventRecord).filter(
43             *args).order_by('alarmEventRecordId')
44         count = result.count()
45         if size is not None and size != -1:
46             return (count, result.limit(size).offset(offset))
47         return (count, result)
48
49     def _update(self, alarm_event_record: alarm_obj.AlarmEventRecord):
50         self.session.add(alarm_event_record)
51
52     def _delete(self, alarm_event_record_id):
53         self.session.query(alarm_obj.AlarmEventRecord).filter_by(
54             alarmEventRecordId=alarm_event_record_id).delete()
55
56
57 class AlarmDefinitionSqlAlchemyRepository(AlarmDefinitionRepository):
58     def __init__(self, session):
59         super().__init__()
60         self.session = session
61
62     def _add(self, definition: alarm_obj.AlarmDefinition):
63         self.session.add(definition)
64
65     def _get(self, definition_id) -> alarm_obj.AlarmDefinition:
66         return self.session.query(alarm_obj.AlarmDefinition).filter_by(
67             alarmDefinitionId=definition_id).first()
68
69     def _list(self) -> List[alarm_obj.AlarmDefinition]:
70         return self.session.query(alarm_obj.AlarmDefinition)
71
72     def _update(self, definition: alarm_obj.AlarmDefinition):
73         self.session.add(definition)
74
75     def _delete(self, alarm_definition_id):
76         self.session.query(alarm_obj.AlarmDefinition).filter_by(
77             alarmDefinitionId=alarm_definition_id).delete()
78
79
80 class AlarmSubscriptionSqlAlchemyRepository(AlarmSubscriptionRepository):
81     def __init__(self, session):
82         super().__init__()
83         self.session = session
84
85     def _add(self, subscription: alarm_obj.AlarmSubscription):
86         self.session.add(subscription)
87
88     def _get(self, subscription_id) -> alarm_obj.AlarmSubscription:
89         return self.session.query(alarm_obj.AlarmSubscription).filter_by(
90             alarmSubscriptionId=subscription_id).first()
91
92     def _list(self, *args, **kwargs) -> Tuple[
93             int, List[alarm_obj.AlarmSubscription]]:
94         size = kwargs.pop('limit') if 'limit' in kwargs else None
95         offset = kwargs.pop('start') if 'start' in kwargs else 0
96
97         result = self.session.query(alarm_obj.AlarmSubscription).filter(
98             *args).order_by('alarmEventRecordId')
99         count = result.count()
100         if size is not None and size != -1:
101             return (count, result.limit(size).offset(offset))
102         return (count, result)
103
104     def _update(self, subscription: alarm_obj.AlarmSubscription):
105         self.session.add(subscription)
106
107     def _delete(self, alarm_subscription_id):
108         self.session.query(alarm_obj.AlarmSubscription).filter_by(
109             alarmSubscriptionId=alarm_subscription_id).delete()
110
111
112 class AlarmProbableCauseSqlAlchemyRepository(AlarmProbableCauseRepository):
113     def __init__(self, session):
114         super().__init__()
115         self.session = session
116
117     def _add(self, probable_cause: alarm_obj.ProbableCause):
118         self.session.add(probable_cause)
119
120     def _get(self, probable_cause_id) -> alarm_obj.ProbableCause:
121         return self.session.query(alarm_obj.ProbableCause).filter_by(
122             probableCauseId=probable_cause_id).first()
123
124     def _list(self) -> List[alarm_obj.ProbableCause]:
125         return self.session.query(alarm_obj.ProbableCause)
126
127     def _update(self, probable_cause: alarm_obj.ProbableCause):
128         self.session.add(probable_cause)
129
130     def _delete(self, probable_cause_id):
131         self.session.query(alarm_obj.ProbableCause).filter_by(
132             probableCauseId=probable_cause_id).delete()