Code Review
/
pti
/
o2.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
HelmCLI: support SOL018 Helm CLI ssh access
[pti/o2.git]
/
o2ims
/
service
/
auditor
/
ocloud_handler.py
diff --git
a/o2ims/service/auditor/ocloud_handler.py
b/o2ims/service/auditor/ocloud_handler.py
index
c0d8eaf
..
4cc8ec7
100644
(file)
--- a/
o2ims/service/auditor/ocloud_handler.py
+++ b/
o2ims/service/auditor/ocloud_handler.py
@@
-14,17
+14,20
@@
# pylint: disable=unused-argument
from __future__ import annotations
# pylint: disable=unused-argument
from __future__ import annotations
+# from typing import Callable
-from o2ims.domain.stx_object import StxGenericModel
# from dataclasses import asdict
# from typing import List, Dict, Callable, Type
# TYPE_CHECKING
# from dataclasses import asdict
# from typing import List, Dict, Callable, Type
# TYPE_CHECKING
-from o2ims.domain import commands
+
+from o2common.config import config
+# from o2common.service.messagebus import MessageBus
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import InvalidOcloudState
-from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain import events, commands
from o2ims.domain.ocloud import Ocloud
from o2ims.domain.ocloud import Ocloud
-from o2common.config import config
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.resource_type import InvalidOcloudState, MismatchedModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
# if TYPE_CHECKING:
# from . import unit_of_work
# if TYPE_CHECKING:
# from . import unit_of_work
@@
-78,7
+81,7
@@
def is_outdated(ocloud: Ocloud, stxobj: StxGenericModel):
def create_by(stxobj: StxGenericModel) -> Ocloud:
def create_by(stxobj: StxGenericModel) -> Ocloud:
- imsendpoint = config.get_api_url() + config.get_o2ims_api_base()
+ imsendpoint = config.get_api_url() + config.get_o2ims_api_base()
+ '/'
globalcloudId = stxobj.id # to be updated
description = "An ocloud"
ocloud = Ocloud(stxobj.id, stxobj.name, imsendpoint,
globalcloudId = stxobj.id # to be updated
description = "An ocloud"
ocloud = Ocloud(stxobj.id, stxobj.name, imsendpoint,
@@
-99,4
+102,8
@@
def update_by(ocloud: Ocloud, stxobj: StxGenericModel) -> None:
# ocloud.content = stxobj.content
ocloud.hash = stxobj.hash
ocloud.version_number = ocloud.version_number + 1
# ocloud.content = stxobj.content
ocloud.hash = stxobj.hash
ocloud.version_number = ocloud.version_number + 1
- ocloud.events = []
+ ocloud.events.append(events.OcloudChanged(
+ id=stxobj.id,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))