from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain.resource_type import MismatchedModel
from o2ims.domain.ocloud import DeploymentManager
-from o2common.config import config
# if TYPE_CHECKING:
# from . import unit_of_work
def create_by(stxobj: StxGenericModel, parentid: str) -> DeploymentManager:
- dmsendpoint = config.get_api_url() +\
- config.get_o2dms_api_base() + "/" + stxobj.id
description = "A DMS"
ocloudid = parentid
+ content = json.loads(stxobj.content)
+ # logger.info(stxobj)
+ # logger.info(content)
supportedLocations = ''
- capabilities = ''
+ capabilities = content['capabilities']
capacity = ''
- profile = _convert_content(stxobj.content)
+ dmsendpoint = content['cluster_api_endpoint']
+ profile = _convert_content(content)
localmodel = DeploymentManager(
stxobj.id, stxobj.name, ocloudid, dmsendpoint, description,
supportedLocations, capabilities, capacity, profile)
parentid: str) -> None:
if target.deploymentManagerId != stxobj.id:
raise MismatchedModel("Mismatched Id")
+ content = json.loads(stxobj.content)
+ # logger.info(content)
target.name = stxobj.name
target.createtime = stxobj.createtime
target.updatetime = stxobj.updatetime
target.hash = stxobj.hash
target.oCloudId = parentid
+ target.capabilities = content['capabilities']
target.version_number = target.version_number + 1
- target.profile = _convert_content(stxobj.content)
+ target.profile = _convert_content(content)
target.events.append(events.DmsChanged(
id=stxobj.id,
))
-def _convert_content(stxobj_content: str):
+def _convert_content(content: str):
# Convert api retrun content to dict object
- content = json.loads(stxobj_content)
+ # content = json.loads(stxobj_content)
admin_user = content["admin_user"]
cluster_api_endpoint = content["cluster_api_endpoint"]
cluster_ca_cert = _b64_encode_str(content["cluster_ca_cert"])