1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from urllib.parse import urlparse
21 # from o2common.config import config
22 from o2common.domain.filter import gen_orm_filter
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service.command.handler import get_https_conn_default
25 from o2common.service.command.handler import get_http_conn
26 from o2common.service.command.handler import get_https_conn_selfsigned
27 from o2common.service.command.handler import post_data
29 from o2ims.domain import commands, ocloud
30 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
33 from o2common.helper import o2logging
34 logger = o2logging.get_logger(__name__)
37 # # Maybe another MQ server
38 # r = redis.Redis(**config.get_redis_host_and_port())
41 def notify_change_to_smo(
42 cmd: commands.PubMessage2SMO,
43 uow: AbstractUnitOfWork,
45 logger.debug('In notify_change_to_smo')
47 if msg_type == 'ResourceType':
48 _notify_resourcetype(uow, cmd.data)
49 elif msg_type == 'ResourcePool':
50 _notify_resourcepool(uow, cmd.data)
51 elif msg_type == 'Dms':
52 _notify_dms(uow, cmd.data)
53 elif msg_type == 'Resource':
54 _notify_resource(uow, cmd.data)
57 def _notify_resourcetype(uow, data):
59 resource_type = uow.resource_types.get(data.id)
60 if resource_type is None:
61 logger.warning('ResourceType {} does not exists.'.format(data.id))
63 resource_type_dict = {
64 'resourceTypeId': resource_type.resourceTypeId,
65 'name': resource_type.name,
66 'description': resource_type.description,
67 'vendor': resource_type.vendor,
68 'model': resource_type.model,
69 'version': resource_type.version,
70 # 'alarmDictionary': resource_type.alarmDictionary.serialize()
73 subs = uow.subscriptions.list()
75 sub_data = sub.serialize()
76 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
77 filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
79 callback_smo(sub, data, resource_type_dict)
82 for filter in filters:
84 args = gen_orm_filter(ocloud.ResourceType, filter)
87 'Subscription {} filter {} has wrong attribute '
88 'name or value. Ignore the filter.'.format(
89 sub_data['subscriptionId'],
92 if len(args) == 0 and 'objectType' in filter:
95 args.append(ocloud.ResourceType.resourceTypeId == data.id)
96 obj_count, _ = uow.resource_types.list_with_count(*args)
101 logger.info('Subscription {} filter hit, skip ResourceType {}.'
102 .format(sub_data['subscriptionId'], data.id))
104 callback_smo(sub, data, resource_type_dict)
107 def _notify_resourcepool(uow, data):
109 resource_pool = uow.resource_pools.get(data.id)
110 if resource_pool is None:
111 logger.warning('ResourcePool {} does not exists.'.format(data.id))
113 resource_pool_dict = {
114 'resourcePoolId': resource_pool.resourcePoolId,
115 'oCloudId': resource_pool.oCloudId,
116 'globalLocationId': resource_pool.globalLocationId,
117 'name': resource_pool.name,
118 'description': resource_pool.description
121 subs = uow.subscriptions.list()
123 sub_data = sub.serialize()
124 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
125 filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
127 callback_smo(sub, data, resource_pool_dict)
130 for filter in filters:
132 args = gen_orm_filter(ocloud.ResourcePool, filter)
135 'Subscription {} filter {} has wrong attribute '
136 'name or value. Ignore the filter.'.format(
137 sub_data['subscriptionId'],
140 if len(args) == 0 and 'objectType' in filter:
143 args.append(ocloud.ResourcePool.resourcePoolId == data.id)
144 obj_count, _ = uow.resource_pools.list_with_count(*args)
149 logger.info('Subscription {} filter hit, skip ResourcePool {}.'
150 .format(sub_data['subscriptionId'], data.id))
152 callback_smo(sub, data, resource_pool_dict)
155 def _notify_dms(uow, data):
157 dms = uow.deployment_managers.get(data.id)
160 'DeploymentManager {} does not exists.'.format(data.id))
163 'deploymentManagerId': dms.deploymentManagerId,
165 'description': dms.description,
166 'oCloudId': dms.oCloudId,
167 'serviceUri': dms.serviceUri
170 subs = uow.subscriptions.list()
172 sub_data = sub.serialize()
173 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
174 filters = handle_filter(
175 sub_data['filter'], 'DeploymentManagerInfo')
177 callback_smo(sub, data, dms_dict)
180 for filter in filters:
182 args = gen_orm_filter(ocloud.DeploymentManager, filter)
185 'Subscription {} filter {} has wrong attribute '
186 'name or value. Ignore the filter.'.format(
187 sub_data['subscriptionId'],
190 if len(args) == 0 and 'objectType' in filter:
194 ocloud.DeploymentManager.deploymentManagerId == data.id)
195 obj_count, _ = uow.deployment_managers.list_with_count(*args)
200 logger.info('Subscription {} filter hit, skip '
201 'DeploymentManager {}.'
202 .format(sub_data['subscriptionId'], data.id))
204 callback_smo(sub, data, dms_dict)
207 def _notify_resource(uow, data):
209 resource = uow.resources.get(data.id)
211 logger.warning('Resource {} does not exists.'.format(data.id))
213 res_pool_id = resource.serialize()['resourcePoolId']
214 logger.debug('res pool id is {}'.format(res_pool_id))
216 'resourceId': resource.resourceId,
217 'description': resource.description,
218 'resourceTypeId': resource.resourceTypeId,
219 'resourcePoolId': resource.resourcePoolId,
220 'globalAssetId': resource.globalAssetId
223 subs = uow.subscriptions.list()
225 sub_data = sub.serialize()
226 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
227 filters = handle_filter(sub_data['filter'], 'ResourceInfo')
229 callback_smo(sub, data, res_dict)
232 for filter in filters:
234 args = gen_orm_filter(ocloud.Resource, filter)
237 'Subscription {} filter {} has wrong attribute '
238 'name or value. Ignore the filter.'.format(
239 sub_data['subscriptionId'],
242 if len(args) == 0 and 'objectType' in filter:
245 args.append(ocloud.Resource.resourceId == data.id)
246 obj_count, _ = uow.resources.list_with_count(
252 logger.info('Subscription {} filter hit, skip Resource {}.'
253 .format(sub_data['subscriptionId'], data.id))
255 callback_smo(sub, data, res_dict)
258 def handle_filter(filter: str, f_type: str):
261 filter_strip = filter.strip(' []')
262 filter_list = filter_strip.split('|')
264 for sub_filter in filter_list:
265 exprs = sub_filter.split(';')
269 expr_strip = expr.strip(' ()')
270 items = expr_strip.split(',')
271 item_key = items[1].strip()
272 if item_key != 'objectType':
275 objectTypeValue = items[2].strip()
277 if f_type == 'ResourceInfo':
278 filters.append(sub_filter)
280 if objectTypeValue == f_type:
281 filters.append(sub_filter)
285 def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None):
286 sub_data = sub.serialize()
288 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
289 'notificationEventType': msg.notificationEventType,
290 'objectRef': msg.objectRef,
291 'updateTime': msg.updatetime
293 if msg.notificationEventType in [NotificationEventEnum.DELETE,
294 NotificationEventEnum.MODIFY]:
295 callback['priorObjectState'] = json.dumps(obj_dict)
296 if msg.notificationEventType in [NotificationEventEnum.CREATE,
297 NotificationEventEnum.MODIFY]:
298 callback['postObjectState'] = json.dumps(obj_dict)
299 if msg.notificationEventType == NotificationEventEnum.DELETE:
300 callback.pop('objectRef')
301 callback_data = json.dumps(callback)
302 logger.info('callback URL: {}'.format(sub_data['callback']))
303 logger.debug('callback data: {}'.format(callback_data))
305 # Call SMO through the SMO callback url
306 o = urlparse(sub_data['callback'])
307 if o.scheme == 'https':
308 conn = get_https_conn_default(o.netloc)
310 conn = get_http_conn(o.netloc)
312 rst, status = post_data(conn, o.path, callback_data)
315 'Notify to SMO successed with status: {}'.format(status))
317 logger.error('Notify Response code is: {}'.format(status))
318 except ssl.SSLCertVerificationError as e:
320 'Notify try to post data with trusted ca failed: {}'.format(e))
321 if 'self signed' in str(e):
322 conn = get_https_conn_selfsigned(o.netloc)
324 return post_data(conn, o.path, callback_data)
325 except Exception as e:
327 'Notify post data with self-signed ca \
328 failed: {}'.format(e))
329 # TODO: write the status to extension db table.
332 except Exception as e:
333 logger.critical('Notify except: {}'.format(e))