1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from urllib.parse import urlparse
21 # from o2common.config import config
22 from o2common.domain.filter import gen_orm_filter
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service.command.handler import get_https_conn_default
25 from o2common.service.command.handler import get_http_conn
26 from o2common.service.command.handler import get_https_conn_selfsigned
27 from o2common.service.command.handler import post_data
29 from o2ims.domain import commands, ocloud
30 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
33 from o2common.helper import o2logging
34 logger = o2logging.get_logger(__name__)
37 # # Maybe another MQ server
38 # r = redis.Redis(**config.get_redis_host_and_port())
41 def notify_change_to_smo(
42 cmd: commands.PubMessage2SMO,
43 uow: AbstractUnitOfWork,
45 logger.debug('In notify_change_to_smo')
47 if msg_type == 'ResourceType':
48 _notify_resourcetype(uow, cmd.data)
49 elif msg_type == 'ResourcePool':
50 _notify_resourcepool(uow, cmd.data)
51 elif msg_type == 'Dms':
52 _notify_dms(uow, cmd.data)
53 elif msg_type == 'Resource':
54 _notify_resource(uow, cmd.data)
57 def _notify_resourcetype(uow, data):
59 resource_type = uow.resource_types.get(data.id)
60 if resource_type is None:
61 logger.warning('ResourceType {} does not exists.'.format(data.id))
63 resource_type_dict = {
64 'resourceTypeId': resource_type.resourceTypeId,
65 'name': resource_type.name,
66 'description': resource_type.description,
67 'vendor': resource_type.vendor,
68 'model': resource_type.model,
69 'version': resource_type.version,
70 # 'alarmDictionary': resource_type.alarmDictionary.serialize()
73 subs = uow.subscriptions.list()
75 sub_data = sub.serialize()
76 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
77 filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
79 callback_smo(sub, data, resource_type_dict)
82 for filter in filters:
84 args = gen_orm_filter(ocloud.ResourceType, filter)
87 'Subscription {} filter {} has wrong attribute '
88 'name or value. Ignore the filter.'.format(
89 sub_data['subscriptionId'],
92 if len(args) == 0 and 'objectType' in filter:
95 args.append(ocloud.ResourceType.resourceTypeId == data.id)
96 ret = uow.resource_types.list_with_count(*args)
99 'ResourcePool {} skip for subscription {} because of'
101 .format(data.id, sub_data['subscriptionId']))
104 if filter_effect > 0:
106 callback_smo(sub, data, resource_type_dict)
109 def _notify_resourcepool(uow, data):
111 resource_pool = uow.resource_pools.get(data.id)
112 if resource_pool is None:
113 logger.warning('ResourcePool {} does not exists.'.format(data.id))
115 resource_pool_dict = {
116 'resourcePoolId': resource_pool.resourcePoolId,
117 'oCloudId': resource_pool.oCloudId,
118 'globalLocationId': resource_pool.globalLocationId,
119 'name': resource_pool.name,
120 'description': resource_pool.description
123 subs = uow.subscriptions.list()
125 sub_data = sub.serialize()
126 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
127 filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
129 callback_smo(sub, data, resource_pool_dict)
132 for filter in filters:
134 args = gen_orm_filter(ocloud.ResourcePool, filter)
137 'Subscription {} filter {} has wrong attribute '
138 'name or value. Ignore the filter.'.format(
139 sub_data['subscriptionId'],
142 if len(args) == 0 and 'objectType' in filter:
145 args.append(ocloud.ResourcePool.resourcePoolId == data.id)
146 ret = uow.resource_pools.list_with_count(*args)
149 'ResourcePool {} skip for subscription {} because of'
151 .format(data.id, sub_data['subscriptionId']))
154 if filter_effect > 0:
156 callback_smo(sub, data, resource_pool_dict)
159 def _notify_dms(uow, data):
161 dms = uow.deployment_managers.get(data.id)
164 'DeploymentManager {} does not exists.'.format(data.id))
167 'deploymentManagerId': dms.deploymentManagerId,
169 'description': dms.description,
170 'oCloudId': dms.oCloudId,
171 'serviceUri': dms.serviceUri
174 subs = uow.subscriptions.list()
176 sub_data = sub.serialize()
177 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
178 filters = handle_filter(
179 sub_data['filter'], 'DeploymentManagerInfo')
181 callback_smo(sub, data, dms_dict)
184 for filter in filters:
186 args = gen_orm_filter(ocloud.DeploymentManager, filter)
189 'Subscription {} filter {} has wrong attribute '
190 'name or value. Ignore the filter.'.format(
191 sub_data['subscriptionId'],
194 if len(args) == 0 and 'objectType' in filter:
198 ocloud.DeploymentManager.deploymentManagerId == data.id)
199 ret = uow.deployment_managers.list_with_count(*args)
202 'DeploymentManager {} skip for subscription {} because'
204 .format(data.id, sub_data['subscriptionId']))
207 if filter_effect > 0:
209 callback_smo(sub, data, dms_dict)
212 def _notify_resource(uow, data):
214 resource = uow.resources.get(data.id)
216 logger.warning('Resource {} does not exists.'.format(data.id))
218 res_pool_id = resource.serialize()['resourcePoolId']
219 logger.debug('res pool id is {}'.format(res_pool_id))
221 'resourceId': resource.resourceId,
222 'description': resource.description,
223 'resourceTypeId': resource.resourceTypeId,
224 'resourcePoolId': resource.resourcePoolId,
225 'globalAssetId': resource.globalAssetId
228 subs = uow.subscriptions.list()
230 sub_data = sub.serialize()
231 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
232 filters = handle_filter(sub_data['filter'], 'ResourceInfo')
234 callback_smo(sub, data, res_dict)
237 for filter in filters:
239 args = gen_orm_filter(ocloud.Resource, filter)
242 'Subscription {} filter {} has wrong attribute '
243 'name or value. Ignore the filter.'.format(
244 sub_data['subscriptionId'],
247 if len(args) == 0 and 'objectType' in filter:
250 args.append(ocloud.Resource.resourceId == data.id)
251 ret = uow.resources.list_with_count(res_pool_id, *args)
254 'Resource {} skip for subscription {} because of '
256 .format(data.id, sub_data['subscriptionId']))
259 if filter_effect > 0:
261 callback_smo(sub, data, res_dict)
264 def handle_filter(filter: str, f_type: str):
267 filter_strip = filter.strip(' []')
268 filter_list = filter_strip.split('|')
270 for sub_filter in filter_list:
271 exprs = sub_filter.split(';')
275 expr_strip = expr.strip(' ()')
276 items = expr_strip.split(',')
277 item_key = items[1].strip()
278 if item_key != 'objectType':
281 objectTypeValue = items[2].strip()
283 if f_type == 'ResourceInfo':
284 filters.append(sub_filter)
286 if objectTypeValue == f_type:
287 filters.append(sub_filter)
291 def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None):
292 sub_data = sub.serialize()
294 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
295 'notificationEventType': msg.notificationEventType,
296 'objectRef': msg.objectRef,
297 'updateTime': msg.updatetime
299 if msg.notificationEventType in [NotificationEventEnum.DELETE,
300 NotificationEventEnum.MODIFY]:
301 callback['priorObjectState'] = json.dumps(obj_dict)
302 if msg.notificationEventType in [NotificationEventEnum.CREATE,
303 NotificationEventEnum.MODIFY]:
304 callback['postObjectState'] = json.dumps(obj_dict)
305 if msg.notificationEventType == NotificationEventEnum.DELETE:
306 callback.pop('objectRef')
307 callback_data = json.dumps(callback)
308 logger.info('URL: {}'.format(sub_data['callback']))
309 logger.debug('callback data: {}'.format(callback_data))
311 # Call SMO through the SMO callback url
312 o = urlparse(sub_data['callback'])
313 if o.scheme == 'https':
314 conn = get_https_conn_default(o.netloc)
316 conn = get_http_conn(o.netloc)
318 rst, status = post_data(conn, o.path, callback_data)
321 'Notify to SMO successed with status: {}'.format(status))
323 logger.error('Notify Response code is: {}'.format(status))
324 except ssl.SSLCertVerificationError as e:
326 'Notify try to post data with trusted ca failed: {}'.format(e))
327 if 'self signed' in str(e):
328 conn = get_https_conn_selfsigned(o.netloc)
330 return post_data(conn, o.path, callback_data)
331 except Exception as e:
333 'Notify post data with self-signed ca \
334 failed: {}'.format(e))
335 # TODO: write the status to extension db table.
338 except Exception as e:
339 logger.critical('Notify except: {}'.format(e))