1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from urllib.parse import urlparse
21 # from o2common.config import config
22 from o2common.domain.filter import gen_orm_filter
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service.command.handler import get_https_conn_default
25 from o2common.service.command.handler import get_http_conn
26 from o2common.service.command.handler import get_https_conn_selfsigned
27 from o2common.service.command.handler import post_data
29 from o2ims.domain import commands, ocloud
30 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
33 from o2common.helper import o2logging
34 logger = o2logging.get_logger(__name__)
37 # # Maybe another MQ server
38 # r = redis.Redis(**config.get_redis_host_and_port())
41 def notify_change_to_smo(
42 cmd: commands.PubMessage2SMO,
43 uow: AbstractUnitOfWork,
45 logger.debug('In notify_change_to_smo')
47 if msg_type == 'ResourceType':
48 _notify_resourcetype(uow, cmd)
49 elif msg_type == 'ResourcePool':
50 _notify_resourcepool(uow, cmd)
51 elif msg_type == 'Dms':
53 elif msg_type == 'Resource':
54 _notify_resource(uow, cmd)
57 def _notify_resourcetype(uow, cmd):
61 resource_type = uow.resource_types.get(data.id)
62 if resource_type is None:
63 logger.warning('ResourceType {} does not exists.'.format(data.id))
65 resource_type_dict = {
66 'resourceTypeId': resource_type.resourceTypeId,
67 'name': resource_type.name,
68 'description': resource_type.description,
69 'vendor': resource_type.vendor,
70 'model': resource_type.model,
71 'version': resource_type.version,
72 # 'alarmDictionary': resource_type.alarmDictionary.serialize()
75 subs = uow.subscriptions.list()
77 sub_data = sub.serialize()
78 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
79 if not sub_data.get('filter', None):
80 callback_smo(sub, data, msg_type, resource_type_dict)
83 args = gen_orm_filter(ocloud.ResourceType, sub_data['filter'])
86 'Subscription {} filter {} has wrong attribute name '
87 'or value. Ignore the filter.'.format(
88 sub_data['subscriptionId'], sub_data['filter']))
89 callback_smo(sub, data, msg_type, resource_type_dict)
91 args.append(ocloud.ResourceType.resourceTypeId == data.id)
92 ret = uow.resource_types.list_with_count(*args)
95 'ResourceType {} skip for subscription {} because of the '
97 .format(data.id, sub_data['subscriptionId']))
99 callback_smo(sub, data, msg_type, resource_type_dict)
102 def _notify_resourcepool(uow, cmd):
106 resource_pool = uow.resource_pools.get(data.id)
107 if resource_pool is None:
108 logger.warning('ResourcePool {} does not exists.'.format(data.id))
110 resource_pool_dict = {
111 'resourcePoolId': resource_pool.resourcePoolId,
112 'oCloudId': resource_pool.oCloudId,
113 'globalLocationId': resource_pool.globalLocationId,
114 'name': resource_pool.name,
115 'description': resource_pool.description
118 subs = uow.subscriptions.list()
120 sub_data = sub.serialize()
121 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
122 if not sub_data.get('filter', None):
123 callback_smo(sub, data, msg_type, resource_pool_dict)
126 args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
129 'Subscription {} filter {} has wrong attribute name '
130 'or value. Ignore the filter.'.format(
131 sub_data['subscriptionId'], sub_data['filter']))
132 callback_smo(sub, data, msg_type, resource_pool_dict)
134 args.append(ocloud.ResourcePool.resourcePoolId == data.id)
135 ret = uow.resource_pools.list_with_count(*args)
138 'ResourcePool {} skip for subscription {} because of the '
140 .format(data.id, sub_data['subscriptionId']))
142 callback_smo(sub, data, msg_type, resource_pool_dict)
145 def _notify_dms(uow, cmd):
149 dms = uow.deployment_managers.get(data.id)
152 'DeploymentManager {} does not exists.'.format(data.id))
155 'deploymentManagerId': dms.deploymentManagerId,
157 'description': dms.description,
158 'oCloudId': dms.oCloudId,
159 'serviceUri': dms.serviceUri
162 subs = uow.subscriptions.list()
164 sub_data = sub.serialize()
165 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
166 if not sub_data.get('filter', None):
167 callback_smo(sub, data, msg_type, dms_dict)
170 args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
173 'Subscription {} filter {} has wrong attribute name '
174 'or value. Ignore the filter.'.format(
175 sub_data['subscriptionId'], sub_data['filter']))
176 callback_smo(sub, data, msg_type, dms_dict)
179 ocloud.DeploymentManager.deploymentManagerId == data.id)
180 ret = uow.deployment_managers.list_with_count(*args)
183 'DeploymentManager {} skip for subscription {} because of '
185 .format(data.id, sub_data['subscriptionId']))
187 callback_smo(sub, data)
188 callback_smo(sub, data, msg_type, dms_dict)
191 def _notify_resource(uow, cmd):
195 resource = uow.resources.get(data.id)
197 logger.warning('Resource {} does not exists.'.format(data.id))
199 res_pool_id = resource.serialize()['resourcePoolId']
200 logger.debug('res pool id is {}'.format(res_pool_id))
202 subs = uow.subscriptions.list()
204 sub_data = sub.serialize()
205 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
206 if not sub_data.get('filter', None):
207 callback_smo(sub, data, msg_type)
210 args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
213 'Subscription {} filter {} has wrong attribute name '
214 'or value. Ignore the filter.'.format(
215 sub_data['subscriptionId'], sub_data['filter']))
216 callback_smo(sub, data, msg_type)
218 args.append(ocloud.Resource.resourceId == data.id)
219 ret = uow.resources.list_with_count(res_pool_id, *args)
222 'Resource {} skip for subscription {} because of the '
224 .format(data.id, sub_data['subscriptionId']))
226 callback_smo(sub, data, msg_type)
229 def callback_smo(sub: Subscription, msg: Message2SMO, msg_type: str,
230 obj_dict: dict = None):
231 sub_data = sub.serialize()
233 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
234 'notificationEventType': msg.notificationEventType,
235 'objectRef': msg.objectRef,
236 'updateTime': msg.updatetime
238 if msg_type != 'Resource':
239 if msg.notificationEventType in [NotificationEventEnum.DELETE,
240 NotificationEventEnum.MODIFY]:
241 callback['priorObjectState'] = obj_dict
242 if msg.notificationEventType in [NotificationEventEnum.CREATE,
243 NotificationEventEnum.MODIFY]:
244 callback['postObjectState'] = obj_dict
245 callback_data = json.dumps(callback)
246 logger.info('URL: {}, data: {}'.format(
247 sub_data['callback'], callback_data))
249 # Call SMO through the SMO callback url
250 o = urlparse(sub_data['callback'])
251 if o.scheme == 'https':
252 conn = get_https_conn_default(o.netloc)
254 conn = get_http_conn(o.netloc)
256 rst, status = post_data(conn, o.path, callback_data)
259 'Notify to SMO successed with status: {}'.format(status))
261 logger.error('Notify Response code is: {}'.format(status))
262 except ssl.SSLCertVerificationError as e:
264 'Notify try to post data with trusted ca failed: {}'.format(e))
265 if 'self signed' in str(e):
266 conn = get_https_conn_selfsigned(o.netloc)
268 return post_data(conn, o.path, callback_data)
269 except Exception as e:
271 'Notify post data with self-signed ca \
272 failed: {}'.format(e))
273 # TODO: write the status to extension db table.
276 except Exception as e:
277 logger.critical('Notify except: {}'.format(e))