bus = bootstrap.bootstrap()
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe("NfDeploymentStateChanged")
- pubsub.subscribe('ResourceChanged')
pubsub.subscribe('OcloudChanged')
+ pubsub.subscribe('ResourceTypeChanged')
+ pubsub.subscribe('ResourcePoolChanged')
+ pubsub.subscribe('DmsChanged')
+ pubsub.subscribe('ResourceChanged')
pubsub.subscribe('AlarmEventChanged')
for m in pubsub.listen():
ToState=data['ToState']
)
bus.handle(cmd)
+ elif channel == 'ResourceTypeChanged':
+ datastr = m['data']
+ data = json.loads(datastr)
+ logger.info('ResourceTypeChanged with cmd:{}'.format(data))
+ ref = apibase + inventory_api_version + '/resourceTypes/' + \
+ data['id']
+ cmd = imscmd.PubMessage2SMO(data=Message2SMO(
+ id=data['id'], ref=ref,
+ eventtype=data['notificationEventType'],
+ updatetime=data['updatetime']),
+ type='ResourceType')
+ bus.handle(cmd)
+ elif channel == 'ResourcePoolChanged':
+ datastr = m['data']
+ data = json.loads(datastr)
+ logger.info('ResourcePoolChanged with cmd:{}'.format(data))
+ ref = apibase + inventory_api_version + '/resourcePools/' + \
+ data['id']
+ cmd = imscmd.PubMessage2SMO(data=Message2SMO(
+ id=data['id'], ref=ref,
+ eventtype=data['notificationEventType'],
+ updatetime=data['updatetime']),
+ type='ResourcePool')
+ bus.handle(cmd)
+ elif channel == 'DmsChanged':
+ datastr = m['data']
+ data = json.loads(datastr)
+ logger.info('ResourceChanged with cmd:{}'.format(data))
+ ref = apibase + inventory_api_version + '/deploymentManagers/' + \
+ data['id']
+ cmd = imscmd.PubMessage2SMO(data=Message2SMO(
+ id=data['id'], ref=ref,
+ eventtype=data['notificationEventType'],
+ updatetime=data['updatetime']),
+ type='Dms')
+ bus.handle(cmd)
elif channel == 'ResourceChanged':
datastr = m['data']
data = json.loads(datastr)
cmd = imscmd.PubMessage2SMO(data=Message2SMO(
id=data['id'], ref=ref,
eventtype=data['notificationEventType'],
- updatetime=data['updatetime']))
+ updatetime=data['updatetime']),
+ type='Resource')
bus.handle(cmd)
elif channel == 'OcloudChanged':
datastr = m['data']
from o2ims.service.command import notify_handler, registration_handler,\
notify_alarm_handler
from o2ims.service.event import ocloud_event, resource_event, \
- resource_pool_event, alarm_event
+ resource_pool_event, alarm_event, dms_event, resource_type_event
# if TYPE_CHECKING:
# from . import unit_of_work
# o2dms_events.NfDeploymentUninstalled: [
# nfdeployment_handler.publish_nfdeployment_uninstalled]
events.OcloudChanged: [ocloud_event.notify_ocloud_update],
+ events.ResourceTypeChanged: [resource_type_event.\
+ notify_resourcetype_change],
+ events.DmsChanged: [dms_event.notify_dms_change],
events.ResourceChanged: [resource_event.notify_resource_change],
events.ResourcePoolChanged: [resource_pool_event.\
notify_resourcepool_change],
@dataclass
class PubMessage2SMO(Command):
data: Message2SMO
+ type: str
@dataclass
@dataclass
class ResourceTypeChanged(Event):
id: str
+ notificationEventType: NotificationEventEnum
+ updatetime: datetime.now()
+
+
+@dataclass
+class DmsChanged(Event):
+ id: str
+ notificationEventType: NotificationEventEnum
updatetime: datetime.now()
if api_response:
self.id = str(api_response.uuid)
self.type = type
+ self.name = api_response.name
self.updatetime = datetime.datetime.strptime(
api_response.updated_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \
if api_response.updated_at else None
self.createtime = datetime.datetime.strptime(
api_response.created_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \
if api_response.created_at else None
- self.name = api_response.name
- self.hash = content_hash if content_hash \
- else str(hash((self.id, self.updatetime)))
+ self.hash = content_hash
+ if not self.hash:
+ if hasattr(api_response, 'filtered'):
+ self.filtered = api_response.filtered
+ self.hash = str(hash((self.id, str(self.filtered))))
+ else:
+ self.hash = str(hash((self.id, self.updatetime)))
self.content = json.dumps(api_response.to_dict())
if ResourceTypeEnum.RESOURCE_POOL == type:
self.res_pool_id = self.id
# pylint: disable=unused-argument
from __future__ import annotations
-
import base64
import json
+from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
-# from dataclasses import asdict
-# from typing import List, Dict, Callable, Type
-# TYPE_CHECKING
-from o2ims.domain import commands
+from o2ims.domain.subscription_obj import NotificationEventEnum
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain.resource_type import MismatchedModel
from o2ims.domain.ocloud import DeploymentManager
localmodel.updatetime = stxobj.updatetime
localmodel.hash = stxobj.hash
+ localmodel.events.append(events.DmsChanged(
+ id=stxobj.id,
+ notificationEventType=NotificationEventEnum.CREATE,
+ updatetime=stxobj.updatetime
+ ))
+
return localmodel
target.oCloudId = parentid
target.version_number = target.version_number + 1
target.profile = _convert_content(stxobj.content)
- target.events = []
+
+ target.events.append(events.DmsChanged(
+ id=stxobj.id,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
def _convert_content(stxobj_content: str):
alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
if alarm_dictionary:
res_type.alarmDictionary = alarm_dictionary
+ res_type.events.append(events.ResourceTypeChanged(
+ id=res_type.resourceTypeId,
+ notificationEventType=NotificationEventEnum.CREATE,
+ updatetime=stxobj.updatetime))
uow.resource_types.add(res_type)
else:
resourcetype_id = first['resourceTypeId']
"operational", "availability", "administrative",
"boot_device", "rootfs_device", "install_state", "subfunctions",
"clock_synchronization", "max_cpu_mhz_allowed"
- ]
+ ]
filtered = dict(
filter(lambda item: item[0] in selected_keys, content.items()))
extensions = json.dumps(filtered)
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
+ resource.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=resource.resourcePoolId,
+ notificationEventType=NotificationEventEnum.CREATE,
+ updatetime=stxobj.updatetime
+ ))
+
return resource
resourcepool.createtime = stxobj.createtime
resourcepool.updatetime = stxobj.updatetime
resourcepool.hash = stxobj.hash
-
+ resourcepool.events.append(events.ResourcePoolChanged(
+ id=stxobj.id,
+ notificationEventType=NotificationEventEnum.CREATE,
+ updatetime=stxobj.updatetime
+ ))
return resourcepool
from o2common.service.command.handler import post_data
from o2ims.domain import commands, ocloud
-from o2ims.domain.subscription_obj import Subscription, Message2SMO
+from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
+ NotificationEventEnum
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
uow: AbstractUnitOfWork,
):
logger.debug('In notify_change_to_smo')
+ msg_type = cmd.type
+ if msg_type == 'ResourceType':
+ _notify_resourcetype(uow, cmd)
+ elif msg_type == 'ResourcePool':
+ _notify_resourcepool(uow, cmd)
+ elif msg_type == 'Dms':
+ _notify_dms(uow, cmd)
+ elif msg_type == 'Resource':
+ _notify_resource(uow, cmd)
+
+
+def _notify_resourcetype(uow, cmd):
+ data = cmd.data
+ msg_type = cmd.type
+ with uow:
+ resource_type = uow.resource_types.get(data.id)
+ if resource_type is None:
+ logger.warning('ResourceType {} does not exists.'.format(data.id))
+ return
+ resource_type_dict = {
+ 'resourceTypeId': resource_type.resourceTypeId,
+ 'name': resource_type.name,
+ 'description': resource_type.description,
+ 'vendor': resource_type.vendor,
+ 'model': resource_type.model,
+ 'version': resource_type.version,
+ # 'alarmDictionary': resource_type.alarmDictionary.serialize()
+ }
+
+ subs = uow.subscriptions.list()
+ for sub in subs:
+ sub_data = sub.serialize()
+ logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
+ if not sub_data.get('filter', None):
+ callback_smo(sub, data, msg_type, resource_type_dict)
+ continue
+ try:
+ args = gen_orm_filter(ocloud.ResourceType, sub_data['filter'])
+ except KeyError:
+ logger.warning(
+ 'Subscription {} filter {} has wrong attribute name '
+ 'or value. Ignore the filter.'.format(
+ sub_data['subscriptionId'], sub_data['filter']))
+ callback_smo(sub, data, msg_type, resource_type_dict)
+ continue
+ args.append(ocloud.ResourceType.resourceTypeId == data.id)
+ ret = uow.resource_types.list_with_count(*args)
+ if ret[0] != 0:
+ logger.debug(
+ 'ResourceType {} skip for subscription {} because of the '
+ 'filter.'
+ .format(data.id, sub_data['subscriptionId']))
+ continue
+ callback_smo(sub, data, msg_type, resource_type_dict)
+
+
+def _notify_resourcepool(uow, cmd):
+ data = cmd.data
+ msg_type = cmd.type
+ with uow:
+ resource_pool = uow.resource_pools.get(data.id)
+ if resource_pool is None:
+ logger.warning('ResourcePool {} does not exists.'.format(data.id))
+ return
+ resource_pool_dict = {
+ 'resourcePoolId': resource_pool.resourcePoolId,
+ 'oCloudId': resource_pool.oCloudId,
+ 'globalLocationId': resource_pool.globalLocationId,
+ 'name': resource_pool.name,
+ 'description': resource_pool.description
+ }
+
+ subs = uow.subscriptions.list()
+ for sub in subs:
+ sub_data = sub.serialize()
+ logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
+ if not sub_data.get('filter', None):
+ callback_smo(sub, data, msg_type, resource_pool_dict)
+ continue
+ try:
+ args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
+ except KeyError:
+ logger.warning(
+ 'Subscription {} filter {} has wrong attribute name '
+ 'or value. Ignore the filter.'.format(
+ sub_data['subscriptionId'], sub_data['filter']))
+ callback_smo(sub, data, msg_type, resource_pool_dict)
+ continue
+ args.append(ocloud.ResourcePool.resourcePoolId == data.id)
+ ret = uow.resource_pools.list_with_count(*args)
+ if ret[0] != 0:
+ logger.debug(
+ 'ResourcePool {} skip for subscription {} because of the '
+ 'filter.'
+ .format(data.id, sub_data['subscriptionId']))
+ continue
+ callback_smo(sub, data, msg_type, resource_pool_dict)
+
+
+def _notify_dms(uow, cmd):
data = cmd.data
+ msg_type = cmd.type
+ with uow:
+ dms = uow.deployment_managers.get(data.id)
+ if dms is None:
+ logger.warning(
+ 'DeploymentManager {} does not exists.'.format(data.id))
+ return
+ dms_dict = {
+ 'deploymentManagerId': dms.deploymentManagerId,
+ 'name': dms.name,
+ 'description': dms.description,
+ 'oCloudId': dms.oCloudId,
+ 'serviceUri': dms.serviceUri
+ }
+
+ subs = uow.subscriptions.list()
+ for sub in subs:
+ sub_data = sub.serialize()
+ logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
+ if not sub_data.get('filter', None):
+ callback_smo(sub, data, msg_type, dms_dict)
+ continue
+ try:
+ args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
+ except KeyError:
+ logger.warning(
+ 'Subscription {} filter {} has wrong attribute name '
+ 'or value. Ignore the filter.'.format(
+ sub_data['subscriptionId'], sub_data['filter']))
+ callback_smo(sub, data, msg_type, dms_dict)
+ continue
+ args.append(
+ ocloud.DeploymentManager.deploymentManagerId == data.id)
+ ret = uow.deployment_managers.list_with_count(*args)
+ if ret[0] != 0:
+ logger.debug(
+ 'DeploymentManager {} skip for subscription {} because of '
+ 'the filter.'
+ .format(data.id, sub_data['subscriptionId']))
+ continue
+ callback_smo(sub, data)
+ callback_smo(sub, data, msg_type, dms_dict)
+
+
+def _notify_resource(uow, cmd):
+ data = cmd.data
+ msg_type = cmd.type
with uow:
resource = uow.resources.get(data.id)
if resource is None:
- logger.debug('Resource {} does not exists.'.format(data.id))
+ logger.warning('Resource {} does not exists.'.format(data.id))
return
res_pool_id = resource.serialize()['resourcePoolId']
logger.debug('res pool id is {}'.format(res_pool_id))
sub_data = sub.serialize()
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
if not sub_data.get('filter', None):
- callback_smo(sub, data)
+ callback_smo(sub, data, msg_type)
continue
try:
args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
except KeyError:
- logger.error(
+ logger.warning(
'Subscription {} filter {} has wrong attribute name '
'or value. Ignore the filter.'.format(
sub_data['subscriptionId'], sub_data['filter']))
- callback_smo(sub, data)
+ callback_smo(sub, data, msg_type)
continue
args.append(ocloud.Resource.resourceId == data.id)
ret = uow.resources.list_with_count(res_pool_id, *args)
'filter.'
.format(data.id, sub_data['subscriptionId']))
continue
- callback_smo(sub, data)
+ callback_smo(sub, data, msg_type)
-def callback_smo(sub: Subscription, msg: Message2SMO):
+def callback_smo(sub: Subscription, msg: Message2SMO, msg_type: str,
+ obj_dict: dict = None):
sub_data = sub.serialize()
callback = {
'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
'objectRef': msg.objectRef,
'updateTime': msg.updatetime
}
- # if msg.notificationEventType in [NotificationEventEnum.DELETE,
- # NotificationEventEnum.MODIFY]:
- # callback['priorObjectState'] = {}
- # if msg.notificationEventType in [NotificationEventEnum.CREATE,
- # NotificationEventEnum.MODIFY]:
- # callback['postObjectState'] = {}
- # logger.warning(callback)
+ if msg_type != 'Resource':
+ if msg.notificationEventType in [NotificationEventEnum.DELETE,
+ NotificationEventEnum.MODIFY]:
+ callback['priorObjectState'] = obj_dict
+ if msg.notificationEventType in [NotificationEventEnum.CREATE,
+ NotificationEventEnum.MODIFY]:
+ callback['postObjectState'] = obj_dict
callback_data = json.dumps(callback)
logger.info('URL: {}, data: {}'.format(
sub_data['callback'], callback_data))
event: events.AlarmEventChanged,
publish: Callable,
):
- logger.info('In notify_alarm_event_change')
+ logger.debug('In notify_alarm_event_change')
publish("AlarmEventChanged", event)
logger.debug("published Alarm Event Changed: {}".format(
event.id))
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Callable
+
+from o2ims.domain import events
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def notify_dms_change(
+ event: events.DmsChanged,
+ publish: Callable,
+):
+ logger.debug('In notify_dms_change')
+ publish("DmsChanged", event)
+ logger.debug("published DMS Changed: {}".format(
+ event.id))
event: events.OcloudChanged,
publish: Callable,
):
- logger.info('In notify_ocloud_update')
+ logger.debug('In notify_ocloud_update')
publish("OcloudChanged", event)
logger.debug("published Ocloud Changed: {}".format(
event.id))
event: events.ResourceChanged,
publish: Callable,
):
- logger.info('In notify_resource_change')
+ logger.debug('In notify_resource_change')
publish("ResourceChanged", event)
logger.debug("published Resource Changed: {}".format(
event.id))
event: events.ResourcePoolChanged,
publish: Callable,
):
- logger.info('In notify_resourcepool_change')
+ logger.debug('In notify_resourcepool_change')
publish("ResourcePoolChanged", event)
logger.debug("published Resource Pool Changed: {}".format(
event.id))
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Callable
+
+from o2ims.domain import events
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def notify_resourcetype_change(
+ event: events.ResourceTypeChanged,
+ publish: Callable,
+):
+ logger.debug('In notify_resourcetype_change')
+ publish("ResourceTypeChanged", event)
+ logger.debug("published Resource Type Changed: {}".format(
+ event.id))