INF-417 InfrastructureInventoryObject implemented
[pti/o2.git] / o2ims / service / command / notify_handler.py
1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 # import redis
16 # import requests
17 import json
18
19 # from o2common.config import conf
20 from o2common.domain.filter import gen_orm_filter
21 from o2common.service.unit_of_work import AbstractUnitOfWork
22 from o2common.adapter.notifications import AbstractNotifications
23
24 from o2ims.domain import commands, ocloud
25 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
26     NotificationEventEnum
27
28 from o2common.helper import o2logging
29 logger = o2logging.get_logger(__name__)
30
31
32 # # Maybe another MQ server
33 # r = redis.Redis(**config.get_redis_host_and_port())
34
35
36 def notify_change_to_smo(
37     cmd: commands.PubMessage2SMO,
38     uow: AbstractUnitOfWork,
39     notifications: AbstractNotifications,
40 ):
41     logger.debug('In notify_change_to_smo')
42     msg_type = cmd.type
43     if msg_type == 'ResourceType':
44         _notify_resourcetype(uow, notifications, cmd.data)
45     elif msg_type == 'ResourcePool':
46         _notify_resourcepool(uow, notifications, cmd.data)
47     elif msg_type == 'Dms':
48         _notify_dms(uow, notifications, cmd.data)
49     elif msg_type == 'Resource':
50         _notify_resource(uow, notifications, cmd.data)
51
52
53 def __get_object_type_and_value(sub_filter):
54     exprs = sub_filter.split(';')
55     for expr in exprs:
56         items = expr.strip(' ()').split(',')
57         item_key = items[1].strip()
58         if item_key == 'objectType':
59             return True, items[2].strip()
60     return False, ''
61
62
63 def handle_filter(filter: str, f_type: str):
64     print(filter)
65     if not filter:
66         return
67
68     filter_list = filter.strip(' []').split('|')
69     if not filter_list:
70         return
71
72     match_type_count = 0
73     filters = []
74     for sub_filter in filter_list:
75         objectType, objectTypeValue = __get_object_type_and_value(sub_filter)
76         if objectTypeValue == f_type:
77             match_type_count += 1
78             filters.append(sub_filter)
79         elif not objectType and f_type == 'ResourceInfo':
80             match_type_count += 1
81             filters.append(sub_filter)
82
83     return match_type_count, filters
84
85
86 def check_filters(filters, sub_data, uow_cls, obj_cls, attr_id, id):
87     for filter in filters[1]:
88         logger.debug(f'filter: {filter}')
89         try:
90             args = gen_orm_filter(obj_cls, filter)
91         except KeyError:
92             logger.warning(
93                 'Subscription {} filter {} has wrong attribute '
94                 'name or value. Ignore the filter.'.format(
95                     sub_data['subscriptionId'],
96                     sub_data['filter']))
97             continue
98         logger.debug(f'args: {args}')
99
100         if len(args) == 0 and 'objectType' in filter:
101             return False
102
103         args.append(attr_id == id)
104         obj_count, _ = uow_cls.list_with_count(*args)
105         if obj_count > 0:
106             return True
107     return False
108
109
110 def _notify_resourcetype(uow, notifications, data):
111     with uow:
112         resource_type = uow.resource_types.get(data.id)
113         if resource_type is None:
114             logger.warning('ResourceType {} does not exists.'.format(data.id))
115             return
116
117         resource_type_dict = resource_type.get_notification_dict()
118
119         subs = uow.subscriptions.list()
120         for sub in subs:
121             sub_data = sub.serialize()
122             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
123             filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
124             logger.debug(f'filters: {filters}, sub_data: {sub_data}')
125
126             if not filters or filters[0] == 0 or check_filters(
127                 filters, sub_data, uow.resource_types, ocloud.ResourceType,
128                     ocloud.ResourceType.resourceTypeId, data.id):
129                 callback_smo(notifications, sub, data, resource_type_dict)
130                 continue
131
132             logger.info('Subscription {} filter hit, skip ResourceType {}.'
133                         .format(sub_data['subscriptionId'], data.id))
134
135
136 def _notify_resourcepool(uow, notifications, data):
137     with uow:
138         resource_pool = uow.resource_pools.get(data.id)
139         if resource_pool is None:
140             logger.warning('ResourcePool {} does not exists.'.format(data.id))
141             return
142
143         resource_pool_dict = resource_pool.get_notification_dict()
144
145         subs = uow.subscriptions.list()
146         for sub in subs:
147             sub_data = sub.serialize()
148             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
149             filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
150             logger.debug(f'filters: {filters}, sub_data: {sub_data}')
151
152             if not filters or filters[0] == 0 or check_filters(
153                 filters, sub_data, uow.resource_pools, ocloud.ResourcePool,
154                     ocloud.ResourcePool.resourcePoolId, data.id):
155                 callback_smo(notifications, sub, data, resource_pool_dict)
156                 continue
157
158             logger.info('Subscription {} filter hit, skip ResourcePool {}.'
159                         .format(sub_data['subscriptionId'], data.id))
160
161
162 def _notify_dms(uow, notifications, data):
163     with uow:
164         dms = uow.deployment_managers.get(data.id)
165         if dms is None:
166             logger.warning(
167                 'DeploymentManager {} does not exists.'.format(data.id))
168             return
169
170         dms_dict = dms.get_notification_dict()
171
172         subs = uow.subscriptions.list()
173         for sub in subs:
174             sub_data = sub.serialize()
175             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
176             filters = handle_filter(
177                 sub_data['filter'], 'DeploymentManagerInfo')
178             logger.debug(f'filters: {filters}, sub_data: {sub_data}')
179
180             if not filters or filters[0] == 0 or check_filters(
181                     filters, sub_data, uow.deployment_managers,
182                     ocloud.DeploymentManager,
183                     ocloud.DeploymentManager.deploymentManagerId, data.id):
184                 callback_smo(notifications, sub, data, dms_dict)
185                 continue
186
187             logger.info('Subscription {} filter hit, skip '
188                         'DeploymentManager {}.'
189                         .format(sub_data['subscriptionId'], data.id))
190
191
192 def _notify_resource(uow, notifications, data):
193     with uow:
194         resource = uow.resources.get(data.id)
195         if resource is None:
196             logger.warning('Resource {} does not exists.'.format(data.id))
197             return
198         res_pool_id = resource.serialize()['resourcePoolId']
199         logger.debug('res pool id is {}'.format(res_pool_id))
200
201         res_dict = resource.get_notification_dict()
202
203         subs = uow.subscriptions.list()
204         for sub in subs:
205             sub_data = sub.serialize()
206             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
207             filters = handle_filter(sub_data['filter'], 'ResourceInfo')
208             if not filters or filters[0] == 0:
209                 callback_smo(notifications, sub, data, res_dict)
210                 continue
211             if filters[0] > 0 and not filters[1]:
212                 continue
213             filter_hit = False
214             for filter in filters[1]:
215                 try:
216                     args = gen_orm_filter(ocloud.Resource, filter)
217                 except KeyError:
218                     logger.warning(
219                         'Subscription {} filter {} has wrong attribute '
220                         'name or value. Ignore the filter.'.format(
221                             sub_data['subscriptionId'],
222                             sub_data['filter']))
223                     continue
224                 if len(args) == 0 and 'objectType' in filter:
225                     filter_hit = True
226                     break
227                 args.append(ocloud.Resource.resourceId == data.id)
228                 obj_count, _ = uow.resources.list_with_count(
229                     res_pool_id, *args)
230                 if obj_count > 0:
231                     filter_hit = True
232                     break
233             if filter_hit:
234                 logger.info('Subscription {} filter hit, skip Resource {}.'
235                             .format(sub_data['subscriptionId'], data.id))
236             else:
237                 callback_smo(notifications, sub, data, res_dict)
238
239
240 def callback_smo(notifications: AbstractNotifications, sub: Subscription,
241                  msg: Message2SMO, obj_dict: dict = None):
242     sub_data = sub.serialize()
243     callback = {
244         'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
245         'notificationEventType': msg.notificationEventType,
246         'objectRef': msg.objectRef,
247         'updateTime': msg.updatetime
248     }
249     if msg.notificationEventType in [NotificationEventEnum.DELETE,
250                                      NotificationEventEnum.MODIFY]:
251         callback['priorObjectState'] = json.dumps(obj_dict)
252     if msg.notificationEventType in [NotificationEventEnum.CREATE,
253                                      NotificationEventEnum.MODIFY]:
254         callback['postObjectState'] = json.dumps(obj_dict)
255     if msg.notificationEventType == NotificationEventEnum.DELETE:
256         callback.pop('objectRef')
257     logger.info('callback URL: {}'.format(sub_data['callback']))
258     logger.debug('callback data: {}'.format(json.dumps(callback)))
259
260     return notifications.send(sub_data['callback'], callback)