Fix the filter of the notifications not correct when the objectType not specific...
[pti/o2.git] / o2ims / service / command / notify_handler.py
1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 # import redis
16 # import requests
17 import json
18
19 # from o2common.config import conf
20 from o2common.domain.filter import gen_orm_filter
21 from o2common.service.unit_of_work import AbstractUnitOfWork
22 from o2common.adapter.notifications import AbstractNotifications
23
24 from o2ims.domain import commands, ocloud
25 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
26     NotificationEventEnum
27
28 from o2common.helper import o2logging
29 logger = o2logging.get_logger(__name__)
30
31
32 # # Maybe another MQ server
33 # r = redis.Redis(**config.get_redis_host_and_port())
34
35
36 def notify_change_to_smo(
37     cmd: commands.PubMessage2SMO,
38     uow: AbstractUnitOfWork,
39     notifications: AbstractNotifications,
40 ):
41     logger.debug('In notify_change_to_smo')
42     msg_type = cmd.type
43     if msg_type == 'ResourceType':
44         _notify_resourcetype(uow, notifications, cmd.data)
45     elif msg_type == 'ResourcePool':
46         _notify_resourcepool(uow, notifications, cmd.data)
47     elif msg_type == 'Dms':
48         _notify_dms(uow, notifications, cmd.data)
49     elif msg_type == 'Resource':
50         _notify_resource(uow, notifications, cmd.data)
51
52
53 # def get_resource_dict(resource_type):
54 #    return {
55 #        'resourceTypeId': resource_type.resourceTypeId,
56 #        'name': resource_type.name,
57 #        'description': resource_type.description,
58 #        'vendor': resource_type.vendor,
59 #        'model': resource_type.model,
60 #        'version': resource_type.version,
61 #    }
62 #
63 #
64 # def handle_filter(filter: str, f_type: str):
65 #    if not filter:
66 #        return
67 #
68 #    filter_list = filter.strip(' []').split('|')
69 #
70 #    match_type_count = 0
71 #    filters = []
72 #    for sub_filter in filter_list:
73 #        objectType, objectTypeValue = get_object_type_and_value(sub_filter)
74 #        if objectTypeValue == f_type:
75 #            match_type_count += 1
76 #            filters.append(sub_filter)
77 #        elif not objectType and f_type == 'ResourceInfo':
78 #            filters.append(sub_filter)
79 #
80 #    return match_type_count, filters
81 #
82 #
83 # def get_object_type_and_value(sub_filter):
84 #    exprs = sub_filter.split(';')
85 #    for expr in exprs:
86 #        items = expr.strip(' ()').split(',')
87 #        item_key = items[1].strip()
88 #        if item_key == 'objectType':
89 #            return True, items[2].strip()
90 #    return False, ''
91 #
92 #
93 # def check_filters(filters, sub_data, uow, id):
94 #    for filter in filters[1]:
95 #        if isinstance(filter, bool) and filter:
96 #            return True
97 #
98 #        try:
99 #            args = gen_orm_filter(ocloud.ResourceType, filter)
100 #        except KeyError:
101 #            logger.warning(
102 #                'Subscription {} filter {} has wrong attribute '
103 #                'name or value. Ignore the filter.'.format(
104 #                    sub_data['subscriptionId'],
105 #                    sub_data['filter']))
106 #            continue
107 #
108 #        if len(args) == 0 and 'objectType' in filter:
109 #            return True
110 #
111 #        args.append(ocloud.ResourceType.resourceTypeId == id)
112 #        obj_count, _ = uow.resource_types.list_with_count(*args)
113 #        if obj_count > 0:
114 #            return True
115 #    return False
116 #
117 #
118 # def _notify_resourcetype(uow, notifications, data):
119 #    with uow:
120 #        resource_type = uow.resource_types.get(data.id)
121 #        if resource_type is None:
122 #            logger.warning('ResourceType {} does not exists.'.format(data.id))
123 #            return
124 #
125 #        resource_type_dict = get_resource_dict(resource_type)
126 #
127 #        subs = uow.subscriptions.list()
128 #        for sub in subs:
129 #            sub_data = sub.serialize()
130 #            filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
131 #            logger.debug(f'filters: {filters}, sub_data: {sub_data}')
132 #
133 #            if not filters or filters[0] == 0 or check_filters(
134 #                    filters, sub_data, uow, data.id):
135 #                callback_smo(notifications, sub, data, resource_type_dict)
136 #                continue
137 #
138 #            logger.info('Subscription {} filter hit, skip ResourceType {}.'
139 #                        .format(sub_data['subscriptionId'], data.id))
140
141 def _notify_resourcetype(uow, notifications, data):
142     with uow:
143         resource_type = uow.resource_types.get(data.id)
144         if resource_type is None:
145             logger.warning('ResourceType {} does not exists.'.format(data.id))
146             return
147         resource_type_dict = {
148             'resourceTypeId': resource_type.resourceTypeId,
149             'name': resource_type.name,
150             'description': resource_type.description,
151             'vendor': resource_type.vendor,
152             'model': resource_type.model,
153             'version': resource_type.version,
154             # 'alarmDictionary': resource_type.alarmDictionary.serialize()
155         }
156
157         subs = uow.subscriptions.list()
158         for sub in subs:
159             sub_data = sub.serialize()
160             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
161             filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
162             if not filters or filters[0] == 0:
163                 callback_smo(notifications, sub, data, resource_type_dict)
164                 continue
165             if filters[0] > 0 and not filters[1]:
166                 continue
167
168             filter_hit = False
169             for filter in filters[1]:
170                 try:
171                     args = gen_orm_filter(ocloud.ResourceType, filter)
172                 except KeyError:
173                     logger.warning(
174                         'Subscription {} filter {} has wrong attribute '
175                         'name or value. Ignore the filter.'.format(
176                             sub_data['subscriptionId'],
177                             sub_data['filter']))
178                     continue
179                 if len(args) == 0 and 'objectType' in filter:
180                     filter_hit = True
181                     break
182                 args.append(ocloud.ResourceType.resourceTypeId == data.id)
183                 obj_count, _ = uow.resource_types.list_with_count(*args)
184                 if obj_count > 0:
185                     filter_hit = True
186                     break
187             if filter_hit:
188                 logger.info('Subscription {} filter hit, skip ResourceType {}.'
189                             .format(sub_data['subscriptionId'], data.id))
190             else:
191                 callback_smo(notifications, sub, data, resource_type_dict)
192
193
194 def _notify_resourcepool(uow, notifications, data):
195     with uow:
196         resource_pool = uow.resource_pools.get(data.id)
197         if resource_pool is None:
198             logger.warning('ResourcePool {} does not exists.'.format(data.id))
199             return
200         resource_pool_dict = {
201             'resourcePoolId': resource_pool.resourcePoolId,
202             'oCloudId': resource_pool.oCloudId,
203             'globalLocationId': resource_pool.globalLocationId,
204             'name': resource_pool.name,
205             'description': resource_pool.description
206         }
207
208         subs = uow.subscriptions.list()
209         for sub in subs:
210             sub_data = sub.serialize()
211             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
212             filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
213             if not filters or filters[0] == 0:
214                 callback_smo(notifications, sub, data, resource_pool_dict)
215                 continue
216             if filters[0] > 0 and not filters[1]:
217                 continue
218             filter_hit = False
219             for filter in filters[1]:
220                 try:
221                     args = gen_orm_filter(ocloud.ResourcePool, filter)
222                 except KeyError:
223                     logger.warning(
224                         'Subscription {} filter {} has wrong attribute '
225                         'name or value. Ignore the filter.'.format(
226                             sub_data['subscriptionId'],
227                             sub_data['filter']))
228                     continue
229                 if len(args) == 0 and 'objectType' in filter:
230                     filter_hit = True
231                     break
232                 args.append(ocloud.ResourcePool.resourcePoolId == data.id)
233                 obj_count, _ = uow.resource_pools.list_with_count(*args)
234                 if obj_count > 0:
235                     filter_hit = True
236                     break
237             if filter_hit:
238                 logger.info('Subscription {} filter hit, skip ResourcePool {}.'
239                             .format(sub_data['subscriptionId'], data.id))
240             else:
241                 callback_smo(notifications, sub, data, resource_pool_dict)
242
243
244 def _notify_dms(uow, notifications, data):
245     with uow:
246         dms = uow.deployment_managers.get(data.id)
247         if dms is None:
248             logger.warning(
249                 'DeploymentManager {} does not exists.'.format(data.id))
250             return
251         dms_dict = {
252             'deploymentManagerId': dms.deploymentManagerId,
253             'name': dms.name,
254             'description': dms.description,
255             'oCloudId': dms.oCloudId,
256             'serviceUri': dms.serviceUri
257         }
258
259         subs = uow.subscriptions.list()
260         for sub in subs:
261             sub_data = sub.serialize()
262             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
263             filters_rst = handle_filter(
264                 sub_data['filter'], 'DeploymentManagerInfo')
265             if not filters_rst or filters_rst[0] == 0:
266                 callback_smo(notifications, sub, data, dms_dict)
267                 continue
268             if filters_rst[0] > 0 and not filters_rst[1]:
269                 continue
270             filter_hit = False
271             for filter in filters_rst[1]:
272                 try:
273                     args = gen_orm_filter(ocloud.DeploymentManager, filter)
274                 except KeyError:
275                     logger.warning(
276                         'Subscription {} filter {} has wrong attribute '
277                         'name or value. Ignore the filter.'.format(
278                             sub_data['subscriptionId'],
279                             sub_data['filter']))
280                     continue
281                 if len(args) == 0 and 'objectType' in filter:
282                     filter_hit = True
283                     break
284                 args.append(
285                     ocloud.DeploymentManager.deploymentManagerId == data.id)
286                 obj_count, _ = uow.deployment_managers.list_with_count(*args)
287                 if obj_count > 0:
288                     filter_hit = True
289                     break
290             if filter_hit:
291                 logger.info('Subscription {} filter hit, skip '
292                             'DeploymentManager {}.'
293                             .format(sub_data['subscriptionId'], data.id))
294             else:
295                 continue
296                 # callback_smo(notifications, sub, data, dms_dict)
297
298
299 def _notify_resource(uow, notifications, data):
300     with uow:
301         resource = uow.resources.get(data.id)
302         if resource is None:
303             logger.warning('Resource {} does not exists.'.format(data.id))
304             return
305         res_pool_id = resource.serialize()['resourcePoolId']
306         logger.debug('res pool id is {}'.format(res_pool_id))
307         res_dict = {
308             'resourceId': resource.resourceId,
309             'description': resource.description,
310             'resourceTypeId': resource.resourceTypeId,
311             'resourcePoolId': resource.resourcePoolId,
312             'globalAssetId': resource.globalAssetId
313         }
314
315         subs = uow.subscriptions.list()
316         for sub in subs:
317             sub_data = sub.serialize()
318             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
319             filters = handle_filter(sub_data['filter'], 'ResourceInfo')
320             if not filters or filters[0] == 0:
321                 callback_smo(notifications, sub, data, res_dict)
322                 continue
323             if filters[0] > 0 and not filters[1]:
324                 continue
325             filter_hit = False
326             for filter in filters[1]:
327                 try:
328                     args = gen_orm_filter(ocloud.Resource, filter)
329                 except KeyError:
330                     logger.warning(
331                         'Subscription {} filter {} has wrong attribute '
332                         'name or value. Ignore the filter.'.format(
333                             sub_data['subscriptionId'],
334                             sub_data['filter']))
335                     continue
336                 if len(args) == 0 and 'objectType' in filter:
337                     filter_hit = True
338                     break
339                 args.append(ocloud.Resource.resourceId == data.id)
340                 obj_count, _ = uow.resources.list_with_count(
341                     res_pool_id, *args)
342                 if obj_count > 0:
343                     filter_hit = True
344                     break
345             if filter_hit:
346                 logger.info('Subscription {} filter hit, skip Resource {}.'
347                             .format(sub_data['subscriptionId'], data.id))
348             else:
349                 callback_smo(notifications, sub, data, res_dict)
350
351
352 def handle_filter(filter: str, f_type: str):
353     if not filter:
354         return
355     match_type_count = 0
356     filter_strip = filter.strip(' []')
357     filter_list = filter_strip.split('|')
358     if not filter_list:
359         return
360     filters = list()
361     for sub_filter in filter_list:
362         exprs = sub_filter.split(';')
363         objectType = False
364         objectTypeValue = ''
365         for expr in exprs:
366             expr_strip = expr.strip(' ()')
367             items = expr_strip.split(',')
368             item_key = items[1].strip()
369             if item_key != 'objectType':
370                 continue
371             objectType = True
372             objectTypeValue = items[2].strip()
373         if not objectType:
374             if f_type == 'ResourceInfo':
375                 filters.append(sub_filter)
376             continue
377         if objectTypeValue == f_type:
378             match_type_count += 1
379             filters.append(sub_filter)
380     return (match_type_count, filters)
381
382
383 def callback_smo(notifications: AbstractNotifications, sub: Subscription,
384                  msg: Message2SMO, obj_dict: dict = None):
385     sub_data = sub.serialize()
386     callback = {
387         'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
388         'notificationEventType': msg.notificationEventType,
389         'objectRef': msg.objectRef,
390         'updateTime': msg.updatetime
391     }
392     if msg.notificationEventType in [NotificationEventEnum.DELETE,
393                                      NotificationEventEnum.MODIFY]:
394         callback['priorObjectState'] = json.dumps(obj_dict)
395     if msg.notificationEventType in [NotificationEventEnum.CREATE,
396                                      NotificationEventEnum.MODIFY]:
397         callback['postObjectState'] = json.dumps(obj_dict)
398     if msg.notificationEventType == NotificationEventEnum.DELETE:
399         callback.pop('objectRef')
400     logger.info('callback URL: {}'.format(sub_data['callback']))
401     logger.debug('callback data: {}'.format(json.dumps(callback)))
402
403     return notifications.send(sub_data['callback'], callback)
404
405     # Call SMO through the SMO callback url
406     # o = urlparse(sub_data['callback'])
407     # if o.scheme == 'https':
408     #     conn = get_https_conn_default(o.netloc)
409     # else:
410     #     conn = get_http_conn(o.netloc)
411     # try:
412     #     rst, status = post_data(conn, o.path, callback_data)
413     #     if rst is True:
414     #         logger.info(
415     #             'Notify to SMO successed with status: {}'.format(status))
416     #         return
417     #     logger.error('Notify Response code is: {}'.format(status))
418     # except ssl.SSLCertVerificationError as e:
419     #     logger.debug(
420     #         'Notify try to post data with trusted ca failed: {}'.format(e))
421     #     if 'self signed' in str(e):
422     #         conn = get_https_conn_selfsigned(o.netloc)
423     #         try:
424     #             return post_data(conn, o.path, callback_data)
425     #         except Exception as e:
426     #             logger.info(
427     #                 'Notify post data with self-signed ca \
428     #                 failed: {}'.format(e))
429     #             return False
430     #     return False
431     # except Exception as e:
432     #     logger.critical('Notify except: {}'.format(e))
433     #     return False