X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2ims%2Fservice%2Fauditor%2Fdms_handler.py;h=070c929f3bdcad23add1018132ae5627e49d6af2;hb=209906139ba3d55342e5e95dd7cdb804c7b61943;hp=b7d8e83ce09080b92bcb59d66bc0ab279e419d6c;hpb=387ee50cfe8e97ba211464c311bb7b6eb9ee2961;p=pti%2Fo2.git diff --git a/o2ims/service/auditor/dms_handler.py b/o2ims/service/auditor/dms_handler.py index b7d8e83..070c929 100644 --- a/o2ims/service/auditor/dms_handler.py +++ b/o2ims/service/auditor/dms_handler.py @@ -14,16 +14,16 @@ # pylint: disable=unused-argument from __future__ import annotations +import base64 +import json +from o2ims.domain import commands, events from o2ims.domain.stx_object import StxGenericModel -# from dataclasses import asdict -# from typing import List, Dict, Callable, Type -# TYPE_CHECKING -from o2ims.domain import commands -from o2ims.service.unit_of_work import AbstractUnitOfWork +from o2ims.domain.subscription_obj import NotificationEventEnum +from o2common.service.unit_of_work import AbstractUnitOfWork from o2ims.domain.resource_type import MismatchedModel from o2ims.domain.ocloud import DeploymentManager -from o2ims import config +from o2common.config import config # if TYPE_CHECKING: # from . import unit_of_work @@ -43,7 +43,7 @@ def update_dms( with uow: dms = uow.deployment_managers.get(stxobj.id) if not dms: - logger.info("add dms:" + stxobj.name + logger.info("add dms: " + stxobj.name + " update_at: " + str(stxobj.updatetime) + " id: " + str(stxobj.id) + " hash: " + str(stxobj.hash)) @@ -56,7 +56,7 @@ def update_dms( else: localmodel = dms if is_outdated(localmodel, stxobj): - logger.info("update a dms:" + stxobj.name + logger.info("update a dms: " + stxobj.name + " update_at: " + str(stxobj.updatetime) + " id: " + str(stxobj.id) + " hash: " + str(stxobj.hash)) @@ -76,19 +76,27 @@ def is_outdated(ocloud: DeploymentManager, stxobj: StxGenericModel): def create_by(stxobj: StxGenericModel, parentid: str) -> DeploymentManager: - dmsendpoint = config.get_api_url() + config.get_o2dms_api_base(stxobj.id) + dmsendpoint = config.get_api_url() +\ + config.get_o2dms_api_base() + "/" + stxobj.id description = "A DMS" ocloudid = parentid supportedLocations = '' capabilities = '' capacity = '' + profile = _convert_content(stxobj.content) localmodel = DeploymentManager( stxobj.id, stxobj.name, ocloudid, dmsendpoint, description, - supportedLocations, capabilities, capacity) + supportedLocations, capabilities, capacity, profile) localmodel.createtime = stxobj.createtime localmodel.updatetime = stxobj.updatetime localmodel.hash = stxobj.hash + localmodel.events.append(events.DmsChanged( + id=stxobj.id, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime + )) + return localmodel @@ -99,7 +107,43 @@ def update_by(target: DeploymentManager, stxobj: StxGenericModel, target.name = stxobj.name target.createtime = stxobj.createtime target.updatetime = stxobj.updatetime - # ocloud.content = stxobj.content target.hash = stxobj.hash target.oCloudId = parentid target.version_number = target.version_number + 1 + target.profile = _convert_content(stxobj.content) + + target.events.append(events.DmsChanged( + id=stxobj.id, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) + + +def _convert_content(stxobj_content: str): + # Convert api retrun content to dict object + content = json.loads(stxobj_content) + admin_user = content["admin_user"] + cluster_api_endpoint = content["cluster_api_endpoint"] + cluster_ca_cert = _b64_encode_str(content["cluster_ca_cert"]) + admin_client_cert = _b64_encode_str(content["admin_client_cert"]) + admin_client_key = _b64_encode_str(content["admin_client_key"]) + # admin_client_cert = base64.b64encode( + # bytes(content["admin_client_cert"], "utf-8")) + # admin_client_key = base64.b64encode( + # bytes(content["admin_client_key"], "utf-8")) + profile = { + "admin_user": admin_user, + "cluster_api_endpoint": cluster_api_endpoint, + "cluster_ca_cert": cluster_ca_cert, + "admin_client_cert": admin_client_cert, + "admin_client_key": admin_client_key + } + + return json.dumps(profile) + + +def _b64_encode_str(msg: str, encode: str = 'utf-8') -> str: + msg_bytes = msg.encode('utf-8') + base64_bytes = base64.b64encode(msg_bytes) + base64_msg = base64_bytes.decode('utf-8') + return base64_msg