1 # Copyright (C) 2021-2022 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 # from o2common.config import conf
20 from o2common.domain.filter import gen_orm_filter
21 from o2common.service.unit_of_work import AbstractUnitOfWork
22 from o2common.adapter.notifications import AbstractNotifications
24 from o2ims.domain import commands, ocloud
25 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
28 from o2common.helper import o2logging
29 logger = o2logging.get_logger(__name__)
32 # # Maybe another MQ server
33 # r = redis.Redis(**config.get_redis_host_and_port())
36 def notify_change_to_smo(
37 cmd: commands.PubMessage2SMO,
38 uow: AbstractUnitOfWork,
39 notifications: AbstractNotifications,
41 logger.debug('In notify_change_to_smo')
43 if msg_type == 'ResourceType':
44 _notify_resourcetype(uow, notifications, cmd.data)
45 elif msg_type == 'ResourcePool':
46 _notify_resourcepool(uow, notifications, cmd.data)
47 elif msg_type == 'Dms':
48 _notify_dms(uow, notifications, cmd.data)
49 elif msg_type == 'Resource':
50 _notify_resource(uow, notifications, cmd.data)
53 def __get_object_type_and_value(sub_filter):
54 exprs = sub_filter.split(';')
56 items = expr.strip(' ()').split(',')
57 item_key = items[1].strip()
58 if item_key == 'objectType':
59 return True, items[2].strip()
63 def handle_filter(filter: str, f_type: str):
68 filter_list = filter.strip(' []').split('|')
74 for sub_filter in filter_list:
75 objectType, objectTypeValue = __get_object_type_and_value(sub_filter)
76 if objectTypeValue == f_type:
78 filters.append(sub_filter)
79 elif not objectType and f_type == 'ResourceInfo':
81 filters.append(sub_filter)
83 return match_type_count, filters
86 def check_filters(filters, sub_data, uow_cls, obj_cls, attr_id, id):
87 for filter in filters[1]:
88 logger.debug(f'filter: {filter}')
90 args = gen_orm_filter(obj_cls, filter)
93 'Subscription {} filter {} has wrong attribute '
94 'name or value. Ignore the filter.'.format(
95 sub_data['subscriptionId'],
98 logger.debug(f'args: {args}')
100 if len(args) == 0 and 'objectType' in filter:
103 args.append(attr_id == id)
104 obj_count, _ = uow_cls.list_with_count(*args)
110 def _notify_resourcetype(uow, notifications, data):
112 resource_type = uow.resource_types.get(data.id)
113 if resource_type is None:
114 logger.warning('ResourceType {} does not exists.'.format(data.id))
117 resource_type_dict = resource_type.get_notification_dict()
119 subs = uow.subscriptions.list()
121 sub_data = sub.serialize()
122 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
123 filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
124 logger.debug(f'filters: {filters}, sub_data: {sub_data}')
126 if not filters or filters[0] == 0 or check_filters(
127 filters, sub_data, uow.resource_types, ocloud.ResourceType,
128 ocloud.ResourceType.resourceTypeId, data.id):
129 callback_smo(notifications, sub, data, resource_type_dict)
132 logger.info('Subscription {} filter hit, skip ResourceType {}.'
133 .format(sub_data['subscriptionId'], data.id))
136 def _notify_resourcepool(uow, notifications, data):
138 resource_pool = uow.resource_pools.get(data.id)
139 if resource_pool is None:
140 logger.warning('ResourcePool {} does not exists.'.format(data.id))
143 resource_pool_dict = resource_pool.get_notification_dict()
145 subs = uow.subscriptions.list()
147 sub_data = sub.serialize()
148 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
149 filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
150 logger.debug(f'filters: {filters}, sub_data: {sub_data}')
152 if not filters or filters[0] == 0 or check_filters(
153 filters, sub_data, uow.resource_pools, ocloud.ResourcePool,
154 ocloud.ResourcePool.resourcePoolId, data.id):
155 callback_smo(notifications, sub, data, resource_pool_dict)
158 logger.info('Subscription {} filter hit, skip ResourcePool {}.'
159 .format(sub_data['subscriptionId'], data.id))
162 def _notify_dms(uow, notifications, data):
164 dms = uow.deployment_managers.get(data.id)
167 'DeploymentManager {} does not exists.'.format(data.id))
170 dms_dict = dms.get_notification_dict()
172 subs = uow.subscriptions.list()
174 sub_data = sub.serialize()
175 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
176 filters = handle_filter(
177 sub_data['filter'], 'DeploymentManagerInfo')
178 logger.debug(f'filters: {filters}, sub_data: {sub_data}')
180 if not filters or filters[0] == 0 or check_filters(
181 filters, sub_data, uow.deployment_managers,
182 ocloud.DeploymentManager,
183 ocloud.DeploymentManager.deploymentManagerId, data.id):
184 callback_smo(notifications, sub, data, dms_dict)
187 logger.info('Subscription {} filter hit, skip '
188 'DeploymentManager {}.'
189 .format(sub_data['subscriptionId'], data.id))
192 def _notify_resource(uow, notifications, data):
194 resource = uow.resources.get(data.id)
196 logger.warning('Resource {} does not exists.'.format(data.id))
198 res_pool_id = resource.serialize()['resourcePoolId']
199 logger.debug('res pool id is {}'.format(res_pool_id))
201 res_dict = resource.get_notification_dict()
203 subs = uow.subscriptions.list()
205 sub_data = sub.serialize()
206 logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
207 filters = handle_filter(sub_data['filter'], 'ResourceInfo')
208 if not filters or filters[0] == 0:
209 callback_smo(notifications, sub, data, res_dict)
211 if filters[0] > 0 and not filters[1]:
214 for filter in filters[1]:
216 args = gen_orm_filter(ocloud.Resource, filter)
219 'Subscription {} filter {} has wrong attribute '
220 'name or value. Ignore the filter.'.format(
221 sub_data['subscriptionId'],
224 if len(args) == 0 and 'objectType' in filter:
227 args.append(ocloud.Resource.resourceId == data.id)
228 obj_count, _ = uow.resources.list_with_count(
234 logger.info('Subscription {} filter hit, skip Resource {}.'
235 .format(sub_data['subscriptionId'], data.id))
237 callback_smo(notifications, sub, data, res_dict)
240 def callback_smo(notifications: AbstractNotifications, sub: Subscription,
241 msg: Message2SMO, obj_dict: dict = None):
242 sub_data = sub.serialize()
244 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
245 'notificationEventType': msg.notificationEventType,
246 'objectRef': msg.objectRef,
247 'updateTime': msg.updatetime
249 if msg.notificationEventType in [NotificationEventEnum.DELETE,
250 NotificationEventEnum.MODIFY]:
251 callback['priorObjectState'] = json.dumps(obj_dict)
252 if msg.notificationEventType in [NotificationEventEnum.CREATE,
253 NotificationEventEnum.MODIFY]:
254 callback['postObjectState'] = json.dumps(obj_dict)
255 if msg.notificationEventType == NotificationEventEnum.DELETE:
256 callback.pop('objectRef')
257 logger.info('callback URL: {}'.format(sub_data['callback']))
258 logger.debug('callback data: {}'.format(json.dumps(callback)))
260 return notifications.send(sub_data['callback'], callback)