ef7ae26e02cbecc9f400c18acf61908fe0236a53
[pti/o2.git] / o2ims / adapter / alarm_repository.py
1 # Copyright (C) 2022 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 from typing import List, Tuple
16
17 from o2ims.domain import alarm_obj
18 from o2ims.domain.alarm_repo import AlarmDefinitionRepository, \
19     AlarmEventRecordRepository, AlarmSubscriptionRepository, \
20     AlarmProbableCauseRepository
21 from o2common.helper import o2logging
22 logger = o2logging.get_logger(__name__)
23
24
25 class AlarmEventRecordSqlAlchemyRepository(AlarmEventRecordRepository):
26     def __init__(self, session):
27         super().__init__()
28         self.session = session
29
30     def _add(self, alarm_event_record: alarm_obj.AlarmEventRecord):
31         self.session.add(alarm_event_record)
32
33     def _get(self, alarm_event_record_id) -> alarm_obj.AlarmEventRecord:
34         return self.session.query(alarm_obj.AlarmEventRecord).filter_by(
35             alarmEventRecordId=alarm_event_record_id).first()
36
37     def _list(self, **kwargs) -> Tuple[int, List[alarm_obj.AlarmEventRecord]]:
38         size = kwargs.pop('limit') if 'limit' in kwargs else None
39         offset = kwargs.pop('start') if 'start' in kwargs else 0
40
41         result = self.session.query(alarm_obj.AlarmEventRecord).filter_by(
42             **kwargs).order_by('alarmEventRecordId')
43         count = result.count()
44         if size is not None and size != -1:
45             return (count, result.limit(size).offset(offset))
46         return (count, result)
47
48     def _update(self, alarm_event_record: alarm_obj.AlarmEventRecord):
49         self.session.add(alarm_event_record)
50
51     def _delete(self, alarm_event_record_id):
52         self.session.query(alarm_obj.AlarmEventRecord).filter_by(
53             alarmEventRecordId=alarm_event_record_id).delete()
54
55
56 class AlarmDefinitionSqlAlchemyRepository(AlarmDefinitionRepository):
57     def __init__(self, session):
58         super().__init__()
59         self.session = session
60
61     def _add(self, definition: alarm_obj.AlarmDefinition):
62         self.session.add(definition)
63
64     def _get(self, definition_id) -> alarm_obj.AlarmDefinition:
65         return self.session.query(alarm_obj.AlarmDefinition).filter_by(
66             alarmDefinitionId=definition_id).first()
67
68     def _list(self) -> List[alarm_obj.AlarmDefinition]:
69         return self.session.query(alarm_obj.AlarmDefinition)
70
71     def _update(self, definition: alarm_obj.AlarmDefinition):
72         self.session.add(definition)
73
74     def _delete(self, alarm_definition_id):
75         self.session.query(alarm_obj.AlarmDefinition).filter_by(
76             alarmDefinitionId=alarm_definition_id).delete()
77
78
79 class AlarmSubscriptionSqlAlchemyRepository(AlarmSubscriptionRepository):
80     def __init__(self, session):
81         super().__init__()
82         self.session = session
83
84     def _add(self, subscription: alarm_obj.AlarmSubscription):
85         self.session.add(subscription)
86
87     def _get(self, subscription_id) -> alarm_obj.AlarmSubscription:
88         return self.session.query(alarm_obj.AlarmSubscription).filter_by(
89             alarmSubscriptionId=subscription_id).first()
90
91     def _list(self, **kwargs) -> Tuple[int, List[alarm_obj.AlarmSubscription]]:
92         size = kwargs.pop('limit') if 'limit' in kwargs else None
93         offset = kwargs.pop('start') if 'start' in kwargs else 0
94
95         result = self.session.query(alarm_obj.AlarmSubscription).filter_by(
96             **kwargs).order_by('alarmSubscriptionId')
97         count = result.count()
98         if size is not None and size != -1:
99             return (count, result.limit(size).offset(offset))
100         return (count, result)
101
102     def _update(self, subscription: alarm_obj.AlarmSubscription):
103         self.session.add(subscription)
104
105     def _delete(self, alarm_subscription_id):
106         self.session.query(alarm_obj.AlarmSubscription).filter_by(
107             alarmSubscriptionId=alarm_subscription_id).delete()
108
109
110 class AlarmProbableCauseSqlAlchemyRepository(AlarmProbableCauseRepository):
111     def __init__(self, session):
112         super().__init__()
113         self.session = session
114
115     def _add(self, probable_cause: alarm_obj.ProbableCause):
116         self.session.add(probable_cause)
117
118     def _get(self, probable_cause_id) -> alarm_obj.ProbableCause:
119         return self.session.query(alarm_obj.ProbableCause).filter_by(
120             probableCauseId=probable_cause_id).first()
121
122     def _list(self) -> List[alarm_obj.ProbableCause]:
123         return self.session.query(alarm_obj.ProbableCause)
124
125     def _update(self, probable_cause: alarm_obj.ProbableCause):
126         self.session.add(probable_cause)
127
128     def _delete(self, probable_cause_id):
129         self.session.query(alarm_obj.ProbableCause).filter_by(
130             probableCauseId=probable_cause_id).delete()