# pylint: disable=unused-argument
from __future__ import annotations
+import base64
+import json
+from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
-# from dataclasses import asdict
-# from typing import List, Dict, Callable, Type
-# TYPE_CHECKING
-from o2ims.domain import commands
+from o2ims.domain.subscription_obj import NotificationEventEnum
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain.resource_type import MismatchedModel
from o2ims.domain.ocloud import DeploymentManager
-from o2common.config import config
# if TYPE_CHECKING:
# from . import unit_of_work
with uow:
dms = uow.deployment_managers.get(stxobj.id)
if not dms:
- logger.info("add dms:" + stxobj.name
+ logger.info("add dms: " + stxobj.name
+ " update_at: " + str(stxobj.updatetime)
+ " id: " + str(stxobj.id)
+ " hash: " + str(stxobj.hash))
else:
localmodel = dms
if is_outdated(localmodel, stxobj):
- logger.info("update a dms:" + stxobj.name
+ logger.info("update a dms: " + stxobj.name
+ " update_at: " + str(stxobj.updatetime)
+ " id: " + str(stxobj.id)
+ " hash: " + str(stxobj.hash))
def create_by(stxobj: StxGenericModel, parentid: str) -> DeploymentManager:
- dmsendpoint = config.get_api_url() +\
- config.get_o2dms_api_base() + "/" + stxobj.id
description = "A DMS"
ocloudid = parentid
+ content = json.loads(stxobj.content)
+ # logger.info(stxobj)
+ # logger.info(content)
supportedLocations = ''
- capabilities = ''
+ capabilities = content['capabilities']
capacity = ''
+ dmsendpoint = content['cluster_api_endpoint']
+ profile = _convert_content(content)
localmodel = DeploymentManager(
stxobj.id, stxobj.name, ocloudid, dmsendpoint, description,
- supportedLocations, capabilities, capacity)
+ supportedLocations, capabilities, capacity, profile)
localmodel.createtime = stxobj.createtime
localmodel.updatetime = stxobj.updatetime
localmodel.hash = stxobj.hash
+ localmodel.events.append(events.DmsChanged(
+ id=stxobj.id,
+ notificationEventType=NotificationEventEnum.CREATE,
+ updatetime=stxobj.updatetime
+ ))
+
return localmodel
parentid: str) -> None:
if target.deploymentManagerId != stxobj.id:
raise MismatchedModel("Mismatched Id")
+ content = json.loads(stxobj.content)
+ # logger.info(content)
target.name = stxobj.name
target.createtime = stxobj.createtime
target.updatetime = stxobj.updatetime
- # ocloud.content = stxobj.content
target.hash = stxobj.hash
target.oCloudId = parentid
+ target.capabilities = content['capabilities']
target.version_number = target.version_number + 1
- target.events = []
+ target.profile = _convert_content(content)
+
+ target.events.append(events.DmsChanged(
+ id=stxobj.id,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
+
+
+def _convert_content(content: str):
+ # Convert api retrun content to dict object
+ # content = json.loads(stxobj_content)
+ admin_user = content["admin_user"]
+ cluster_api_endpoint = content["cluster_api_endpoint"]
+ cluster_ca_cert = _b64_encode_str(content["cluster_ca_cert"])
+ admin_client_cert = _b64_encode_str(content["admin_client_cert"])
+ admin_client_key = _b64_encode_str(content["admin_client_key"])
+ # admin_client_cert = base64.b64encode(
+ # bytes(content["admin_client_cert"], "utf-8"))
+ # admin_client_key = base64.b64encode(
+ # bytes(content["admin_client_key"], "utf-8"))
+ profile = {
+ "admin_user": admin_user,
+ "cluster_api_endpoint": cluster_api_endpoint,
+ "cluster_ca_cert": cluster_ca_cert,
+ "admin_client_cert": admin_client_cert,
+ "admin_client_key": admin_client_key
+ }
+
+ return json.dumps(profile)
+
+
+def _b64_encode_str(msg: str, encode: str = 'utf-8') -> str:
+ msg_bytes = msg.encode('utf-8')
+ base64_bytes = base64.b64encode(msg_bytes)
+ base64_msg = base64_bytes.decode('utf-8')
+ return base64_msg