Fix install O2 on subcloud failed
[pti/o2.git] / o2ims / service / command / notify_handler.py
1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 # import redis
16 # import requests
17 import json
18
19 # from o2common.config import conf
20 from o2common.domain.filter import gen_orm_filter
21 from o2common.service.unit_of_work import AbstractUnitOfWork
22 from o2common.adapter.notifications import AbstractNotifications
23
24 from o2ims.domain import commands, ocloud
25 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
26     NotificationEventEnum
27
28 from o2common.helper import o2logging
29 logger = o2logging.get_logger(__name__)
30
31
32 # # Maybe another MQ server
33 # r = redis.Redis(**config.get_redis_host_and_port())
34
35
36 def notify_change_to_smo(
37     cmd: commands.PubMessage2SMO,
38     uow: AbstractUnitOfWork,
39     notifications: AbstractNotifications,
40 ):
41     logger.debug('In notify_change_to_smo')
42     msg_type = cmd.type
43     if msg_type == 'ResourceType':
44         _notify_resourcetype(uow, notifications, cmd.data)
45     elif msg_type == 'ResourcePool':
46         _notify_resourcepool(uow, notifications, cmd.data)
47     elif msg_type == 'Dms':
48         _notify_dms(uow, notifications, cmd.data)
49     elif msg_type == 'Resource':
50         _notify_resource(uow, notifications, cmd.data)
51     elif msg_type == 'OCloud':
52         _notify_ocloud(uow, notifications, cmd.data)
53
54
55 def __get_object_type_and_value(sub_filter):
56     exprs = sub_filter.split(';')
57     for expr in exprs:
58         items = expr.strip(' ()').split(',')
59         item_key = items[1].strip()
60         if item_key == 'objectType':
61             return True, items[2].strip()
62     return False, ''
63
64
65 def handle_filter(filter: str, f_type: str):
66     if not filter:
67         return
68
69     filter_list = filter.strip(' []').split('|')
70     if not filter_list:
71         return
72
73     match_type_count = 0
74     filters = []
75     for sub_filter in filter_list:
76         objectType, objectTypeValue = __get_object_type_and_value(sub_filter)
77         if objectTypeValue == f_type:
78             match_type_count += 1
79             filters.append(sub_filter)
80         elif not objectType and f_type == 'ResourceInfo':
81             match_type_count += 1
82             filters.append(sub_filter)
83
84     return match_type_count, filters
85
86
87 def check_filters(filters, sub_data, uow_cls, obj_cls, attr_id, id):
88     for filter in filters[1]:
89         logger.debug(f'filter: {filter}')
90         try:
91             args = gen_orm_filter(obj_cls, filter)
92         except KeyError:
93             logger.warning(
94                 'Subscription {} filter {} has wrong attribute '
95                 'name or value. Ignore the filter.'.format(
96                     sub_data['subscriptionId'],
97                     sub_data['filter']))
98             continue
99         logger.debug(f'args: {args}')
100
101         if len(args) == 0 and 'objectType' in filter:
102             return False
103
104         args.append(attr_id == id)
105         obj_count, _ = uow_cls.list_with_count(*args)
106         if obj_count > 0:
107             return True
108     return False
109
110
111 def _notify_resourcetype(uow, notifications, data):
112     with uow:
113         resource_type = uow.resource_types.get(data.id)
114         if resource_type is None:
115             logger.warning('ResourceType {} does not exists.'.format(data.id))
116             return
117
118         resource_type_dict = resource_type.get_notification_dict()
119
120         subs = uow.subscriptions.list()
121         for sub in subs:
122             sub_data = sub.serialize()
123             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
124             filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
125             logger.debug(f'filters: {filters}, sub_data: {sub_data}')
126
127             if not filters or filters[0] == 0 or check_filters(
128                 filters, sub_data, uow.resource_types, ocloud.ResourceType,
129                     ocloud.ResourceType.resourceTypeId, data.id):
130                 callback_smo(notifications, sub, data, resource_type_dict)
131                 continue
132
133             logger.info('Subscription {} filter hit, skip ResourceType {}.'
134                         .format(sub_data['subscriptionId'], data.id))
135
136
137 def _notify_resourcepool(uow, notifications, data):
138     with uow:
139         resource_pool = uow.resource_pools.get(data.id)
140         if resource_pool is None:
141             logger.warning('ResourcePool {} does not exists.'.format(data.id))
142             return
143
144         resource_pool_dict = resource_pool.get_notification_dict()
145
146         subs = uow.subscriptions.list()
147         for sub in subs:
148             sub_data = sub.serialize()
149             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
150             filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
151             logger.debug(f'filters: {filters}, sub_data: {sub_data}')
152
153             if not filters or filters[0] == 0 or check_filters(
154                 filters, sub_data, uow.resource_pools, ocloud.ResourcePool,
155                     ocloud.ResourcePool.resourcePoolId, data.id):
156                 callback_smo(notifications, sub, data, resource_pool_dict)
157                 continue
158
159             logger.info('Subscription {} filter hit, skip ResourcePool {}.'
160                         .format(sub_data['subscriptionId'], data.id))
161
162
163 def _notify_dms(uow, notifications, data):
164     with uow:
165         dms = uow.deployment_managers.get(data.id)
166         if dms is None:
167             logger.warning(
168                 'DeploymentManager {} does not exists.'.format(data.id))
169             return
170
171         dms_dict = dms.get_notification_dict()
172
173         subs = uow.subscriptions.list()
174         for sub in subs:
175             sub_data = sub.serialize()
176             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
177             filters = handle_filter(
178                 sub_data['filter'], 'DeploymentManagerInfo')
179             logger.debug(f'filters: {filters}, sub_data: {sub_data}')
180
181             if not filters or filters[0] == 0 or check_filters(
182                     filters, sub_data, uow.deployment_managers,
183                     ocloud.DeploymentManager,
184                     ocloud.DeploymentManager.deploymentManagerId, data.id):
185                 callback_smo(notifications, sub, data, dms_dict)
186                 continue
187
188             logger.info('Subscription {} filter hit, skip '
189                         'DeploymentManager {}.'
190                         .format(sub_data['subscriptionId'], data.id))
191
192
193 def _notify_ocloud(uow, notifications, data):
194     with uow:
195         ocloud = uow.oclouds.get(data.id)
196         if ocloud is None:
197             logger.warning(
198                 'oCloud {} does not exists.'.format(data.id))
199             return
200
201         ocloud_dict = ocloud.get_notification_dict()
202
203         subs = uow.subscriptions.list()
204         for sub in subs:
205             sub_data = sub.serialize()
206             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
207             filters = handle_filter(
208                 sub_data['filter'], 'CloudInfo')
209             logger.debug(f'filters: {filters}, sub_data: {sub_data}')
210
211             if not filters or filters[0] == 0 or check_filters(
212                     filters, sub_data, uow.oclouds,
213                     ocloud.Ocloud,
214                     ocloud.Ocloud.oCloudId, data.id):
215                 callback_smo(notifications, sub, data, ocloud_dict)
216                 continue
217
218             logger.info('Subscription {} filter hit, skip Cloud {}.'
219                         .format(sub_data['subscriptionId'], data.id))
220
221
222 def _notify_resource(uow, notifications, data):
223     with uow:
224         resource = uow.resources.get(data.id)
225         if resource is None:
226             logger.warning('Resource {} does not exists.'.format(data.id))
227             return
228         res_pool_id = resource.serialize()['resourcePoolId']
229         logger.debug('res pool id is {}'.format(res_pool_id))
230
231         res_dict = resource.get_notification_dict()
232
233         subs = uow.subscriptions.list()
234         for sub in subs:
235             sub_data = sub.serialize()
236             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
237             filters = handle_filter(sub_data['filter'], 'ResourceInfo')
238             if not filters or filters[0] == 0:
239                 callback_smo(notifications, sub, data, res_dict)
240                 continue
241             if filters[0] > 0 and not filters[1]:
242                 continue
243             filter_hit = False
244             for filter in filters[1]:
245                 try:
246                     args = gen_orm_filter(ocloud.Resource, filter)
247                 except KeyError:
248                     logger.warning(
249                         'Subscription {} filter {} has wrong attribute '
250                         'name or value. Ignore the filter.'.format(
251                             sub_data['subscriptionId'],
252                             sub_data['filter']))
253                     continue
254                 if len(args) == 0 and 'objectType' in filter:
255                     filter_hit = True
256                     break
257                 args.append(ocloud.Resource.resourceId == data.id)
258                 obj_count, _ = uow.resources.list_with_count(
259                     res_pool_id, *args)
260                 if obj_count > 0:
261                     filter_hit = True
262                     break
263             if filter_hit:
264                 logger.info('Subscription {} filter hit, skip Resource {}.'
265                             .format(sub_data['subscriptionId'], data.id))
266             else:
267                 callback_smo(notifications, sub, data, res_dict)
268
269
270 def callback_smo(notifications: AbstractNotifications, sub: Subscription,
271                  msg: Message2SMO, obj_dict: dict = None):
272     sub_data = sub.serialize()
273     callback = {
274         'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
275         'notificationEventType': msg.notificationEventType,
276         'objectRef': msg.objectRef,
277         'updateTime': msg.updatetime
278     }
279     if msg.notificationEventType in [NotificationEventEnum.DELETE,
280                                      NotificationEventEnum.MODIFY]:
281         callback['priorObjectState'] = json.dumps(obj_dict)
282     if msg.notificationEventType in [NotificationEventEnum.CREATE,
283                                      NotificationEventEnum.MODIFY]:
284         callback['postObjectState'] = json.dumps(obj_dict)
285     if msg.notificationEventType == NotificationEventEnum.DELETE:
286         callback.pop('objectRef')
287     logger.info('callback URL: {}'.format(sub_data['callback']))
288     logger.debug('callback data: {}'.format(json.dumps(callback)))
289
290     return notifications.send(sub_data['callback'], callback)