1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 # from o2common.config import conf
20 from o2common.domain.filter import gen_orm_filter
21 from o2common.service.unit_of_work import AbstractUnitOfWork
22 from o2common.adapter.notifications import AbstractNotifications
24 from o2ims.domain import commands, ocloud
25 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
28 from o2common.helper import o2logging
29 logger = o2logging.get_logger(__name__)
32 # # Maybe another MQ server
33 # r = redis.Redis(**config.get_redis_host_and_port())
36 def notify_change_to_smo(
37 cmd: commands.PubMessage2SMO,
38 uow: AbstractUnitOfWork,
39 notifications: AbstractNotifications,
41 logger.debug('In notify_change_to_smo')
43 if msg_type == 'ResourceType':
44 _notify_resourcetype(uow, notifications, cmd.data)
45 elif msg_type == 'ResourcePool':
46 _notify_resourcepool(uow, notifications, cmd.data)
47 elif msg_type == 'Dms':
48 _notify_dms(uow, notifications, cmd.data)
49 elif msg_type == 'Resource':
50 _notify_resource(uow, notifications, cmd.data)
51 elif msg_type == 'OCloud':
52 _notify_ocloud(uow, notifications, cmd.data)
55 def __get_object_type_and_value(sub_filter):
56 exprs = sub_filter.split(';')
58 items = expr.strip(' ()').split(',')
59 item_key = items[1].strip()
60 if item_key == 'objectType':
61 return True, items[2].strip()
65 def handle_filter(filter: str, f_type: str):
69 filter_list = filter.strip(' []').split('|')
75 for sub_filter in filter_list:
76 objectType, objectTypeValue = __get_object_type_and_value(sub_filter)
77 if objectTypeValue == f_type:
79 filters.append(sub_filter)
80 elif not objectType and f_type == 'ResourceInfo':
82 filters.append(sub_filter)
84 return match_type_count, filters
87 def check_filters(filters, sub_data, uow_cls, obj_cls, attr_id, id):
88 for filter in filters[1]:
89 logger.debug(f'filter: {filter}')
91 args = gen_orm_filter(obj_cls, filter)
94 'Subscription {} filter {} has wrong attribute '
95 'name or value. Ignore the filter.'.format(
96 sub_data['subscriptionId'],
99 logger.debug(f'args: {args}')
101 if len(args) == 0 and 'objectType' in filter:
104 args.append(attr_id == id)
105 obj_count, _ = uow_cls.list_with_count(*args)
111 def _notify_resourcetype(uow, notifications, data):
113 resource_type = uow.resource_types.get(data.id)
114 if resource_type is None:
115 logger.warning('ResourceType {} does not exists.'.format(data.id))
118 resource_type_dict = resource_type.get_notification_dict()
120 subs = uow.subscriptions.list()
122 sub_data = sub.serialize()
123 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
124 filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
125 logger.debug(f'filters: {filters}, sub_data: {sub_data}')
127 if not filters or filters[0] == 0 or check_filters(
128 filters, sub_data, uow.resource_types, ocloud.ResourceType,
129 ocloud.ResourceType.resourceTypeId, data.id):
130 callback_smo(notifications, sub, data, resource_type_dict)
133 logger.info('Subscription {} filter hit, skip ResourceType {}.'
134 .format(sub_data['subscriptionId'], data.id))
137 def _notify_resourcepool(uow, notifications, data):
139 resource_pool = uow.resource_pools.get(data.id)
140 if resource_pool is None:
141 logger.warning('ResourcePool {} does not exists.'.format(data.id))
144 resource_pool_dict = resource_pool.get_notification_dict()
146 subs = uow.subscriptions.list()
148 sub_data = sub.serialize()
149 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
150 filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
151 logger.debug(f'filters: {filters}, sub_data: {sub_data}')
153 if not filters or filters[0] == 0 or check_filters(
154 filters, sub_data, uow.resource_pools, ocloud.ResourcePool,
155 ocloud.ResourcePool.resourcePoolId, data.id):
156 callback_smo(notifications, sub, data, resource_pool_dict)
159 logger.info('Subscription {} filter hit, skip ResourcePool {}.'
160 .format(sub_data['subscriptionId'], data.id))
163 def _notify_dms(uow, notifications, data):
165 dms = uow.deployment_managers.get(data.id)
168 'DeploymentManager {} does not exists.'.format(data.id))
171 dms_dict = dms.get_notification_dict()
173 subs = uow.subscriptions.list()
175 sub_data = sub.serialize()
176 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
177 filters = handle_filter(
178 sub_data['filter'], 'DeploymentManagerInfo')
179 logger.debug(f'filters: {filters}, sub_data: {sub_data}')
181 if not filters or filters[0] == 0 or check_filters(
182 filters, sub_data, uow.deployment_managers,
183 ocloud.DeploymentManager,
184 ocloud.DeploymentManager.deploymentManagerId, data.id):
185 callback_smo(notifications, sub, data, dms_dict)
188 logger.info('Subscription {} filter hit, skip '
189 'DeploymentManager {}.'
190 .format(sub_data['subscriptionId'], data.id))
193 def _notify_ocloud(uow, notifications, data):
195 ocloud = uow.oclouds.get(data.id)
198 'oCloud {} does not exists.'.format(data.id))
201 ocloud_dict = ocloud.get_notification_dict()
203 subs = uow.subscriptions.list()
205 sub_data = sub.serialize()
206 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
207 filters = handle_filter(
208 sub_data['filter'], 'CloudInfo')
209 logger.debug(f'filters: {filters}, sub_data: {sub_data}')
211 if not filters or filters[0] == 0 or check_filters(
212 filters, sub_data, uow.oclouds,
214 ocloud.Ocloud.oCloudId, data.id):
215 callback_smo(notifications, sub, data, ocloud_dict)
218 logger.info('Subscription {} filter hit, skip Cloud {}.'
219 .format(sub_data['subscriptionId'], data.id))
222 def _notify_resource(uow, notifications, data):
224 resource = uow.resources.get(data.id)
226 logger.warning('Resource {} does not exists.'.format(data.id))
228 res_pool_id = resource.serialize()['resourcePoolId']
229 logger.debug('res pool id is {}'.format(res_pool_id))
231 res_dict = resource.get_notification_dict()
233 subs = uow.subscriptions.list()
235 sub_data = sub.serialize()
236 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
237 filters = handle_filter(sub_data['filter'], 'ResourceInfo')
238 if not filters or filters[0] == 0:
239 callback_smo(notifications, sub, data, res_dict)
241 if filters[0] > 0 and not filters[1]:
244 for filter in filters[1]:
246 args = gen_orm_filter(ocloud.Resource, filter)
249 'Subscription {} filter {} has wrong attribute '
250 'name or value. Ignore the filter.'.format(
251 sub_data['subscriptionId'],
254 if len(args) == 0 and 'objectType' in filter:
257 args.append(ocloud.Resource.resourceId == data.id)
258 obj_count, _ = uow.resources.list_with_count(
264 logger.info('Subscription {} filter hit, skip Resource {}.'
265 .format(sub_data['subscriptionId'], data.id))
267 callback_smo(notifications, sub, data, res_dict)
270 def callback_smo(notifications: AbstractNotifications, sub: Subscription,
271 msg: Message2SMO, obj_dict: dict = None):
272 sub_data = sub.serialize()
274 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
275 'notificationEventType': msg.notificationEventType,
276 'objectRef': msg.objectRef,
277 'updateTime': msg.updatetime
279 if msg.notificationEventType in [NotificationEventEnum.DELETE,
280 NotificationEventEnum.MODIFY]:
281 callback['priorObjectState'] = json.dumps(obj_dict)
282 if msg.notificationEventType in [NotificationEventEnum.CREATE,
283 NotificationEventEnum.MODIFY]:
284 callback['postObjectState'] = json.dumps(obj_dict)
285 if msg.notificationEventType == NotificationEventEnum.DELETE:
286 callback.pop('objectRef')
287 logger.info('callback URL: {}'.format(sub_data['callback']))
288 logger.debug('callback data: {}'.format(json.dumps(callback)))
290 return notifications.send(sub_data['callback'], callback)