1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from urllib.parse import urlparse
21 # from o2common.config import config
22 from o2common.domain.filter import gen_orm_filter
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service.command.handler import get_https_conn_default
25 from o2common.service.command.handler import get_http_conn
26 from o2common.service.command.handler import get_https_conn_selfsigned
27 from o2common.service.command.handler import post_data
29 from o2ims.domain import commands, ocloud
30 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
33 from o2common.helper import o2logging
34 logger = o2logging.get_logger(__name__)
37 # # Maybe another MQ server
38 # r = redis.Redis(**config.get_redis_host_and_port())
41 def notify_change_to_smo(
42 cmd: commands.PubMessage2SMO,
43 uow: AbstractUnitOfWork,
45 logger.debug('In notify_change_to_smo')
47 if msg_type == 'ResourceType':
48 _notify_resourcetype(uow, cmd.data)
49 elif msg_type == 'ResourcePool':
50 _notify_resourcepool(uow, cmd.data)
51 elif msg_type == 'Dms':
52 _notify_dms(uow, cmd.data)
53 elif msg_type == 'Resource':
54 _notify_resource(uow, cmd.data)
57 def _notify_resourcetype(uow, data):
59 resource_type = uow.resource_types.get(data.id)
60 if resource_type is None:
61 logger.warning('ResourceType {} does not exists.'.format(data.id))
63 resource_type_dict = {
64 'resourceTypeId': resource_type.resourceTypeId,
65 'name': resource_type.name,
66 'description': resource_type.description,
67 'vendor': resource_type.vendor,
68 'model': resource_type.model,
69 'version': resource_type.version,
70 # 'alarmDictionary': resource_type.alarmDictionary.serialize()
73 subs = uow.subscriptions.list()
75 sub_data = sub.serialize()
76 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
77 filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
79 callback_smo(sub, data, resource_type_dict)
82 for filter in filters:
84 args = gen_orm_filter(ocloud.ResourceType, filter)
87 'Subscription {} filter {} has wrong attribute '
88 'name or value. Ignore the filter.'.format(
89 sub_data['subscriptionId'],
94 args.append(ocloud.ResourceType.resourceTypeId == data.id)
95 ret = uow.resource_types.list_with_count(*args)
98 'ResourcePool {} skip for subscription {} because of'
100 .format(data.id, sub_data['subscriptionId']))
103 if filter_effect > 0:
105 callback_smo(sub, data, resource_type_dict)
108 def _notify_resourcepool(uow, data):
110 resource_pool = uow.resource_pools.get(data.id)
111 if resource_pool is None:
112 logger.warning('ResourcePool {} does not exists.'.format(data.id))
114 resource_pool_dict = {
115 'resourcePoolId': resource_pool.resourcePoolId,
116 'oCloudId': resource_pool.oCloudId,
117 'globalLocationId': resource_pool.globalLocationId,
118 'name': resource_pool.name,
119 'description': resource_pool.description
122 subs = uow.subscriptions.list()
124 sub_data = sub.serialize()
125 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
126 filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
128 callback_smo(sub, data, resource_pool_dict)
131 for filter in filters:
133 args = gen_orm_filter(ocloud.ResourcePool, filter)
136 'Subscription {} filter {} has wrong attribute '
137 'name or value. Ignore the filter.'.format(
138 sub_data['subscriptionId'],
143 args.append(ocloud.ResourcePool.resourcePoolId == data.id)
144 ret = uow.resource_pools.list_with_count(*args)
147 'ResourcePool {} skip for subscription {} because of'
149 .format(data.id, sub_data['subscriptionId']))
152 if filter_effect > 0:
154 callback_smo(sub, data, resource_pool_dict)
157 def _notify_dms(uow, data):
159 dms = uow.deployment_managers.get(data.id)
162 'DeploymentManager {} does not exists.'.format(data.id))
165 'deploymentManagerId': dms.deploymentManagerId,
167 'description': dms.description,
168 'oCloudId': dms.oCloudId,
169 'serviceUri': dms.serviceUri
172 subs = uow.subscriptions.list()
174 sub_data = sub.serialize()
175 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
176 filters = handle_filter(
177 sub_data['filter'], 'DeploymentManagerInfo')
179 callback_smo(sub, data, dms_dict)
182 for filter in filters:
184 args = gen_orm_filter(ocloud.DeploymentManager, filter)
187 'Subscription {} filter {} has wrong attribute '
188 'name or value. Ignore the filter.'.format(
189 sub_data['subscriptionId'],
195 ocloud.DeploymentManager.deploymentManagerId == data.id)
196 ret = uow.deployment_managers.list_with_count(*args)
199 'DeploymentManager {} skip for subscription {} because'
201 .format(data.id, sub_data['subscriptionId']))
204 if filter_effect > 0:
206 callback_smo(sub, data, dms_dict)
209 def _notify_resource(uow, data):
211 resource = uow.resources.get(data.id)
213 logger.warning('Resource {} does not exists.'.format(data.id))
215 res_pool_id = resource.serialize()['resourcePoolId']
216 logger.debug('res pool id is {}'.format(res_pool_id))
218 'resourceId': resource.resourceId,
219 'description': resource.description,
220 'resourceTypeId': resource.resourceTypeId,
221 'resourcePoolId': resource.resourcePoolId,
222 'globalAssetId': resource.globalAssetId
225 subs = uow.subscriptions.list()
227 sub_data = sub.serialize()
228 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
229 filters = handle_filter(sub_data['filter'], 'ResourceInfo')
231 callback_smo(sub, data, res_dict)
234 for filter in filters:
236 args = gen_orm_filter(ocloud.Resource, filter)
239 'Subscription {} filter {} has wrong attribute '
240 'name or value. Ignore the filter.'.format(
241 sub_data['subscriptionId'],
246 args.append(ocloud.Resource.resourceId == data.id)
247 ret = uow.resources.list_with_count(res_pool_id, *args)
250 'Resource {} skip for subscription {} because of '
252 .format(data.id, sub_data['subscriptionId']))
255 if filter_effect > 0:
257 callback_smo(sub, data, res_dict)
260 def handle_filter(filter: str, f_type: str):
263 filter_strip = filter.strip(' []')
264 filter_list = filter_strip.split('|')
266 for sub_filter in filter_list:
267 exprs = sub_filter.split(';')
271 expr_strip = expr.strip(' ()')
272 items = expr_strip.split(',')
273 item_key = items[1].strip()
274 if item_key != 'objectType':
277 objectTypeValue = items[2].strip()
279 if f_type == 'ResourceInfo':
280 filters.append(sub_filter)
282 if objectTypeValue == f_type:
283 filters.append(sub_filter)
287 def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None):
288 sub_data = sub.serialize()
290 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
291 'notificationEventType': msg.notificationEventType,
292 'objectRef': msg.objectRef,
293 'updateTime': msg.updatetime
295 if msg.notificationEventType in [NotificationEventEnum.DELETE,
296 NotificationEventEnum.MODIFY]:
297 callback['priorObjectState'] = obj_dict
298 if msg.notificationEventType in [NotificationEventEnum.CREATE,
299 NotificationEventEnum.MODIFY]:
300 callback['postObjectState'] = obj_dict
301 if msg.notificationEventType == NotificationEventEnum.DELETE:
302 callback.pop('objectRef')
303 callback_data = json.dumps(callback)
304 logger.info('URL: {}, data: {}'.format(
305 sub_data['callback'], callback_data))
307 # Call SMO through the SMO callback url
308 o = urlparse(sub_data['callback'])
309 if o.scheme == 'https':
310 conn = get_https_conn_default(o.netloc)
312 conn = get_http_conn(o.netloc)
314 rst, status = post_data(conn, o.path, callback_data)
317 'Notify to SMO successed with status: {}'.format(status))
319 logger.error('Notify Response code is: {}'.format(status))
320 except ssl.SSLCertVerificationError as e:
322 'Notify try to post data with trusted ca failed: {}'.format(e))
323 if 'self signed' in str(e):
324 conn = get_https_conn_selfsigned(o.netloc)
326 return post_data(conn, o.path, callback_data)
327 except Exception as e:
329 'Notify post data with self-signed ca \
330 failed: {}'.format(e))
331 # TODO: write the status to extension db table.
334 except Exception as e:
335 logger.critical('Notify except: {}'.format(e))