# pylint: disable=unused-argument
from __future__ import annotations
+# from typing import Callable
-from o2ims.domain.stx_object import StxGenericModel
# from dataclasses import asdict
# from typing import List, Dict, Callable, Type
# TYPE_CHECKING
-from o2ims.domain import commands
+
+from o2common.config import config, conf
+# from o2common.service.messagebus import MessageBus
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import InvalidOcloudState
-from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain import events, commands
from o2ims.domain.ocloud import Ocloud
-from o2common.config import config
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.resource_type import InvalidOcloudState, MismatchedModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
# if TYPE_CHECKING:
# from . import unit_of_work
stxobj = cmd.data
with uow:
oclouds = uow.oclouds.list()
- if oclouds and len(oclouds) > 1:
+ if oclouds and oclouds.count() > 1:
raise InvalidOcloudState("More than 1 ocloud is found")
- elif not oclouds or len(oclouds) == 0:
+ elif not oclouds or oclouds.count() == 0:
logger.info("add ocloud:" + stxobj.name
+ " update_at: " + str(stxobj.updatetime)
+ " id: " + str(stxobj.id)
logger.info("Add the ocloud: " + stxobj.id
+ ", name: " + stxobj.name)
else:
- localmodel = oclouds.pop()
+ localmodel = oclouds.first()
if is_outdated(localmodel, stxobj):
logger.info("update ocloud:" + stxobj.name
+ " update_at: " + str(stxobj.updatetime)
def create_by(stxobj: StxGenericModel) -> Ocloud:
- imsendpoint = config.get_api_url() + config.get_o2ims_api_base()
- globalcloudId = stxobj.id # to be updated
+ imsendpoint = config.get_api_url() + config.get_o2ims_api_base() + '/'
+ globalcloudId = conf.DEFAULT.ocloud_global_id
description = "An ocloud"
ocloud = Ocloud(stxobj.id, stxobj.name, imsendpoint,
globalcloudId, description, 1)
ocloud.createtime = stxobj.createtime
ocloud.updatetime = stxobj.updatetime
ocloud.hash = stxobj.hash
+ ocloud.events.append(events.OcloudChanged(
+ id=stxobj.id,
+ notificationEventType=NotificationEventEnum.CREATE,
+ updatetime=stxobj.updatetime
+ ))
return ocloud
# ocloud.content = stxobj.content
ocloud.hash = stxobj.hash
ocloud.version_number = ocloud.version_number + 1
- ocloud.events = []
+ ocloud.events.append(events.OcloudChanged(
+ id=stxobj.id,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))