1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 # pylint: disable=unused-argument
16 from __future__ import annotations
20 from o2ims.domain import commands, events
21 from o2ims.domain.stx_object import StxGenericModel
22 from o2ims.domain.subscription_obj import NotificationEventEnum
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2ims.domain.resource_type import MismatchedModel
25 from o2ims.domain.ocloud import DeploymentManager
27 # from . import unit_of_work
29 from o2common.helper import o2logging
30 logger = o2logging.get_logger(__name__)
33 class InvalidResourceType(Exception):
38 cmd: commands.UpdateDms,
39 uow: AbstractUnitOfWork
43 dms = uow.deployment_managers.get(stxobj.id)
45 logger.info("add dms: " + stxobj.name
46 + " update_at: " + str(stxobj.updatetime)
47 + " id: " + str(stxobj.id)
48 + " hash: " + str(stxobj.hash))
49 # ocloud = uow.oclouds.get(cmd.parent.oCloudId)
50 localmodel = create_by(stxobj, cmd.parentid)
51 uow.deployment_managers.add(localmodel)
53 logger.info("Add a dms: " + stxobj.id
54 + ", name: " + stxobj.name)
57 if is_outdated(localmodel, stxobj):
58 logger.info("update a dms: " + stxobj.name
59 + " update_at: " + str(stxobj.updatetime)
60 + " id: " + str(stxobj.id)
61 + " hash: " + str(stxobj.hash))
62 update_by(localmodel, stxobj, cmd.parentid)
63 uow.deployment_managers.update(localmodel)
65 logger.info("Update a dms: " + stxobj.id
66 + ", name: " + stxobj.name)
70 def is_outdated(ocloud: DeploymentManager, stxobj: StxGenericModel):
71 # if stxobj.updatetime:
72 # return True if Ocloud.updatetime < stxobj.updatetime else False
74 return True if ocloud.hash != stxobj.hash else False
77 def create_by(stxobj: StxGenericModel, parentid: str) -> DeploymentManager:
80 content = json.loads(stxobj.content)
82 # logger.info(content)
83 supportedLocations = ''
84 capabilities = content['capabilities']
86 dmsendpoint = content['cluster_api_endpoint']
87 profile = _convert_content(content)
88 localmodel = DeploymentManager(
89 stxobj.id, stxobj.name, ocloudid, dmsendpoint, description,
90 supportedLocations, capabilities, capacity, profile)
91 localmodel.createtime = stxobj.createtime
92 localmodel.updatetime = stxobj.updatetime
93 localmodel.hash = stxobj.hash
95 localmodel.events.append(events.DmsChanged(
97 notificationEventType=NotificationEventEnum.CREATE,
98 updatetime=stxobj.updatetime
104 def update_by(target: DeploymentManager, stxobj: StxGenericModel,
105 parentid: str) -> None:
106 if target.deploymentManagerId != stxobj.id:
107 raise MismatchedModel("Mismatched Id")
108 content = json.loads(stxobj.content)
109 # logger.info(content)
110 target.name = stxobj.name
111 target.createtime = stxobj.createtime
112 target.updatetime = stxobj.updatetime
113 target.hash = stxobj.hash
114 target.oCloudId = parentid
115 target.capabilities = content['capabilities']
116 target.version_number = target.version_number + 1
117 target.profile = _convert_content(content)
119 target.events.append(events.DmsChanged(
121 notificationEventType=NotificationEventEnum.MODIFY,
122 updatetime=stxobj.updatetime
126 def _convert_content(content: str):
127 # Convert api retrun content to dict object
128 # content = json.loads(stxobj_content)
129 admin_user = content["admin_user"]
130 cluster_api_endpoint = content["cluster_api_endpoint"]
131 cluster_ca_cert = _b64_encode_str(content["cluster_ca_cert"])
132 admin_client_cert = _b64_encode_str(content["admin_client_cert"])
133 admin_client_key = _b64_encode_str(content["admin_client_key"])
134 # admin_client_cert = base64.b64encode(
135 # bytes(content["admin_client_cert"], "utf-8"))
136 # admin_client_key = base64.b64encode(
137 # bytes(content["admin_client_key"], "utf-8"))
139 "admin_user": admin_user,
140 "cluster_api_endpoint": cluster_api_endpoint,
141 "cluster_ca_cert": cluster_ca_cert,
142 "admin_client_cert": admin_client_cert,
143 "admin_client_key": admin_client_key
146 return json.dumps(profile)
149 def _b64_encode_str(msg: str, encode: str = 'utf-8') -> str:
150 msg_bytes = msg.encode('utf-8')
151 base64_bytes = base64.b64encode(msg_bytes)
152 base64_msg = base64_bytes.decode('utf-8')