Add capacity values for the DMS K8S profile
[pti/o2.git] / o2ims / views / alarm_view.py
1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 from datetime import datetime
16 import uuid as uuid
17
18 from o2common.service import unit_of_work, messagebus
19 from o2common.views.view import gen_filter, check_filter
20 from o2common.views.pagination_view import Pagination
21 from o2common.views.route_exception import BadRequestException, \
22     NotFoundException
23
24 from o2ims.domain import events
25 from o2ims.views.alarm_dto import SubscriptionDTO
26 from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord, \
27     AlarmNotificationEventEnum, AlarmEventRecordModifications, \
28     PerceivedSeverityEnum
29
30 from o2common.helper import o2logging
31 # from o2common.config import config
32 logger = o2logging.get_logger(__name__)
33
34
35 def alarm_event_records(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
36     pagination = Pagination(**kwargs)
37     query_kwargs = pagination.get_pagination()
38     args = gen_filter(AlarmEventRecord,
39                       kwargs['filter']) if 'filter' in kwargs else []
40     with uow:
41         li = uow.alarm_event_records.list_with_count(*args, **query_kwargs)
42     return pagination.get_result(li)
43
44
45 def alarm_event_record_one(alarmEventRecordId: str,
46                            uow: unit_of_work.AbstractUnitOfWork):
47     with uow:
48         first = uow.alarm_event_records.get(alarmEventRecordId)
49         return first.serialize() if first is not None else None
50
51
52 def alarm_event_record_ack(alarmEventRecordId: str,
53                            uow: unit_of_work.AbstractUnitOfWork):
54     with uow:
55         alarm_event_record = uow.alarm_event_records.get(alarmEventRecordId)
56         # Check the record does not exist, return None. Otherwise, the
57         # acknowledge request will update the record even if it is
58         # acknowledged.
59         if alarm_event_record is None:
60             return None
61         alarm_event_record.alarmAcknowledged = True
62         alarm_event_record.alarmAcknowledgeTime = datetime.\
63             now().strftime("%Y-%m-%dT%H:%M:%S")
64         uow.alarm_event_records.update(alarm_event_record)
65         uow.commit()
66
67         result = AlarmEventRecordModifications(True)
68     return result
69
70
71 def alarm_event_record_clear(alarmEventRecordId: str,
72                              uow: unit_of_work.AbstractUnitOfWork):
73     with uow:
74         alarm_event_record = uow.alarm_event_records.get(alarmEventRecordId)
75         if alarm_event_record is None:
76             return None
77         elif alarm_event_record.perceivedSeverity == \
78                 PerceivedSeverityEnum.CLEARED:
79             raise BadRequestException(
80                 "Alarm Event Record {} has already been marked as CLEARED."
81                 .format(alarmEventRecordId))
82         alarm_event_record.events.append(events.AlarmEventPurged(
83             id=alarm_event_record.alarmEventRecordId,
84             notificationEventType=AlarmNotificationEventEnum.CLEAR,
85             updatetime=alarm_event_record.alarmAcknowledgeTime))
86
87         uow.alarm_event_records.update(alarm_event_record)
88         uow.commit()
89
90         result = AlarmEventRecordModifications(
91             clear=PerceivedSeverityEnum.CLEARED)
92     _handle_events(messagebus.MessageBus.get_instance())
93     return result
94
95
96 def _handle_events(bus: messagebus.MessageBus):
97     # handle events
98     events = bus.uow.collect_new_events()
99     for event in events:
100         bus.handle(event)
101     return True
102
103
104 def subscriptions(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
105     pagination = Pagination(**kwargs)
106     query_kwargs = pagination.get_pagination()
107     args = gen_filter(AlarmSubscription,
108                       kwargs['filter']) if 'filter' in kwargs else []
109     with uow:
110         li = uow.alarm_subscriptions.list_with_count(*args, **query_kwargs)
111     return pagination.get_result(li)
112
113
114 def subscription_one(subscriptionId: str,
115                      uow: unit_of_work.AbstractUnitOfWork):
116     with uow:
117         first = uow.alarm_subscriptions.get(subscriptionId)
118         return first.serialize() if first is not None else None
119
120
121 def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create,
122                         uow: unit_of_work.AbstractUnitOfWork):
123     filter = subscriptionDto.get('filter', '')
124     consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '')
125
126     check_filter(AlarmEventRecord, filter)
127
128     sub_uuid = str(uuid.uuid4())
129     subscription = AlarmSubscription(
130         sub_uuid, subscriptionDto['callback'],
131         consumer_subs_id, filter)
132     with uow:
133         args = list()
134         args.append(getattr(AlarmSubscription, 'callback')
135                     == subscriptionDto['callback'])
136         args.append(getattr(AlarmSubscription, 'filter') == filter)
137         args.append(getattr(AlarmSubscription,
138                     'consumerSubscriptionId') == consumer_subs_id)
139         count, _ = uow.alarm_subscriptions.list_with_count(*args)
140         if count > 0:
141             raise BadRequestException("The value of parameters is duplicated")
142         uow.alarm_subscriptions.add(subscription)
143         uow.commit()
144         first = uow.alarm_subscriptions.get(sub_uuid)
145         return first.serialize()
146
147
148 def subscription_delete(subscriptionId: str,
149                         uow: unit_of_work.AbstractUnitOfWork):
150     with uow:
151         first = uow.alarm_subscriptions.get(subscriptionId)
152         if not first:
153             raise NotFoundException(
154                 "Alarm Subscription {} not found.".format(subscriptionId))
155         uow.alarm_subscriptions.delete(subscriptionId)
156         uow.commit()
157     return True