0b2ce580cb3def0247186376b4fd52f20968dddd
[pti/o2.git] / o2ims / service / command / notify_handler.py
1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 # import redis
16 # import requests
17 import json
18
19 # from o2common.config import conf
20 from o2common.domain.filter import gen_orm_filter
21 from o2common.service.unit_of_work import AbstractUnitOfWork
22 from o2common.adapter.notifications import AbstractNotifications
23
24 from o2ims.domain import commands, ocloud
25 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
26     NotificationEventEnum
27
28 from o2common.helper import o2logging
29 logger = o2logging.get_logger(__name__)
30
31
32 # # Maybe another MQ server
33 # r = redis.Redis(**config.get_redis_host_and_port())
34
35
36 def notify_change_to_smo(
37     cmd: commands.PubMessage2SMO,
38     uow: AbstractUnitOfWork,
39     notifications: AbstractNotifications,
40 ):
41     logger.debug('In notify_change_to_smo')
42     msg_type = cmd.type
43     if msg_type == 'ResourceType':
44         _notify_resourcetype(uow, notifications, cmd.data)
45     elif msg_type == 'ResourcePool':
46         _notify_resourcepool(uow, notifications, cmd.data)
47     elif msg_type == 'Dms':
48         _notify_dms(uow, notifications, cmd.data)
49     elif msg_type == 'Resource':
50         _notify_resource(uow, notifications, cmd.data)
51
52
53 def _notify_resourcetype(uow, notifications, data):
54     with uow:
55         resource_type = uow.resource_types.get(data.id)
56         if resource_type is None:
57             logger.warning('ResourceType {} does not exists.'.format(data.id))
58             return
59         resource_type_dict = {
60             'resourceTypeId': resource_type.resourceTypeId,
61             'name': resource_type.name,
62             'description': resource_type.description,
63             'vendor': resource_type.vendor,
64             'model': resource_type.model,
65             'version': resource_type.version,
66             # 'alarmDictionary': resource_type.alarmDictionary.serialize()
67         }
68
69         subs = uow.subscriptions.list()
70         for sub in subs:
71             sub_data = sub.serialize()
72             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
73             filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
74             if not filters:
75                 callback_smo(notifications, sub, data, resource_type_dict)
76                 continue
77             filter_hit = False
78             for filter in filters:
79                 try:
80                     args = gen_orm_filter(ocloud.ResourceType, filter)
81                 except KeyError:
82                     logger.warning(
83                         'Subscription {} filter {} has wrong attribute '
84                         'name or value. Ignore the filter.'.format(
85                             sub_data['subscriptionId'],
86                             sub_data['filter']))
87                     continue
88                 if len(args) == 0 and 'objectType' in filter:
89                     filter_hit = True
90                     break
91                 args.append(ocloud.ResourceType.resourceTypeId == data.id)
92                 obj_count, _ = uow.resource_types.list_with_count(*args)
93                 if obj_count > 0:
94                     filter_hit = True
95                     break
96             if filter_hit:
97                 logger.info('Subscription {} filter hit, skip ResourceType {}.'
98                             .format(sub_data['subscriptionId'], data.id))
99             else:
100                 callback_smo(notifications, sub, data, resource_type_dict)
101
102
103 def _notify_resourcepool(uow, notifications, data):
104     with uow:
105         resource_pool = uow.resource_pools.get(data.id)
106         if resource_pool is None:
107             logger.warning('ResourcePool {} does not exists.'.format(data.id))
108             return
109         resource_pool_dict = {
110             'resourcePoolId': resource_pool.resourcePoolId,
111             'oCloudId': resource_pool.oCloudId,
112             'globalLocationId': resource_pool.globalLocationId,
113             'name': resource_pool.name,
114             'description': resource_pool.description
115         }
116
117         subs = uow.subscriptions.list()
118         for sub in subs:
119             sub_data = sub.serialize()
120             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
121             filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
122             if not filters:
123                 callback_smo(notifications, sub, data, resource_pool_dict)
124                 continue
125             filter_hit = False
126             for filter in filters:
127                 try:
128                     args = gen_orm_filter(ocloud.ResourcePool, filter)
129                 except KeyError:
130                     logger.warning(
131                         'Subscription {} filter {} has wrong attribute '
132                         'name or value. Ignore the filter.'.format(
133                             sub_data['subscriptionId'],
134                             sub_data['filter']))
135                     continue
136                 if len(args) == 0 and 'objectType' in filter:
137                     filter_hit = True
138                     break
139                 args.append(ocloud.ResourcePool.resourcePoolId == data.id)
140                 obj_count, _ = uow.resource_pools.list_with_count(*args)
141                 if obj_count > 0:
142                     filter_hit = True
143                     break
144             if filter_hit:
145                 logger.info('Subscription {} filter hit, skip ResourcePool {}.'
146                             .format(sub_data['subscriptionId'], data.id))
147             else:
148                 callback_smo(notifications, sub, data, resource_pool_dict)
149
150
151 def _notify_dms(uow, notifications, data):
152     with uow:
153         dms = uow.deployment_managers.get(data.id)
154         if dms is None:
155             logger.warning(
156                 'DeploymentManager {} does not exists.'.format(data.id))
157             return
158         dms_dict = {
159             'deploymentManagerId': dms.deploymentManagerId,
160             'name': dms.name,
161             'description': dms.description,
162             'oCloudId': dms.oCloudId,
163             'serviceUri': dms.serviceUri
164         }
165
166         subs = uow.subscriptions.list()
167         for sub in subs:
168             sub_data = sub.serialize()
169             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
170             filters = handle_filter(
171                 sub_data['filter'], 'DeploymentManagerInfo')
172             if not filters:
173                 callback_smo(notifications, sub, data, dms_dict)
174                 continue
175             filter_hit = False
176             for filter in filters:
177                 try:
178                     args = gen_orm_filter(ocloud.DeploymentManager, filter)
179                 except KeyError:
180                     logger.warning(
181                         'Subscription {} filter {} has wrong attribute '
182                         'name or value. Ignore the filter.'.format(
183                             sub_data['subscriptionId'],
184                             sub_data['filter']))
185                     continue
186                 if len(args) == 0 and 'objectType' in filter:
187                     filter_hit = True
188                     break
189                 args.append(
190                     ocloud.DeploymentManager.deploymentManagerId == data.id)
191                 obj_count, _ = uow.deployment_managers.list_with_count(*args)
192                 if obj_count > 0:
193                     filter_hit = True
194                     break
195             if filter_hit:
196                 logger.info('Subscription {} filter hit, skip '
197                             'DeploymentManager {}.'
198                             .format(sub_data['subscriptionId'], data.id))
199             else:
200                 callback_smo(notifications, sub, data, dms_dict)
201
202
203 def _notify_resource(uow, notifications, data):
204     with uow:
205         resource = uow.resources.get(data.id)
206         if resource is None:
207             logger.warning('Resource {} does not exists.'.format(data.id))
208             return
209         res_pool_id = resource.serialize()['resourcePoolId']
210         logger.debug('res pool id is {}'.format(res_pool_id))
211         res_dict = {
212             'resourceId': resource.resourceId,
213             'description': resource.description,
214             'resourceTypeId': resource.resourceTypeId,
215             'resourcePoolId': resource.resourcePoolId,
216             'globalAssetId': resource.globalAssetId
217         }
218
219         subs = uow.subscriptions.list()
220         for sub in subs:
221             sub_data = sub.serialize()
222             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
223             filters = handle_filter(sub_data['filter'], 'ResourceInfo')
224             if not filters:
225                 callback_smo(notifications, sub, data, res_dict)
226                 continue
227             filter_hit = False
228             for filter in filters:
229                 try:
230                     args = gen_orm_filter(ocloud.Resource, filter)
231                 except KeyError:
232                     logger.warning(
233                         'Subscription {} filter {} has wrong attribute '
234                         'name or value. Ignore the filter.'.format(
235                             sub_data['subscriptionId'],
236                             sub_data['filter']))
237                     continue
238                 if len(args) == 0 and 'objectType' in filter:
239                     filter_hit = True
240                     break
241                 args.append(ocloud.Resource.resourceId == data.id)
242                 obj_count, _ = uow.resources.list_with_count(
243                     res_pool_id, *args)
244                 if obj_count > 0:
245                     filter_hit = True
246                     break
247             if filter_hit:
248                 logger.info('Subscription {} filter hit, skip Resource {}.'
249                             .format(sub_data['subscriptionId'], data.id))
250             else:
251                 callback_smo(notifications, sub, data, res_dict)
252
253
254 def handle_filter(filter: str, f_type: str):
255     if not filter:
256         return
257     filter_strip = filter.strip(' []')
258     filter_list = filter_strip.split('|')
259     filters = list()
260     for sub_filter in filter_list:
261         exprs = sub_filter.split(';')
262         objectType = False
263         objectTypeValue = ''
264         for expr in exprs:
265             expr_strip = expr.strip(' ()')
266             items = expr_strip.split(',')
267             item_key = items[1].strip()
268             if item_key != 'objectType':
269                 continue
270             objectType = True
271             objectTypeValue = items[2].strip()
272         if not objectType:
273             if f_type == 'ResourceInfo':
274                 filters.append(sub_filter)
275             continue
276         if objectTypeValue == f_type:
277             filters.append(sub_filter)
278     return filters
279
280
281 def callback_smo(notifications: AbstractNotifications, sub: Subscription,
282                  msg: Message2SMO, obj_dict: dict = None):
283     sub_data = sub.serialize()
284     callback = {
285         'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
286         'notificationEventType': msg.notificationEventType,
287         'objectRef': msg.objectRef,
288         'updateTime': msg.updatetime
289     }
290     if msg.notificationEventType in [NotificationEventEnum.DELETE,
291                                      NotificationEventEnum.MODIFY]:
292         callback['priorObjectState'] = json.dumps(obj_dict)
293     if msg.notificationEventType in [NotificationEventEnum.CREATE,
294                                      NotificationEventEnum.MODIFY]:
295         callback['postObjectState'] = json.dumps(obj_dict)
296     if msg.notificationEventType == NotificationEventEnum.DELETE:
297         callback.pop('objectRef')
298     logger.info('callback URL: {}'.format(sub_data['callback']))
299     logger.debug('callback data: {}'.format(json.dumps(callback)))
300
301     return notifications.send(sub_data['callback'], callback)
302
303     # Call SMO through the SMO callback url
304     # o = urlparse(sub_data['callback'])
305     # if o.scheme == 'https':
306     #     conn = get_https_conn_default(o.netloc)
307     # else:
308     #     conn = get_http_conn(o.netloc)
309     # try:
310     #     rst, status = post_data(conn, o.path, callback_data)
311     #     if rst is True:
312     #         logger.info(
313     #             'Notify to SMO successed with status: {}'.format(status))
314     #         return
315     #     logger.error('Notify Response code is: {}'.format(status))
316     # except ssl.SSLCertVerificationError as e:
317     #     logger.debug(
318     #         'Notify try to post data with trusted ca failed: {}'.format(e))
319     #     if 'self signed' in str(e):
320     #         conn = get_https_conn_selfsigned(o.netloc)
321     #         try:
322     #             return post_data(conn, o.path, callback_data)
323     #         except Exception as e:
324     #             logger.info(
325     #                 'Notify post data with self-signed ca \
326     #                 failed: {}'.format(e))
327     #             return False
328     #     return False
329     # except Exception as e:
330     #     logger.critical('Notify except: {}'.format(e))
331     #     return False