1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 # from o2common.config import conf
20 from o2common.domain.filter import gen_orm_filter
21 from o2common.service.unit_of_work import AbstractUnitOfWork
22 from o2common.adapter.notifications import AbstractNotifications
24 from o2ims.domain import commands, ocloud
25 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
28 from o2common.helper import o2logging
29 logger = o2logging.get_logger(__name__)
32 # # Maybe another MQ server
33 # r = redis.Redis(**config.get_redis_host_and_port())
36 def notify_change_to_smo(
37 cmd: commands.PubMessage2SMO,
38 uow: AbstractUnitOfWork,
39 notifications: AbstractNotifications,
41 logger.debug('In notify_change_to_smo')
43 if msg_type == 'ResourceType':
44 _notify_resourcetype(uow, notifications, cmd.data)
45 elif msg_type == 'ResourcePool':
46 _notify_resourcepool(uow, notifications, cmd.data)
47 elif msg_type == 'Dms':
48 _notify_dms(uow, notifications, cmd.data)
49 elif msg_type == 'Resource':
50 _notify_resource(uow, notifications, cmd.data)
53 # def get_resource_dict(resource_type):
55 # 'resourceTypeId': resource_type.resourceTypeId,
56 # 'name': resource_type.name,
57 # 'description': resource_type.description,
58 # 'vendor': resource_type.vendor,
59 # 'model': resource_type.model,
60 # 'version': resource_type.version,
64 # def handle_filter(filter: str, f_type: str):
68 # filter_list = filter.strip(' []').split('|')
70 # match_type_count = 0
72 # for sub_filter in filter_list:
73 # objectType, objectTypeValue = get_object_type_and_value(sub_filter)
74 # if objectTypeValue == f_type:
75 # match_type_count += 1
76 # filters.append(sub_filter)
77 # elif not objectType and f_type == 'ResourceInfo':
78 # filters.append(sub_filter)
80 # return match_type_count, filters
83 # def get_object_type_and_value(sub_filter):
84 # exprs = sub_filter.split(';')
86 # items = expr.strip(' ()').split(',')
87 # item_key = items[1].strip()
88 # if item_key == 'objectType':
89 # return True, items[2].strip()
93 # def check_filters(filters, sub_data, uow, id):
94 # for filter in filters[1]:
95 # if isinstance(filter, bool) and filter:
99 # args = gen_orm_filter(ocloud.ResourceType, filter)
102 # 'Subscription {} filter {} has wrong attribute '
103 # 'name or value. Ignore the filter.'.format(
104 # sub_data['subscriptionId'],
105 # sub_data['filter']))
108 # if len(args) == 0 and 'objectType' in filter:
111 # args.append(ocloud.ResourceType.resourceTypeId == id)
112 # obj_count, _ = uow.resource_types.list_with_count(*args)
118 # def _notify_resourcetype(uow, notifications, data):
120 # resource_type = uow.resource_types.get(data.id)
121 # if resource_type is None:
122 # logger.warning('ResourceType {} does not exists.'.format(data.id))
125 # resource_type_dict = get_resource_dict(resource_type)
127 # subs = uow.subscriptions.list()
129 # sub_data = sub.serialize()
130 # filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
131 # logger.debug(f'filters: {filters}, sub_data: {sub_data}')
133 # if not filters or filters[0] == 0 or check_filters(
134 # filters, sub_data, uow, data.id):
135 # callback_smo(notifications, sub, data, resource_type_dict)
138 # logger.info('Subscription {} filter hit, skip ResourceType {}.'
139 # .format(sub_data['subscriptionId'], data.id))
141 def _notify_resourcetype(uow, notifications, data):
143 resource_type = uow.resource_types.get(data.id)
144 if resource_type is None:
145 logger.warning('ResourceType {} does not exists.'.format(data.id))
147 resource_type_dict = {
148 'resourceTypeId': resource_type.resourceTypeId,
149 'name': resource_type.name,
150 'description': resource_type.description,
151 'vendor': resource_type.vendor,
152 'model': resource_type.model,
153 'version': resource_type.version,
154 # 'alarmDictionary': resource_type.alarmDictionary.serialize()
157 subs = uow.subscriptions.list()
159 sub_data = sub.serialize()
160 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
161 filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
162 if not filters or filters[0] == 0:
163 callback_smo(notifications, sub, data, resource_type_dict)
165 if filters[0] > 0 and not filters[1]:
169 for filter in filters[1]:
171 args = gen_orm_filter(ocloud.ResourceType, filter)
174 'Subscription {} filter {} has wrong attribute '
175 'name or value. Ignore the filter.'.format(
176 sub_data['subscriptionId'],
179 if len(args) == 0 and 'objectType' in filter:
182 args.append(ocloud.ResourceType.resourceTypeId == data.id)
183 obj_count, _ = uow.resource_types.list_with_count(*args)
188 logger.info('Subscription {} filter hit, skip ResourceType {}.'
189 .format(sub_data['subscriptionId'], data.id))
191 callback_smo(notifications, sub, data, resource_type_dict)
194 def _notify_resourcepool(uow, notifications, data):
196 resource_pool = uow.resource_pools.get(data.id)
197 if resource_pool is None:
198 logger.warning('ResourcePool {} does not exists.'.format(data.id))
200 resource_pool_dict = {
201 'resourcePoolId': resource_pool.resourcePoolId,
202 'oCloudId': resource_pool.oCloudId,
203 'globalLocationId': resource_pool.globalLocationId,
204 'name': resource_pool.name,
205 'description': resource_pool.description
208 subs = uow.subscriptions.list()
210 sub_data = sub.serialize()
211 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
212 filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
213 if not filters or filters[0] == 0:
214 callback_smo(notifications, sub, data, resource_pool_dict)
216 if filters[0] > 0 and not filters[1]:
219 for filter in filters[1]:
221 args = gen_orm_filter(ocloud.ResourcePool, filter)
224 'Subscription {} filter {} has wrong attribute '
225 'name or value. Ignore the filter.'.format(
226 sub_data['subscriptionId'],
229 if len(args) == 0 and 'objectType' in filter:
232 args.append(ocloud.ResourcePool.resourcePoolId == data.id)
233 obj_count, _ = uow.resource_pools.list_with_count(*args)
238 logger.info('Subscription {} filter hit, skip ResourcePool {}.'
239 .format(sub_data['subscriptionId'], data.id))
241 callback_smo(notifications, sub, data, resource_pool_dict)
244 def _notify_dms(uow, notifications, data):
246 dms = uow.deployment_managers.get(data.id)
249 'DeploymentManager {} does not exists.'.format(data.id))
252 'deploymentManagerId': dms.deploymentManagerId,
254 'description': dms.description,
255 'oCloudId': dms.oCloudId,
256 'serviceUri': dms.serviceUri
259 subs = uow.subscriptions.list()
261 sub_data = sub.serialize()
262 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
263 filters_rst = handle_filter(
264 sub_data['filter'], 'DeploymentManagerInfo')
265 if not filters_rst or filters_rst[0] == 0:
266 callback_smo(notifications, sub, data, dms_dict)
268 if filters_rst[0] > 0 and not filters_rst[1]:
271 for filter in filters_rst[1]:
273 args = gen_orm_filter(ocloud.DeploymentManager, filter)
276 'Subscription {} filter {} has wrong attribute '
277 'name or value. Ignore the filter.'.format(
278 sub_data['subscriptionId'],
281 if len(args) == 0 and 'objectType' in filter:
285 ocloud.DeploymentManager.deploymentManagerId == data.id)
286 obj_count, _ = uow.deployment_managers.list_with_count(*args)
291 logger.info('Subscription {} filter hit, skip '
292 'DeploymentManager {}.'
293 .format(sub_data['subscriptionId'], data.id))
296 # callback_smo(notifications, sub, data, dms_dict)
299 def _notify_resource(uow, notifications, data):
301 resource = uow.resources.get(data.id)
303 logger.warning('Resource {} does not exists.'.format(data.id))
305 res_pool_id = resource.serialize()['resourcePoolId']
306 logger.debug('res pool id is {}'.format(res_pool_id))
308 'resourceId': resource.resourceId,
309 'description': resource.description,
310 'resourceTypeId': resource.resourceTypeId,
311 'resourcePoolId': resource.resourcePoolId,
312 'globalAssetId': resource.globalAssetId
315 subs = uow.subscriptions.list()
317 sub_data = sub.serialize()
318 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
319 filters = handle_filter(sub_data['filter'], 'ResourceInfo')
320 if not filters or filters[0] == 0:
321 callback_smo(notifications, sub, data, res_dict)
323 if filters[0] > 0 and not filters[1]:
326 for filter in filters[1]:
328 args = gen_orm_filter(ocloud.Resource, filter)
331 'Subscription {} filter {} has wrong attribute '
332 'name or value. Ignore the filter.'.format(
333 sub_data['subscriptionId'],
336 if len(args) == 0 and 'objectType' in filter:
339 args.append(ocloud.Resource.resourceId == data.id)
340 obj_count, _ = uow.resources.list_with_count(
346 logger.info('Subscription {} filter hit, skip Resource {}.'
347 .format(sub_data['subscriptionId'], data.id))
349 callback_smo(notifications, sub, data, res_dict)
352 def handle_filter(filter: str, f_type: str):
356 filter_strip = filter.strip(' []')
357 filter_list = filter_strip.split('|')
361 for sub_filter in filter_list:
362 exprs = sub_filter.split(';')
366 expr_strip = expr.strip(' ()')
367 items = expr_strip.split(',')
368 item_key = items[1].strip()
369 if item_key != 'objectType':
372 objectTypeValue = items[2].strip()
374 if f_type == 'ResourceInfo':
375 filters.append(sub_filter)
377 if objectTypeValue == f_type:
378 match_type_count += 1
379 filters.append(sub_filter)
380 return (match_type_count, filters)
383 def callback_smo(notifications: AbstractNotifications, sub: Subscription,
384 msg: Message2SMO, obj_dict: dict = None):
385 sub_data = sub.serialize()
387 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
388 'notificationEventType': msg.notificationEventType,
389 'objectRef': msg.objectRef,
390 'updateTime': msg.updatetime
392 if msg.notificationEventType in [NotificationEventEnum.DELETE,
393 NotificationEventEnum.MODIFY]:
394 callback['priorObjectState'] = json.dumps(obj_dict)
395 if msg.notificationEventType in [NotificationEventEnum.CREATE,
396 NotificationEventEnum.MODIFY]:
397 callback['postObjectState'] = json.dumps(obj_dict)
398 if msg.notificationEventType == NotificationEventEnum.DELETE:
399 callback.pop('objectRef')
400 logger.info('callback URL: {}'.format(sub_data['callback']))
401 logger.debug('callback data: {}'.format(json.dumps(callback)))
403 return notifications.send(sub_data['callback'], callback)
405 # Call SMO through the SMO callback url
406 # o = urlparse(sub_data['callback'])
407 # if o.scheme == 'https':
408 # conn = get_https_conn_default(o.netloc)
410 # conn = get_http_conn(o.netloc)
412 # rst, status = post_data(conn, o.path, callback_data)
415 # 'Notify to SMO successed with status: {}'.format(status))
417 # logger.error('Notify Response code is: {}'.format(status))
418 # except ssl.SSLCertVerificationError as e:
420 # 'Notify try to post data with trusted ca failed: {}'.format(e))
421 # if 'self signed' in str(e):
422 # conn = get_https_conn_selfsigned(o.netloc)
424 # return post_data(conn, o.path, callback_data)
425 # except Exception as e:
427 # 'Notify post data with self-signed ca \
428 # failed: {}'.format(e))
431 # except Exception as e:
432 # logger.critical('Notify except: {}'.format(e))