# import redis
# import requests
import json
-import ssl
-from urllib.parse import urlparse
-# from o2common.config import config
+# from o2common.config import conf
from o2common.domain.filter import gen_orm_filter
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.service.command.handler import get_https_conn_default
-from o2common.service.command.handler import get_http_conn
-from o2common.service.command.handler import get_https_conn_selfsigned
-from o2common.service.command.handler import post_data
+from o2common.adapter.notifications import AbstractNotifications
from o2ims.domain import commands, ocloud
from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
def notify_change_to_smo(
cmd: commands.PubMessage2SMO,
uow: AbstractUnitOfWork,
+ notifications: AbstractNotifications,
):
logger.debug('In notify_change_to_smo')
msg_type = cmd.type
if msg_type == 'ResourceType':
- _notify_resourcetype(uow, cmd.data)
+ _notify_resourcetype(uow, notifications, cmd.data)
elif msg_type == 'ResourcePool':
- _notify_resourcepool(uow, cmd.data)
+ _notify_resourcepool(uow, notifications, cmd.data)
elif msg_type == 'Dms':
- _notify_dms(uow, cmd.data)
+ _notify_dms(uow, notifications, cmd.data)
elif msg_type == 'Resource':
- _notify_resource(uow, cmd.data)
+ _notify_resource(uow, notifications, cmd.data)
-def _notify_resourcetype(uow, data):
+def _notify_resourcetype(uow, notifications, data):
with uow:
resource_type = uow.resource_types.get(data.id)
if resource_type is None:
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
if not filters:
- callback_smo(sub, data, resource_type_dict)
+ callback_smo(notifications, sub, data, resource_type_dict)
continue
- filter_effect = 0
+ filter_hit = False
for filter in filters:
try:
args = gen_orm_filter(ocloud.ResourceType, filter)
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0 and 'objectType' in filter:
+ filter_hit = True
+ break
args.append(ocloud.ResourceType.resourceTypeId == data.id)
- ret = uow.resource_types.list_with_count(*args)
- if ret[0] > 0:
- logger.debug(
- 'ResourcePool {} skip for subscription {} because of'
- ' the filter.'
- .format(data.id, sub_data['subscriptionId']))
- filter_effect += 1
+ obj_count, _ = uow.resource_types.list_with_count(*args)
+ if obj_count > 0:
+ filter_hit = True
break
- if filter_effect > 0:
- continue
- callback_smo(sub, data, resource_type_dict)
+ if filter_hit:
+ logger.info('Subscription {} filter hit, skip ResourceType {}.'
+ .format(sub_data['subscriptionId'], data.id))
+ else:
+ callback_smo(notifications, sub, data, resource_type_dict)
-def _notify_resourcepool(uow, data):
+def _notify_resourcepool(uow, notifications, data):
with uow:
resource_pool = uow.resource_pools.get(data.id)
if resource_pool is None:
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
if not filters:
- callback_smo(sub, data, resource_pool_dict)
+ callback_smo(notifications, sub, data, resource_pool_dict)
continue
- filter_effect = 0
+ filter_hit = False
for filter in filters:
try:
args = gen_orm_filter(ocloud.ResourcePool, filter)
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0 and 'objectType' in filter:
+ filter_hit = True
+ break
args.append(ocloud.ResourcePool.resourcePoolId == data.id)
- ret = uow.resource_pools.list_with_count(*args)
- if ret[0] > 0:
- logger.debug(
- 'ResourcePool {} skip for subscription {} because of'
- ' the filter.'
- .format(data.id, sub_data['subscriptionId']))
- filter_effect += 1
+ obj_count, _ = uow.resource_pools.list_with_count(*args)
+ if obj_count > 0:
+ filter_hit = True
break
- if filter_effect > 0:
- continue
- callback_smo(sub, data, resource_pool_dict)
+ if filter_hit:
+ logger.info('Subscription {} filter hit, skip ResourcePool {}.'
+ .format(sub_data['subscriptionId'], data.id))
+ else:
+ callback_smo(notifications, sub, data, resource_pool_dict)
-def _notify_dms(uow, data):
+def _notify_dms(uow, notifications, data):
with uow:
dms = uow.deployment_managers.get(data.id)
if dms is None:
filters = handle_filter(
sub_data['filter'], 'DeploymentManagerInfo')
if not filters:
- callback_smo(sub, data, dms_dict)
+ callback_smo(notifications, sub, data, dms_dict)
continue
- filter_effect = 0
+ filter_hit = False
for filter in filters:
try:
args = gen_orm_filter(ocloud.DeploymentManager, filter)
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0 and 'objectType' in filter:
+ filter_hit = True
+ break
args.append(
ocloud.DeploymentManager.deploymentManagerId == data.id)
- ret = uow.deployment_managers.list_with_count(*args)
- if ret[0] > 0:
- logger.debug(
- 'DeploymentManager {} skip for subscription {} because'
- ' of the filter.'
- .format(data.id, sub_data['subscriptionId']))
- filter_effect += 1
+ obj_count, _ = uow.deployment_managers.list_with_count(*args)
+ if obj_count > 0:
+ filter_hit = True
break
- if filter_effect > 0:
- continue
- callback_smo(sub, data, dms_dict)
-
+ if filter_hit:
+ logger.info('Subscription {} filter hit, skip '
+ 'DeploymentManager {}.'
+ .format(sub_data['subscriptionId'], data.id))
+ else:
+ callback_smo(notifications, sub, data, dms_dict)
-class FilterNotEffect(Exception):
- pass
-
-class FilterEffect(Exception):
- pass
-
-
-def _notify_resource(uow, data):
+def _notify_resource(uow, notifications, data):
with uow:
resource = uow.resources.get(data.id)
if resource is None:
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
filters = handle_filter(sub_data['filter'], 'ResourceInfo')
if not filters:
- callback_smo(sub, data, res_dict)
+ callback_smo(notifications, sub, data, res_dict)
continue
- filter_effect = 0
+ filter_hit = False
for filter in filters:
try:
args = gen_orm_filter(ocloud.Resource, filter)
sub_data['subscriptionId'],
sub_data['filter']))
continue
+ if len(args) == 0 and 'objectType' in filter:
+ filter_hit = True
+ break
args.append(ocloud.Resource.resourceId == data.id)
- ret = uow.resources.list_with_count(res_pool_id, *args)
- if ret[0] > 0:
- logger.debug(
- 'Resource {} skip for subscription {} because of '
- 'the filter.'
- .format(data.id, sub_data['subscriptionId']))
- filter_effect += 1
+ obj_count, _ = uow.resources.list_with_count(
+ res_pool_id, *args)
+ if obj_count > 0:
+ filter_hit = True
break
- if filter_effect > 0:
- continue
- callback_smo(sub, data, res_dict)
+ if filter_hit:
+ logger.info('Subscription {} filter hit, skip Resource {}.'
+ .format(sub_data['subscriptionId'], data.id))
+ else:
+ callback_smo(notifications, sub, data, res_dict)
def handle_filter(filter: str, f_type: str):
return filters
-def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None):
+def callback_smo(notifications: AbstractNotifications, sub: Subscription,
+ msg: Message2SMO, obj_dict: dict = None):
sub_data = sub.serialize()
callback = {
'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
}
if msg.notificationEventType in [NotificationEventEnum.DELETE,
NotificationEventEnum.MODIFY]:
- callback['priorObjectState'] = obj_dict
+ callback['priorObjectState'] = json.dumps(obj_dict)
if msg.notificationEventType in [NotificationEventEnum.CREATE,
NotificationEventEnum.MODIFY]:
- callback['postObjectState'] = obj_dict
+ callback['postObjectState'] = json.dumps(obj_dict)
if msg.notificationEventType == NotificationEventEnum.DELETE:
callback.pop('objectRef')
callback_data = json.dumps(callback)
- logger.info('URL: {}, data: {}'.format(
- sub_data['callback'], callback_data))
+ logger.info('callback URL: {}'.format(sub_data['callback']))
+ logger.debug('callback data: {}'.format(callback_data))
+
+ return notifications.send(sub_data['callback'], callback_data)
# Call SMO through the SMO callback url
- o = urlparse(sub_data['callback'])
- if o.scheme == 'https':
- conn = get_https_conn_default(o.netloc)
- else:
- conn = get_http_conn(o.netloc)
- try:
- rst, status = post_data(conn, o.path, callback_data)
- if rst is True:
- logger.info(
- 'Notify to SMO successed with status: {}'.format(status))
- return
- logger.error('Notify Response code is: {}'.format(status))
- except ssl.SSLCertVerificationError as e:
- logger.debug(
- 'Notify try to post data with trusted ca failed: {}'.format(e))
- if 'self signed' in str(e):
- conn = get_https_conn_selfsigned(o.netloc)
- try:
- return post_data(conn, o.path, callback_data)
- except Exception as e:
- logger.info(
- 'Notify post data with self-signed ca \
- failed: {}'.format(e))
- # TODO: write the status to extension db table.
- return False
- return False
- except Exception as e:
- logger.critical('Notify except: {}'.format(e))
- return False
+ # o = urlparse(sub_data['callback'])
+ # if o.scheme == 'https':
+ # conn = get_https_conn_default(o.netloc)
+ # else:
+ # conn = get_http_conn(o.netloc)
+ # try:
+ # rst, status = post_data(conn, o.path, callback_data)
+ # if rst is True:
+ # logger.info(
+ # 'Notify to SMO successed with status: {}'.format(status))
+ # return
+ # logger.error('Notify Response code is: {}'.format(status))
+ # except ssl.SSLCertVerificationError as e:
+ # logger.debug(
+ # 'Notify try to post data with trusted ca failed: {}'.format(e))
+ # if 'self signed' in str(e):
+ # conn = get_https_conn_selfsigned(o.netloc)
+ # try:
+ # return post_data(conn, o.path, callback_data)
+ # except Exception as e:
+ # logger.info(
+ # 'Notify post data with self-signed ca \
+ # failed: {}'.format(e))
+ # return False
+ # return False
+ # except Exception as e:
+ # logger.critical('Notify except: {}'.format(e))
+ # return False