# import redis
# import requests
import json
-import ssl
-from urllib.parse import urlparse
-# from o2common.config import config
+# from o2common.config import conf
from o2common.domain.filter import gen_orm_filter
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.service.command.handler import get_https_conn_default
-from o2common.service.command.handler import get_http_conn
-from o2common.service.command.handler import get_https_conn_selfsigned
-from o2common.service.command.handler import post_data
+from o2common.adapter.notifications import AbstractNotifications
from o2ims.domain import commands, ocloud
from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
def notify_change_to_smo(
cmd: commands.PubMessage2SMO,
uow: AbstractUnitOfWork,
+ notifications: AbstractNotifications,
):
logger.debug('In notify_change_to_smo')
msg_type = cmd.type
if msg_type == 'ResourceType':
- _notify_resourcetype(uow, cmd.data)
+ _notify_resourcetype(uow, notifications, cmd.data)
elif msg_type == 'ResourcePool':
- _notify_resourcepool(uow, cmd.data)
+ _notify_resourcepool(uow, notifications, cmd.data)
elif msg_type == 'Dms':
- _notify_dms(uow, cmd.data)
+ _notify_dms(uow, notifications, cmd.data)
elif msg_type == 'Resource':
- _notify_resource(uow, cmd.data)
+ _notify_resource(uow, notifications, cmd.data)
-def _notify_resourcetype(uow, data):
+# def get_resource_dict(resource_type):
+# return {
+# 'resourceTypeId': resource_type.resourceTypeId,
+# 'name': resource_type.name,
+# 'description': resource_type.description,
+# 'vendor': resource_type.vendor,
+# 'model': resource_type.model,
+# 'version': resource_type.version,
+# }
+#
+#
+# def handle_filter(filter: str, f_type: str):
+# if not filter:
+# return
+#
+# filter_list = filter.strip(' []').split('|')
+#
+# match_type_count = 0
+# filters = []
+# for sub_filter in filter_list:
+# objectType, objectTypeValue = get_object_type_and_value(sub_filter)
+# if objectTypeValue == f_type:
+# match_type_count += 1
+# filters.append(sub_filter)
+# elif not objectType and f_type == 'ResourceInfo':
+# filters.append(sub_filter)
+#
+# return match_type_count, filters
+#
+#
+# def get_object_type_and_value(sub_filter):
+# exprs = sub_filter.split(';')
+# for expr in exprs:
+# items = expr.strip(' ()').split(',')
+# item_key = items[1].strip()
+# if item_key == 'objectType':
+# return True, items[2].strip()
+# return False, ''
+#
+#
+# def check_filters(filters, sub_data, uow, id):
+# for filter in filters[1]:
+# if isinstance(filter, bool) and filter:
+# return True
+#
+# try:
+# args = gen_orm_filter(ocloud.ResourceType, filter)
+# except KeyError:
+# logger.warning(
+# 'Subscription {} filter {} has wrong attribute '
+# 'name or value. Ignore the filter.'.format(
+# sub_data['subscriptionId'],
+# sub_data['filter']))
+# continue
+#
+# if len(args) == 0 and 'objectType' in filter:
+# return True
+#
+# args.append(ocloud.ResourceType.resourceTypeId == id)
+# obj_count, _ = uow.resource_types.list_with_count(*args)
+# if obj_count > 0:
+# return True
+# return False
+#
+#
+# def _notify_resourcetype(uow, notifications, data):
+# with uow:
+# resource_type = uow.resource_types.get(data.id)
+# if resource_type is None:
+# logger.warning('ResourceType {} does not exists.'.format(data.id))
+# return
+#
+# resource_type_dict = get_resource_dict(resource_type)
+#
+# subs = uow.subscriptions.list()
+# for sub in subs:
+# sub_data = sub.serialize()
+# filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
+# logger.debug(f'filters: {filters}, sub_data: {sub_data}')
+#
+# if not filters or filters[0] == 0 or check_filters(
+# filters, sub_data, uow, data.id):
+# callback_smo(notifications, sub, data, resource_type_dict)
+# continue
+#
+# logger.info('Subscription {} filter hit, skip ResourceType {}.'
+# .format(sub_data['subscriptionId'], data.id))
+
+def _notify_resourcetype(uow, notifications, data):
with uow:
resource_type = uow.resource_types.get(data.id)
if resource_type is None:
sub_data = sub.serialize()
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
- if not filters:
- callback_smo(sub, data, resource_type_dict)
+ if not filters or filters[0] == 0:
+ callback_smo(notifications, sub, data, resource_type_dict)
continue
- filter_effect = 0
- for filter in filters:
+ if filters[0] > 0 and not filters[1]:
+ continue
+
+ filter_hit = False
+ for filter in filters[1]:
try:
args = gen_orm_filter(ocloud.ResourceType, filter)
except KeyError:
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0 and 'objectType' in filter:
+ filter_hit = True
+ break
args.append(ocloud.ResourceType.resourceTypeId == data.id)
- ret = uow.resource_types.list_with_count(*args)
- if ret[0] > 0:
- logger.debug(
- 'ResourcePool {} skip for subscription {} because of'
- ' the filter.'
- .format(data.id, sub_data['subscriptionId']))
- filter_effect += 1
+ obj_count, _ = uow.resource_types.list_with_count(*args)
+ if obj_count > 0:
+ filter_hit = True
break
- if filter_effect > 0:
- continue
- callback_smo(sub, data, resource_type_dict)
+ if filter_hit:
+ logger.info('Subscription {} filter hit, skip ResourceType {}.'
+ .format(sub_data['subscriptionId'], data.id))
+ else:
+ callback_smo(notifications, sub, data, resource_type_dict)
-def _notify_resourcepool(uow, data):
+def _notify_resourcepool(uow, notifications, data):
with uow:
resource_pool = uow.resource_pools.get(data.id)
if resource_pool is None:
sub_data = sub.serialize()
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
- if not filters:
- callback_smo(sub, data, resource_pool_dict)
+ if not filters or filters[0] == 0:
+ callback_smo(notifications, sub, data, resource_pool_dict)
+ continue
+ if filters[0] > 0 and not filters[1]:
continue
- filter_effect = 0
- for filter in filters:
+ filter_hit = False
+ for filter in filters[1]:
try:
args = gen_orm_filter(ocloud.ResourcePool, filter)
except KeyError:
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0 and 'objectType' in filter:
+ filter_hit = True
+ break
args.append(ocloud.ResourcePool.resourcePoolId == data.id)
- ret = uow.resource_pools.list_with_count(*args)
- if ret[0] > 0:
- logger.debug(
- 'ResourcePool {} skip for subscription {} because of'
- ' the filter.'
- .format(data.id, sub_data['subscriptionId']))
- filter_effect += 1
+ obj_count, _ = uow.resource_pools.list_with_count(*args)
+ if obj_count > 0:
+ filter_hit = True
break
- if filter_effect > 0:
- continue
- callback_smo(sub, data, resource_pool_dict)
+ if filter_hit:
+ logger.info('Subscription {} filter hit, skip ResourcePool {}.'
+ .format(sub_data['subscriptionId'], data.id))
+ else:
+ callback_smo(notifications, sub, data, resource_pool_dict)
-def _notify_dms(uow, data):
+def _notify_dms(uow, notifications, data):
with uow:
dms = uow.deployment_managers.get(data.id)
if dms is None:
for sub in subs:
sub_data = sub.serialize()
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
- filters = handle_filter(
+ filters_rst = handle_filter(
sub_data['filter'], 'DeploymentManagerInfo')
- if not filters:
- callback_smo(sub, data, dms_dict)
+ if not filters_rst or filters_rst[0] == 0:
+ callback_smo(notifications, sub, data, dms_dict)
+ continue
+ if filters_rst[0] > 0 and not filters_rst[1]:
continue
- filter_effect = 0
- for filter in filters:
+ filter_hit = False
+ for filter in filters_rst[1]:
try:
args = gen_orm_filter(ocloud.DeploymentManager, filter)
except KeyError:
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0 and 'objectType' in filter:
+ filter_hit = True
+ break
args.append(
ocloud.DeploymentManager.deploymentManagerId == data.id)
- ret = uow.deployment_managers.list_with_count(*args)
- if ret[0] > 0:
- logger.debug(
- 'DeploymentManager {} skip for subscription {} because'
- ' of the filter.'
- .format(data.id, sub_data['subscriptionId']))
- filter_effect += 1
+ obj_count, _ = uow.deployment_managers.list_with_count(*args)
+ if obj_count > 0:
+ filter_hit = True
break
- if filter_effect > 0:
+ if filter_hit:
+ logger.info('Subscription {} filter hit, skip '
+ 'DeploymentManager {}.'
+ .format(sub_data['subscriptionId'], data.id))
+ else:
continue
- callback_smo(sub, data, dms_dict)
-
+ # callback_smo(notifications, sub, data, dms_dict)
-class FilterNotEffect(Exception):
- pass
-
-class FilterEffect(Exception):
- pass
-
-
-def _notify_resource(uow, data):
+def _notify_resource(uow, notifications, data):
with uow:
resource = uow.resources.get(data.id)
if resource is None:
sub_data = sub.serialize()
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
filters = handle_filter(sub_data['filter'], 'ResourceInfo')
- if not filters:
- callback_smo(sub, data, res_dict)
+ if not filters or filters[0] == 0:
+ callback_smo(notifications, sub, data, res_dict)
continue
- filter_effect = 0
- for filter in filters:
+ if filters[0] > 0 and not filters[1]:
+ continue
+ filter_hit = False
+ for filter in filters[1]:
try:
args = gen_orm_filter(ocloud.Resource, filter)
except KeyError:
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0 and 'objectType' in filter:
+ filter_hit = True
+ break
args.append(ocloud.Resource.resourceId == data.id)
- ret = uow.resources.list_with_count(res_pool_id, *args)
- if ret[0] > 0:
- logger.debug(
- 'Resource {} skip for subscription {} because of '
- 'the filter.'
- .format(data.id, sub_data['subscriptionId']))
- filter_effect += 1
+ obj_count, _ = uow.resources.list_with_count(
+ res_pool_id, *args)
+ if obj_count > 0:
+ filter_hit = True
break
- if filter_effect > 0:
- continue
- callback_smo(sub, data, res_dict)
+ if filter_hit:
+ logger.info('Subscription {} filter hit, skip Resource {}.'
+ .format(sub_data['subscriptionId'], data.id))
+ else:
+ callback_smo(notifications, sub, data, res_dict)
def handle_filter(filter: str, f_type: str):
if not filter:
return
+ match_type_count = 0
filter_strip = filter.strip(' []')
filter_list = filter_strip.split('|')
+ if not filter_list:
+ return
filters = list()
for sub_filter in filter_list:
exprs = sub_filter.split(';')
filters.append(sub_filter)
continue
if objectTypeValue == f_type:
+ match_type_count += 1
filters.append(sub_filter)
- return filters
+ return (match_type_count, filters)
-def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None):
+def callback_smo(notifications: AbstractNotifications, sub: Subscription,
+ msg: Message2SMO, obj_dict: dict = None):
sub_data = sub.serialize()
callback = {
'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
}
if msg.notificationEventType in [NotificationEventEnum.DELETE,
NotificationEventEnum.MODIFY]:
- callback['priorObjectState'] = obj_dict
+ callback['priorObjectState'] = json.dumps(obj_dict)
if msg.notificationEventType in [NotificationEventEnum.CREATE,
NotificationEventEnum.MODIFY]:
- callback['postObjectState'] = obj_dict
+ callback['postObjectState'] = json.dumps(obj_dict)
if msg.notificationEventType == NotificationEventEnum.DELETE:
callback.pop('objectRef')
- callback_data = json.dumps(callback)
- logger.info('URL: {}, data: {}'.format(
- sub_data['callback'], callback_data))
+ logger.info('callback URL: {}'.format(sub_data['callback']))
+ logger.debug('callback data: {}'.format(json.dumps(callback)))
+
+ return notifications.send(sub_data['callback'], callback)
# Call SMO through the SMO callback url
- o = urlparse(sub_data['callback'])
- if o.scheme == 'https':
- conn = get_https_conn_default(o.netloc)
- else:
- conn = get_http_conn(o.netloc)
- try:
- rst, status = post_data(conn, o.path, callback_data)
- if rst is True:
- logger.info(
- 'Notify to SMO successed with status: {}'.format(status))
- return
- logger.error('Notify Response code is: {}'.format(status))
- except ssl.SSLCertVerificationError as e:
- logger.debug(
- 'Notify try to post data with trusted ca failed: {}'.format(e))
- if 'self signed' in str(e):
- conn = get_https_conn_selfsigned(o.netloc)
- try:
- return post_data(conn, o.path, callback_data)
- except Exception as e:
- logger.info(
- 'Notify post data with self-signed ca \
- failed: {}'.format(e))
- # TODO: write the status to extension db table.
- return False
- return False
- except Exception as e:
- logger.critical('Notify except: {}'.format(e))
- return False
+ # o = urlparse(sub_data['callback'])
+ # if o.scheme == 'https':
+ # conn = get_https_conn_default(o.netloc)
+ # else:
+ # conn = get_http_conn(o.netloc)
+ # try:
+ # rst, status = post_data(conn, o.path, callback_data)
+ # if rst is True:
+ # logger.info(
+ # 'Notify to SMO successed with status: {}'.format(status))
+ # return
+ # logger.error('Notify Response code is: {}'.format(status))
+ # except ssl.SSLCertVerificationError as e:
+ # logger.debug(
+ # 'Notify try to post data with trusted ca failed: {}'.format(e))
+ # if 'self signed' in str(e):
+ # conn = get_https_conn_selfsigned(o.netloc)
+ # try:
+ # return post_data(conn, o.path, callback_data)
+ # except Exception as e:
+ # logger.info(
+ # 'Notify post data with self-signed ca \
+ # failed: {}'.format(e))
+ # return False
+ # return False
+ # except Exception as e:
+ # logger.critical('Notify except: {}'.format(e))
+ # return False