1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from urllib.parse import urlparse
21 # from o2common.config import config
22 from o2common.domain.filter import gen_orm_filter
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service.command.handler import get_https_conn_default
25 from o2common.service.command.handler import get_http_conn
26 from o2common.service.command.handler import get_https_conn_selfsigned
27 from o2common.service.command.handler import post_data
29 from o2ims.domain import commands, ocloud
30 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
33 from o2common.helper import o2logging
34 logger = o2logging.get_logger(__name__)
37 # # Maybe another MQ server
38 # r = redis.Redis(**config.get_redis_host_and_port())
41 def notify_change_to_smo(
42 cmd: commands.PubMessage2SMO,
43 uow: AbstractUnitOfWork,
45 logger.debug('In notify_change_to_smo')
47 if msg_type == 'ResourceType':
48 _notify_resourcetype(uow, cmd.data)
49 elif msg_type == 'ResourcePool':
50 _notify_resourcepool(uow, cmd.data)
51 elif msg_type == 'Dms':
52 _notify_dms(uow, cmd.data)
53 elif msg_type == 'Resource':
54 _notify_resource(uow, cmd.data)
57 def _notify_resourcetype(uow, data):
59 resource_type = uow.resource_types.get(data.id)
60 if resource_type is None:
61 logger.warning('ResourceType {} does not exists.'.format(data.id))
63 resource_type_dict = {
64 'resourceTypeId': resource_type.resourceTypeId,
65 'name': resource_type.name,
66 'description': resource_type.description,
67 'vendor': resource_type.vendor,
68 'model': resource_type.model,
69 'version': resource_type.version,
70 # 'alarmDictionary': resource_type.alarmDictionary.serialize()
73 subs = uow.subscriptions.list()
75 sub_data = sub.serialize()
76 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
77 filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
79 callback_smo(sub, data, resource_type_dict)
82 for filter in filters:
84 args = gen_orm_filter(ocloud.ResourceType, filter)
87 'Subscription {} filter {} has wrong attribute '
88 'name or value. Ignore the filter.'.format(
89 sub_data['subscriptionId'],
92 args.append(ocloud.ResourceType.resourceTypeId == data.id)
93 ret = uow.resource_types.list_with_count(*args)
96 'ResourcePool {} skip for subscription {} because of'
98 .format(data.id, sub_data['subscriptionId']))
101 if filter_effect > 0:
103 callback_smo(sub, data, resource_type_dict)
106 def _notify_resourcepool(uow, data):
108 resource_pool = uow.resource_pools.get(data.id)
109 if resource_pool is None:
110 logger.warning('ResourcePool {} does not exists.'.format(data.id))
112 resource_pool_dict = {
113 'resourcePoolId': resource_pool.resourcePoolId,
114 'oCloudId': resource_pool.oCloudId,
115 'globalLocationId': resource_pool.globalLocationId,
116 'name': resource_pool.name,
117 'description': resource_pool.description
120 subs = uow.subscriptions.list()
122 sub_data = sub.serialize()
123 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
124 filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
126 callback_smo(sub, data, resource_pool_dict)
129 for filter in filters:
131 args = gen_orm_filter(ocloud.ResourcePool, filter)
134 'Subscription {} filter {} has wrong attribute '
135 'name or value. Ignore the filter.'.format(
136 sub_data['subscriptionId'],
139 args.append(ocloud.ResourcePool.resourcePoolId == data.id)
140 ret = uow.resource_pools.list_with_count(*args)
143 'ResourcePool {} skip for subscription {} because of'
145 .format(data.id, sub_data['subscriptionId']))
148 if filter_effect > 0:
150 callback_smo(sub, data, resource_pool_dict)
153 def _notify_dms(uow, data):
155 dms = uow.deployment_managers.get(data.id)
158 'DeploymentManager {} does not exists.'.format(data.id))
161 'deploymentManagerId': dms.deploymentManagerId,
163 'description': dms.description,
164 'oCloudId': dms.oCloudId,
165 'serviceUri': dms.serviceUri
168 subs = uow.subscriptions.list()
170 sub_data = sub.serialize()
171 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
172 filters = handle_filter(
173 sub_data['filter'], 'DeploymentManagerInfo')
175 callback_smo(sub, data, dms_dict)
178 for filter in filters:
180 args = gen_orm_filter(ocloud.DeploymentManager, filter)
183 'Subscription {} filter {} has wrong attribute '
184 'name or value. Ignore the filter.'.format(
185 sub_data['subscriptionId'],
189 ocloud.DeploymentManager.deploymentManagerId == data.id)
190 ret = uow.deployment_managers.list_with_count(*args)
193 'DeploymentManager {} skip for subscription {} because'
195 .format(data.id, sub_data['subscriptionId']))
198 if filter_effect > 0:
200 callback_smo(sub, data, dms_dict)
203 class FilterNotEffect(Exception):
207 class FilterEffect(Exception):
211 def _notify_resource(uow, data):
213 resource = uow.resources.get(data.id)
215 logger.warning('Resource {} does not exists.'.format(data.id))
217 res_pool_id = resource.serialize()['resourcePoolId']
218 logger.debug('res pool id is {}'.format(res_pool_id))
220 'resourceId': resource.resourceId,
221 'description': resource.description,
222 'resourceTypeId': resource.resourceTypeId,
223 'resourcePoolId': resource.resourcePoolId,
224 'globalAssetId': resource.globalAssetId
227 subs = uow.subscriptions.list()
229 sub_data = sub.serialize()
230 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
231 filters = handle_filter(sub_data['filter'], 'ResourceInfo')
233 callback_smo(sub, data, res_dict)
236 for filter in filters:
238 args = gen_orm_filter(ocloud.Resource, filter)
241 'Subscription {} filter {} has wrong attribute '
242 'name or value. Ignore the filter.'.format(
243 sub_data['subscriptionId'],
246 args.append(ocloud.Resource.resourceId == data.id)
247 ret = uow.resources.list_with_count(res_pool_id, *args)
250 'Resource {} skip for subscription {} because of '
252 .format(data.id, sub_data['subscriptionId']))
255 if filter_effect > 0:
257 callback_smo(sub, data, res_dict)
260 def handle_filter(filter: str, f_type: str):
263 filter_strip = filter.strip(' []')
264 filter_list = filter_strip.split('|')
266 for sub_filter in filter_list:
267 exprs = sub_filter.split(';')
271 expr_strip = expr.strip(' ()')
272 items = expr_strip.split(',')
273 item_key = items[1].strip()
274 if item_key != 'objectType':
277 objectTypeValue = items[2].strip()
279 if f_type == 'ResourceInfo':
280 filters.append(sub_filter)
282 if objectTypeValue == f_type:
283 filters.append(sub_filter)
287 def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None):
288 sub_data = sub.serialize()
290 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
291 'notificationEventType': msg.notificationEventType,
292 'objectRef': msg.objectRef,
293 'updateTime': msg.updatetime
295 if msg.notificationEventType in [NotificationEventEnum.DELETE,
296 NotificationEventEnum.MODIFY]:
297 callback['priorObjectState'] = obj_dict
298 if msg.notificationEventType in [NotificationEventEnum.CREATE,
299 NotificationEventEnum.MODIFY]:
300 callback['postObjectState'] = obj_dict
301 if msg.notificationEventType == NotificationEventEnum.DELETE:
302 callback.pop('objectRef')
303 callback_data = json.dumps(callback)
304 logger.info('URL: {}, data: {}'.format(
305 sub_data['callback'], callback_data))
307 # Call SMO through the SMO callback url
308 o = urlparse(sub_data['callback'])
309 if o.scheme == 'https':
310 conn = get_https_conn_default(o.netloc)
312 conn = get_http_conn(o.netloc)
314 rst, status = post_data(conn, o.path, callback_data)
317 'Notify to SMO successed with status: {}'.format(status))
319 logger.error('Notify Response code is: {}'.format(status))
320 except ssl.SSLCertVerificationError as e:
322 'Notify try to post data with trusted ca failed: {}'.format(e))
323 if 'self signed' in str(e):
324 conn = get_https_conn_selfsigned(o.netloc)
326 return post_data(conn, o.path, callback_data)
327 except Exception as e:
329 'Notify post data with self-signed ca \
330 failed: {}'.format(e))
331 # TODO: write the status to extension db table.
334 except Exception as e:
335 logger.critical('Notify except: {}'.format(e))